日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

flume avro java 发送数据_flume将数据发送到kafka、hdfs、hive、http、netcat等模式的使用总结...

發布時間:2025/3/8 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flume avro java 发送数据_flume将数据发送到kafka、hdfs、hive、http、netcat等模式的使用总结... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、source為http模式,sink為logger模式,將數據在控制臺打印出來。

conf配置文件如下:

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = http #該設置表示接收通過http方式發送過來的數據

a1.sources.r1.bind = hadoop-master #運行flume的主機或IP地址都可以

a1.sources.r1.port = 9000#端口

#a1.sources.r1.fileHeader = true

# Describe the sink

a1.sinks.k1.type = logger#該設置表示將數據在控制臺打印出來

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

啟動flume命令為:

bin/flume-ng agent -c conf -f conf/http.conf -n a1 -Dflume.root.logger=INFO,console。

顯示如下的信息表示啟動flume成功。

895 (lifecycleSupervisor-1-3) [INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started

打開另外一個終端,通過http post的方式發送數據:

curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:9000。

hadoop-master就是flume配置文件綁定的主機名,9000就是綁定的端口。

然后在運行flume的窗口就是看到如下的內容:

2018-06-12 08:24:04,472 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{timestampe=1234567, host=master} body: 62 61 64 6F 75 20 66 6C 75 6D 65 badou flume }

2、source為netcat(udp、tcp模式),sink為logger模式,將數據打印在控制臺

conf配置文件如下:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = netcat

a1.sources.r1.bind = hadoop-master#綁定的主機名或IP地址

a1.sources.r1.port = 44444

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transcationCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

啟動flume

bin/flume-ng agent -c conf -f conf/netcat.conf -n a1 -Dflume.root.logger=INFO,console。

然后在另外一個終端,使用telnet發送數據:

命令為:telnet hadoop-maser 44444

[root@hadoop-master ~]# telnet hadoop-master 44444

Trying 192.168.194.6...

Connected to hadoop-master.

Escape character is '^]'.

顯示上面的信息表示連接flume成功,然后輸入:

12213213213

OK

12321313

OK

在flume就會收到相應的信息:

2018-06-12 08:38:51,129 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 32 31 33 32 31 33 32 31 33 0D 12213213213. }

2018-06-12 08:38:51,130 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 33 32 31 33 31 33 0D 12321313. }

3、source為netcat/http模式,sink為hdfs模式,將數據存儲在hdfs中。

conf配置文件如下,文件名為hdfs.conf:

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = hadoop-master

a1.sources.r1.port = 44444

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type =regex_filter

a1.sources.r1.interceptors.i1.regex =^[0-9]*$

a1.sources.r1.interceptors.i1.excludeEvents =true

# Describe the sink

#a1.sinks.k1.type = logger

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs:/flume/events #文件在hdfs文件系統中存放的位置

a1.sinks.k1.hdfs.filePrefix = events- #文件的前綴

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

a1.sinks.k1.hdfs.fileType = DataStream #制定文件的存放格式,這個設置是以text的格式存放從flume傳輸過來的數據。

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

在hdfs文件系統中創建文件存放的路徑:

hadoop fs -mkdir /flume/event1。

啟動flume:

bin/flume-ng agent -c conf -f conf/hdfs.conf -n a1 -Dflume.root.logger=INFO,console

通過telnet模式向flume中發送文件:

telnet hadoop-master 44444

然后輸入:

aaaaaaaa

bbbbbbb

ccccccccc

dddddddddd

通過如下的命令hadoop fs -ls /flume/events/查看hdfs中的文件,可以看到hdfs中有/flume/events有如下文件:

-rw-r--r-- 3 root supergroup 16 2018-06-05 06:02 /flume/events/events-.1528203709070

-rw-r--r-- 3 root supergroup 5 2018-06-05 06:02 /flume/events/events-.1528203755556

-rw-r--r-- 3 root supergroup 11 2018-06-05 06:03 /flume/events/events-.1528203755557

-rw-r--r-- 3 root supergroup 26 2018-06-13 07:28 /flume/events/events-.1528900112215

-rw-r--r-- 3 root supergroup 209 2018-06-13 07:29 /flume/events/events-.1528900112216

-rw-r--r-- 3 root supergroup 72 2018-06-13 07:29 /flume/events/events-.1528900112217

通過hadoop fs -cat /flume/events/events-.1528900112216查看文件events-.1528900112216的內容:

aaaaaaaaaaaaaaaaa

bbbbbbbbbbbbbbbb

ccccccccccccccccccc

dddddddddddddddd

eeeeeeeeeeeeeeeeeee

fffffffffffffffffffffff

gggggggggggggggggg

hhhhhhhhhhhhhhhhhhhhhhh

iiiiiiiiiiiiiiiiiii

jjjjjjjjjjjjjjjjjjj

http模式就是把hdfs.conf文件中的netcat改為http,然后傳輸文件從telnet改為:

curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:44444。

在hadoop文件中就會看到上面命令傳輸的內容:badou flume。

4、source為netcat/http模式,sink為hive模式,將數據存儲在hive中,并分區存儲。

conf配置如下,文件名為hive.conf:

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = hadoop-master

a1.sources.r1.port = 44444

# Describe the sink

#a1.sinks.k1.type = logger

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore=thrift://hadoop-master:9083

a1.sinks.k1.hive.database=default#hive數據庫名

a1.sinks.k1.hive.table=flume_user1

a1.sinks.k1.serializer=DELIMITED

a1.sinks.k1.hive.partition=3#如果以netcat模式,只能靜態設置分區的值,因為netcat模式傳輸數據,無法傳輸某個字段的值,只能按照順序來。這里設置age的分區值為3。

#a1.sinks.k1.hive.partition=%{age}#如果以http或json等模式,只能動態設置分區的值,因為http模式可以動態傳輸age的值。

a1.sinks.k1.serializer.delimiter=" "

a1.sinks.k1.serializer.serderSeparator=' '

a1.sinks.k1.serializer.fieldnames=user_id,user_name

a1.sinks.k1.hive.txnsPerBatchAsk = 10

a1.sinks.k1.hive.batchSize = 1500

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

在hive中創建表:

create table flume_user(

user_id int

,user_name string

)

partitioned by(age int)

clustered by (user_id) into 2 buckets

stored as orc

在hive-site.xml中添加如下內容:

javax.jdo.option.ConnectionPassword

hive

password to use against metastore database

hive.support.concurrency

true

hive.exec.dynamic.partition.mode

nonstrict

hive.txn.manager

org.apache.hadoop.hive.ql.lockmgr.DbTxnManager

hive.compactor.initiator.on

true

hive.compactor.worker.threads

1

將hive根目錄下的/hcatalog/share/hcatalog文件夾中的如下三個文件夾添加到flume的lib目錄下。

運行flume:

bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。

重新打開一個窗口,

啟動metastroe服務:

hive --service metastore &

重新打開一個客戶端,通過telnet連接到flume

telnet hadoop-master 44444

然后輸入:

1 1

3 3

就會在hive中看到如下兩行數據:

flume_user1.user_id flume_user1.user_name flume_user1.age

1 1 3

3 3 3

age是在hive.conf中設置的值3。

現在將flume的source換成http模式,然后hive分區通過參數模式動態的傳輸分區值。

將hive.conf中的

a1.sources.r1.type = netcat改成a1.sources.r1.type = http

a1.sinks.k1.hive.partition=3改成a1.sinks.k1.hive.partition=%{age}。

然后啟動flume:

bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。

在重新打開的窗口中通過http的模式傳輸數據到flume

curl -X POST -d '[{"headers":{"age":"109"},"body":"11 ligongong"}]' hadoop-master:44444。

在hive中可以看到如下的數據:

flume_user1.user_id flume_user1.user_name flume_user1.age

11 ligongong 109

由此可以看出通過http模式傳輸數據到hive中時,分區字段的信息是在header中傳輸,而其他字段的信息是放在bady中傳輸,并且不同列之間以hive.conf文件定義好的分隔符分隔。

5、使用avro模式,將數據在控制臺打印出來。

不同的agent之間傳輸數據只能通過avro模式。

這里我們需要兩臺服務器來演示avro的使用,兩臺服務器分別是hadoop-master和hadoop-slave2

hadoop-master中運行agent2,然后指定agent2的sink為avro,并且將數據發送的主機名設置為hadoop-slave2。hadoop-master中flume的conf文件設置如下,名字為push.conf:

#Name the components on this agent

a2.sources= r1

a2.sinks= k1

a2.channels= c1

#Describe/configure the source

a2.sources.r1.type= netcat

a2.sources.r1.bind= hadoop-master

a2.sources.r1.port = 44444

a2.sources.r1.channels= c1

#Use a channel which buffers events in memory

a2.channels.c1.type= memory

a2.channels.c1.keep-alive= 10

a2.channels.c1.capacity= 100000

a2.channels.c1.transactionCapacity= 100000

#Describe/configure the source

a2.sinks.k1.type= avro#制定sink為avro

a2.sinks.k1.channel= c1

a2.sinks.k1.hostname= hadoop-slave2#指定sink要發送數據到的目的服務器名

a2.sinks.k1.port= 44444#目的服務器的端口

hadoop-slave2中運行的是agent1,agent1的source為avro。flume配置內容如下,文件名為pull.conf

#Name the components on this agent

a1.sources= r1

a1.sinks= k1

a1.channels= c1

#Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels= c1

a1.sources.r1.bind= hadoop-slave2

a1.sources.r1.port= 44444

#Describe the sink

a1.sinks.k1.type= logger

a1.sinks.k1.channel = c1

#Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.keep-alive= 10

a1.channels.c1.capacity= 100000

a1.channels.c1.transactionCapacity= 100000。

現在hadoop-slave2中啟動flume,然后在hadoop-master中啟動flume,順序一定要對,否則會報如下的錯誤:org.apache.flume.FlumeException: java.net.SocketException: Unresolved address

在hadoop-slave2中啟動flume:

bin/flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console

在hadoop-master中啟動flume:

bin/flume-ng agent -c conf -f conf/push.conf -n a2 -Dflume.root.logger=INFO,console

重新打開一個窗口,通過telnet連接到hadoop-master

telnet hadoop-master 44444

然后發送11111aaaa

在hadoop-slave2的控制臺中就會顯示之前發送的,11111aaaa,如下所示:

2018-06-14 06:43:00,686 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 31 31 31 31 61 61 61 61 0D 11111aaaa. }

6、通過flume將數據通傳輸到kafka,然后通過kafka將數據存儲在hdfs和hive中。

在分別在hadoop-master、hadoop-slave1、hadoop-slave2上啟動zookeeper。

命令為:

然后啟動kafka,進入kafka的安裝目錄,執行命令:

./bin/kafka-server-start.sh config/server.properties &

在kafka中創建topic:

bin/kafka-topics.sh --create --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --replication-factor 1 --partitions 2 --topic flume_kafka

查看kafka中的topic:

bin/kafka-topics.sh --list --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181

啟動kafka的消費者:

./kafka-console-consumer.sh --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --topic flume_kafka

配置flume中conf文件,設置source類型為exec,sink為org.apache.flume.sink.kafka.KafkaSink,設置kafka的topic為上面創建的flume_kafka,具體配置如下:

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

#設置sources的類型為exec,就是執行命令的意思

a1.sources.r1.type = exec

#設置sources要執行的命令

a1.sources.r1.command = tail -f /home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt

# 設置kafka接收器

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

# 設置kafka的broker地址和端口號

a1.sinks.k1.brokerList=hadoop-master:9092

# 設置Kafka的topic

a1.sinks.k1.topic=flume_kafka

# 設置序列化的方式

a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder

# use a channel which buffers events in memory

a1.channels.c1.type=memory

a1.channels.c1.capacity = 100000

a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

啟動flume:

只要/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt中有數據時flume就會加載kafka中,然后被上面啟動的kafka消費者消費掉。

我們查看發現/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt文件中有如下的數據:

131,dry pasta

131,dry pasta

132,beauty

133,muscles joints pain relief

133,muscles joints pain relief

133,muscles joints pain relief

133,muscles joints pain relief

134,specialty wines champagnes

134,specialty wines champagnes

134,specialty wines champagnes

總結

以上是生活随笔為你收集整理的flume avro java 发送数据_flume将数据发送到kafka、hdfs、hive、http、netcat等模式的使用总结...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。