日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

HBase 数据导入功能实现方式解释

發(fā)布時(shí)間:2025/4/5 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 HBase 数据导入功能实现方式解释 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

https://www.ibm.com/developerworks/cn/opensource/os-cn-data-import/index.html

預(yù)備知識(shí):啟動(dòng) HBase

清單 1. 修改 hosts 文件
1 2 3 [root@node1:2 hbase-0.96.1.1-cdh5.0.1]# cat /etc/hosts 10.17.139.186 node1 10.17.139.185 scheduler2
清單 2. 啟動(dòng) HBase 服務(wù)
1 2 3 4 5 6 7 8 9 10 11 12 [root@node1:2 bin]# ./start-hbase.sh starting master, logging to /home/zhoumingyao/hbase-0.96.1.1-cdh5.0.1/bin/../logs/hbase-root-master-node1.out [root@node1:2 bin]# jps 2981 SchedulerServer 46776 Jps 29242 org.eclipse.equinox.launcher_1.1.0.v20100507.jar 2686 IvmsSchedulerDog 46430 HMaster [root@node1:2 bin]# ps -ef | grep hbase root 46415 1 0 09:34 pts/2 00:00:00 bash /home/zhoumingyao/hbase-0.96.1.1-cdh5.0.1/bin/hbase-daemon.sh --config /home/zhoumingyao/hbase-0.96.1.1-cdh5.0.1/bin/../conf internal_start master root 46430 46415 91 09:34 pts/2 00:00:19 /usr/share/jdk1.8.0_45/bin/java -Dproc_master -XX:OnOutOfMemoryError=kill -9 %p -Xmx1000m -XX:+UseConcMarkSweepGC -Dhbase.log.dir=/home/zhoumingyao/hbase-0.96.1.1-cdh5.0.1/bin/../logs -Dhbase.log.file=hbase-root-master-node1.log -Dhbase.home.dir=/home/zhoumingyao/hbase-0.96.1.1-cdh5.0.1/bin/.. -Dhbase.id.str=root -Dhbase.root.logger=INFO,RFA -Dhbase.security.logger=INFO,RFAS org.apache.hadoop.hbase.master.HMaster start root 47464 1078 0 09:34 pts/2 00:00:00 grep hbase
清單 3. 插入若干數(shù)據(jù)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 hbase(main):002:0> put 'test', 'row1', 'cf:a', 'value1' 0 row(s) in 0.1180 seconds => ["test"] hbase(main):004:0> scan 'test' ROW COLUMN+CELL row1 column=cf:a, timestamp=1439861879625, value=value1 1row(s) in 0.0380 seconds hbase(main):005:0> put 'test', 'row2', 'cf:b', 'value2' 0 row(s) in 0.0170 seconds hbase(main):006:0> put 'test', 'row3', 'cf:c', 'value3' 0 row(s) in 0.0130 seconds hbase(main):007:0> scan 'test' ROW COLUMN+CELL ?row1 column=cf:a, timestamp=1439861879625, value=value1 ?row2 column=cf:b, timestamp=1439861962080, value=value2 row3 column=cf:c, timestamp=1439861968096, value=value3 3 row(s) in 0.0270 seconds hbase(main):008:0> put 'test', 'row2', 'cf:b', 'value2' 0 row(s) in 0.0080 seconds hbase(main):009:0> scan 'test' ROW COLUMN+CELL row1 column=cf:a, timestamp=1439861879625, value=value1 row2 column=cf:b, timestamp=1439861984176, value=value2 ?row3 column=cf:c, timestamp=1439861968096, value=value3 3 row(s) in 0.0230 seconds hbase(main):013:0> put 'test','row1','cf:a','value2' 0 row(s) in 0.0150 seconds hbase(main):014:0> scan 'test' ROW COLUMN+CELL row1 column=cf:1, timestamp=1439862083677, value=value1 row1 column=cf:a, timestamp=1439862100401, value=value2 row2 column=cf:b, timestamp=1439861984176, value=value2 row3 column=cf:c, timestamp=1439861968096, value=value3

向 HBase 導(dǎo)入數(shù)據(jù)

注意:本文代碼基于 HBase0.94 版本。

數(shù)據(jù)導(dǎo)入到 HBase,我們必須考慮分布式環(huán)境下的數(shù)據(jù)合并問(wèn)題,而數(shù)據(jù)合并問(wèn)題一直是 HBase 的難題,因?yàn)閿?shù)據(jù)合并需要頻繁執(zhí)行寫操作任務(wù),解決方案是我們可以通過(guò)生成 HBase 的內(nèi)部數(shù)據(jù)文件,這樣可以做到直接把數(shù)據(jù)文件加載到 HBase 數(shù)據(jù)庫(kù)對(duì)應(yīng)的數(shù)據(jù)表。這樣的做法寫入 HBase 的速度確實(shí)很快,但是如果合并過(guò)程中 HBase 的配置不是很正確,可能會(huì)造成寫操作阻塞。目前我們常用的數(shù)據(jù)導(dǎo)入方法有 HBase Client 調(diào)用方式、MapReduce 任務(wù)方式、Bulk Load 工具方式、Sqoop 工具方式這四種。下面的文章內(nèi)容會(huì)逐一展開講解。

下面的幾種方式都可以通過(guò) HFile 的幫助做到快速數(shù)據(jù)導(dǎo)入,我們首先在這里先給出生成 HFile 的 Java 代碼,后面各個(gè)方法內(nèi)部再按照各自方式插入 HFile 文件到 HBase 數(shù)據(jù)庫(kù)。代碼如清單 4 所示。

清單 4. 生成 HFile 代碼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import org.apache.hadoop.conf.Configuration; ??…… public class generateHFile { public static class generateHFileMapper extends Mapper<LongWritable, ????????????????Text, ImmutableBytesWritable, KeyValue> { ?@Override ?protected void map(LongWritable key, Text value, Context context) ?throws IOException, InterruptedException { ?String line = value.toString(); ?String[] items = line.split(",", -1); ?ImmutableBytesWritable rowkey = new ImmutableBytesWritable(items[0].getBytes()); ?KeyValue kvProtocol = new KeyValue(items[0].getBytes(), "colfam1".getBytes(), ?????????????????????????"colfam1".getBytes(), items[0].getBytes()); ?if (null != kvProtocol) { ?context.write(rowkey, kvProtocol); ?} ?} ?} public static void main(String[] args) throws IOException, ??????????????????????InterruptedException, ClassNotFoundException { Configuration conf = HBaseConfiguration.create(); System.out.println("conf="+conf); HTable table = new HTable(conf, "testtable1"); System.out.println("table="+table); Job job = new Job(conf, "generateHFile"); job.setJarByClass(generateHFile.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setMapperClass(generateHFileMapper.class); job.setReducerClass(KeyValueSortReducer.class); job.setOutputFormatClass(HFileOutputFormat.class);//組織成 HFile 文件 //自動(dòng)對(duì) job 進(jìn)行配置,SimpleTotalOrderPartitioner 是需要先對(duì) key 進(jìn)行整體排序, //然后劃分到每個(gè) reduce 中,保證每一個(gè) reducer 中的的 key 最小最大值區(qū)間范圍,是不會(huì)有交集的。 HFileOutputFormat.configureIncrementalLoad(job, table); ?FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

運(yùn)行代碼后生成的 HFile 文件放著后面要用。

Client API 方法

使用 HBase 的 API 中的 Put 方法是最直接的數(shù)據(jù)導(dǎo)入方式,如清單 3 我們就是采用 HBase 自帶的 Shell 工具,調(diào)用 put 命令插入了幾條數(shù)據(jù)作為演示。該方式的缺點(diǎn)是當(dāng)需要將海量數(shù)據(jù)在規(guī)定時(shí)間內(nèi)導(dǎo)入 HBase 中時(shí),需要消耗較大的 CPU 和網(wǎng)絡(luò)資源,所以這個(gè)方式適用于數(shù)據(jù)量較小的應(yīng)用環(huán)境。

使用 Put 方法將數(shù)據(jù)插入 HBase 中的方式,由于所有的操作均是在一個(gè)單獨(dú)的客戶端執(zhí)行,所以不會(huì)使用到 MapReduce 的 job 概念,即沒(méi)有任務(wù)的概念,所有的操作都是逐條插入到數(shù)據(jù)庫(kù)中的。大致的流程可以分解為 HBase Client--->HTable---->Hmastermanager/ZK(獲取-root-,--meta--)------>HregionServer----->Hregion------>Hlog/Hmemstore----->HFile。即 HBase Client 調(diào)用 HTable 類訪問(wèn)到 HMaster 的原數(shù)據(jù)保存地點(diǎn),然后通過(guò)找到相應(yīng)的 Region Server,并分配具體的 Region,最后操作到 HFile 這一層級(jí)。當(dāng)連接上 HRegionServer 后,首先獲得鎖,然后調(diào)用 HRegion 類對(duì)應(yīng)的 put 命令開始執(zhí)行數(shù)據(jù)導(dǎo)入操作,數(shù)據(jù)插入后還要寫時(shí)間戳、寫 Hlog,WAL(Write Ahead Log)、Hmemstore。具體實(shí)現(xiàn)代碼如清單 5 所示,在代碼中我們嘗試插入了 10 萬(wàn)條數(shù)據(jù),打印出插入過(guò)程消耗的時(shí)間。

清單 5. 采用 HBase Client 方式代碼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class PutDemo { ?public static void main(String[] args) throws IOException { ?//創(chuàng)建 HBase 上下文環(huán)境 ?Configuration conf = HBaseConfiguration.create(); ?System.out.println("conf="+conf); ?int count=0; ?? ?HBaseHelper helper = HBaseHelper.getHelper(conf); ?System.out.println("helper="+helper); ?helper.dropTable("testtable1"); ?helper.createTable("testtable1", "colfam1"); ?? ?HTable table = new HTable(conf, "testtable1"); ?long start = System.currentTimeMillis(); for(int i=1;i<100000;i++){ //設(shè)置 rowkey 的值 ?Put put = new Put(Bytes.toBytes("row"+i)); // 設(shè)置 family:qualifier:value ?put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), ?Bytes.toBytes("val1")); ?put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), ?Bytes.toBytes("val2")); ?//調(diào)用 put 方法,插入數(shù)據(jù)導(dǎo) HBase 數(shù)據(jù)表 testtable1 里 ?table.put(put); ?count++; ?if(count%10000==0){ ?System.out.println("Completed 10000 rows insetion"); ?} ?} ?? ?System.out.println(System.currentTimeMillis() - start); ?} }
清單 6. 采用 HBase Client 方式代碼運(yùn)行輸出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 conf=Configuration: core-default.xml, core-site.xml, hbase-default.xml, hbase-site.xml 2015-08-20 18:58:18,184 WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-08-20 18:58:18,272 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:zookeeper.version=3.4.5-cdh4.6.0--1, built on 02/26/2014 09:15 GMT 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:host.name=node3 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.version=1.7.0_79 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.vendor=Oracle Corporation 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.home=/usr/lib/jdk1.7.0_79/jre 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.class.path=./zz.jar 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.io.tmpdir=/tmp 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.compiler=<NA> 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.name=Linux 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.arch=amd64 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.version=2.6.32-220.el6.x86_64 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.name=root 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.home=/root 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.dir=/home/zhoumingyao 2015-08-20 18:58:18,277 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=hconnection 2015-08-20 18:58:18,294 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(104)) - The identifier of this process is 32390@node3 2015-08-20 18:58:18,300 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(973)) - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-08-20 18:58:18,308 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to localhost/127.0.0.1:2181, initiating session 2015-08-20 18:58:18,317 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1214)) - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x24f2624839f0023, negotiated timeout = 180000 2015-08-20 18:58:18,394 WARN [main] conf.Configuration (Configuration.java:warnOnceIfDeprecated(981)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available helper=HBaseHelper@5d48e5d6 2015-08-20 18:58:18,570 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=catalogtracker-on-org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@6c521576 2015-08-20 18:58:18,571 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(104)) - The identifier of this process is 32390@node3 2015-08-20 18:58:18,572 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(973)) - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-08-20 18:58:18,572 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session 2015-08-20 18:58:18,575 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1214)) - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x24f2624839f0024, negotiated timeout = 180000 2015-08-20 18:58:18,647 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:close(684)) - Session: 0x24f2624839f0024 closed 2015-08-20 18:58:18,647 INFO [main-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(512)) - EventThread shut down 2015-08-20 18:58:18,672 INFO [main] client.HBaseAdmin (HBaseAdmin.java:disableTableAsync(858)) - Started disable of testtable1 2015-08-20 18:58:18,676 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=catalogtracker-on-org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@6c521576 2015-08-20 18:58:18,678 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(104)) - The identifier of this process is 32390@node3 2015-08-20 18:58:18,679 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(973)) - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-08-20 18:58:18,680 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to localhost/127.0.0.1:2181, initiating session 2015-08-20 18:58:18,683 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1214)) - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x24f2624839f0025, negotiated timeout = 180000 2015-08-20 18:58:18,705 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:close(684)) - Session: 0x24f2624839f0025 closed 2015-08-20 18:58:18,705 INFO [main-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(512)) - EventThread shut down 2015-08-20 18:58:19,713 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=catalogtracker-on-org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@6c521576 2015-08-20 18:58:19,714 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(104)) - The identifier of this process is 32390@node3 2015-08-20 18:58:19,715 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(973)) - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-08-20 18:58:19,716 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session 2015-08-20 18:58:19,720 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1214)) - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x24f2624839f0026, negotiated timeout = 180000 2015-08-20 18:58:19,733 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:close(684)) - Session: 0x24f2624839f0026 closed 2015-08-20 18:58:19,733 INFO [main-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(512)) - EventThread shut down 2015-08-20 18:58:19,735 INFO [main] client.HBaseAdmin (HBaseAdmin.java:disableTable(905)) - Disabled testtable1 2015-08-20 18:58:20,763 INFO [main] client.HBaseAdmin (HBaseAdmin.java:deleteTable(656)) - Deleted testtable1 table=testtable1 2015-08-20 18:58:21,809 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=catalogtracker-on-org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@6c521576 2015-08-20 18:58:21,810 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(104)) - The identifier of this process is 32390@node3 2015-08-20 18:58:21,811 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(973)) - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-08-20 18:58:21,812 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to localhost/127.0.0.1:2181, initiating session 2015-08-20 18:58:21,816 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1214)) - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x24f2624839f0027, negotiated timeout = 180000 2015-08-20 18:58:21,828 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:close(684)) - Session: 0x24f2624839f0027 closed 2015-08-20 18:58:21,828 INFO [main-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(512)) - EventThread shut down Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion 127073ms

整個(gè)插入 10 萬(wàn)條數(shù)據(jù)的耗時(shí)達(dá)到了 127 秒,即 2 分鐘。清單 7 所示是清單 5 代碼中用到的類源代碼。

清單 7.HBaseHelper 類代碼部分相關(guān)代碼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 import org.apache.hadoop.conf.Configuration; …… /** ?* Used by the book examples to generate tables and fill them with test data. ?*/ public class HBaseHelper { //在 Java 代碼中,為了連接到 HBase,我們首先創(chuàng)建一個(gè)配置(Configuration)對(duì)象,使用該對(duì)象創(chuàng)建一個(gè) HTable 實(shí)例。 //這個(gè) HTable 對(duì)象用于處理所有的客戶端 API 調(diào)用。 ?private Configuration conf = null; ?private HBaseAdmin admin = null; ?protected HBaseHelper(Configuration conf) throws IOException { ?this.conf = conf; ?this.admin = new HBaseAdmin(conf); ?} ?public static HBaseHelper getHelper(Configuration conf) throws IOException { ?return new HBaseHelper(conf); ?} ?public void put(String table, String row, String fam, String qual, long ts, ?String val) throws IOException { ?HTable tbl = new HTable(conf, table); ?Put put = new Put(Bytes.toBytes(row)); ?put.add(Bytes.toBytes(fam), Bytes.toBytes(qual), ts, ?Bytes.toBytes(val)); ?tbl.put(put); ?tbl.close(); ?} ?public void put(String table, String[] rows, String[] fams, String[] quals, ?long[] ts, String[] vals) throws IOException { ?HTable tbl = new HTable(conf, table); ?for (String row : rows) { ?Put put = new Put(Bytes.toBytes(row)); ?for (String fam : fams) { ?int v = 0; ?for (String qual : quals) { ?String val = vals[v < vals.length ? v : vals.length]; ?long t = ts[v < ts.length ? v : ts.length - 1]; ?put.add(Bytes.toBytes(fam), Bytes.toBytes(qual), t, ?Bytes.toBytes(val)); ?v++; ?} ?} ?tbl.put(put); ?} ?tbl.close(); ?} ?public void dump(String table, String[] rows, String[] fams, String[] quals) ?throws IOException { ?HTable tbl = new HTable(conf, table); ?List<Get> gets = new ArrayList<Get>(); ?for (String row : rows) { ?Get get = new Get(Bytes.toBytes(row)); ?get.setMaxVersions(); ?if (fams != null) { ?for (String fam : fams) { ?for (String qual : quals) { ?get.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual)); ?} ?} ?} ?gets.add(get); ?} ?Result[] results = tbl.get(gets); ?for (Result result : results) { ?for (KeyValue kv : result.raw()) { ?System.out.println("KV: " + kv + ?", Value: " + Bytes.toString(kv.getValue())); ?} ?} ?} } public void dropTable(String table) throws IOException { ?if (existsTable(table)) { ?disableTable(table); ?admin.deleteTable(table); ?} ?} public void put(String table, String row, String fam, String qual, long ts, ?String val) throws IOException { ?HTable tbl = new HTable(conf, table); ?Put put = new Put(Bytes.toBytes(row)); ?put.add(Bytes.toBytes(fam), Bytes.toBytes(qual), ts, ?Bytes.toBytes(val)); ?tbl.put(put); ?tbl.close(); ?}

MapReduce 方法

如果需要通過(guò)編程來(lái)生成數(shù)據(jù),那么用 importtsv 工具不是很方便,這時(shí)候可以使用 MapReduce 向 HBase 導(dǎo)入數(shù)據(jù),但海量的數(shù)據(jù)集會(huì)讓 MapReduce Job 變得很繁重,若處理不當(dāng),則可能使得 MapReduce 的 job 運(yùn)行時(shí)的吞吐量很小。由于 MapReduce 在寫 HBase 是采用的是 TableOutputFormat 方式,這樣在寫入數(shù)據(jù)庫(kù)的時(shí)候容易對(duì)寫入塊進(jìn)行頻繁的刷新、分割、合并操作,這些操作都是較為耗費(fèi)磁盤 I/O 的操作,最終導(dǎo)致 HBase 節(jié)點(diǎn)的不穩(wěn)定性。

前面介紹過(guò)生成 HFile 的代碼,生成 HFile 后,我們可以采用 MapReduce 方式把數(shù)據(jù)導(dǎo)入到 HBase 數(shù)據(jù)表里,具體代碼如清單 8 所示。

清單 8.MapReduce 方式導(dǎo)入 HFile 到 HBase 數(shù)據(jù)表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 import java.io.IOException; …… public class HBaseImportByMapReduce extends Configured implements Tool { static final Log LOG = LogFactory.getLog(HBaseImportByMapReduce.class); public static final String JOBNAME = "MapReduceImport"; public static class Map extends Mapper<LongWritable , ??????????????????????Text, NullWritable, NullWritable>{ ?Configuration configuration = null; ?HTable xTable = null; ?static long count = 0; ?? ?@Override ?protected void cleanup(Context context) throws IOException,InterruptedException { ?// TODO Auto-generated method stub ?super.cleanup(context); ?xTable.flushCommits(); ?xTable.close(); ?} ?? ?@Override ?protected void map(LongWritable key, Text value, Context context) ??????????????????????????????throws IOException, InterruptedException { ?String all[] = value.toString().split("/t"); ?Put put = new Put(Bytes.toBytes(all[0])); ?put.add(Bytes.toBytes("colfam1"),Bytes.toBytes("value1"), null); ?xTable.put(put); ?if ((++count % 100)==0) { ?context.setStatus(count +" DOCUMENTS done!"); ?context.progress(); ?System.out.println(count +" DOCUMENTS done!"); ?} ?} ?@Override ?protected void setup(Context context) throws IOException,InterruptedException { ?// TODO Auto-generated method stub ?super.setup(context); ?configuration = context.getConfiguration(); ?xTable = new HTable(configuration,"testtable2"); ?xTable.setAutoFlush(false); ?xTable.setWriteBufferSize(12*1024*1024); ?} } @Override public int run(String[] args) throws Exception { ?String input = args[0]; ?Configuration conf = HBaseConfiguration.create(getConf()); ?conf.set("hbase.master", "node1:60000"); ?Job job = new Job(conf,JOBNAME); ?job.setJarByClass(HBaseImportByMapReduce.class); ?job.setMapperClass(Map.class); ?job.setNumReduceTasks(0); ?job.setInputFormatClass(TextInputFormat.class); ?TextInputFormat.setInputPaths(job, input); ?job.setOutputFormatClass(NullOutputFormat.class); ?return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws IOException { ?Configuration conf = new Configuration(); ?String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); ?int res = 1; ?try { ?res = ToolRunner.run(conf, new HBaseImportByMapReduce(), otherArgs); ?} catch (Exception e) { ?e.printStackTrace(); ?} ?System.exit(res); } }

清單 8 所示的 MapReduce 方式,啟動(dòng)任務(wù)需要一些時(shí)間,如果數(shù)據(jù)量較大,整個(gè) Map 過(guò)程也會(huì)消耗較多時(shí)間。

其實(shí)一般來(lái)說(shuō) MapReduce 方式和后面要介紹的 Bulk Load 方式是配合使用的,MapReduce 負(fù)責(zé)生成 HFile 文件,Bulk Load 負(fù)責(zé)導(dǎo)入 HBase。

Bulk Load方式

總的來(lái)說(shuō),使用 Bulk Load 方式由于利用了 HBase 的數(shù)據(jù)信息是按照特定格式存儲(chǔ)在 HDFS 里的這一特性,直接在 HDFS 中生成持久化的 HFile 數(shù)據(jù)格式文件,然后完成巨量數(shù)據(jù)快速入庫(kù)的操作,配合 MapReduce 完成這樣的操作,不占用 Region 資源,不會(huì)產(chǎn)生巨量的寫入 I/O,所以需要較少的 CPU 和網(wǎng)絡(luò)資源。Bulk Load 的實(shí)現(xiàn)原理是通過(guò)一個(gè) MapReduce Job 來(lái)實(shí)現(xiàn)的,通過(guò) Job 直接生成一個(gè) HBase 的內(nèi)部 HFile 格式文件,用來(lái)形成一個(gè)特殊的 HBase 數(shù)據(jù)表,然后直接將數(shù)據(jù)文件加載到運(yùn)行的集群中。使用 Bulk Load 功能最簡(jiǎn)單的方式就是使用 ImportTsv 工具,ImportTsv 是 HBase 的一個(gè)內(nèi)置工具,目的是從 TSV 文件直接加載內(nèi)容至 HBase。它通過(guò)運(yùn)行一個(gè) MapReduce Job, 將數(shù)據(jù)從 TSV 文件中直接寫入 HBase 的表或者寫入一個(gè) HBase 的自有格式數(shù)據(jù)文件。

ImportTsv 本身是一個(gè)在 HBase 的 JAR 文件中的 Java 類,使用 ImportTsv 工具,首先創(chuàng)建一個(gè)數(shù)據(jù)文件,如清單 9 所示,我們創(chuàng)建了一個(gè) data.tsv 文件,包含 4 條數(shù)據(jù)。

清單 9.data.tsv
1 2 3 4 5 [root@node3 zhoumingyao]# vi data.tsv 1001 name1 17 00000000001 1002 name2 16 00000000002 1003 name3 16 00000000003 1004 name4 16 00000000004

由于 ImportTsv 工具只支持從 HDFS 中讀取數(shù)據(jù),所以一開始我們需要將 TSV 文件從本地文件系統(tǒng)拷貝到 HDFS 中,接下來(lái)我們?cè)?HDFS 里新建文件夾后上傳 data.tsv 文件到該文件夾,由于讀和寫的操作是在多臺(tái)服務(wù)器上并行執(zhí)行,所以相比從單臺(tái)節(jié)點(diǎn)讀取速度快很多。需要指定輸出 (-Dimporttsv.bulk.output), 否則默認(rèn)會(huì)采用 HBase API 方式插入數(shù)據(jù)。代碼如清單 10 所示。

清單 10. 調(diào)用 ImportTsv
1 2 3 4 5 6 7 $HADOOP_HOME/bin/hadoop fs -mkdir /user/test 創(chuàng)建數(shù)據(jù)表 create 'student', {NAME => 'info'} 調(diào)用 importtsv 命令導(dǎo)入數(shù)據(jù), $HADOOP_HOME/bin/hadoop jar /usr/lib/cdh/hbase/hbase-0.94.15-hdh4.6.0.jar ??????????importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:phone ??????????????????????-Dimporttsv.bulk.output=/user/test/output/ student /user/test/data.tsv

記住需要啟動(dòng) YARN,否則會(huì)報(bào)錯(cuò),如清單 11 所示。

清單 11. 錯(cuò)誤提示
1 2 3 15/08/21 13:41:27 INFO ipc.Client: Retrying connect to ??????????????server: node1/172.10.201.62:18040. Already tried 0 time(s); ????retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)

ImportTsv 工具默認(rèn)使用了 HBase 的 Put API 來(lái)將數(shù)據(jù)插入 HBase 表中,在 Map 階段使用的是 TableOutputFormat。但是當(dāng)-Dimporttsv.bulk. 輸入選項(xiàng)被指定時(shí),會(huì)使用 HFileOutputFormat 來(lái)代替在 HDFS 中生成 HBase 的自有格式文件(HFile)。而后我們能夠使用 completebulkload 來(lái)加載生成的文件到一個(gè)運(yùn)行的集群中。根據(jù)清單 12 可以使用 bulk 輸出以及加載工具。

清單 12. 調(diào)用 completebulkload
1 2 3 4 5 6 7 8 9 創(chuàng)建生成文件的文件夾: $HADOOP_HOME/bin/hadoop fs -mkdir /user/hac/output 開始導(dǎo)入數(shù)據(jù): $HADOOP_HOME/bin/hadoop jar /usr/lib/cdh/hbase/hbase-0.94.15-hdh4.6.0.jar ??importtsv -Dimporttsv.bulk.output=/user/hac/output/2-1 -Dimporttsv.columns= ????????????HBASE_ROW_KEY,info:name,info:age,info:phone student /user/hac/input/2-1 完成 bulk load 導(dǎo)入 $HADOOP_HOME/bin/hadoop jar /usr/lib/cdh/hbase/hbase-0.94.15-hdh4.6.0.jar ??????????????completebulkload /user/hac/output/2-1 student

Completebulkload 工具讀取生成的文件,判斷它們歸屬的 Resgion Server 族群,然后訪問(wèn)適當(dāng)?shù)淖迦悍?wù)器。族群服務(wù)器會(huì)將 HFile 文件轉(zhuǎn)移進(jìn)自身存儲(chǔ)目錄中,并且為客戶端建立在線數(shù)據(jù)。

HBase 說(shuō)明文檔里面記載,Bulk Load 方法分為兩個(gè)主要步驟:

1. 使用 HFileOutputFormat 類通過(guò)一個(gè) MapReduce 任務(wù)方式生成 HBase 的數(shù)據(jù)文件,就是英文稱為“StoreFiles”的數(shù)據(jù)文件。由于輸出的時(shí)候按照 HBase 內(nèi)部的存儲(chǔ)格式來(lái)輸出數(shù)據(jù),所以后面讀入 HBase 集群的時(shí)候就非常高效了。為了保證高效性,HFileOutputFormat 借助 configureIncrementalLoad 函數(shù),基于當(dāng)前 Table 的各 Region 邊界自動(dòng)匹配 MapReduce 的分區(qū)類 TotalOrderPartitioner,這樣每一個(gè)輸出的 HFile 都會(huì)是在一個(gè)單獨(dú)的 Region 里面的。

為了實(shí)現(xiàn)這樣的設(shè)計(jì),所有任務(wù)的輸出都需要使用 Hadoop 的 TotalOrderPartitioner 類去對(duì)輸出進(jìn)行分區(qū),按照 Regions 的主鍵范圍進(jìn)行分區(qū)。HFileOutputFormat 類包含了一個(gè)快捷方法,即 configureIncrementalLoad(),它自動(dòng)基于數(shù)據(jù)表的當(dāng)前 region 間隔生成一個(gè) TotalOrderPartitioner。

2. 完成數(shù)據(jù)載入到 HBase。當(dāng)所有的數(shù)據(jù)都被用 HFileOutputFormat 方式準(zhǔn)備好以后,我們可以使用 completebulkload 讀入到集群。這個(gè)命令行工具迭代循環(huán)數(shù)據(jù)文件,對(duì)于每一個(gè)數(shù)據(jù)文件迅速找到屬于它的 region,然后 Region 服務(wù)器會(huì)讀入這些 HFile。如果在生成文件的過(guò)程當(dāng)中 region 被修改了,那 completebulkload 工具會(huì)自動(dòng)切分?jǐn)?shù)據(jù)文件到新的區(qū)域,這個(gè)過(guò)程需要花費(fèi)一些時(shí)間。如果數(shù)據(jù)表 (此處是 mytable) 不存在,工具會(huì)自動(dòng)創(chuàng)建該數(shù)據(jù)表。

如清單 13 所示,我們也調(diào)用方法直接載入 HFile 文件到 HBase,采用 Bulk Load 方式完成這個(gè)實(shí)驗(yàn)。

清單 13.Bulk Load 方式載入 HFile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; ?? public class loadIncrementalHFileToHBase { ?? ?public static void main(String[] args) throws Exception { ?Configuration conf = HBaseConfiguration.create(); ?HBaseHelper helper = HBaseHelper.getHelper(conf); ?helper.dropTable("testtable2"); ?helper.createTable("testtable2", "colfam1"); ?HTable table = new HTable("testtable2"); ?LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); ?loader.doBulkLoad(new Path(args[0]), table); ?} ?? }

特別提醒:

1. 一定記得建 HBase 數(shù)據(jù)表時(shí)做 Region 的預(yù)切分,HFileOutputFormat.configureIncrementalLoad 方法會(huì)根據(jù) Region 的數(shù)量來(lái)決定 Reduce 的數(shù)量以及每個(gè) Reduce 覆蓋的 RowKey 范圍,否則單個(gè) Reduce 過(guò)大,容易造成任務(wù)處理不均衡。造成這個(gè)的原因是,創(chuàng)建 HBase 表的時(shí)候, 默認(rèn)只有一個(gè) Region, 只有等到這個(gè) Region 的大小超過(guò)一定的閾值之后, 才會(huì)進(jìn)行 split,所以為了利用完全分布式加快生成 HFile 和導(dǎo)入 HBase 中以及數(shù)據(jù)負(fù)載均衡, 我們需要在創(chuàng)建表的時(shí)候預(yù)先進(jìn)行分區(qū), 而進(jìn)行分區(qū)時(shí)要利用 startKey 與 endKey 進(jìn)行 rowKey 區(qū)間劃分 (因?yàn)閷?dǎo)入 HBase 中, 需要 rowKey 整體有序)。解決方法是在數(shù)據(jù)導(dǎo)入之前, 自己先寫一個(gè) MapReduce 的 Job 求最小與最大的 rowKey,即 startKey 與 endKey。

2. 單個(gè) RowKey 下的子列不要過(guò)多,否則在 reduce 階段排序的時(shí)候會(huì)造成內(nèi)存溢出異常,有一種辦法是通過(guò)二次排序來(lái)避免 reduce 階段的排序,這個(gè)解決方案需要視具體應(yīng)用而定。

Sqoop 方法

Sqoop 是 Apache 頂級(jí)項(xiàng)目,主要用于在 Hadoop(Hive) 與傳統(tǒng)的數(shù)據(jù)庫(kù) (mysql、postgresql 等等) 之間進(jìn)行數(shù)據(jù)的傳遞,可以將一個(gè)關(guān)系型數(shù)據(jù)庫(kù),例如 MySQL,Oracle,Postgres 等中的數(shù)據(jù)導(dǎo)入到 Hadoop 的 HDFS 中,也可以將 HDFS 的數(shù)據(jù)導(dǎo)進(jìn)到關(guān)系型數(shù)據(jù)庫(kù)中。Sqoop 支持多種導(dǎo)入方式,包括指定列導(dǎo)入,指定格式導(dǎo)入,支持增量導(dǎo)入(有更新才導(dǎo)入)等等。Sqoop 的一個(gè)特點(diǎn)就是可以通過(guò) Hadoop 的 MapReduce 把數(shù)據(jù)從關(guān)系型數(shù)據(jù)庫(kù)中導(dǎo)入數(shù)據(jù)到 HDFS。

Sqoop 的架構(gòu)較為簡(jiǎn)單,通過(guò)整合 Hive,實(shí)現(xiàn) SQL 方式的操作,通過(guò)整合 HBase,可以向 HBase 寫入數(shù)據(jù),通過(guò)整合 Oozie,擁有了任務(wù)流的概念。而 Sqoop 本身是通過(guò) MapReduce 機(jī)制來(lái)保證傳輸數(shù)據(jù),從而提供并發(fā)特性和容錯(cuò)機(jī)制,系統(tǒng)架構(gòu)圖如圖 1 所示,來(lái)源 Apache 官方網(wǎng)站。

圖 1.Sqoop 系統(tǒng)架構(gòu)圖

在使用上,Sqoop 對(duì)外提供了一組操作命令,只需要簡(jiǎn)單配置就可以進(jìn)行數(shù)據(jù)的轉(zhuǎn)移。

首先配置 Sqoop,如清單 14 所示,對(duì)/etc/profile 文件添加兩行,然后執(zhí)行命令。

清單 14. 配置 Sqoop
1 2 3 export SQOOP_HOME=/home/zhoumingyao/sqoop2-1.99.3-cdh5.0.1 export PATH = $SQOOP_HOME/bin:$PATH source /etc/profile

我們這次做的實(shí)驗(yàn)使用了 Sqoop 的 import 功能,用于將 Oracle 中的人員信息導(dǎo)入到 HBase。在 Hadoop 和 HBase 正常運(yùn)行的環(huán)境里,我們首先需要配置好 Sqoop,然后調(diào)用如下的命令即可將 Oracle 中的表導(dǎo)入到 HBase 中,代碼如清單 15 所示。

清單 15.Sqoop 導(dǎo)入 Oracle 數(shù)據(jù)到 HBase
1 2 3 4 5 6 7 8 9 10 11 12 sqoop import ?--connect jdbc:oracle:thin:@172.7.27.225:1521:testzmy //JDBC URL ?--username SYSTEM //Oracle username(必須大寫) ?--password hik123456 //Oracle password ?--query 'SELECT RYID, HZCZRK_JBXXB.ZPID, HZCZRK_JBXXB.GMSFHM, HZCZRK_JBXXB.XM, HZCZRK_JBXXB.XB, ?HZCZRK_JBXXB.CSRQ, HZCZRK_ZPXXB.ZP AS ZP FROM HZCZRK_JBXXB ?JOIN HZCZRK_ZPXXB USING(RYID) WHERE $CONDITIONS' // Oracle 數(shù)據(jù),Sqoop 支持多表 query ?--split-by RYID //指定并行處理切分任務(wù)的列名,通常為主鍵 --map-column-java ZP=String //ZP 為 LONG RAW 類型,sqoop 不支持,需要映射成 String ?--hbase-table TESTHZ //HBase 中的 Table ?--column-family INFO //HBase 中的 column-family

清單 15 所示代碼從兩張數(shù)據(jù)表 HZCZRK_JBXXB 和 HZCZRK_ZPXXB 讀取數(shù)據(jù)并寫入到 HBase 數(shù)據(jù)表 TESTHZ,該數(shù)據(jù)表有一個(gè)列祖 INFO。我們?cè)?VMWare CentOS5.6 單節(jié)點(diǎn)偽分布式環(huán)境下進(jìn)行了測(cè)試。測(cè)試結(jié)果顯示,單表 HZCZRK_ZPXXB 導(dǎo)入 90962 條數(shù)據(jù)耗時(shí)約 27 分鐘,兩表 HZCZRK_JBXXB 和 HZCZRK_ZPXXB JOIN 導(dǎo)入 90962 條數(shù)據(jù)耗時(shí)約 50 分鐘。

該實(shí)驗(yàn)顯示 Sqoop 使用過(guò)程中的局限性:

1. Import 中進(jìn)行多表 query 的方式效率會(huì)受到影響;

2. 不支持從數(shù)據(jù)庫(kù)的視圖導(dǎo)出數(shù)據(jù);

3. 不支持 BLOB、RAW 等大數(shù)據(jù)塊類型直接導(dǎo)入到 HBase,需要通過(guò)--map-column-java 將對(duì)應(yīng)的列映射成 Java 的基本類型 String 來(lái)處理;

4. 每次 import 只能導(dǎo)入到 HBase 的一個(gè) column family。

總的來(lái)說(shuō),Sqoop 類似于其他 ETL 工具,使用元數(shù)據(jù)模型來(lái)判斷數(shù)據(jù)類型并在數(shù)據(jù)從數(shù)據(jù)源轉(zhuǎn)移到 Hadoop 時(shí)確保類型安全的數(shù)據(jù)處理。Sqoop 專為大數(shù)據(jù)批量傳輸設(shè)計(jì),能夠分割數(shù)據(jù)集并創(chuàng)建 Hadoop 任務(wù)來(lái)處理每個(gè)區(qū)塊。

除了上面介紹的 4 種方法的實(shí)現(xiàn),我這里還想多提一些關(guān)于數(shù)據(jù)分布、合并的注意事項(xiàng)。HBase 數(shù)據(jù)庫(kù)不適用于經(jīng)常更新的應(yīng)用場(chǎng)景,寫操作很頻繁的任務(wù)可能引起的另一個(gè)問(wèn)題是將數(shù)據(jù)寫入了單一的族群服務(wù)器 (Region Server),這種情況經(jīng)常出現(xiàn)在將海量數(shù)據(jù)導(dǎo)入到一個(gè)新建的 HBase 數(shù)據(jù)庫(kù)中時(shí)。一旦數(shù)據(jù)集中在相同的服務(wù)器上,整個(gè)集群就變得不平衡,并且寫速度會(huì)顯著的降低。

結(jié)束語(yǔ)

數(shù)據(jù)導(dǎo)入環(huán)節(jié)屬于大數(shù)據(jù)應(yīng)用的數(shù)據(jù)清洗部分,需要嘗試多種方式將數(shù)據(jù)導(dǎo)入進(jìn)去,沒(méi)有哪一種方法是唯一的選擇,我們首先要根據(jù)用戶的實(shí)際環(huán)境選擇正確的方式。總的來(lái)說(shuō),Bulk Load 方式是最快速的,我們可以優(yōu)先選擇它。

轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/articles/9181251.html

總結(jié)

以上是生活随笔為你收集整理的HBase 数据导入功能实现方式解释的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。