日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Flume监听文件夹中的文件变化,并把文件下沉到hdfs

發(fā)布時(shí)間:2024/9/27 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flume监听文件夹中的文件变化,并把文件下沉到hdfs 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1、采集目錄到HDFS

采集需求:某服務(wù)器的某特定目錄下,會(huì)不斷產(chǎn)生新的文件,每當(dāng)有新文件出現(xiàn),就需要把文件采集到HDFS中去
根據(jù)需求,首先定義以下3大要素
采集源,即source——監(jiān)控文件目錄 : spooldir
下沉目標(biāo),即sink——HDFS文件系統(tǒng) : hdfs sink
source和sink之間的傳遞通道——channel,可用file channel 也可以用內(nèi)存channel

配置文件spooldir-hdfs.conf編寫:

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1# Describe/configure the source ##注意:不能往監(jiān)控目中重復(fù)丟同名文件 ## 通過(guò)spooldir來(lái)監(jiān)控文件內(nèi)容的變化 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/tuzq/software/flumedata a1.sources.r1.fileHeader = true# Describe the sink ## 表示下沉到hdfs,下面配置的類型不同,type下面的參數(shù)就不同 a1.sinks.k1.type = hdfs #sinks.k1只能連接一個(gè)channel,source可以配置多個(gè) a1.sinks.k1.channel = c1 #下面的配置告訴用hdfs去寫文件的時(shí)候?qū)懙绞裁次恢?#xff0c;下面的表示不是寫死的,而是動(dòng)態(tài)變化的。表示輸出的目錄名稱是可變的 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/ #表示文件的前綴 a1.sinks.k1.hdfs.filePrefix = events- #表示到了需要觸發(fā)的時(shí)間時(shí),是否要更新文件夾,true:表示要更新 a1.sinks.k1.hdfs.round = true ##表示每隔1分鐘改變一下文件夾 a1.sinks.k1.hdfs.roundValue = 1 ##切換文件的時(shí)候單位是分鐘 a1.sinks.k1.hdfs.roundUnit = minute ##表示只要過(guò)了3秒鐘,就切換生成一個(gè)新的文件 a1.sinks.k1.hdfs.rollInterval = 3 ##如果記錄的文件大于20字節(jié)時(shí)切換一次 a1.sinks.k1.hdfs.rollSize = 20 ##當(dāng)寫了5個(gè)事件時(shí)觸發(fā) a1.sinks.k1.hdfs.rollCount = 5 ##收到了多少條消息往hdfs中追加內(nèi)容 a1.sinks.k1.hdfs.batchSize = 1 #使用本地時(shí)間戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件類型,默認(rèn)是Sequencefile,可用DataStream,則為普通文本 a1.sinks.k1.hdfs.fileType = DataStream# Use a channel which buffers events in memory ##使用內(nèi)存的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

Channel參數(shù)解釋:
capacity:默認(rèn)該通道中最大的可以存儲(chǔ)的event數(shù)量
trasactionCapacity:每次最大可以從source中拿到或者送到sink中的event數(shù)量
keep-alive:event添加到通道中或者移出的允許時(shí)間

Flume有HDFS Sink,可以將Source進(jìn)來(lái)的數(shù)據(jù)寫入到hdfs中。HDFS Sink具體的邏輯代碼是在HDFSEventSink這個(gè)類中。HDFS Sink跟寫文件相關(guān)的配置如下:hdfs.path -> hdfs目錄路徑hdfs.filePrefix -> 文件前綴。默認(rèn)值FlumeDatahdfs.fileSuffix -> 文件后綴hdfs.rollInterval -> 多久時(shí)間后close hdfs文件。單位是秒,默認(rèn)30秒。設(shè)置為0的話表示不根據(jù)時(shí)間close hdfs文件hdfs.rollSize -> 文件大小超過(guò)一定值后,close文件。默認(rèn)值1024,單位是字節(jié)。設(shè)置為0的話表示不基于文件大小hdfs.rollCount -> 寫入了多少個(gè)事件后close文件。默認(rèn)值是10個(gè)。設(shè)置為0的話表示不基于事件個(gè)數(shù)hdfs.fileType -> 文件格式, 有3種格式可選擇:SequenceFile, DataStream or CompressedStreamhdfs.batchSize -> 批次數(shù),HDFS Sink每次從Channel中拿的事件個(gè)數(shù)。默認(rèn)值100hdfs.minBlockReplicas -> HDFS每個(gè)塊最小的replicas數(shù)字,不設(shè)置的話會(huì)取hadoop中的配置hdfs.maxOpenFiles -> 允許最多打開(kāi)的文件數(shù),默認(rèn)是5000。如果超過(guò)了這個(gè)值,越早的文件會(huì)被關(guān)閉serializer -> HDFS Sink寫文件的時(shí)候會(huì)進(jìn)行序列化操作。會(huì)調(diào)用對(duì)應(yīng)的Serializer借口,可以自定義符合需求的Serializerhdfs.retryInterval -> 關(guān)閉HDFS文件失敗后重新嘗試關(guān)閉的延遲數(shù),單位是秒hdfs.callTimeout -> HDFS操作允許的時(shí)間,比如hdfs文件的open,write,flush,close操作。單位是毫秒,默認(rèn)值是10000

執(zhí)行命令

[root@hadoop1 apache-flume-1.6.0-bin]#cd /home/tuzq/software/apache-flume-1.6.0-bin [root@hadoop1 apache-flume-1.6.0-bin]#bin/flume-ng agent -c ./conf -f ./agentconf/spool-logger.conf -n a1 -Dflume.root.logger=INFO,console;

接著往/home/tuzq/software/flumedata文件夾中扔文件

[root@hadoop1 flumedata]# pwd /home/tuzq/software/flumedata [root@hadoop1 flumedata]# echo 111111111 >> 1.txt [root@hadoop1 flumedata]# ls 1.txt.COMPLETED test.log.COMPLETED [root@hadoop1 flumedata]# echo 22222222 >> 2.txt [root@hadoop1 flumedata]# echo 33333333 >> 3.txt [root@hadoop1 flumedata]# echo 44444444 >> 4.txt [root@hadoop1 flumedata]# ls 1.txt.COMPLETED 2.txt.COMPLETED 3.txt.COMPLETED 4.txt.COMPLETED test.log.COMPLETED [root@hadoop1 flumedata]#

扔了之后,現(xiàn)象是
1、/home/tuzq/software/flumedata文件文件夾下的文件倍加了一個(gè)一個(gè)后綴.COMPLETED,
2、在flume的監(jiān)控位置,出現(xiàn)類似下圖一樣的文件:
3、到hdfs上查看文件:

[root@hadoop1 flumedata]# hdfs dfs -ls / Found 5 items drwxr-xr-x - root supergroup 0 2017-06-13 12:01 /40000 drwxr-xr-x - root supergroup 0 2017-06-13 23:43 /flume -rw-r--r-- 3 root supergroup 3719 2017-06-10 12:11 /kms.sh drwxrwxrwx - root supergroup 0 2017-06-10 22:06 /tmp drwxr-xr-x - root supergroup 0 2017-06-10 22:27 /user [root@hadoop1 flumedata]# hdfs dfs -ls /flume Found 2 items drwxr-xr-x - root supergroup 0 2017-06-13 23:43 /flume/events drwxr-xr-x - root supergroup 0 2017-06-13 22:01 /flume/tailout [root@hadoop1 flumedata]# hdfs dfs -ls /flume/events Found 1 items drwxr-xr-x - root supergroup 0 2017-06-13 23:47 /flume/events/17-06-13 [root@hadoop1 flumedata]# hdfs dfs -ls /flume/events/17-06-13 Found 3 items drwxr-xr-x - root supergroup 0 2017-06-13 23:43 /flume/events/17-06-13/2343 drwxr-xr-x - root supergroup 0 2017-06-13 23:46 /flume/events/17-06-13/2346 drwxr-xr-x - root supergroup 0 2017-06-13 23:47 /flume/events/17-06-13/2347 [root@hadoop1 flumedata]#

綜上所述:說(shuō)明通過(guò)flume已經(jīng)把新增的文件下沉到了hdfs中。

總結(jié)

以上是生活随笔為你收集整理的Flume监听文件夹中的文件变化,并把文件下沉到hdfs的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。