日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

flume mysql hdfs_利用Flume将MySQL表数据准实时抽取到HDFS

發布時間:2023/12/10 数据库 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flume mysql hdfs_利用Flume将MySQL表数据准实时抽取到HDFS 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、為什么要用到Flume

在以前搭建HAWQ數據倉庫實驗環境時,我使用Sqoop抽取從MySQL數據庫增量抽取數據到HDFS,然后用HAWQ的外部表進行訪問。這種方式只需要很少量的配置即可完成數據抽取任務,但缺點同樣明顯,那就是實時性。Sqoop使用MapReduce讀寫數據,而MapReduce是為了批處理場景設計的,目標是大吞吐量,并不太關心低延時問題。就像實驗中所做的,每天定時增量抽取數據一次。

Flume是一個海量日志采集、聚合和傳輸的系統,支持在日志系統中定制各類數據發送方,用于收集數據。同時,Flume提供對數據進行簡單處理,并寫到各種數據接受方的能力。Flume以流方式處理數據,可作為代理持續運行。當新的數據可用時,Flume能夠立即獲取數據并輸出至目標,這樣就可以在很大程度上解決實時性問題。

Flume是最初只是一個日志收集器,但隨著flume-ng-sql-source插件的出現,使得Flume從關系數據庫采集數據成為可能。下面簡單介紹Flume,并詳細說明如何配置Flume將MySQL表數據準實時抽取到HDFS。

二、Flume簡介

1. Flume的概念

Flume是分布式的日志收集系統,它將各個服務器中的數據收集起來并送到指定的地方去,比如說送到HDFS,簡單來說flume就是收集日志的,其架構如圖1所示。

圖1

2. Event的概念

在這里有必要先介紹一下Flume中event的相關概念:Flume的核心是把數據從數據源(source)收集過來,在將收集到的數據送到指定的目的地(sink)。為了保證輸送的過程一定成功,在送到目的地(sink)之前,會先緩存數據(channel),待數據真正到達目的地(sink)后,Flume再刪除自己緩存的數據。

在整個數據的傳輸的過程中,流動的是event,即事務保證是在event級別進行的。那么什么是event呢?Event將傳輸的數據進行封裝,是Flume傳輸數據的基本單位,如果是文本文件,通常是一行記錄。Event也是事務的基本單位。Event從source,流向channel,再到sink,本身為一個字節數組,并可攜帶headers(頭信息)信息。Event代表著一個數據的最小完整單元,從外部數據源來,向外部的目的地去。

3. Flume架構介紹

Flume之所以這么神奇,是源于它自身的一個設計,這個設計就是agent。Agent本身是一個Java進程,運行在日志收集節點——所謂日志收集節點就是服務器節點。 Agent里面包含3個核心的組件:source、channel和sink,類似生產者、倉庫、消費者的架構。

Source:source組件是專門用來收集數據的,可以處理各種類型、各種格式的日志數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。

Channel:source組件把數據收集來以后,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數據的——對采集到的數據進行簡單的緩存,可以存放在memory、jdbc、file等等。

Sink:sink組件是用于把數據發送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定義。

4. Flume的運行機制

Flume的核心就是一個agent,這個agent對外有兩個進行交互的地方,一個是接受數據輸入的source,一個是數據輸出的sink,sink負責將數據發送到外部指定的目的地。source接收到數據之后,將數據發送給channel,chanel作為一個數據緩沖區會臨時存放這些數據,隨后sink會將channel中的數據發送到指定的地方,例如HDFS等。注意:只有在sink將channel中的數據成功發送出去之后,channel才會將臨時數據進行刪除,這種機制保證了數據傳輸的可靠性與安全性。

三、安裝Hadoop和Flume

我的實驗在HDP 2.5.0上進行,HDP安裝中包含Flume,只要配置Flume服務即可。HDP的安裝步驟參見“HAWQ技術解析(二) —— 安裝部署”

四、配置與測試

1. 建立MySQL數據庫表

建立測試表并添加數據。

use?test;

create?table??wlslog

(id?????????int?not?null,

time_stamp?varchar(40),

category???varchar(40),

type???????varchar(40),

servername?varchar(40),

code???????varchar(40),

msg????????varchar(40),

primary?key?(?id?)

);

insert?into?wlslog(id,time_stamp,category,type,servername,code,msg)?values(1,'apr-8-2014-7:06:16-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server?state?changed?to?standby');

insert?into?wlslog(id,time_stamp,category,type,servername,code,msg)?values(2,'apr-8-2014-7:06:17-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server?state?changed?to?starting');

insert?into?wlslog(id,time_stamp,category,type,servername,code,msg)?values(3,'apr-8-2014-7:06:18-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server?state?changed?to?admin');

insert?into?wlslog(id,time_stamp,category,type,servername,code,msg)?values(4,'apr-8-2014-7:06:19-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server?state?changed?to?resuming');

insert?into?wlslog(id,time_stamp,category,type,servername,code,msg)?values(5,'apr-8-2014-7:06:20-pm-pdt','notice','weblogicserver','adminserver','bea-000361','started?weblogic?adminserver');

insert?into?wlslog(id,time_stamp,category,type,servername,code,msg)?values(6,'apr-8-2014-7:06:21-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server?state?changed?to?running');

insert?into?wlslog(id,time_stamp,category,type,servername,code,msg)?values(7,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server?started?in?running?mode');

commit;

2. 建立相關目錄與文件

(1)創建本地狀態文件

mkdir?-p?/var/lib/flume

cd?/var/lib/flume

touch?sql-source.status

chmod?-R?777?/var/lib/flume

(2)建立HDFS目標目錄

hdfs?dfs?-mkdir?-p?/flume/mysql

hdfs?dfs?-chmod?-R?777?/flume/mysql

3. 準備JAR包

從http://book2s.com/java/jar/f/flume-ng-sql-source/download-flume-ng-sql-source-1.3.7.html下載flume-ng-sql-source-1.3.7.jar文件,并復制到Flume庫目錄。

cp?flume-ng-sql-source-1.3.7.jar?/usr/hdp/current/flume-server/lib/

將MySQL JDBC驅動JAR包也復制到Flume庫目錄。

cp?mysql-connector-java-5.1.17.jar?/usr/hdp/current/flume-server/lib/mysql-connector-java.jar

4. 建立HAWQ外部表

create?external?table?ext_wlslog

(id?????????int,

time_stamp?varchar(40),

category???varchar(40),

type???????varchar(40),

servername?varchar(40),

code???????varchar(40),

msg????????varchar(40)

)?location?('pxf://mycluster/flume/mysql?profile=hdfstextmulti')?format?'csv'?(quote=e'"');

5. 配置Flume

在Ambari -> Flume -> Configs -> flume.conf中配置如下屬性:

agent.channels.ch1.type?=?memory

agent.sources.sql-source.channels?=?ch1

agent.channels?=?ch1

agent.sinks?=?HDFS

agent.sources?=?sql-source

agent.sources.sql-source.type?=?org.keedio.flume.source.SQLSource

agent.sources.sql-source.connection.url?=?jdbc:mysql://172.16.1.127:3306/test

agent.sources.sql-source.user?=?root

agent.sources.sql-source.password?=?123456

agent.sources.sql-source.table?=?wlslog

agent.sources.sql-source.columns.to.select?=?*

agent.sources.sql-source.incremental.column.name?=?id

agent.sources.sql-source.incremental.value?=?0

agent.sources.sql-source.run.query.delay=5000

agent.sources.sql-source.status.file.path?=?/var/lib/flume

agent.sources.sql-source.status.file.name?=?sql-source.status

agent.sinks.HDFS.channel?=?ch1

agent.sinks.HDFS.type?=?hdfs

agent.sinks.HDFS.hdfs.path?=?hdfs://mycluster/flume/mysql

agent.sinks.HDFS.hdfs.fileType?=?DataStream

agent.sinks.HDFS.hdfs.writeFormat?=?Text

agent.sinks.HDFS.hdfs.rollSize?=?268435456

agent.sinks.HDFS.hdfs.rollInterval?=?0

agent.sinks.HDFS.hdfs.rollCount?=?0

Flume在flume.conf文件中指定Source、Channel和Sink相關的配置,各屬性描述如表1所示。

屬性

描述

agent.channels.ch1.type

Agent的channel類型

agent.sources.sql-source.channels

Source對應的channel名稱

agent.channels

Channel名稱

agent.sinks

Sink名稱

agent.sources

Source名稱

agent.sources.sql-source.type

Source類型

agent.sources.sql-source.connection.url

數據庫URL

agent.sources.sql-source.user

數據庫用戶名

agent.sources.sql-source.password

數據庫密碼

agent.sources.sql-source.table

數據庫表名

agent.sources.sql-source.columns.to.select

查詢的列

agent.sources.sql-source.incremental.column.name

增量列名

agent.sources.sql-source.incremental.value

增量初始值

agent.sources.sql-source.run.query.delay

發起查詢的時間間隔,單位是毫秒

agent.sources.sql-source.status.file.path

狀態文件路徑

agent.sources.sql-source.status.file.name

狀態文件名稱

agent.sinks.HDFS.channel

Sink對應的channel名稱

agent.sinks.HDFS.type

Sink類型

agent.sinks.HDFS.hdfs.path

Sink路徑

agent.sinks.HDFS.hdfs.fileType

流數據的文件類型

agent.sinks.HDFS.hdfs.writeFormat

數據寫入格式

agent.sinks.HDFS.hdfs.rollSize

目標文件輪轉大小,單位是字節

agent.sinks.HDFS.hdfs.rollInterval

hdfs sink間隔多長將臨時文件滾動成最終目標文件,單位是秒;如果設置成0,則表示不根據時間來滾動文件

agent.sinks.HDFS.hdfs.rollCount

當events數據達到該數量時候,將臨時文件滾動成目標文件;如果設置成0,則表示不根據events數據來滾動文件

表1

6. 運行Flume代理

保存上一步的設置,然后重啟Flume服務,如圖2所示。

圖2

重啟后,狀態文件已經記錄了將最新的id值7,如圖3所示。

圖3

查看目標路徑,生成了一個臨時文件,其中有7條記錄,如圖4所示。

圖4

查詢HAWQ外部表,結果也有全部7條數據,如圖5所示。

圖5

至此,初始數據抽取已經完成。

7. 測試準實時增量抽取

在源表中新增id為8、9、10的三條記錄。

use?test;

insert?into?wlslog(id,time_stamp,category,type,servername,code,msg)?values(8,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server?started?in?running?mode');

insert?into?wlslog(id,time_stamp,category,type,servername,code,msg)?values(9,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server?started?in?running?mode');

insert?into?wlslog(id,time_stamp,category,type,servername,code,msg)?values(10,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server?started?in?running?mode');

commit;

5秒之后查詢HAWQ外部表,從圖6可以看到,已經查詢出全部10條數據,準實時增量抽取成功。

圖6

五、方案優缺點

利用Flume采集關系數據庫表數據最大的優點是配置簡單,不用編程。相比tungsten-replicator的復雜性,Flume只要在flume.conf文件中配置source、channel及sink的相關屬性,已經沒什么難度了。而與現在很火的canal比較,雖然不夠靈活,但畢竟一行代碼也不用寫。再有該方案采用普通SQL輪詢的方式實現,具有通用性,適用于所有關系庫數據源。

這種方案的缺點與其優點一樣突出,主要體現在以下幾方面。

在源庫上執行了查詢,具有入侵性。

通過輪詢的方式實現增量,只能做到準實時,而且輪詢間隔越短,對源庫的影響越大。

只能識別新增數據,檢測不到刪除與更新。

要求源庫必須有用于表示增量的字段。

即便有諸多局限,但用Flume抽取關系庫數據的方案還是有一定的價值,特別是在要求快速部署、簡化編程,又能滿足需求的應用場景,對傳統的Sqoop方式也不失為一種有效的補充。

參考:

Flume架構以及應用介紹

Streaming MySQL Database Table Data to HDFS with Flume

how to read data from oracle using FLUME to kafka broker

https://github.com/keedio/flume-ng-sql-source

總結

以上是生活随笔為你收集整理的flume mysql hdfs_利用Flume将MySQL表数据准实时抽取到HDFS的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 四季av一区二区夜夜嗨 | 久久www视频 | 91成品视频 | 天天做夜夜爱 | 香蕉传媒 | 亚洲综合日韩 | 亚州av网 | 熟妇一区二区三区 | 苍井空亚洲精品aa片在线播放 | 中文字幕久久精品 | 久操综合 | 黄一区二区三区 | 日韩专区视频 | 国产成人av一区二区三区在线观看 | 日韩一区二区影院 | 亚洲另类欧美日韩 | 久久综合av | 九热视频在线观看 | 久久久精品视频免费 | 亚洲精品一区二区三区精华液 | 神马午夜场 | 国产一区二区三区免费观看 | 欧洲一级片 | 乱老熟女一区二区三区 | 人妻精品无码一区二区 | 国产免费视频 | 尤物网站在线播放 | 91传媒视频在线观看 | 亚洲天堂va| 在线免费日韩av | 91秘密入口 | 久久免费精彩视频 | 天堂网资源 | 国产精品野外户外 | 男女做网站 | 91香蕉国产在线观看软件 | 美女一区二区三区视频 | 日韩怡红院| 看成人片 | 日韩经典一区二区三区 | 91精品国产综合久久久久久久 | 久久精品亚洲一区 | 男女插插插视频 | 欧美七区 | 成年人福利网站 | 一区二区三区视频在线观看免费 | 久草色视频| 性猛交娇小69hd| 成人午夜在线播放 | 免费在线看黄网站 | 国产伦精品一区二区三区视频孕妇 | 日韩成人高清视频在线观看 | 国产aⅴ爽av久久久久成人 | 男生脱女生衣服 | 久草这里只有精品 | 综合久色 | 亚洲一区二区三区人妻 | 中文字幕人妻伦伦 | 精品在线观看视频 | 午夜国产小视频 | 麻豆视频免费入口 | 国产男女无遮挡猛进猛出 | 大陆农村乡下av | 国产亚洲精品久久久久久777 | 91精品国自产在线观看 | 亚洲日本在线观看视频 | 天堂资源在线 | 欧美大色 | 国产黄色片视频 | 狠狠躁天天躁夜夜躁婷婷 | 国产女人和拘做受视频免费 | 欧美福利视频导航 | 人人干人 | 超污视频软件 | av一区二区三区四区 | 麻豆高清免费国产一区 | 日本一区二区精品 | jizz中国女人高潮 | 日本黄视频在线观看 | 国产不卡在线播放 | 一区二区三区精品 | 啪啪自拍视频 | 国产在线拍揄自揄拍无码 | 日韩不卡高清视频 | 国产人妖ts重口系列网站观看 | 天天射夜夜骑 | caobi视频 | 国产馆视频| 韩国三级hd中文字幕的背景音乐 | 一区在线观看 | 日韩欧美电影一区二区三区 | 国产精品视频免费在线观看 | 亚洲欧美日韩精品色xxx | 久久国产综合 | 亚洲成人精品久久 | 国产精品8888 | 精品产国自在拍 | 夜间福利在线观看 | 黄色网页入口 |