日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

HBase 数据导入功能实现方式解释

發布時間:2025/4/5 编程问答 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 HBase 数据导入功能实现方式解释 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

https://www.ibm.com/developerworks/cn/opensource/os-cn-data-import/index.html

預備知識:啟動 HBase

清單 1. 修改 hosts 文件
1 2 3 [root@node1:2 hbase-0.96.1.1-cdh5.0.1]# cat /etc/hosts 10.17.139.186 node1 10.17.139.185 scheduler2
清單 2. 啟動 HBase 服務
1 2 3 4 5 6 7 8 9 10 11 12 [root@node1:2 bin]# ./start-hbase.sh starting master, logging to /home/zhoumingyao/hbase-0.96.1.1-cdh5.0.1/bin/../logs/hbase-root-master-node1.out [root@node1:2 bin]# jps 2981 SchedulerServer 46776 Jps 29242 org.eclipse.equinox.launcher_1.1.0.v20100507.jar 2686 IvmsSchedulerDog 46430 HMaster [root@node1:2 bin]# ps -ef | grep hbase root 46415 1 0 09:34 pts/2 00:00:00 bash /home/zhoumingyao/hbase-0.96.1.1-cdh5.0.1/bin/hbase-daemon.sh --config /home/zhoumingyao/hbase-0.96.1.1-cdh5.0.1/bin/../conf internal_start master root 46430 46415 91 09:34 pts/2 00:00:19 /usr/share/jdk1.8.0_45/bin/java -Dproc_master -XX:OnOutOfMemoryError=kill -9 %p -Xmx1000m -XX:+UseConcMarkSweepGC -Dhbase.log.dir=/home/zhoumingyao/hbase-0.96.1.1-cdh5.0.1/bin/../logs -Dhbase.log.file=hbase-root-master-node1.log -Dhbase.home.dir=/home/zhoumingyao/hbase-0.96.1.1-cdh5.0.1/bin/.. -Dhbase.id.str=root -Dhbase.root.logger=INFO,RFA -Dhbase.security.logger=INFO,RFAS org.apache.hadoop.hbase.master.HMaster start root 47464 1078 0 09:34 pts/2 00:00:00 grep hbase
清單 3. 插入若干數據
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 hbase(main):002:0> put 'test', 'row1', 'cf:a', 'value1' 0 row(s) in 0.1180 seconds => ["test"] hbase(main):004:0> scan 'test' ROW COLUMN+CELL row1 column=cf:a, timestamp=1439861879625, value=value1 1row(s) in 0.0380 seconds hbase(main):005:0> put 'test', 'row2', 'cf:b', 'value2' 0 row(s) in 0.0170 seconds hbase(main):006:0> put 'test', 'row3', 'cf:c', 'value3' 0 row(s) in 0.0130 seconds hbase(main):007:0> scan 'test' ROW COLUMN+CELL ?row1 column=cf:a, timestamp=1439861879625, value=value1 ?row2 column=cf:b, timestamp=1439861962080, value=value2 row3 column=cf:c, timestamp=1439861968096, value=value3 3 row(s) in 0.0270 seconds hbase(main):008:0> put 'test', 'row2', 'cf:b', 'value2' 0 row(s) in 0.0080 seconds hbase(main):009:0> scan 'test' ROW COLUMN+CELL row1 column=cf:a, timestamp=1439861879625, value=value1 row2 column=cf:b, timestamp=1439861984176, value=value2 ?row3 column=cf:c, timestamp=1439861968096, value=value3 3 row(s) in 0.0230 seconds hbase(main):013:0> put 'test','row1','cf:a','value2' 0 row(s) in 0.0150 seconds hbase(main):014:0> scan 'test' ROW COLUMN+CELL row1 column=cf:1, timestamp=1439862083677, value=value1 row1 column=cf:a, timestamp=1439862100401, value=value2 row2 column=cf:b, timestamp=1439861984176, value=value2 row3 column=cf:c, timestamp=1439861968096, value=value3

向 HBase 導入數據

注意:本文代碼基于 HBase0.94 版本。

數據導入到 HBase,我們必須考慮分布式環境下的數據合并問題,而數據合并問題一直是 HBase 的難題,因為數據合并需要頻繁執行寫操作任務,解決方案是我們可以通過生成 HBase 的內部數據文件,這樣可以做到直接把數據文件加載到 HBase 數據庫對應的數據表。這樣的做法寫入 HBase 的速度確實很快,但是如果合并過程中 HBase 的配置不是很正確,可能會造成寫操作阻塞。目前我們常用的數據導入方法有 HBase Client 調用方式、MapReduce 任務方式、Bulk Load 工具方式、Sqoop 工具方式這四種。下面的文章內容會逐一展開講解。

下面的幾種方式都可以通過 HFile 的幫助做到快速數據導入,我們首先在這里先給出生成 HFile 的 Java 代碼,后面各個方法內部再按照各自方式插入 HFile 文件到 HBase 數據庫。代碼如清單 4 所示。

清單 4. 生成 HFile 代碼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import org.apache.hadoop.conf.Configuration; ??…… public class generateHFile { public static class generateHFileMapper extends Mapper<LongWritable, ????????????????Text, ImmutableBytesWritable, KeyValue> { ?@Override ?protected void map(LongWritable key, Text value, Context context) ?throws IOException, InterruptedException { ?String line = value.toString(); ?String[] items = line.split(",", -1); ?ImmutableBytesWritable rowkey = new ImmutableBytesWritable(items[0].getBytes()); ?KeyValue kvProtocol = new KeyValue(items[0].getBytes(), "colfam1".getBytes(), ?????????????????????????"colfam1".getBytes(), items[0].getBytes()); ?if (null != kvProtocol) { ?context.write(rowkey, kvProtocol); ?} ?} ?} public static void main(String[] args) throws IOException, ??????????????????????InterruptedException, ClassNotFoundException { Configuration conf = HBaseConfiguration.create(); System.out.println("conf="+conf); HTable table = new HTable(conf, "testtable1"); System.out.println("table="+table); Job job = new Job(conf, "generateHFile"); job.setJarByClass(generateHFile.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setMapperClass(generateHFileMapper.class); job.setReducerClass(KeyValueSortReducer.class); job.setOutputFormatClass(HFileOutputFormat.class);//組織成 HFile 文件 //自動對 job 進行配置,SimpleTotalOrderPartitioner 是需要先對 key 進行整體排序, //然后劃分到每個 reduce 中,保證每一個 reducer 中的的 key 最小最大值區間范圍,是不會有交集的。 HFileOutputFormat.configureIncrementalLoad(job, table); ?FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

運行代碼后生成的 HFile 文件放著后面要用。

Client API 方法

使用 HBase 的 API 中的 Put 方法是最直接的數據導入方式,如清單 3 我們就是采用 HBase 自帶的 Shell 工具,調用 put 命令插入了幾條數據作為演示。該方式的缺點是當需要將海量數據在規定時間內導入 HBase 中時,需要消耗較大的 CPU 和網絡資源,所以這個方式適用于數據量較小的應用環境。

使用 Put 方法將數據插入 HBase 中的方式,由于所有的操作均是在一個單獨的客戶端執行,所以不會使用到 MapReduce 的 job 概念,即沒有任務的概念,所有的操作都是逐條插入到數據庫中的。大致的流程可以分解為 HBase Client--->HTable---->Hmastermanager/ZK(獲取-root-,--meta--)------>HregionServer----->Hregion------>Hlog/Hmemstore----->HFile。即 HBase Client 調用 HTable 類訪問到 HMaster 的原數據保存地點,然后通過找到相應的 Region Server,并分配具體的 Region,最后操作到 HFile 這一層級。當連接上 HRegionServer 后,首先獲得鎖,然后調用 HRegion 類對應的 put 命令開始執行數據導入操作,數據插入后還要寫時間戳、寫 Hlog,WAL(Write Ahead Log)、Hmemstore。具體實現代碼如清單 5 所示,在代碼中我們嘗試插入了 10 萬條數據,打印出插入過程消耗的時間。

清單 5. 采用 HBase Client 方式代碼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class PutDemo { ?public static void main(String[] args) throws IOException { ?//創建 HBase 上下文環境 ?Configuration conf = HBaseConfiguration.create(); ?System.out.println("conf="+conf); ?int count=0; ?? ?HBaseHelper helper = HBaseHelper.getHelper(conf); ?System.out.println("helper="+helper); ?helper.dropTable("testtable1"); ?helper.createTable("testtable1", "colfam1"); ?? ?HTable table = new HTable(conf, "testtable1"); ?long start = System.currentTimeMillis(); for(int i=1;i<100000;i++){ //設置 rowkey 的值 ?Put put = new Put(Bytes.toBytes("row"+i)); // 設置 family:qualifier:value ?put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), ?Bytes.toBytes("val1")); ?put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), ?Bytes.toBytes("val2")); ?//調用 put 方法,插入數據導 HBase 數據表 testtable1 里 ?table.put(put); ?count++; ?if(count%10000==0){ ?System.out.println("Completed 10000 rows insetion"); ?} ?} ?? ?System.out.println(System.currentTimeMillis() - start); ?} }
清單 6. 采用 HBase Client 方式代碼運行輸出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 conf=Configuration: core-default.xml, core-site.xml, hbase-default.xml, hbase-site.xml 2015-08-20 18:58:18,184 WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-08-20 18:58:18,272 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:zookeeper.version=3.4.5-cdh4.6.0--1, built on 02/26/2014 09:15 GMT 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:host.name=node3 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.version=1.7.0_79 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.vendor=Oracle Corporation 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.home=/usr/lib/jdk1.7.0_79/jre 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.class.path=./zz.jar 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 2015-08-20 18:58:18,273 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.io.tmpdir=/tmp 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.compiler=<NA> 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.name=Linux 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.arch=amd64 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:os.version=2.6.32-220.el6.x86_64 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.name=root 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.home=/root 2015-08-20 18:58:18,274 INFO [main] zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:user.dir=/home/zhoumingyao 2015-08-20 18:58:18,277 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=hconnection 2015-08-20 18:58:18,294 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(104)) - The identifier of this process is 32390@node3 2015-08-20 18:58:18,300 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(973)) - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-08-20 18:58:18,308 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to localhost/127.0.0.1:2181, initiating session 2015-08-20 18:58:18,317 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1214)) - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x24f2624839f0023, negotiated timeout = 180000 2015-08-20 18:58:18,394 WARN [main] conf.Configuration (Configuration.java:warnOnceIfDeprecated(981)) - hadoop.native.lib is deprecated. Instead, use io.native.lib.available helper=HBaseHelper@5d48e5d6 2015-08-20 18:58:18,570 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=catalogtracker-on-org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@6c521576 2015-08-20 18:58:18,571 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(104)) - The identifier of this process is 32390@node3 2015-08-20 18:58:18,572 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(973)) - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-08-20 18:58:18,572 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session 2015-08-20 18:58:18,575 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1214)) - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x24f2624839f0024, negotiated timeout = 180000 2015-08-20 18:58:18,647 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:close(684)) - Session: 0x24f2624839f0024 closed 2015-08-20 18:58:18,647 INFO [main-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(512)) - EventThread shut down 2015-08-20 18:58:18,672 INFO [main] client.HBaseAdmin (HBaseAdmin.java:disableTableAsync(858)) - Started disable of testtable1 2015-08-20 18:58:18,676 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=catalogtracker-on-org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@6c521576 2015-08-20 18:58:18,678 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(104)) - The identifier of this process is 32390@node3 2015-08-20 18:58:18,679 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(973)) - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-08-20 18:58:18,680 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to localhost/127.0.0.1:2181, initiating session 2015-08-20 18:58:18,683 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1214)) - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x24f2624839f0025, negotiated timeout = 180000 2015-08-20 18:58:18,705 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:close(684)) - Session: 0x24f2624839f0025 closed 2015-08-20 18:58:18,705 INFO [main-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(512)) - EventThread shut down 2015-08-20 18:58:19,713 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=catalogtracker-on-org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@6c521576 2015-08-20 18:58:19,714 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(104)) - The identifier of this process is 32390@node3 2015-08-20 18:58:19,715 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(973)) - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-08-20 18:58:19,716 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session 2015-08-20 18:58:19,720 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1214)) - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x24f2624839f0026, negotiated timeout = 180000 2015-08-20 18:58:19,733 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:close(684)) - Session: 0x24f2624839f0026 closed 2015-08-20 18:58:19,733 INFO [main-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(512)) - EventThread shut down 2015-08-20 18:58:19,735 INFO [main] client.HBaseAdmin (HBaseAdmin.java:disableTable(905)) - Disabled testtable1 2015-08-20 18:58:20,763 INFO [main] client.HBaseAdmin (HBaseAdmin.java:deleteTable(656)) - Deleted testtable1 table=testtable1 2015-08-20 18:58:21,809 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString=localhost:2181 sessionTimeout=180000 watcher=catalogtracker-on-org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation@6c521576 2015-08-20 18:58:21,810 INFO [main] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(104)) - The identifier of this process is 32390@node3 2015-08-20 18:58:21,811 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(973)) - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2015-08-20 18:58:21,812 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to localhost/127.0.0.1:2181, initiating session 2015-08-20 18:58:21,816 INFO [main-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1214)) - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x24f2624839f0027, negotiated timeout = 180000 2015-08-20 18:58:21,828 INFO [main] zookeeper.ZooKeeper (ZooKeeper.java:close(684)) - Session: 0x24f2624839f0027 closed 2015-08-20 18:58:21,828 INFO [main-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(512)) - EventThread shut down Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion Completed 10000 rows insetion 127073ms

整個插入 10 萬條數據的耗時達到了 127 秒,即 2 分鐘。清單 7 所示是清單 5 代碼中用到的類源代碼。

清單 7.HBaseHelper 類代碼部分相關代碼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 import org.apache.hadoop.conf.Configuration; …… /** ?* Used by the book examples to generate tables and fill them with test data. ?*/ public class HBaseHelper { //在 Java 代碼中,為了連接到 HBase,我們首先創建一個配置(Configuration)對象,使用該對象創建一個 HTable 實例。 //這個 HTable 對象用于處理所有的客戶端 API 調用。 ?private Configuration conf = null; ?private HBaseAdmin admin = null; ?protected HBaseHelper(Configuration conf) throws IOException { ?this.conf = conf; ?this.admin = new HBaseAdmin(conf); ?} ?public static HBaseHelper getHelper(Configuration conf) throws IOException { ?return new HBaseHelper(conf); ?} ?public void put(String table, String row, String fam, String qual, long ts, ?String val) throws IOException { ?HTable tbl = new HTable(conf, table); ?Put put = new Put(Bytes.toBytes(row)); ?put.add(Bytes.toBytes(fam), Bytes.toBytes(qual), ts, ?Bytes.toBytes(val)); ?tbl.put(put); ?tbl.close(); ?} ?public void put(String table, String[] rows, String[] fams, String[] quals, ?long[] ts, String[] vals) throws IOException { ?HTable tbl = new HTable(conf, table); ?for (String row : rows) { ?Put put = new Put(Bytes.toBytes(row)); ?for (String fam : fams) { ?int v = 0; ?for (String qual : quals) { ?String val = vals[v < vals.length ? v : vals.length]; ?long t = ts[v < ts.length ? v : ts.length - 1]; ?put.add(Bytes.toBytes(fam), Bytes.toBytes(qual), t, ?Bytes.toBytes(val)); ?v++; ?} ?} ?tbl.put(put); ?} ?tbl.close(); ?} ?public void dump(String table, String[] rows, String[] fams, String[] quals) ?throws IOException { ?HTable tbl = new HTable(conf, table); ?List<Get> gets = new ArrayList<Get>(); ?for (String row : rows) { ?Get get = new Get(Bytes.toBytes(row)); ?get.setMaxVersions(); ?if (fams != null) { ?for (String fam : fams) { ?for (String qual : quals) { ?get.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual)); ?} ?} ?} ?gets.add(get); ?} ?Result[] results = tbl.get(gets); ?for (Result result : results) { ?for (KeyValue kv : result.raw()) { ?System.out.println("KV: " + kv + ?", Value: " + Bytes.toString(kv.getValue())); ?} ?} ?} } public void dropTable(String table) throws IOException { ?if (existsTable(table)) { ?disableTable(table); ?admin.deleteTable(table); ?} ?} public void put(String table, String row, String fam, String qual, long ts, ?String val) throws IOException { ?HTable tbl = new HTable(conf, table); ?Put put = new Put(Bytes.toBytes(row)); ?put.add(Bytes.toBytes(fam), Bytes.toBytes(qual), ts, ?Bytes.toBytes(val)); ?tbl.put(put); ?tbl.close(); ?}

MapReduce 方法

如果需要通過編程來生成數據,那么用 importtsv 工具不是很方便,這時候可以使用 MapReduce 向 HBase 導入數據,但海量的數據集會讓 MapReduce Job 變得很繁重,若處理不當,則可能使得 MapReduce 的 job 運行時的吞吐量很小。由于 MapReduce 在寫 HBase 是采用的是 TableOutputFormat 方式,這樣在寫入數據庫的時候容易對寫入塊進行頻繁的刷新、分割、合并操作,這些操作都是較為耗費磁盤 I/O 的操作,最終導致 HBase 節點的不穩定性。

前面介紹過生成 HFile 的代碼,生成 HFile 后,我們可以采用 MapReduce 方式把數據導入到 HBase 數據表里,具體代碼如清單 8 所示。

清單 8.MapReduce 方式導入 HFile 到 HBase 數據表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 import java.io.IOException; …… public class HBaseImportByMapReduce extends Configured implements Tool { static final Log LOG = LogFactory.getLog(HBaseImportByMapReduce.class); public static final String JOBNAME = "MapReduceImport"; public static class Map extends Mapper<LongWritable , ??????????????????????Text, NullWritable, NullWritable>{ ?Configuration configuration = null; ?HTable xTable = null; ?static long count = 0; ?? ?@Override ?protected void cleanup(Context context) throws IOException,InterruptedException { ?// TODO Auto-generated method stub ?super.cleanup(context); ?xTable.flushCommits(); ?xTable.close(); ?} ?? ?@Override ?protected void map(LongWritable key, Text value, Context context) ??????????????????????????????throws IOException, InterruptedException { ?String all[] = value.toString().split("/t"); ?Put put = new Put(Bytes.toBytes(all[0])); ?put.add(Bytes.toBytes("colfam1"),Bytes.toBytes("value1"), null); ?xTable.put(put); ?if ((++count % 100)==0) { ?context.setStatus(count +" DOCUMENTS done!"); ?context.progress(); ?System.out.println(count +" DOCUMENTS done!"); ?} ?} ?@Override ?protected void setup(Context context) throws IOException,InterruptedException { ?// TODO Auto-generated method stub ?super.setup(context); ?configuration = context.getConfiguration(); ?xTable = new HTable(configuration,"testtable2"); ?xTable.setAutoFlush(false); ?xTable.setWriteBufferSize(12*1024*1024); ?} } @Override public int run(String[] args) throws Exception { ?String input = args[0]; ?Configuration conf = HBaseConfiguration.create(getConf()); ?conf.set("hbase.master", "node1:60000"); ?Job job = new Job(conf,JOBNAME); ?job.setJarByClass(HBaseImportByMapReduce.class); ?job.setMapperClass(Map.class); ?job.setNumReduceTasks(0); ?job.setInputFormatClass(TextInputFormat.class); ?TextInputFormat.setInputPaths(job, input); ?job.setOutputFormatClass(NullOutputFormat.class); ?return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws IOException { ?Configuration conf = new Configuration(); ?String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); ?int res = 1; ?try { ?res = ToolRunner.run(conf, new HBaseImportByMapReduce(), otherArgs); ?} catch (Exception e) { ?e.printStackTrace(); ?} ?System.exit(res); } }

清單 8 所示的 MapReduce 方式,啟動任務需要一些時間,如果數據量較大,整個 Map 過程也會消耗較多時間。

其實一般來說 MapReduce 方式和后面要介紹的 Bulk Load 方式是配合使用的,MapReduce 負責生成 HFile 文件,Bulk Load 負責導入 HBase。

Bulk Load方式

總的來說,使用 Bulk Load 方式由于利用了 HBase 的數據信息是按照特定格式存儲在 HDFS 里的這一特性,直接在 HDFS 中生成持久化的 HFile 數據格式文件,然后完成巨量數據快速入庫的操作,配合 MapReduce 完成這樣的操作,不占用 Region 資源,不會產生巨量的寫入 I/O,所以需要較少的 CPU 和網絡資源。Bulk Load 的實現原理是通過一個 MapReduce Job 來實現的,通過 Job 直接生成一個 HBase 的內部 HFile 格式文件,用來形成一個特殊的 HBase 數據表,然后直接將數據文件加載到運行的集群中。使用 Bulk Load 功能最簡單的方式就是使用 ImportTsv 工具,ImportTsv 是 HBase 的一個內置工具,目的是從 TSV 文件直接加載內容至 HBase。它通過運行一個 MapReduce Job, 將數據從 TSV 文件中直接寫入 HBase 的表或者寫入一個 HBase 的自有格式數據文件。

ImportTsv 本身是一個在 HBase 的 JAR 文件中的 Java 類,使用 ImportTsv 工具,首先創建一個數據文件,如清單 9 所示,我們創建了一個 data.tsv 文件,包含 4 條數據。

清單 9.data.tsv
1 2 3 4 5 [root@node3 zhoumingyao]# vi data.tsv 1001 name1 17 00000000001 1002 name2 16 00000000002 1003 name3 16 00000000003 1004 name4 16 00000000004

由于 ImportTsv 工具只支持從 HDFS 中讀取數據,所以一開始我們需要將 TSV 文件從本地文件系統拷貝到 HDFS 中,接下來我們在 HDFS 里新建文件夾后上傳 data.tsv 文件到該文件夾,由于讀和寫的操作是在多臺服務器上并行執行,所以相比從單臺節點讀取速度快很多。需要指定輸出 (-Dimporttsv.bulk.output), 否則默認會采用 HBase API 方式插入數據。代碼如清單 10 所示。

清單 10. 調用 ImportTsv
1 2 3 4 5 6 7 $HADOOP_HOME/bin/hadoop fs -mkdir /user/test 創建數據表 create 'student', {NAME => 'info'} 調用 importtsv 命令導入數據, $HADOOP_HOME/bin/hadoop jar /usr/lib/cdh/hbase/hbase-0.94.15-hdh4.6.0.jar ??????????importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:phone ??????????????????????-Dimporttsv.bulk.output=/user/test/output/ student /user/test/data.tsv

記住需要啟動 YARN,否則會報錯,如清單 11 所示。

清單 11. 錯誤提示
1 2 3 15/08/21 13:41:27 INFO ipc.Client: Retrying connect to ??????????????server: node1/172.10.201.62:18040. Already tried 0 time(s); ????retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)

ImportTsv 工具默認使用了 HBase 的 Put API 來將數據插入 HBase 表中,在 Map 階段使用的是 TableOutputFormat。但是當-Dimporttsv.bulk. 輸入選項被指定時,會使用 HFileOutputFormat 來代替在 HDFS 中生成 HBase 的自有格式文件(HFile)。而后我們能夠使用 completebulkload 來加載生成的文件到一個運行的集群中。根據清單 12 可以使用 bulk 輸出以及加載工具。

清單 12. 調用 completebulkload
1 2 3 4 5 6 7 8 9 創建生成文件的文件夾: $HADOOP_HOME/bin/hadoop fs -mkdir /user/hac/output 開始導入數據: $HADOOP_HOME/bin/hadoop jar /usr/lib/cdh/hbase/hbase-0.94.15-hdh4.6.0.jar ??importtsv -Dimporttsv.bulk.output=/user/hac/output/2-1 -Dimporttsv.columns= ????????????HBASE_ROW_KEY,info:name,info:age,info:phone student /user/hac/input/2-1 完成 bulk load 導入 $HADOOP_HOME/bin/hadoop jar /usr/lib/cdh/hbase/hbase-0.94.15-hdh4.6.0.jar ??????????????completebulkload /user/hac/output/2-1 student

Completebulkload 工具讀取生成的文件,判斷它們歸屬的 Resgion Server 族群,然后訪問適當的族群服務器。族群服務器會將 HFile 文件轉移進自身存儲目錄中,并且為客戶端建立在線數據。

HBase 說明文檔里面記載,Bulk Load 方法分為兩個主要步驟:

1. 使用 HFileOutputFormat 類通過一個 MapReduce 任務方式生成 HBase 的數據文件,就是英文稱為“StoreFiles”的數據文件。由于輸出的時候按照 HBase 內部的存儲格式來輸出數據,所以后面讀入 HBase 集群的時候就非常高效了。為了保證高效性,HFileOutputFormat 借助 configureIncrementalLoad 函數,基于當前 Table 的各 Region 邊界自動匹配 MapReduce 的分區類 TotalOrderPartitioner,這樣每一個輸出的 HFile 都會是在一個單獨的 Region 里面的。

為了實現這樣的設計,所有任務的輸出都需要使用 Hadoop 的 TotalOrderPartitioner 類去對輸出進行分區,按照 Regions 的主鍵范圍進行分區。HFileOutputFormat 類包含了一個快捷方法,即 configureIncrementalLoad(),它自動基于數據表的當前 region 間隔生成一個 TotalOrderPartitioner。

2. 完成數據載入到 HBase。當所有的數據都被用 HFileOutputFormat 方式準備好以后,我們可以使用 completebulkload 讀入到集群。這個命令行工具迭代循環數據文件,對于每一個數據文件迅速找到屬于它的 region,然后 Region 服務器會讀入這些 HFile。如果在生成文件的過程當中 region 被修改了,那 completebulkload 工具會自動切分數據文件到新的區域,這個過程需要花費一些時間。如果數據表 (此處是 mytable) 不存在,工具會自動創建該數據表。

如清單 13 所示,我們也調用方法直接載入 HFile 文件到 HBase,采用 Bulk Load 方式完成這個實驗。

清單 13.Bulk Load 方式載入 HFile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; ?? public class loadIncrementalHFileToHBase { ?? ?public static void main(String[] args) throws Exception { ?Configuration conf = HBaseConfiguration.create(); ?HBaseHelper helper = HBaseHelper.getHelper(conf); ?helper.dropTable("testtable2"); ?helper.createTable("testtable2", "colfam1"); ?HTable table = new HTable("testtable2"); ?LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); ?loader.doBulkLoad(new Path(args[0]), table); ?} ?? }

特別提醒:

1. 一定記得建 HBase 數據表時做 Region 的預切分,HFileOutputFormat.configureIncrementalLoad 方法會根據 Region 的數量來決定 Reduce 的數量以及每個 Reduce 覆蓋的 RowKey 范圍,否則單個 Reduce 過大,容易造成任務處理不均衡。造成這個的原因是,創建 HBase 表的時候, 默認只有一個 Region, 只有等到這個 Region 的大小超過一定的閾值之后, 才會進行 split,所以為了利用完全分布式加快生成 HFile 和導入 HBase 中以及數據負載均衡, 我們需要在創建表的時候預先進行分區, 而進行分區時要利用 startKey 與 endKey 進行 rowKey 區間劃分 (因為導入 HBase 中, 需要 rowKey 整體有序)。解決方法是在數據導入之前, 自己先寫一個 MapReduce 的 Job 求最小與最大的 rowKey,即 startKey 與 endKey。

2. 單個 RowKey 下的子列不要過多,否則在 reduce 階段排序的時候會造成內存溢出異常,有一種辦法是通過二次排序來避免 reduce 階段的排序,這個解決方案需要視具體應用而定。

Sqoop 方法

Sqoop 是 Apache 頂級項目,主要用于在 Hadoop(Hive) 與傳統的數據庫 (mysql、postgresql 等等) 之間進行數據的傳遞,可以將一個關系型數據庫,例如 MySQL,Oracle,Postgres 等中的數據導入到 Hadoop 的 HDFS 中,也可以將 HDFS 的數據導進到關系型數據庫中。Sqoop 支持多種導入方式,包括指定列導入,指定格式導入,支持增量導入(有更新才導入)等等。Sqoop 的一個特點就是可以通過 Hadoop 的 MapReduce 把數據從關系型數據庫中導入數據到 HDFS。

Sqoop 的架構較為簡單,通過整合 Hive,實現 SQL 方式的操作,通過整合 HBase,可以向 HBase 寫入數據,通過整合 Oozie,擁有了任務流的概念。而 Sqoop 本身是通過 MapReduce 機制來保證傳輸數據,從而提供并發特性和容錯機制,系統架構圖如圖 1 所示,來源 Apache 官方網站。

圖 1.Sqoop 系統架構圖

在使用上,Sqoop 對外提供了一組操作命令,只需要簡單配置就可以進行數據的轉移。

首先配置 Sqoop,如清單 14 所示,對/etc/profile 文件添加兩行,然后執行命令。

清單 14. 配置 Sqoop
1 2 3 export SQOOP_HOME=/home/zhoumingyao/sqoop2-1.99.3-cdh5.0.1 export PATH = $SQOOP_HOME/bin:$PATH source /etc/profile

我們這次做的實驗使用了 Sqoop 的 import 功能,用于將 Oracle 中的人員信息導入到 HBase。在 Hadoop 和 HBase 正常運行的環境里,我們首先需要配置好 Sqoop,然后調用如下的命令即可將 Oracle 中的表導入到 HBase 中,代碼如清單 15 所示。

清單 15.Sqoop 導入 Oracle 數據到 HBase
1 2 3 4 5 6 7 8 9 10 11 12 sqoop import ?--connect jdbc:oracle:thin:@172.7.27.225:1521:testzmy //JDBC URL ?--username SYSTEM //Oracle username(必須大寫) ?--password hik123456 //Oracle password ?--query 'SELECT RYID, HZCZRK_JBXXB.ZPID, HZCZRK_JBXXB.GMSFHM, HZCZRK_JBXXB.XM, HZCZRK_JBXXB.XB, ?HZCZRK_JBXXB.CSRQ, HZCZRK_ZPXXB.ZP AS ZP FROM HZCZRK_JBXXB ?JOIN HZCZRK_ZPXXB USING(RYID) WHERE $CONDITIONS' // Oracle 數據,Sqoop 支持多表 query ?--split-by RYID //指定并行處理切分任務的列名,通常為主鍵 --map-column-java ZP=String //ZP 為 LONG RAW 類型,sqoop 不支持,需要映射成 String ?--hbase-table TESTHZ //HBase 中的 Table ?--column-family INFO //HBase 中的 column-family

清單 15 所示代碼從兩張數據表 HZCZRK_JBXXB 和 HZCZRK_ZPXXB 讀取數據并寫入到 HBase 數據表 TESTHZ,該數據表有一個列祖 INFO。我們在 VMWare CentOS5.6 單節點偽分布式環境下進行了測試。測試結果顯示,單表 HZCZRK_ZPXXB 導入 90962 條數據耗時約 27 分鐘,兩表 HZCZRK_JBXXB 和 HZCZRK_ZPXXB JOIN 導入 90962 條數據耗時約 50 分鐘。

該實驗顯示 Sqoop 使用過程中的局限性:

1. Import 中進行多表 query 的方式效率會受到影響;

2. 不支持從數據庫的視圖導出數據;

3. 不支持 BLOB、RAW 等大數據塊類型直接導入到 HBase,需要通過--map-column-java 將對應的列映射成 Java 的基本類型 String 來處理;

4. 每次 import 只能導入到 HBase 的一個 column family。

總的來說,Sqoop 類似于其他 ETL 工具,使用元數據模型來判斷數據類型并在數據從數據源轉移到 Hadoop 時確保類型安全的數據處理。Sqoop 專為大數據批量傳輸設計,能夠分割數據集并創建 Hadoop 任務來處理每個區塊。

除了上面介紹的 4 種方法的實現,我這里還想多提一些關于數據分布、合并的注意事項。HBase 數據庫不適用于經常更新的應用場景,寫操作很頻繁的任務可能引起的另一個問題是將數據寫入了單一的族群服務器 (Region Server),這種情況經常出現在將海量數據導入到一個新建的 HBase 數據庫中時。一旦數據集中在相同的服務器上,整個集群就變得不平衡,并且寫速度會顯著的降低。

結束語

數據導入環節屬于大數據應用的數據清洗部分,需要嘗試多種方式將數據導入進去,沒有哪一種方法是唯一的選擇,我們首先要根據用戶的實際環境選擇正確的方式。總的來說,Bulk Load 方式是最快速的,我們可以優先選擇它。

轉載于:https://www.cnblogs.com/davidwang456/articles/9181251.html

總結

以上是生活随笔為你收集整理的HBase 数据导入功能实现方式解释的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

男女激情麻豆 | 特级片免费看 | 天堂网av在线 | 超碰在线观看99 | 国产精品久久久久一区二区三区 | 免费看十八岁美女 | 色婷婷狠狠五月综合天色拍 | 亚洲国产三级 | 成人在线免费av | 五月婷婷综合网 | 免费网站黄 | 国产一区二区视频在线播放 | av中文字幕在线电影 | 激情久久久久 | 伊人色综合网 | 99精品国产在热久久下载 | 亚洲成人精品久久久 | av一区在线| 免费黄色激情视频 | 国内精品免费久久影院 | www蜜桃视频 | 天天操 夜夜操 | 成人在线播放免费观看 | 99久久国产免费,99久久国产免费大片 | 97精品国产一二三产区 | 亚洲精品在线播放视频 | 丁香高清视频在线看看 | 91久久黄色 | 91专区在线观看 | 特级片免费看 | 在线视频a| 亚洲精品国产精品久久99热 | 日韩高清在线一区二区 | 天天在线视频色 | 激情网在线视频 | 亚洲国产三级在线观看 | 亚洲综合视频网 | 国产精品a久久 | 中文字幕制服丝袜av久久 | 欧美日韩视频在线一区 | 一区二区国产精品 | 96亚洲精品久久 | 亚洲精品一区二区三区高潮 | 久久精品久久久久久久 | 久久综合色影院 | 成人免费观看网站 | 免费黄色a网站 | 人人插人人看 | 成年人视频在线免费观看 | 欧美巨乳网 | 久久久久亚洲精品中文字幕 | 亚洲成av | 久久人人爽爽人人爽人人片av | 热久在线| www.色的| 69成人在线| 日本一区二区三区视频在线播放 | 摸bbb搡bbb搡bbbb| 四虎视频| 国产91免费在线 | 五月天亚洲婷婷 | 日韩女同一区二区三区在线观看 | 99热精品国产 | 日本mv大片欧洲mv大片 | 午夜精品一区二区三区四区 | 亚洲欧美国产精品va在线观看 | 蜜臀av网址 | 久久久福利 | 右手影院亚洲欧美 | 午夜国产福利在线 | 国产日韩精品在线 | 六月天综合网 | 成人va天堂 | 久久久久久久久福利 | 国产成人一区二区三区 | 欧美另类性 | 亚州日韩中文字幕 | 久久婷婷综合激情 | 免费h精品视频在线播放 | 精品视频中文字幕 | 制服丝袜欧美 | 欧美亚洲国产一卡 | 激情网第四色 | 麻豆一区二区三区视频 | 成人久久久久久久久 | 99热精品视 | 一区二区 久久 | 久久久久女人精品毛片九一 | 伊人黄 | 国产又粗又猛又黄又爽 | 天天操天天操天天操天天操天天操天天操 | 岛国一区在线 | 91视视频在线直接观看在线看网页在线看 | 久久免费视频精品 | 蜜桃av久久久亚洲精品 | 国产一级做a | 色资源在线观看 | 日韩电影中文字幕在线观看 | 午夜视频在线观看网站 | 精品 激情 | 美女久久99 | 日本久久久久久久久久 | 99视频精品免费视频 | 中文字幕一区二区三区四区久久 | 免费欧美高清视频 | 色网站在线看 | 一区二区中文字幕在线观看 | 99久热在线精品视频观看 | 亚洲视频分类 | 国产精品一区欧美 | 精品国产aⅴ麻豆 | 精品国产精品一区二区夜夜嗨 | 黄色国产区 | 97视频在线观看视频免费视频 | 亚洲理论视频 | 成人av在线网 | 在线 国产 亚洲 欧美 | 日本在线中文 | 2000xxx影视 | 国产一级三级 | 国产精品一区二区免费看 | av在线一二三区 | 国产成人亚洲在线观看 | 国产成人精品一区二区三区福利 | 免费精品视频在线 | 免费av在线网站 | 顶级欧美色妇4khd | 五月天激情综合 | av+在线播放在线播放 | 伊人天堂久久 | 91av99| 久草视频资源 | 欧美日韩一区二区在线观看 | 久久久国产精品久久久 | 久草男人天堂 | 日韩午夜电影院 | 欧美aa一级| 天天色宗合 | 亚洲午夜久久久久 | 欧美精品日韩 | 91看毛片 | av三级在线播放 | 欧美日韩视频免费 | 国产精品久久久久av福利动漫 | 欧美另类一二三四区 | 高清日韩一区二区 | 中文字幕在线观看91 | 国产精品久久久久久久久久白浆 | 久久视频网 | 超碰在线人人艹 | 久久99国产综合精品免费 | 亚洲精品一区二区三区高潮 | 国产男女爽爽爽免费视频 | 国产精品久久在线 | 天天色天天搞 | 夜夜视频资源 | 国产精品普通话 | 久久a v视频 | 全黄色一级片 | 欧美成年网站 | av电影在线观看 | 成人天堂网 | 国产精品系列在线播放 | 日韩城人在线 | 五月婷婷激情网 | 久久九九久久 | 欧美日韩精品在线播放 | 在线观看黄色免费视频 | 国产亚州精品视频 | 99热在线国产 | 亚洲精品1区2区3区 超碰成人网 | 亚洲国产成人精品久久 | 日韩有码在线观看视频 | 亚洲国产精品电影 | 久久免费视频这里只有精品 | 日韩一区二区三区观看 | 日韩色一区二区三区 | 波多野结衣在线播放视频 | 日韩精品黄 | 中文av字幕在线观看 | 麻豆视频免费入口 | 日本一区二区三区免费观看 | 最近中文字幕免费视频 | 精品麻豆入口免费 | 91成人免费看片 | 久久欧美视频 | 视频成人免费 | 久久久久www| 日日躁夜夜躁aaaaxxxx | 色在线网站 | 日韩va在线观看 | 一区二区三区四区五区在线 | 亚洲精品乱码久久久久久蜜桃不爽 | 久久国产精品99久久久久久进口 | 国产另类av| 免费国产在线精品 | 婷婷免费在线视频 | 精品视频在线免费 | 亚洲经典中文字幕 | 99视频网址| 久久国产一二区 | 天天操天 | 高清有码中文字幕 | 视频国产一区二区三区 | 狠狠干夜夜操天天爽 | 18国产精品福利片久久婷 | 亚洲九九影院 | 亚洲最新在线 | 国产精品毛片久久久久久久 | 日本午夜免费福利视频 | 久久免费精彩视频 | 亚洲狠狠丁香婷婷综合久久久 | 天天操网站 | 亚州精品天堂中文字幕 | 亚洲国产字幕 | 久久久久伊人 | 欧美午夜寂寞影院 | 久久久久免费 | 国产69精品久久99不卡的观看体验 | 香蕉网在线观看 | 国产一区在线免费观看视频 | 色99视频 | 探花视频在线观看免费 | 亚洲人av免费网站 | 久久久免费观看视频 | 99久久精品久久亚洲精品 | 久久久久久免费 | 日韩精品中文字幕在线观看 | 久久精品视频在线观看免费 | 深夜免费福利网站 | 国产精品久久一区二区三区, | 亚洲国内精品在线 | 色五月色开心色婷婷色丁香 | 欧美性护士 | 亚洲精品久久久久999中文字幕 | 国产精品mv | 精品国产一区二区三区四 | 免费一级片在线观看 | 亚洲狠狠丁香婷婷综合久久久 | 中文字幕一区二区三区在线观看 | 成人av高清在线观看 | 久久精品久久精品久久 | 黄色字幕网 | 7777精品伊人久久久大香线蕉 | 免费欧美 | 人人干人人做 | 欧美一级视频免费看 | 香蕉看片| 天天操操操操操操 | 日韩高清精品免费观看 | 91精品导航 | 亚洲视频大全 | 亚洲精品国产日韩 | 久久精品牌麻豆国产大山 | 精品国产区在线 | av高清一区二区三区 | 亚洲精品久久久久久久不卡四虎 | 色网站国产精品 | 一级片观看 | 色姑娘综合 | 91亚洲精品在线观看 | 欧美91片| 嫩小bbbb摸bbb摸bbb | 亚洲国产欧美在线人成大黄瓜 | 国产成人一二片 | 91丨九色丨蝌蚪丰满 | 国产精品99精品久久免费 | 91在线免费看片 | 久久成人精品电影 | 在线激情av电影 | 久久精品视频日本 | 久久综合色播五月 | 亚洲国产精品一区二区久久,亚洲午夜 | 中文在线字幕观看电影 | 久久黄页| 免费日韩一区二区 | 国内综合精品午夜久久资源 | 国内精品国产三级国产aⅴ久 | 人人干人人添 | 亚洲电影第一页av | 玖玖视频在线 | 五月婷婷色丁香 | 99热99| 国产精品久久久久久久久大全 | 免费观看一级成人毛片 | 欧美成年性 | 中文字幕日本在线 | 91av在线播放| 伊人五月婷 | 在线免费观看国产黄色 | 精品国自产在线观看 | 亚洲天堂自拍视频 | 国产精品九九视频 | 一区二区三区免费在线观看视频 | 国产在线v | 新版资源中文在线观看 | 三级动态视频在线观看 | 国产亚洲成人网 | 欧美精品成人在线 | 国产精品久久久久av福利动漫 | 亚洲欧美成人网 | 国产精品入口a级 | 国产高清成人av | 午夜精品中文字幕 | 天天激情天天干 | 97超视频在线观看 | 国产一区二区免费在线观看 | 日韩在线激情 | 一区二区视 | 国产精品久久久久久久久久久免费看 | 欧美在线一二 | 天天色棕合合合合合合 | 日本午夜在线亚洲.国产 | 成人黄大片 | 在线成人高清电影 | 国产精品手机在线播放 | 中文字幕二区在线观看 | a久久久久 | 特级毛片网 | 国产高清网站 | 99久久9| 九九国产精品视频 | av电影不卡 | 国产一卡二卡四卡国 | 久久精品看 | 黄色录像av | 九九热视频在线免费观看 | 在线亚洲天堂网 | 手机av在线免费观看 | 国产一级淫片免费看 | 亚洲国内在线 | 欧美精品久久久久a | 97精品欧美91久久久久久 | 久久精品高清视频 | 91桃色在线免费观看 | 免费视频一二三区 | 精品国产一区二区三区久久影院 | 99精品在线免费观看 | 久草在线观看 | 毛片视频电影 | 免费人成在线观看 | 热99在线视频 | 激情亚洲综合在线 | 九月婷婷综合网 | 久草在线最新免费 | 国产黄色片免费看 | 欧美日韩中文在线 | 欧美a级片网站 | 最新国产在线视频 | 中文字幕国产精品一区二区 | 精品视频一区在线 | 亚洲91av| 特级西西444www大精品视频免费看 | 亚洲国产精久久久久久久 | 三级黄色a| 亚洲欧美日韩国产一区二区三区 | www.久久久.com | 天天摸天天操天天舔 | 永久免费在线 | 日韩精品久久久久久久电影竹菊 | 国产视频一二三 | 国产a免费 | 日韩视频在线不卡 | 国产亚洲午夜高清国产拍精品 | 国产视频精选在线 | 99久久激情 | av色网站 | 亚洲永久国产精品 | 亚洲专区在线 | 久久精品视频一 | 99久久综合国产精品二区 | 色婷婷激情电影 | 国产精品自在欧美一区 | 欧美成人基地 | 好看av在线 | 在线看片中文字幕 | 99精品视频在线看 | 超碰av在线播放 | 欧女人精69xxxxxx | 激情五月婷婷丁香 | 日韩高清不卡一区二区三区 | 色五婷婷 | 欧美十八| 日韩中文在线电影 | 99热在线免费观看 | 天海冀一区二区三区 | 天天碰天天操视频 | 久久久久观看 | 五月婷婷激情六月 | 99日韩精品| 国产又粗又猛又爽 | 人人超碰在线 | 久久久免费精品 | 天天五月天色 | 九九电影在线 | 五月婷婷av在线 | 在线观看亚洲免费视频 | 久草电影在线观看 | 中文字幕成人在线 | 久久久久电影 | 中文成人字幕 | 欧美在线资源 | 黄色av观看 | 五月婷婷久久丁香 | 天天干婷婷 | 精产嫩模国品一二三区 | 亚洲最大激情中文字幕 | 亚洲国产美女久久久久 | 久久高清国产视频 | 午夜12点| 亚洲国产精品va在线看黑人 | 色的网站在线观看 | 狠狠干夜夜操天天爽 | 日韩一区在线免费观看 | 中文字幕有码在线观看 | 在线精品在线 | 国产无吗一区二区三区在线欢 | 99热手机在线 | 久久久在线 | 日韩视频免费 | 激情丁香久久 | 精品国产成人av | 成人国产精品久久久春色 | 成人免费网视频 | 91精品久久久久久综合乱菊 | 久久久综合色 | 欧美一区二区在线刺激视频 | 日本一区二区三区免费观看 | av网址最新 | 亚洲视频999 | 人人爽人人av | 操操日 | 狠狠激情中文字幕 | 国产精品一区在线观看 | 日日夜夜干 | 韩国精品在线观看 | 国内精品一区二区 | av片中文 | 992tv在线观看 | 96亚洲精品久久久蜜桃 | 91色偷偷 | 精品久久片 | 久久久久久久久久久综合 | 国产亚洲精品免费 | 日韩黄色免费在线观看 | 四虎国产精品成人免费影视 | 黄色三级免费片 | 最新中文字幕 | av电影亚洲 | 不卡av在线 | 97干com| 女人18毛片a级毛片一区二区 | 97精品国自产拍在线观看 | 一区二区三区在线观看 | 91久久精品日日躁夜夜躁国产 | 国产福利一区二区三区在线观看 | 夜夜操天天 | 国产精品亚洲成人 | 五月婷亚洲 | 在线有码中文 | 欧洲视频一区 | 亚洲伦理电影在线 | 欧美久久久久久久久久久久久 | 日韩专区视频 | 一区二区三高清 | 亚洲精品88欧美一区二区 | 国产123区在线观看 国产精品麻豆91 | 久久久久亚洲精品 | 免费成人在线观看视频 | 国产黄色片免费 | 日日干干夜夜 | 毛片区 | 人人舔人人爽 | 久久免费成人 | 亚洲涩涩涩 | www.天天色 | 97精品视频在线播放 | 91丨九色丨首页 | 午夜精品久久久久久久爽 | 日韩欧美视频免费看 | 在线色视频小说 | 最近日本中文字幕a | 日韩精品电影在线播放 | 成人在线观看资源 | 不卡视频一区二区三区 | 日日夜夜天天操 | 永久免费看av | 国产一线二线三线性视频 | 免费在线一区二区 | 中国黄色一级大片 | 91一区二区在线 | 97超碰超碰久久福利超碰 | 国产人成看黄久久久久久久久 | 日本高清中文字幕有码在线 | 手机看片1042 | 国产精品色 | 亚洲爱视频 | 色噜噜狠狠狠狠色综合久不 | 日本在线观看中文字幕无线观看 | 最近免费中文字幕mv在线视频3 | 日日操天天操夜夜操 | 国产精品久久久久久久久久新婚 | 欧美精品小视频 | 国产高清视频 | 中文字幕a∨在线乱码免费看 | 欧美日韩在线视频观看 | 免费a级毛片在线看 | 精品欧美一区二区三区久久久 | 91精品无人成人www | 69人人| 久草在线视频在线观看 | 精品嫩模福利一区二区蜜臀 | 亚洲精品国偷拍自产在线观看蜜桃 | 97视频入口免费观看 | 亚洲综合在线五月 | 中文字幕在线观看视频一区 | 中文字幕免费观看全部电影 | 欧美日韩在线观看一区二区 | 久久综合久久综合这里只有精品 | 丁香九月激情综合 | 精品国自产在线观看 | 中文免费 | 精品福利在线视频 | 不卡日韩av | 午夜精品视频一区 | 日韩精品91偷拍在线观看 | 日韩激情在线视频 | 亚洲婷婷网 | 国产视频 亚洲视频 | 久久中文字幕视频 | 色片网站在线观看 | 最近免费中文视频 | 日日麻批40分钟视频免费观看 | 日韩在线视频一区 | 视频在线观看国产 | 日韩视频一区二区在线 | 午夜国产福利在线 | 中文字幕第一页av | 国产伦理久久精品久久久久_ | 国产亚洲精品久久久久久 | a'aaa级片在线观看 | 欧美一级久久 | 91视频免费网站 | 依人成人综合网 | 日韩www在线 | 日韩簧片在线观看 | 国产精品一区免费看8c0m | 在线电影av | 国产不卡av在线 | 在线日韩| 操少妇视频 | 国产精品网红直播 | www亚洲国产 | 91丨九色丨91啦蝌蚪老版 | 综合网天天射 | 99re8这里有精品热视频免费 | 中文亚洲欧美日韩 | 少妇激情久久 | 欧洲精品码一区二区三区免费看 | 久久久久久毛片 | 日本三级久久久 | 欧美日韩高清一区二区 国产亚洲免费看 | 午夜精品一区二区三区免费视频 | 国内精品久久影院 | 中文字幕在线免费观看 | 日韩中文字幕a | 91丨九色丨91啦蝌蚪老版 | 天天操天天射天天插 | 在线va网站 | 成人欧美一区二区三区在线观看 | 久久99国产综合精品 | 久久一区国产 | 亚洲精品久久久久久中文传媒 | 成年人视频在线免费播放 | 成人蜜桃| 久久av高清 | 亚洲综合欧美日韩狠狠色 | 国产乱对白刺激视频在线观看女王 | 808电影免费观看三年 | 视频一区二区精品 | 最新99热 | 久久久穴 | 婷婷丁香狠狠爱 | 99riav1国产精品视频 | www.五月天 | 国产精品video爽爽爽爽 | 夜夜躁日日躁 | 免费高清男女打扑克视频 | 91精品国产99久久久久久红楼 | 久久国产色 | 日韩久久网站 | 99精品福利视频 | 国产专区视频在线 | 国产黄网在线 | 在线国产视频一区 | 国产黄色精品在线观看 | 国产丝袜高跟 | 日韩午夜视频在线观看 | 91高清完整版在线观看 | 天天躁天天躁天天躁婷 | 国产系列在线观看 | 黄a网站 | 91手机在线看片 | av888av.com | 国产成人精品女人久久久 | 免费在线中文字幕 | 国产精品9999久久久久仙踪林 | 91人人在线| 中文字幕在线看人 | 黄色福利网 | 91插插影库 | 日韩网站视频 | 97视频播放| 日韩美视频 | 国产黄色大片 | 天天色天天射天天操 | 在线观看黄色国产 | 国产一级片播放 | 观看免费av | 欧美日韩中文在线视频 | 久久免费精彩视频 | 98久久| 久久er99热精品一区二区三区 | 久久人人爽av | 四虎影视成人精品国库在线观看 | 91麻豆国产福利在线观看 | 亚洲成人av一区二区 | 伊人天天干 | 国产精品久久久久久久久久东京 | 国产精品久久久久一区二区国产 | 在线电影日韩 | 精品国产区在线 | 啪嗒啪嗒免费观看完整版 | 国产欧美精品一区二区三区 | 中文字幕在线观看第三页 | 日韩毛片在线一区二区毛片 | 美女黄视频免费 | 亚洲在线观看av | 欧美福利片在线观看 | 国产成人精品综合久久久久99 | 国产精品粉嫩 | 一区二区理论片 | 在线观看中文字幕dvd播放 | 91欧美在线 | 国产精品一区二区av日韩在线 | 狠狠色噜噜狠狠狠狠2022 | 一级黄色片在线 | 亚洲三级影院 | 欧美 亚洲 另类 激情 另类 | 国产成人精品综合 | 久久精品久久精品久久39 | 久久公开免费视频 | 免费成人在线电影 | 日韩久久久久久久久久 | 久久精品一区二 | 国产日韩欧美在线 | 人人草人人草 | 亚洲亚洲精品在线观看 | 99热99| 国产精品18久久久 | 在线亚洲欧美日韩 | 91久久爱热色涩涩 | 九九视频免费在线观看 | 久久久首页 | 国产精品 久久 | 999超碰| 在线免费观看的av | 午夜国产成人 | 99久久影视 | 在线观看免费色 | 一级片在线 | 国产超碰97| 免费看的黄色网 | 成人a在线观看高清电影 | 综合影视 | 97超碰中文字幕 | 91精品国产麻豆国产自产影视 | 久久伊人精品一区二区三区 | av噜噜噜在线播放 | www.久久免费视频 | 欧美日韩伦理一区 | 国产中的精品av小宝探花 | 9999精品| 又爽又黄又刺激的视频 | 国产精品黄 | 91在线麻豆| 国产精品18久久久久久久久久久久 | 日韩经典一区二区三区 | 亚洲精品动漫在线 | 日韩免费大片 | 五月婷婷播播 | 亚洲国产精品va在线看黑人 | 日韩综合第一页 | 99c视频高清免费观看 | www久久 | 国产一级电影网 | 成人黄视频 | 看片在线亚洲 | 天天插天天操天天干 | 久久久久欠精品国产毛片国产毛生 | 97香蕉超级碰碰久久免费软件 | 久久精品综合 | 国产精品久久艹 | 国内精品久久久久久久 | 久久这里只有精品23 | 国产精品99久久久久久大便 | 久久免费av电影 | 在线观看中文字幕一区二区 | 成人av网站在线 | 欧美大片mv免费 | 国产资源在线观看 | 久久精品网站免费观看 | 亚洲午夜精品久久久久久久久 | 91国内在线视频 | 六月婷婷久香在线视频 | 免费黄色a级毛片 | 91插插插免费视频 | 国产黄色精品网站 | 亚洲黄网址 | 久久精品久久久精品美女 | 亚洲资源片 | 2019免费中文字幕 | www.黄色片网站 | 亚a在线 | 狠狠躁夜夜躁人人爽视频 | 高清一区二区三区av | 国产精品一区电影 | 91精品久久久久久综合乱菊 | 国产亚洲精品久久久久动 | 国产成人免费观看久久久 | 国产精品成人a免费观看 | 日韩av一区二区三区四区 | 91麻豆精品国产91久久久久久 | 丁香六月欧美 | 色之综合网 | 欧美网址在线观看 | 黄色小说视频在线 | 精品国产欧美一区二区三区不卡 | 日韩欧美区 | 成人av影视| 亚洲一区二区视频在线播放 | 国产精品久久久精品 | 精品国产自在精品国产精野外直播 | 91精品在线免费观看 | 国产精品久久久久久久久费观看 | 99这里只有久久精品视频 | 天天爱天天射 | 91视频网址入口 | 欧美成人在线免费观看 | 91伊人久久大香线蕉蜜芽人口 | 成年人看片网站 | 久草在线视频国产 | 久久久午夜剧场 | 久草手机视频 | 国产色在线视频 | 91入口在线观看 | 99热国内精品 | 久久久久免费网站 | 超碰在线官网 | 久色婷婷 | 久久99这里只有精品 | 国产精品1区 | 免费在线观看av的网站 | 久久综合狠狠综合久久综合88 | 九色琪琪久久综合网天天 | 国产一区免费视频 | 国产精品99久久免费黑人 | 国产九九精品视频 | 国产精久久久久久妇女av | 中文字幕999 | 久久精品精品 | 99色| 久久国产三级 | 丝袜美女在线观看 | 91精品国产乱码久久 | 久久久久久久久久久成人 | 国产色在线,com | 日韩欧美电影在线观看 | 国产成人精品久久久久蜜臀 | 日韩xxxxxxxxx| 99久久夜色精品国产亚洲 | 最新中文字幕在线播放 | 中文字幕制服丝袜av久久 | 欧美激情综合五月色丁香小说 | 国产亚洲激情视频在线 | 欧美有色 | 六月激情 | www夜夜操 | 国产一区二区三精品久久久无广告 | 五月天激情视频在线观看 | 国产视频精品久久 | 91久久偷偷做嫩草影院 | 婷婷综合 | 99久久久国产精品美女 | 久久亚洲免费 | av综合av| 欧美大片aaa | 亚洲精品视频播放 | 国产色在线视频 | 日本一区二区三区视频在线播放 | 免费福利影院 | 亚洲天堂精品视频 | 精品亚洲一区二区三区 | 国产剧情亚洲 | 中文字幕无吗 | 亚洲视频在线观看免费 | 国产精品综合久久久 | 国产精品乱码久久久久久1区2区 | 日韩中文字幕免费看 | 精品久久久网 | 最近日韩免费视频 | 亚洲成人av在线 | 91视频 - 88av | 96精品视频 | 国产黄色视 | 欧美激情亚洲综合 | 99精品成人| 亚洲精品玖玖玖av在线看 | 成人免费在线视频观看 | 97视频免费看| 久久久影院一区二区三区 | 久久精品www人人爽人人 | 一级精品视频在线观看宜春院 | 国产午夜视频在线观看 | 国产精品观看在线亚洲人成网 | 欧美日韩视频在线播放 | 99精品免费久久久久久久久 | 中文字幕一区二区三区四区 | 欧美最猛性xxxx | 久久99精品一区二区三区三区 | 日韩一区二区三区高清免费看看 | 婷婷六月色 | 狠狠狠干 | 久久久91精品国产一区二区三区 | 日韩av电影手机在线观看 | 欧美亚洲国产日韩 | 欧美午夜理伦三级在线观看 | 97国产一区二区 | 国产日韩精品在线观看 | 91夫妻视频 | 干综合网 | 国产手机视频 | 在线免费av网 | 国产91对白在线 | 国产麻豆视频 | 99人久久精品视频最新地址 | 在线视频观看你懂的 | 中文字幕av全部资源www中文字幕在线观看 | 免费观看www视频 | 国产麻豆精品久久一二三 | 99久久er热在这里只有精品66 | 国产精品美女久久久久久2018 | 免费视频三区 | 人人爱爱人人 | 一区二区中文字幕在线观看 | 九九爱免费视频 | 国产区精品视频 | 超碰97人人干| 亚洲成年人av | 一区二区精品在线 | 97在线播放 | www色片 | 亚洲综合网站在线观看 | 99热九九这里只有精品10 | 97碰碰精品嫩模在线播放 | 日韩网站一区二区 | 亚洲国产网站 | 国产又黄又硬又爽 | 91av在线免费 | 丁香六月在线观看 | 香蕉久草| 免费观看av | 蜜臀av性久久久久蜜臀aⅴ涩爱 | 国产99自拍| 色婷婷激情电影 | 又黄又爽又湿又无遮挡的在线视频 | 97国产情侣爱久久免费观看 | 亚洲欧美精品一区二区 | 香蕉日日 | 婷婷天天色 | 国产精品成人国产乱一区 | 国产999精品久久久久久绿帽 | 成人免费观看在线视频 | 96精品视频| 天天鲁天天干天天射 | 久久久穴 | 麻豆视频在线播放 | 婷婷免费在线视频 | 国产99久久久国产精品成人免费 | 日韩精品最新在线观看 | 天天综合色天天综合 | 亚洲成年人av | 久久www免费视频 | 69久久99精品久久久久婷婷 | 成人影片在线免费观看 | 久久看视频| 中文字幕亚洲欧美 | 精品久久久久久国产91 | 亚洲三级在线播放 | 日韩精品最新在线观看 | 天天干天天拍天天操 | 婷婷丁香七月 | 久久久久久久久久久久久久电影 | 日韩在线小视频 | 麻豆视频入口 | 免费日韩电影 | 香蕉影视app | 激情久久网 | 丁香资源影视免费观看 | 韩日视频在线 | 国产又粗又猛又黄又爽的视频 | 国产精品涩涩屋www在线观看 | 久久99精品久久久久久 | 91成年视频 | 国产亚洲日 | 婷婷激情五月 | 亚洲精品国产成人 | 欧美大片mv免费 | 91视频成人免费 | 欧美日韩在线观看一区二区 | 亚州激情视频 | 精品国产伦一区二区三区免费 | 久草网视频 | 午夜精品一区二区三区在线视频 | 久久有精品 | 欧美久久久| 久久视频免费 | 欧美日韩网站 | 91香蕉亚洲精品 | 在线观看911视频 | 亚洲va韩国va欧美va精四季 | 黄色免费观看视频 | 国产精品美女视频网站 | 欧美久久电影 | 青青草国产免费 | 中文字幕一区二区三区在线播放 | 911香蕉 | 色多多在线观看 | 91av在线电影 | 国产一级片在线播放 | 久久久www成人免费毛片 | 久久99深爱久久99精品 | 色射爱| 一区三区在线欧 | 精品99免费 | 久久久国际精品 | 国产亚洲一级高清 | 久久中文网 | 久久久人 | 精品国产乱码久久久久久三级人 | 国产精品乱码一区二区视频 | 国产一区二区在线视频观看 | 精品视频国产一区 | 一区二区影院 | 亚洲乱码一区 | 欧美婷婷综合 | 中文字幕在线日亚洲9 | 天天做天天爱天天爽综合网 | 免费 在线 中文 日本 | 成人va视频 | 永久免费av在线播放 | 在线播放亚洲激情 | 天天爽夜夜爽人人爽曰av | 国产香蕉视频在线观看 | 成年人免费电影在线观看 | 日日夜夜狠狠操 | 97综合视频| 色噜噜在线观看 | 久久人人看 | 在线播放 日韩专区 | 日韩在线观看高清 | 久操免费视频 | 中文字幕在线观看一区 | 亚洲免费资源 | 国产96在线观看 | 免费瑟瑟网站 | 国产91综合一区在线观看 | 韩日视频在线 | 韩国视频一区二区三区 | 久久精品视频网站 | 日韩 国产 | 久久视频这里只有精品 | 日本夜夜草视频网站 | 亚洲男模gay裸体gay | 99免费在线观看视频 | 九九久久久久久久久激情 | 青春草免费在线视频 | 久久婷婷亚洲 | 97久久精品午夜一区二区 | 少妇自拍av | 日日日日 | 五月天国产 | 免费在线观看亚洲视频 | 毛片网站观看 | 久久精品99国产精品酒店日本 | 在线观看日韩免费视频 | 久久尤物电影视频在线观看 | 精品国产一区二区三区日日嗨 |