日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

Java hdfs连接池_Java使用连接池管理Hdfs连接

發布時間:2025/3/12 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java hdfs连接池_Java使用连接池管理Hdfs连接 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

記錄一下Java API 連接hadoop操作hdfs的實現流程(使用連接池管理)。

以前做過這方面的開發,本來以為不會有什么問題,但是做的還是坑坑巴巴,內心有些懊惱,記錄下這煩人的過程,警示自己切莫眼高手低!

一:引入相關jar包如下

org.apache.hadoop

hadoop-common

2.8.2

org.apache.hadoop

hadoop-hdfs

2.8.2

org.apache.commons

commons-pool2

2.6.0

二:連接池開發的基本流程

2.1項目基本環境是SpringBoot大集成···

2.2hadoop相關包結構如下(自己感覺這結構劃分的也是凸顯了low逼水平【手動笑哭】)

2.2 畫個圖表達下開發思路

三、上代碼

importcom.cmcc.datacenter.hdfs.client.HdfsClient;importcom.cmcc.datacenter.hdfs.client.HdfsFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;

@Configurationpublic classHdfsConfig {

@Value("${hadoop.hdfs.ip}")privateString hdfsServerIp;

@Value("${hadoop.hdfs.port}")privateString hdfsServerPort;

@Value("${hadoop.hdfs.pool.maxTotal}")private intmaxTotal;

@Value("${hadoop.hdfs.pool.maxIdle}")private intmaxIdle;

@Value("${hadoop.hdfs.pool.minIdle}")private intminIdle;

@Value("${hadoop.hdfs.pool.maxWaitMillis}")private intmaxWaitMillis;

@Value("${hadoop.hdfs.pool.testWhileIdle}")private booleantestWhileIdle;

@Value("${hadoop.hdfs.pool.minEvictableIdleTimeMillis}")private long minEvictableIdleTimeMillis = 60000;

@Value("${hadoop.hdfs.pool.timeBetweenEvictionRunsMillis}")private long timeBetweenEvictionRunsMillis = 30000;

@Value("${hadoop.hdfs.pool.numTestsPerEvictionRun}")private int numTestsPerEvictionRun = -1;

@Bean(initMethod= "init", destroyMethod = "stop")publicHdfsClient HdfsClient(){

HdfsClient client= newHdfsClient();returnclient;

}/*** TestWhileConfig - 在空閑時檢查有效性, 默認false

* MinEvictableIdleTimeMillis - 逐出連接的最小空閑時間

* TimeBetweenEvictionRunsMillis - 逐出掃描的時間間隔(毫秒) 如果為負數則不運行逐出線程,默認-1

* NumTestsPerEvictionRun - 每次逐出檢查時 逐出的最大數目

**/@BeanpublicHdfsPoolConfig HdfsPoolConfig(){

HdfsPoolConfig hdfsPoolConfig= newHdfsPoolConfig();

hdfsPoolConfig.setTestWhileIdle(testWhileIdle);

hdfsPoolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);

hdfsPoolConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);

hdfsPoolConfig.setNumTestsPerEvictionRun(numTestsPerEvictionRun);

hdfsPoolConfig.setMaxTotal(maxTotal);

hdfsPoolConfig.setMaxIdle(maxIdle);

hdfsPoolConfig.setMinIdle(minIdle);

hdfsPoolConfig.setMaxWaitMillis(maxWaitMillis);returnhdfsPoolConfig;

}

@BeanpublicHdfsFactory HdfsFactory(){return new HdfsFactory("hdfs://" + hdfsServerIp + ":" +hdfsServerPort);

}

}

importorg.apache.commons.pool2.impl.GenericObjectPoolConfig;public class HdfsPoolConfig extendsGenericObjectPoolConfig {publicHdfsPoolConfig(){}/*** TestWhileConfig - 在空閑時檢查有效性, 默認false

* MinEvictableIdleTimeMillis - 逐出連接的最小空閑時間

* TimeBetweenEvictionRunsMillis - 逐出掃描的時間間隔(毫秒) 如果為負數則不運行逐出線程,默認-1

* NumTestsPerEvictionRun - 每次逐出檢查時 逐出的最大數目

**/

public HdfsPoolConfig(boolean testWhileIdle, long minEvictableIdleTimeMillis, long timeBetweenEvictionRunsMillis, intnumTestsPerEvictionRun){this.setTestWhileIdle(testWhileIdle);this.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);this.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);this.setNumTestsPerEvictionRun(numTestsPerEvictionRun);

}

}

packagecom.cmcc.datacenter.hdfs.client;importcom.cmcc.datacenter.hdfs.config.HdfsPoolConfig;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importjava.util.List;public classHdfsClient {private Logger logger = LoggerFactory.getLogger(this.getClass());privateHdfsPool hdfsPool;

@AutowiredprivateHdfsPoolConfig hdfsPoolConfig;

@AutowiredprivateHdfsFactory hdfsFactory;public voidinit(){

hdfsPool= newHdfsPool(hdfsFactory,hdfsPoolConfig);

}public voidstop(){

hdfsPool.close();

}public long getPathSize(String path) throwsException {

Hdfs hdfs= null;try{

hdfs=hdfsPool.borrowObject();returnhdfs.getContentSummary(path).getLength();

}catch(Exception e) {

logger.error("[HDFS]獲取路徑大小失敗", e);throwe;

}finally{if (null !=hdfs) {

hdfsPool.returnObject(hdfs);

}

}

}public ListgetBasePath(){

Hdfs hdfs= null;try{

hdfs=hdfsPool.borrowObject();returnhdfs.listFileName();

}catch(Exception e) {

e.printStackTrace();return null;

}finally{if (null !=hdfs) {

hdfsPool.returnObject(hdfs);

}

}

}

}

importorg.apache.commons.pool2.PooledObject;importorg.apache.commons.pool2.PooledObjectFactory;importorg.apache.commons.pool2.impl.DefaultPooledObject;importjava.io.IOException;public class HdfsFactory implements PooledObjectFactory{private finalString url;publicHdfsFactory(String url){this.url =url;

}

@Overridepublic PooledObject makeObject() throwsException {

Hdfs hdfs= newHdfs(url);

hdfs.open();return new DefaultPooledObject(hdfs);

}

@Overridepublic void destroyObject(PooledObject pooledObject) throwsException {

Hdfs hdfs=pooledObject.getObject();

hdfs.close();

}

@Overridepublic boolean validateObject(PooledObjectpooledObject) {

Hdfs hdfs=pooledObject.getObject();try{returnhdfs.isConnected();

}catch(IOException e) {

e.printStackTrace();return false;

}

}

@Overridepublic void activateObject(PooledObject pooledObject) throwsException {

}

@Overridepublic void passivateObject(PooledObject pooledObject) throwsException {

}

}

packagecom.cmcc.datacenter.hdfs.client;importorg.apache.commons.pool2.PooledObjectFactory;importorg.apache.commons.pool2.impl.AbandonedConfig;importorg.apache.commons.pool2.impl.GenericObjectPool;importorg.apache.commons.pool2.impl.GenericObjectPoolConfig;public class HdfsPool extends GenericObjectPool{public HdfsPool(PooledObjectFactoryfactory) {super(factory);

}public HdfsPool(PooledObjectFactory factory, GenericObjectPoolConfigconfig) {super(factory, config);

}public HdfsPool(PooledObjectFactory factory, GenericObjectPoolConfigconfig, AbandonedConfig abandonedConfig) {super(factory, config, abandonedConfig);

}

}

importcom.cmcc.datacenter.hdfs.config.HdfsConfig;importcom.google.common.collect.Lists;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.ContentSummary;importorg.apache.hadoop.fs.FileStatus;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importjava.io.IOException;importjava.util.List;public classHdfs {private Logger logger = LoggerFactory.getLogger(this.getClass());privateFileSystem fs;privateString coreResource;privateString hdfsResource;private finalString url;private static final String NAME = "fs.hdfs.impl";publicHdfs(String url) {this.url =url;

}public voidopen() {try{

Configuration conf= newConfiguration();

conf.set("fs.defaultFS", url);

System.out.println("url is "+url);

fs=FileSystem.get(conf);

logger.info("[Hadoop]創建實例成功");

}catch(Exception e) {

logger.error("[Hadoop]創建實例失敗", e);

}

}public voidclose() {try{if (null !=fs) {

fs.close();

logger.info("[Hadoop]關閉實例成功");

}

}catch(Exception e) {

logger.error("[Hadoop]關閉實例失敗", e);

}

}public boolean isConnected() throwsIOException {return fs.exists(new Path("/"));

}public boolean exists(String path) throwsIOException {

Path hdfsPath= newPath(path);returnfs.exists(hdfsPath);

}public FileStatus getFileStatus(String path) throwsIOException {

Path hdfsPath= newPath(path);returnfs.getFileStatus(hdfsPath);

}public ContentSummary getContentSummary(String path) throwsIOException {

ContentSummary contentSummary= null;

Path hdfsPath= newPath(path);if(fs.exists(hdfsPath)) {

contentSummary=fs.getContentSummary(hdfsPath);

}returncontentSummary;

}public List listFileName() throwsIOException {

List res =Lists.newArrayList();

FileStatus[] fileStatuses= fs.listStatus(new Path("/"));for(FileStatus fileStatus : fileStatuses){

res.add(fileStatus.getPath()+":類型--"+ (fileStatus.isDirectory()? "文件夾":"文件"));

}returnres;

}

}

四、總結:

一共六個類,理清思路看是很easy的。

這里就是spring對類的管理和commons-pool2對連接類的管理混著用了,所以顯得有點亂。

1.@Configuration注解加到Hdfsconfig類上,作為一個配置類,作用類似于spring-xml文件中的標簽,springboot會掃描并注入它名下管理的類,其中

@Bean(initMethod = "init", destroyMethod = "stop") 標簽表示spring在初始化這個類時調用他的init方法,銷毀時調用他的stop方法。

2.HdfsClient 是業務方法調用的類,spring在初始化這個類時,調用它的init方法,這個方法會創建HdfsPool(即Hdfs的連接池)。其他方法是對Hdfs中方法的二次封裝,即先使用連接池獲取實例,再調用實例方法。

3.HdfsPoolConfig繼承commons-pool2包中的GenericObjectConfig,受spring管理,作為線程池的配置類,創建HdfsPool時作為參數傳入。

4.HdfsFactory繼承commons-pool2包中的GenericObjectFactory,受spring管理,作為創建連接實例的工廠類,創建HdfsPool時作為參數傳入。實際上連接池就是通過它獲取的連接實例。

5.HdfsPool繼承commons-pool2包中的GenericObjectPool,是連接池。

6.Hdfs,是底層的連接實例,所有增刪改查的方法都要在這里實現,只不過獲取/銷毀連接交給池管理。

聲明:這里用spring管理一些類是應為項目本身用的springboot,spring管理方便,并不是強制使用,愿意完全可以自己new。

五、不得不說的一些不是坑的坑。

1.我真的不記得windows上用Java API連接遠程的hadoop還要有一些神操作。

報錯如下:java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset

解決如下:

1. 將已下載的?hadoop-2.9.0.tar 這個壓縮文件解壓,放到你想要的位置(本機任意位置);

2. 下載 windows 環境下所需的其他文件(hadoop2.9.0對應的hadoop.dll,winutils.exe 等),這步真是關鍵,吐槽某SDN想錢想瘋了啊,霸占百度前10頁,各種下載各種C幣,各種要錢。

不多說了,附上github地址:github地址

3. 拿到上面下載的windows所需文件,執行以下步驟:

3.1:將文件解壓到你解壓的 hadoop-2.9.0.tar 的bin目錄下(沒有的放進去,有的不要替換,以免花式作死,想學習嘗試的除外)

3.2:將hadoop.dll復制到C:\Window\System32下

3.3:添加環境變量HADOOP_HOME,指向hadoop目錄

3.4:將%HADOOP_HOME%\bin加入到path里面,不管用的話將%HADOOP_HOME%\sbin也加進去。

3.5:重啟 IDE(你的編輯工具,例如eclipse,intellij idea)

原文:https://www.cnblogs.com/peripateticism/p/10895903.html

總結

以上是生活随笔為你收集整理的Java hdfs连接池_Java使用连接池管理Hdfs连接的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。