Kafka->Flink->Hbase(纯DDL/DML形式)
概述
最近看到有位自稱阿里的工程師在gitbook收費(fèi)4元[12]:
DDL形式實(shí)現(xiàn)kafka->Flink->Hbase
于是自己琢磨了下具體的流程,流程如下:
kafka的主題user_behavior中的內(nèi)容,
通過Flink SQL Client,
傳遞給hbase的表venn
#########################################################################################################
開發(fā)環(huán)境
| 組件 | 版本 |
| Flink(HA) | 1.12 |
| Zookeeper | 3.6.0 |
| Hadoop | 3.1.2 |
| Hbase(HA) | 2.2.4 |
| Kafka(HA) | 2.5.0 |
這里解釋下Hbase的版本號為什么不能再高了(最多2.2.6),因?yàn)楣俜轿臋n[9]以及官方Repository[10]中Flink1.12對Hbase支持的最高版本是2.2.x
注意需要啟動上述表格中的所有集群,
勿忘關(guān)閉防火墻,或者開啟zookeeper/kafka集群需要的端口,否則有的尷尬的。
關(guān)閉防火牆命令:
service firewalld stop
hbase底層是hdfs,所以這里需要hadoop
###########################################################################################################
hbase常用操作
| 命令行 | 作用 |
| hbase shell | 進(jìn)入命令行客戶端 |
| create 'venn','cf' | 新建表格venn,其中cf是列簇 |
| scan 'venn',{LIMIT=>1} | 查看新建的表格中的數(shù)據(jù)內(nèi)容 |
##########################################################################################################
資料調(diào)研(代碼形式)
| 寫入/讀取方式 | 相關(guān)參考文獻(xiàn) |
| 繼承RichSourceFunction(flink streaming) | [1][3][4] |
| 繼承RichSinkFunction重寫父類方法(flink streaming) | [4][5][8][16] |
| 重寫TableInputFormat方法(flink?streaming和flink dataSet) | [1][2][3][4] |
| 重寫TableOutputFormat(flink streaming和flink dataSet) | [1][2][3] |
本文我們先不關(guān)心代碼形式,
先設(shè)法處理DDL形式的kafka->Flink SQL Client->hbase
########################################可能用到的kafka命令##################################
?
可能用到的kafka命令
| 操作 | 命令 | 備注 |
| 查看topic | $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181 | 無 ?
|
| 往user_behavior這個 topic發(fā)送 json 消息 | ? $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic user_behavior | 這里可能碰到[2]中的報(bào)錯,注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴(yán)格保持一致 [2]中的報(bào)錯還可能是某個節(jié)點(diǎn)的kafka掛掉導(dǎo)致的. ? 可能碰到[3] 注意關(guān)閉防火墻 ? ? |
| 使用kafka自帶消費(fèi)端測試下消費(fèi) | $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic user_behavior | 如果kafka自帶消費(fèi)者測試有問題,那么就不用繼續(xù)往下面做了, 此時如果使用Flink SQL Client來消費(fèi)也必然會出現(xiàn)問題 |
| 清除topic中所有數(shù)據(jù)[6](因?yàn)?萬一你輸錯了呢?對吧) | $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic user_behavior | 需要$KAFKA/config/server.properties設(shè)置 delete.topic.enable=true |
kafka消費(fèi)端用自帶的上述命令行先確定能消費(fèi),先確保自己集群沒有問題再往下走。
######################################一些資料調(diào)研############################################
關(guān)于DDL的寫法調(diào)研
對于[14],群里大佬指出寫法太老了.
###############################################最終整個流程完整步驟和DDL/SQL代碼##################???????#######???????#######???????#######???????####
最終整個流程完整步驟和DDL/SQL代碼
本文最終決定以[19]為準(zhǔn),從user_behavior這個topic的名字來看,這個實(shí)驗(yàn)其實(shí)是修改自Jark云邪的實(shí)驗(yàn)[20]
①往kafka的user_behavior傳入數(shù)據(jù)
$KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic user_behavior
數(shù)據(jù)示例如下(傳一條按下回車):
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "894923", "item_id":"3076029", "category_id": "1879194", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "834377", "item_id":"4541270", "category_id": "3738615", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "625915", "item_id":"1162383", "category_id": "570735", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "578814", "item_id":"176722", "category_id": "982926", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "873335", "item_id":"1256540", "category_id": "1451783", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "429984", "item_id":"4625350", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "866796", "item_id":"534083", "category_id": "4203730", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "937166", "item_id":"321683", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
②終端輸入hbase shell
建表參考[21]
hbase(main):014:0> create 'venn','cf'
③啟動Flink SQL Client(注意附帶要啟動Flink集群,本實(shí)驗(yàn)室standalone模式)
| Flink SQL Client中依次輸入 | 輸入后的實(shí)驗(yàn)效果 |
| -- 讀 kafka ,source定義 (具體見下方gitee鏈接) | |
| --流入hbase,sink 定義 (具體見下方gitee鏈接) | ? |
| --提交任務(wù),user_id當(dāng)做row key,其他的當(dāng)做列簇 (具體見下方gitee鏈接) |
上述 DDL可能在粘貼復(fù)制中被網(wǎng)頁污染導(dǎo)致無法運(yùn)行,以下方鏈接為準(zhǔn):
https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK讀寫各種數(shù)據(jù)源/Kafka_Flink_Hbase.txt
提交后,終端輸入hbase shell登錄hbase 客戶端,
這里解釋下為什么上面的DDL是2.2.3,而環(huán)境版本里面是2.2.4,這是因?yàn)檫@篇博文采用的是老版本的DDL寫法,會校對最高小版本號為2.2.3
如果采用新的DDL option寫法(本文沒有采用,DDL option寫法會無視小版本號校對,option寫法是本文附錄中flink開發(fā)者提及的.)
?
查看結(jié)果:
hbase(main):002:0> scan 'venn',{LIMIT=>1}
#######???????#######???????#######???????#######???????#######???????#######???????#############實(shí)驗(yàn)結(jié)果#???????#######???????#######???????#######???????#######???????#######???????##############???????#######???????#######???????#######???????###
#######???????#######???????#######???????#######???????#######???????#######???????#######???????#附錄######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######
附錄-依賴問題
關(guān)于依賴包問題,我首先在[10]發(fā)現(xiàn)了一些hbase相關(guān)的jar
| jar包名字 | 是否更新到 Flink1.12版本 | 作用 |
| flink-hbase_2.12-1.10.2.jar | 否 | 用于gradoop |
| flink-connector-hbase-base_2.12-1.12.0.jar | 是 | 是一個hbase驅(qū)動基礎(chǔ)包 |
| flink-connector-hbase-2.2_2.12-1.12.0.jar??????? | 是 | 猜測是上面一個jar的完整版 |
| flink-sql-connector-hbase-2.2_2.12-1.12.0.jar | 是 | 猜測是為了支持Flink SQL Client的一個依賴包 |
先說結(jié)論,上面最后一個jar必須放入$FLINK_HOME/lib下面.
這里寫一個小筆記,怎么知道上面這些jar是干嘛的呢?
以flink-hbase_2.12-1.10.2.jar為例:
點(diǎn)開https://mvnrepository.com/search?q=flink-hbase中的上圖中的4 usages
會來到新的頁面,如下:
此時我們就知道這個包是被gradoop這個工具包調(diào)用的,和我們即將進(jìn)行的實(shí)驗(yàn)很可能關(guān)系不大
?
最后,我決定在$FLINK_HOME/lib下面放兩個依賴包:
flink-connector-hbase-2.2_2.12-1.12.0.jar
flink-sql-connector-hbase-2.2_2.12-1.12.0.jar
釘釘群里問開發(fā)者的時候時候,雲(yún)邪說上述兩個Jar只保留后者即可,下面是對話
其實(shí)我是看了Flink1.12的2.11的文檔,去嘗試2.12的實(shí)驗(yàn)(所以并非我搞混2.11和2.12)
發(fā)現(xiàn)Flink1.12并不支持Hbase2.2.6和Hbase2.2.4,于是去官方群里提問,
他們解釋說由于我采用了老版本的DDL的寫法,導(dǎo)致flink對小版本號開始校驗(yàn)了.
也就是說從1.11開始,DDL出了一種他們稱為option的寫法,
老版本的寫法會校對小版本號(本文采用了老版本的DDL寫法),
option寫法[9]不會校對小版本號(官方推薦)
文中使用的是老版本的寫法
從這里也可以看出,DDL的寫法發(fā)生了不小的變化,從一開始的支持 Schema以及上述gitee中的老版本寫法以及后續(xù)的option,至少有三個版本的DDL寫法,
???????在閱讀其他資料的時候也需要注意這點(diǎn).
#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######
Reference:
[1]HBase讀寫的幾種方式(三)flink篇
[2]Flink操作HBase
[3]Flink讀寫系列之-讀HBase并寫入HBase
[4]flink與hbase交互
[5]Flink讀取數(shù)據(jù)存入Hbase
[6]HBase讀寫的幾種方式(三)flink篇
[7]HBase2實(shí)戰(zhàn):HBase Flink和Kafka整合
[8]FlinkSQL讀取Hbase數(shù)據(jù)
[9]HBase SQL Connector
[10]https://repo1.maven.org/maven2/org/apache/flink/
[11]Flink SQL 化實(shí)時任務(wù)實(shí)戰(zhàn):讀取Kafka,計(jì)算并寫入 HBase
[12]從 0 到 1:構(gòu)建全 SQL 化 Flink 實(shí)時任務(wù)
[13]hbase創(chuàng)建表_Flink SQL實(shí)戰(zhàn) — HBase的結(jié)合應(yīng)用(余敖的,不完整)
[14]Flink SQL 將kafka數(shù)據(jù)寫入HBase(用的老式DDL寫法, 廣聯(lián)達(dá)的二把手說該寫法已經(jīng)淘汰)
[15]FlinkSQL 數(shù)據(jù)去重,讀寫HBase,Kafka(這個要盡可能復(fù)現(xiàn)一下)
[16]flink實(shí)戰(zhàn)(一) flink-sql關(guān)聯(lián)hbase維度數(shù)據(jù)處理(用的RichSinkFunction)
[17]Flink SQL 實(shí)戰(zhàn):HBase 的結(jié)合應(yīng)用(余敖的,不完整)
[18]Flink sql 基于hbase,mysql的維表實(shí)戰(zhàn) -未完(hbase和kafka做join 然后爛尾了)
[19]Flink 1.10 SQL 寫HBase???????(本文重點(diǎn)復(fù)現(xiàn))
[20]Kafka2.5->Flink1.12->Mysql8(Jark實(shí)驗(yàn)改為DDL形式)
[21]Flink 1.10 SQL 寫HBase
?
?
總結(jié)
以上是生活随笔為你收集整理的Kafka->Flink->Hbase(纯DDL/DML形式)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: leetcode 全解(python版)
- 下一篇: hbase启动后在log中出现canno