日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

本地日志数据实时接入到hadoop集群的数据接入方案

發布時間:2024/4/17 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 本地日志数据实时接入到hadoop集群的数据接入方案 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.?概述

本手冊主要介紹了,一個將傳統數據接入到Hadoop集群的數據接入方案和實施方法。供數據接入和集群運維人員參考。

1.1.??整體方案

Flume作為日志收集工具,監控一個文件目錄或者一個文件,當有新數據加入時,收集新數據發送給KafkaKafka用來做數據緩存和消息訂閱。Kafka里面的消息可以定時落地到HDFS上,也可以用Spark?Streaming來做實時處理,然后將處理后的數據落地到HDFS上。

1.2.?數據接入流程

本數據接入方案,分為以下幾個步驟:

l?安裝部署Flume:在每個數據采集節點上安裝數據采集工具Flume。詳見“2、安裝部署Flume”。

l?數據預處理:生成特定格式的數據,供Flume采集。詳見“3、數據預處理”。

l?Flume采集數據到Kafka:?Flume采集數據并發送到Kafka消息隊列。詳見“4Flume采集數據到Kafka”。

l?Kafka數據落地:將Kafka數據落地到HDFS。詳見“5Kafka數據落地”。

?

2.?安裝部署Flume

若要采集數據節點的本地數據,每個節點都需要安裝一個Flume工具,用來做數據采集。

2.2.1下載并安裝

到官網去下載最新版本的Flume

下載地址為:http://flume.apache.org/,目前最新版本為1.6.0,需要1.7及以上版本的JDK

1、解壓

tar?-xzvf?apache-flume-1.6.0-bin.tar.gz??-C?/usr/local/

2、安裝JDK1.7

???如果節點上JDK版本低于1.7,需要安裝1.7或以上版本的JDK

JDK?1.7?下載地址:

http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html

Flume目錄下創建一個java目錄,存放JDK

cd?/usr/local/apache-flume-1.6.0-bin

mkdir?java

cd?java

tar?-xzvf?jdk-7u79-linux-x64.tar.gz

?

2.2.2配置Flume系統參數

修改?flume-env.sh?配置文件,主要是JAVA_HOME變量設置

cd?/usr/local/apache-flume-1.6.0-bin/conf

cpflume-env.sh.templateflume-env.sh

flume-env.sh里面設置FLUME_CLASSPATH變量和JAVA_HOME變量,

示例:

export?JAVA_HOME=/usr/local/apache-flume-1.6.0-bin/java/jdk1.7.0_79

FLUME_CLASSPATH="/usr/local/apache-flume-1.6.0-bin/"

變量具體內容根據實際修改

?

2.2.3添加Flume第三方依賴

添加第三方依賴包flume-plugins-1.0-SNAPSHOT.jar,此包實現了一個Flume攔截器,將Flume采集到的數據進行序列化、結構化等預處理,最后每條數據生成一條Event數據返回。

?

cd?/usr/local/apache-flume-1.6.0-bin

mkdir?plugins.d????--創建依賴目錄,目錄名必須為plugins.d

cd?plugins.d?

mkdir?flume-plugins??????????--項目目錄,目錄名隨意

cd?flume-plugins

mkdir?lib???????????--jar目錄,目錄名必須為lib

將第三方jarflume-plugins-1.0-SNAPSHOT.jar放在lib目錄下

2.2.4添加Hive配置文件

hive-site.xml文件拷貝到/usr/local/apache-flume-1.6.0-bin/conf目錄下,并修改hive元數據地址與真實地址對應。如下所示:

?<property>

? <name>hive.metastore.uris</name>

? <value>thrift://m103:9083,thrift://m105:9083</value>

?</property>

?

2.2.5創建Flume?agent配置文件

創建flume啟動配置文件,指定sourcechannelsink?3個組件內容。每個組件都有好幾種配置選項,具體配置請查看Flume官網。創建配置文件flume.conf,示例如下:

?

vim?flume.conf

a1.sources?=?x1

a1.sinks?=?y1

a1.channels?=?z1

#?Describe/configure?the?source

a1.sources.x1.type?=?exec

a1.sources.x1.channels?=?z1

a1.sources.x1.command?=?tail?-F?/home/xdf/exec.txt

#?Describe?the?sink

a1.sinks.y1.type?=?logger

#?Use?a?channel?which?buffers?events?in?memory

a1.channels.z1.type?=?memory

a1.channels.z1.capacity?=?1000

a1.channels.z1.transactionCapacity?=?100

#?Bind?the?source?and?sink?to?the?channel

a1.sources.x1.channels?=?z1

a1.sinks.y1.channel?=?z1

?

2.2.6啟動Flume?Agent

生產環境下,參數-Dflume.root.logger=INFO,console去掉console,此處只為方便查看測試結果,選擇將日志打印到控制臺。若Flume?agent正常啟動,說明Flume安裝成功。

?

?

cd?/usr/local/apache-flume-1.6.0-bin

./bin/flume-ng?agent?--conf?conf?--conf-file?flume.conf?--name?a3?-Dflume.root.logger=INFO,console

?

2.2.7?測試

上面配置的example.conf文件,實現的功能是監控文件/home/xdf/exec.txt,如果有新數據寫入時,Flume就會采集到新數據并打印在控制臺上。

測試用例:向/home/xdf/exec.txt文件中寫入內容“hello?flume”,查看控制臺是否打印出“hello?flume”。正常輸出如下:

?

echo?'hello?flume'?>>?/home/xdf/exec.txt

2015-06-30?16:01:52,910?(SinkRunner-PollingRunner-DefaultSinkProcessor)?[INFO?-?org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)]?Event:?{?headers:{}?body:?68?65?6C?6C?6F?20?66?6C?75?6D?65?hello?flume?}

?

至此,Flume安裝部署完畢。

3.?數據預處理

1、Flume采集數據都是按行分割的,一行代表一條記錄。如果原始數據不符合要求,需要對數據進行預處理。示例如下:

原始數據格式為:

out:?===?START?OF?INFORMATION?SECTION?===

out:?Vendor:???????????????TOSHIBA

out:?Product:??????????????MBF2300RC

out:?Revision:?????????????0109

out:?User?Capacity:????????300,000,000,000?bytes?[300?GB]

out:?Logical?block?size:???512?bytes

???經過預處理,我們將數據變為一條5個字段的記錄:

TOSHIBA;MBF2300RC;0109;300;512

?

2、如果要將上面數據接入到hive中,我們還需要下面幾個處理:

a.?創建一張hive

create?table?test(Vendor?string,Product?string,Revision?string,User_Capacity?string,block?string);

b.?在Kafka節點上創建一個topic,名字與上面hive表名對應,格式為“hive-數據庫名-表名”。示例如下:

bin/kafka-topics?--create?--zookeeper?localhost:2181/kafka?????--topic?hive-xdf-test??--partitions?1?--replication-factor?1

c.?將第一步得到的記錄數據與topic整合成一條記錄,用“@@”分割。示例如下:

hive-xdf-test?@@TOSHIBA;MBF2300RC;0109;300;512

d.?Flume采集整合后的一條數據,通過topic獲取hive表的元數據,根據元數據對記錄數據進行結構化、序列化處理,然后經過Kafka存入到hive表中。具體操作參考下面具體步驟所示。

4.?Flume采集數據到Kafka

Flume如果要將采集到的數據發送到Kafka,需要指定配置文件(如下:flume_test.conf)的sink類型為KafkaSink,并且指定Kafka?broker?list。配置文件示例如下,紅色標注的為KafkaSink配置項:

vim?flume_test.conf

a3.channels?=?c3

a3.sources?=?r3

a3.sinks?=?k3

?

a3.sources.r3.type?=?exec

a3.sources.r3.channels?=?c3

a3.sources.r3.command?=?tail?-F?/home/xdf/exec.txt

a3.sources.r3.fileHeader?=?false

a3.sources.r3.basenameHeader?=?false

a3.sources.r3.interceptors?=?i3

a3.sources.r3.interceptors.i3.type?=iie.flume.interceptor.CSVInterceptor$Builder

a3.sources.r3.interceptors.i3.separator?=?;

a3.sources.r3.decodeErrorPolicy=IGNORE

?

a3.channels.c3.type?=?memory

a3.channels.c3.capacity?=?10000

a3.channels.c3.transactionCapacity?=?1000

?

a3.sinks.k3.channel?=?c3

#?a3.sinks.k3.type?=?logger

#a3.sinks.k3.batchSize?=?10

a3.sinks.k3.type?=?org.apache.flume.sink.kafka.KafkaSink

a3.sinks.k3.brokerList?=?localhost:9092

?

?

注意:此處有一個攔截器插件的定義,它就是用來做結構化、序列化數據預處理的。此插件由上面配置的Flume第三方jar包中獲得。

a3.sources.r3.interceptors.i3.type?=iie.flume.interceptor.CSVInterceptor$Builder

?

5.?Kafka數據落地

我們提供了一個Camus工具,來定時將Kafka中的數據落地到hive表中。

Camus工具包含以下三個文件:

文件

說明

camus-example-0.1.0-cdh-SNAPSHOT-shaded.jar

程序運行jar

camus.properties

配置文件

camusrun.sh

運行腳本

?

配置文件需要根據實際情況,修改以下兩個參數

kafka.whitelist.topics=hive-xdf-test?????????----數據對應的topic

kafka.brokers=m105:9092,m103:9092????????????----kafka?broker?lists

需要指定多個topic時,用逗號間隔,示例:

Kafka.whitelist.topics=topic1,topic2,topic3

修改完配置文件后,定時運行camusrun.sh腳本,就會將新生成的數據接入到topic所對應的hive表中了。

6.?具體案例

6.1?Smart數據接入

6.1.2?創建hive

最終我們要將smart數據接入到hive表中,所以我們首先要創建一個滿足smart數據結構的hive表。

create?table?smart_data(serial_number?String?,update_time?string,smart_health_status?string?,current_drive_temperature?int,drive_trip_temperature?int,elements_in_grown_defect_list?int,manufactured_time?string?,cycle_count?int????,start_stop_cycles?int????,load_unload_count?int????,load_unload_cycles?int????,blocks_sent_to_initiator?bigint?,blocks_received_from_initiator?bigint?,blocks_read_from_cache?bigint?,num_commands_size_not_larger_than_segment_size?bigint?,num_commands_size_larger_than_segment_size?bigint?,num_hours_powered_up?string??????,num_minutes_next_test?int????,read_corrected_ecc_fast?bigint?,read_corrected_ecc_delayed?bigint?,read_corrected_re?bigint?,read_total_errors_corrected?bigint?,read_correction_algo_invocations?bigint?,read_gigabytes_processed?bigint?,read_total_uncorrected_errors?string?,write_corrected_ecc_fast?bigint?,write_corrected_ecc_delayed?bigint?,write_corrected_re?bigint?,write_total_errors_corrected?bigint?,write_correction_algo_invocations?bigint?,write_gigabytes_processed?bigint?,write_total_uncorrected_errors?string?,verify_corrected_ecc_fast?bigint?,verify_corrected_ecc_delayed?bigint?,verify_corrected_re?bigint?,verify_total_errors_corrected?bigint?,verify_correction_algo_invocations?bigint?,verify_gigabytes_processed?bigint?,verify_total_uncorrected_errors?bigint?,non_medium_error_count?bigint);

6.1.2?創建topic

Flume采集到的數據要生成一條條的event數據傳給kafka消息系統保存,kafka需要事先創建一個topic來生產和消費指定數據。為系統正常運行,我們統一定義topic的名字結構為“hive-數據庫名-表名”。需要在kafka集群節點上創建topic,示例如下:

bin/kafka-topics?--create?--zookeeper?localhost:2181/kafka?????--topic?hive-xdf-smart_data??--partitions?1?

--replication-factor?1

注意:此處的數據庫名、表名,必須為上一步創建的hive表,因為Flume會通過此topic名來獲取hive表的元數據信息,從而生成對應event數據。

6.1.2?配置Flume?agent啟動參數

生成參數文件smart_test.conf如下:

vim?smart_test.conf

a3.channels?=?c3

a3.sources?=?r3

a3.sinks?=?k3

?

a3.sources.r3.type?=?exec

a3.sources.r3.channels?=?c3

a3.sources.r3.command?=?tail?-F?/home/xdf/exec.txt

a3.sources.r3.fileHeader?=?false

a3.sources.r3.basenameHeader?=?false

a3.sources.r3.interceptors?=?i3

a3.sources.r3.interceptors.i3.type?=iie.flume.interceptor.CSVInterceptor$Builder

a3.sources.r3.interceptors.i3.separator?=?;

a3.sources.r3.decodeErrorPolicy=IGNORE

?

a3.channels.c3.type?=?memory

a3.channels.c3.capacity?=?10000

a3.channels.c3.transactionCapacity?=?1000

?

a3.sinks.k3.channel?=?c3

#?a3.sinks.k3.type?=?logger

#a3.sinks.k3.batchSize?=?10

a3.sinks.k3.type?=?org.apache.flume.sink.kafka.KafkaSink

a3.sinks.k3.brokerList?=?localhost:9092

?

注意:

1、此處數據源sources的類型為exec。具體命令為:

a3.sources.r3.command?=?tail?-F?/home/xdf/exec.txt

我們定時在每個節點運行一個腳本生成一條smart數據,將數據寫入/home/xdf/exec.txt文件。

?

flume用上面那個命令一直監控文件/home/xdf/exec.txt,如有新數據寫入,則采集傳輸到kafka里。

?

2、指定了一個自定義的第三方插件,Flume過濾器CSVInterceptor,將CSV格式的數據轉化成結構化,序列化的Event格式。

?

3、SinkKafkaSink,數據會寫到kafka里面,特別注意:這里需要指定對應的brokerList,示例如下:

a3.sinks.k3.brokerList?=?m103:9092,m105:9092

6.1.3?開啟Flume?Agent

執行命令:

cd?/usr/local/apache-flume-1.6.0-bin

./bin/flume-ng?agent?--conf?conf?--conf-file?smart_test.conf?--name?a3?-Dflume.root.logger=INFO

6.1.4?生成Smart數據

在每個數據節點上運行createEvent.py腳本,生成一條結構化好的smart數據。

腳本有兩個參數smart_data.loghive-xdf-smart_data,前者為smart命令輸出的原始信息文件,后者是topic名字,即上一步生成的topic名。

python?createEvent.py?smart_data.log?hive-xdf-smart_data?>?

/home/xdf/exec.txt

?

此腳本會解析smart原始信息,生成一條帶topic字段的結構化smart數據寫入到/home/xdf/exec.txt文件中,數據格式如下:

hive-xdf-smart_data@@EB00PC208HFC;2015-06-23?18:56:09;OK;28;65;0;week?08?of?year?2012;50000;21;200000;69;-1;-1;-1;-1;-1;-1;-1;0;0;0;0;0;0;300744.962;0;0;0;0;0;0;10841.446;0;-1;-1;-1;-1;-1;-1;-1

用符號“@@”將topicsmart數據分開,smart數據每列間用逗號隔開。

6.1.5?測試時查看Kafka數據

查看數據是否成功生成到kafka中,可在kafka節點上,通過下面命令查看:

kafka-console-consumer?--zookeeper?localhost:2181/kafka?--topic?hive-xdf-smart_data?--from-beginning

結果展示:

6.1.6?Kafka數據落地到hive表中

打開camus.properties配置文件,修改以下兩個參數

kafka.whitelist.topics=hive-xdf-smart_data?????----smart數據對應topic

kafka.brokers=m105:9092,m103:9092???????????????----kafka?broker?lists

修改完配置文件后,定時運行camusrun.sh腳本,就會將新生成的smart數據接入到topic所對應的hive表中了。

至此,數據接入流程完畢。

轉載于:https://www.cnblogs.com/xiaodf/p/5027167.html

總結

以上是生活随笔為你收集整理的本地日志数据实时接入到hadoop集群的数据接入方案的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。