日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

通过Flume简单实现Kafka与Hive对接(Json格式)

發(fā)布時間:2025/3/8 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 通过Flume简单实现Kafka与Hive对接(Json格式) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

將以下存儲在kafka的topic中的JSON格式字符串,對接存儲到Hive的表中

{"id":1,"name":"小李"} {"id":2,"name":"小張"} {"id":3,"name":"小劉"} {"id":4,"name":"小王"}

1、在hive/conf/hive-site.xml中添加或修改如下內(nèi)容:

<property><name>hive.txn.manager</name><value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value></property><property><name>hive.support.concurrency</name><value>true</value></property><property><name>hive.metastore.uris</name><value>thrift://localhost:9083</value></property>

2、創(chuàng)建database、table,其中表有id、name這個兩個字段

hive> create database hivetokafka;hive> create table kafkatable(id int,name string) hive> clustered by(id) into 2 buckets stored as orc tblproperties('transactional'='true');

3、執(zhí)行 hive --service metastore & 啟動元數(shù)據(jù)服務(wù)

hive --service metastore &

4、配置conf文件,這里文件名和位置可以隨意(我的是放在hive/myconf/新建的目錄下,名字為kafkatohive.conf),添加如下內(nèi)容

a.sources=source_from_kafka a.channels=mem_channel a.sinks=hive_sink#kafka為souce的配置 a.sources.source_from_kafka.type=org.apache.flume.source.kafka.KafkaSource a.sources.source_from_kafka.zookeeperConnect=localhost:2181 a.sources.source_from_kafka.bootstrap.servers=localhost:9092 a.sources.source_from_kafka.topic=testtopic a.sources.source_from_kafka.channels=mem_channel a.sources.source_from_kafka.consumer.timeout.ms=1000 #hive為sink的配置 a.sinks.hive_sink.type=hive a.sinks.hive_sink.hive.metastore=thrift://localhost:9083 a.sinks.hive_sink.hive.database=hivetokafka a.sinks.hive_sink.hive.table=kafkatable a.sinks.hive_sink.hive.txnsPerBatchAsk=2 a.sinks.hive_sink.batchSize=10 a.sinks.hive_sink.serializer=JSON a.sinks.hive_sink.serializer.fieldnames=id,name #channel的配置 a.channels.mem_channel.type=memory a.channels.mem_channel.capacity=1500 a.channels.mem_channel.transactionCapacity=1000 #三者之間的關(guān)系 a.sources.source_from_kafka.channels=mem_channel a.sinks.hive_sink.channel=mem_channel

5、將/hive/hcatalog/share/hcatalog/hive-hcatalog-streaming-x.x.x.jar拷貝到/flume/lib/下

此外還需要注意/hive/lib/guava-xx.x-jre.jar下與/flume/lib/下的版本是否一致。

6、啟動flume,命令格式如下

flume-ng agent --conf conf/ --conf-file conf/…. --name a -Dflume.root.logger=INFO,console;

我這里就是(在flume/路徑下 ):

bin/flume-ng agent --conf myconf/ --conf-file myconf/kafkatohive.conf --name a -Dflume.root.logger=INFO,console;

7、新建終端窗口,創(chuàng)建topic(默認(rèn)已經(jīng)啟動了zookeeper和kafka服務(wù)了)

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic

8、啟動kafka生產(chǎn)者,進(jìn)行生產(chǎn)消息

啟動命令:

kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic

生產(chǎn)消息:

>{"id":1,"name":"小李"} >{"id":2,"name":"小張"} >{"id":3,"name":"小劉"} >{"id":4,"name":"小王"}

9、查看結(jié)果

hive> select * from student; OK 1 小李 2 小張 3 小劉 4 小王Time taken: 0.589 seconds, Fetched: 10 row(s)

總結(jié)

以上是生活随笔為你收集整理的通过Flume简单实现Kafka与Hive对接(Json格式)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。