Flume整合Kafka采集滚动的日志
背景:
從Nginx中間件采集web項目產生的滾動日志。通過本地服務器(簡稱:A服務)的Flume采集日志,然后傳輸到另外一臺服務器(簡稱:B服務器)的Flume上,然后暫存到 B服務器 的Kafka中。
為了展示效果,通過Kafka的consumer進行消費,打印到字符界面。
1、Flume的配置文件如下:
1.1、A服務器上,[exec-memory-avro.conf] 配置文件如下:
# agent properties
exec-memory-avro.sources = r1
exec-memory-avro.sinks = k1
exec-memory-avro.channels = c1
# sourecs properties
exec-memory-avro.sources.r1.type = exec
exec-memory-avro.sources.r1.command = tail -F /home/cyx/data/flume-logs-tmp/logs.txt
exec-memory-avro.sources.r1.shell = /bin/sh -c
# sinks properties
# eg: exec-memory-avro.sinks.k1.type = logger
exec-memory-avro.sinks.k1.type = avro
exec-memory-avro.sinks.k1.hostname =? h6 # B服務器hostname
exec-memory-avro.sinks.k1.port = 44444
# channels properties
exec-memory-avro.channels.c1.type = memory
# Bind the source and sink to channel
exec-memory-avro.sources.r1.channels = c1
exec-memory-avro.sinks.k1.channel = c1
1.2、B服務器上,[avro-memory-kafka.conf] 配置文件如下:
# agent properties
avro-memory-kafka.sources = r1
avro-memory-kafka.sinks = k1
avro-memory-kafka.channels = c1
# source properties
avro-memory-kafka.sources.r1.type = avro
avro-memory-kafka.sources.r1.bind = 0.0.0.0 # 或者直接寫 B服務器hostname
avro-memory-kafka.sources.r1.port = 44444
# sink properties
avro-memory-kafka.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.k1.kafka.topic = my-topic
# flume1.6版本后改成 bootstrap.servers了,1.6之前是 brokerList
avro-memory-kafka.sinks.k1.kafka.bootstrap.servers = h5:9092,h6:9092,h7:9092
avro-memory-kafka.sinks.k1.kafka.flumeBatchSize = 5
avro-memory-kafka.sinks.k1.kafka.producer.acks = 1
avro-memory-kafka.sinks.k1.kafka.producer.linger.ms = 1
avro-memory-kafka.sinks.k1.kafka.producer.compression.type = snappy
# channels properties
avro-memory-kafka.channels.c1.type = memory
# bind the sources and sinks to channels
avro-memory-kafka.sources.r1.channels = c1
avro-memory-kafka.sinks.k1.channel = c1
2、步驟:
2.1、需要先啟動B服務器上的Flume
flume-ng agent -n avro-memory-kafka -c /opt/flume/conf/ -f /opt/flume/conf/avro-memory-kafka.conf -Dflume.root.logger=INFO,console
命令行最后一部分會有信息如下:[INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:260)] Avro source r1 started.
?2.2、然后啟動A服務器上的Flume
?flume-ng agent -n exec-memory-avro -c /opt/flume/conf/ -f /opt/flume/conf/exec-memory-avro.conf -Dflume.root.logger=INFO,console
這時會看到B服務器上顯示兩臺服務器連接成功的信息:。。。。B服務器IP:44444] CONNECTED: /A服務器IP:33478
?2.3、然后啟動B服務器上的Kafka
kafka-console-consumer.sh --zookeeper h6:2181 --topic my-topic
?3、測試:
?3.1、向A服務器的文件寫入信息進行測試
echo word count1? >> /home/cyx/data/flume-logs-tmp/logs.txt
echo word count2? >> /home/cyx/data/flume-logs-tmp/logs.txt
echo word count3? >> /home/cyx/data/flume-logs-tmp/logs.txt
echo word count4? >> /home/cyx/data/flume-logs-tmp/logs.txt
?3.2、查看B服務器的控制臺,可以看到
word count1
word count2
word count3
word count4
如果沒有到,需要等一會,因為flumeBatchSize = 5,需要超過設置緩存的時間后,才會傳到Kafka中
?
注釋:
1).tail -F /home/cyx/data/flume-logs-tmp/logs.txt 執行Linux命令tail,實時采集logs.txt中新增的日志
2).參數 flume.root.logger=INFO,console 是讓打印日志消息到終端
3).flume 的啟動參數 -n 后為flume的agent的名稱,必須和對應的文件一直,否則啟動不成功
轉載于:https://www.cnblogs.com/chenyongxiang/p/8722704.html
總結
以上是生活随笔為你收集整理的Flume整合Kafka采集滚动的日志的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 推荐:Java性能优化系列集锦
- 下一篇: modernizr.js的介绍和使用