日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flume整合Kafka采集滚动的日志

發布時間:2024/1/17 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flume整合Kafka采集滚动的日志 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

背景:

從Nginx中間件采集web項目產生的滾動日志。通過本地服務器(簡稱:A服務)的Flume采集日志,然后傳輸到另外一臺服務器(簡稱:B服務器)的Flume上,然后暫存到 B服務器 的Kafka中。

為了展示效果,通過Kafka的consumer進行消費,打印到字符界面。

1、Flume的配置文件如下:

1.1、A服務器上,[exec-memory-avro.conf] 配置文件如下:

# agent properties
exec-memory-avro.sources = r1
exec-memory-avro.sinks = k1
exec-memory-avro.channels = c1

# sourecs properties
exec-memory-avro.sources.r1.type = exec
exec-memory-avro.sources.r1.command = tail -F /home/cyx/data/flume-logs-tmp/logs.txt
exec-memory-avro.sources.r1.shell = /bin/sh -c

# sinks properties
# eg: exec-memory-avro.sinks.k1.type = logger
exec-memory-avro.sinks.k1.type = avro
exec-memory-avro.sinks.k1.hostname =? h6        # B服務器hostname
exec-memory-avro.sinks.k1.port = 44444

# channels properties
exec-memory-avro.channels.c1.type = memory

# Bind the source and sink to channel
exec-memory-avro.sources.r1.channels = c1
exec-memory-avro.sinks.k1.channel = c1

1.2、B服務器上,[avro-memory-kafka.conf] 配置文件如下:

# agent properties
avro-memory-kafka.sources = r1
avro-memory-kafka.sinks = k1
avro-memory-kafka.channels = c1

# source properties
avro-memory-kafka.sources.r1.type = avro
avro-memory-kafka.sources.r1.bind = 0.0.0.0                  # 或者直接寫 B服務器hostname
avro-memory-kafka.sources.r1.port = 44444

# sink properties
avro-memory-kafka.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.k1.kafka.topic = my-topic

# flume1.6版本后改成 bootstrap.servers了,1.6之前是 brokerList
avro-memory-kafka.sinks.k1.kafka.bootstrap.servers = h5:9092,h6:9092,h7:9092
avro-memory-kafka.sinks.k1.kafka.flumeBatchSize = 5
avro-memory-kafka.sinks.k1.kafka.producer.acks = 1
avro-memory-kafka.sinks.k1.kafka.producer.linger.ms = 1
avro-memory-kafka.sinks.k1.kafka.producer.compression.type = snappy

# channels properties
avro-memory-kafka.channels.c1.type = memory

# bind the sources and sinks to channels
avro-memory-kafka.sources.r1.channels = c1
avro-memory-kafka.sinks.k1.channel = c1

2、步驟:

2.1、需要先啟動B服務器上的Flume

flume-ng agent -n avro-memory-kafka -c /opt/flume/conf/ -f /opt/flume/conf/avro-memory-kafka.conf -Dflume.root.logger=INFO,console

命令行最后一部分會有信息如下:[INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:260)] Avro source r1 started.

?2.2、然后啟動A服務器上的Flume

?flume-ng agent -n exec-memory-avro -c /opt/flume/conf/ -f /opt/flume/conf/exec-memory-avro.conf -Dflume.root.logger=INFO,console

這時會看到B服務器上顯示兩臺服務器連接成功的信息:。。。。B服務器IP:44444] CONNECTED: /A服務器IP:33478

?2.3、然后啟動B服務器上的Kafka

kafka-console-consumer.sh --zookeeper h6:2181 --topic my-topic

?3、測試:

?3.1、向A服務器的文件寫入信息進行測試

echo word count1? >> /home/cyx/data/flume-logs-tmp/logs.txt

echo word count2? >> /home/cyx/data/flume-logs-tmp/logs.txt

echo word count3? >> /home/cyx/data/flume-logs-tmp/logs.txt

echo word count4? >> /home/cyx/data/flume-logs-tmp/logs.txt

?3.2、查看B服務器的控制臺,可以看到

word count1
word count2
word count3
word count4
如果沒有到,需要等一會,因為flumeBatchSize = 5,需要超過設置緩存的時間后,才會傳到Kafka中

?

注釋:

1).tail -F /home/cyx/data/flume-logs-tmp/logs.txt 執行Linux命令tail,實時采集logs.txt中新增的日志

2).參數 flume.root.logger=INFO,console 是讓打印日志消息到終端

3).flume 的啟動參數 -n 后為flume的agent的名稱,必須和對應的文件一直,否則啟動不成功

轉載于:https://www.cnblogs.com/chenyongxiang/p/8722704.html

總結

以上是生活随笔為你收集整理的Flume整合Kafka采集滚动的日志的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。