Flink数据清洗(Kafka事实表+Redis维度表)
調(diào)研
從網(wǎng)上的調(diào)研來看,其實(shí)整個(gè)百度有清洗流程的只有[1]其他都是抄的[1]中的內(nèi)容。
?
實(shí)驗(yàn)流程
這個(gè)流程的話,不要去研究redis的Flink SQL Client的操作方法,因?yàn)樵趍vn repository中
沒有看到flink-sql-connector-redis之類 的jar
所以該流程適可而止吧
####################################################################
Redis數(shù)據(jù)準(zhǔn)備
127.0.0.1:6379> hset areas AREA_US US
127.0.0.1:6379> hset areas AREA_CT TW,HK
127.0.0.1:6379> hset areas AREA_IN IN
127.0.0.1:6379> hset areas AREA_AR PK,SA,KW
127.0.0.1:6379> hset areas AREA_IN IN
127.0.0.1:6379> hgetall areas
1) "AREA_US"
2) "US"
3) "AREA_CT"
4) "TW,HK"
5) "AREA_IN"
6) "IN"
7) "AREA_AR"
8) "PK,SA,KW"
?
本實(shí)驗(yàn)的redis對(duì)象是沒有密碼的,如果事先設(shè)置了密碼,可以根據(jù)[14]去除
?
Redis代碼中的注意事項(xiàng)
代碼中有這么一句話:
this.jedis = new Jedis("127.0.0.1", 6379);
注意,這里的127.0.0.1如果改成redis所在節(jié)點(diǎn)的域名的話,必須是該redis支持外網(wǎng)訪問,否則此處不要修改,會(huì)導(dǎo)致數(shù)據(jù)讀取失敗
####################################################################
本實(shí)驗(yàn)注意事項(xiàng)
①redis相關(guān)的jar依賴其實(shí)目前官方?jīng)]有在維護(hù)了.所以不要做太深入的鉆研
②需要導(dǎo)入flink-shaded-hadoop-3-uber-3.1.1.7.0.3.0-79-7.0.jar
Project Structure->Global Libraries中間一列導(dǎo)入上述的jar
否則會(huì)報(bào)錯(cuò)找不到hdfs這個(gè)file system
####################################################################
數(shù)據(jù)清洗目標(biāo)
kafka(存放事實(shí)表)中數(shù)據(jù)示范:
{"dt":"2021-01-11 12:30:32","countryCode":"PK","data":[{"type":"s3","score":0.8,"level":"C"},{"type":"s5","score":0.1,"level":"C"}]}
格式化后如下:
{"dt": "2021-01-11 12:30:32","countryCode": "PK","data": [{"type": "s3","score": 0.8,"level": "C"},{"type": "s5","score": 0.1,"level": "C"}] }這樣的一條數(shù)據(jù),根據(jù)countryCode轉(zhuǎn)化為redis(存放維度表)中的具體地區(qū)AREA_AR
后面list中的數(shù)據(jù)打散,最終想要的效果如下:
{"area":"AREA_AR","dt":"2021-01-11 12:30:32","score":0.8,"level":"C","type":"s3"}
{"area":"AREA_AR","dt":"2021-01-11 12:30:32","score":0.1,"level":"C","type":"s5"}
也就是想要根據(jù)上述要求,把一條數(shù)據(jù)轉(zhuǎn)化為兩條數(shù)據(jù)
####################################################################
完整實(shí)驗(yàn)操作與代碼
https://gitee.com/appleyuchi/Flink_Code/tree/master/flink清洗數(shù)據(jù)案例/FlinkProj
####################################################################
可能涉及到的Kafka操作
| 操作 | 命令 | 備注 |
| 查看topic | $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181 | 無 ?
|
| 往allData這個(gè) topic發(fā)送 json 消息 | $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic allData | 這里可能碰到[2]中的報(bào)錯(cuò),注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴(yán)格保持一致 [2]中的報(bào)錯(cuò)還可能是某個(gè)節(jié)點(diǎn)的kafka掛掉導(dǎo)致的. ? 可能碰到[3] 注意關(guān)閉防火墻 ? ? |
| 使用kafka自帶消費(fèi)端測(cè)試下消費(fèi) | $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic allData | 如果kafka自帶消費(fèi)者測(cè)試有問題,那么就不用繼續(xù)往下面做了, 此時(shí)如果使用Flink SQL Client來消費(fèi)也必然會(huì)出現(xiàn)問題 |
| 清除topic中所有數(shù)據(jù)[13] | $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic allData | 需要$KAFKA/config/server.properties設(shè)置 delete.topic.enable=true |
?
Reference:
[1]【19】Flink 實(shí)戰(zhàn)案例開發(fā)(一):數(shù)據(jù)清洗(完整代碼+數(shù)據(jù),依賴有問題)
[2]Flink清洗Kafka數(shù)據(jù)存入MySQL測(cè)試(數(shù)據(jù)好像不太完整)
[3]Flink案例開發(fā)之?dāng)?shù)據(jù)清洗、數(shù)據(jù)報(bào)表展現(xiàn)(與[1]內(nèi)容重復(fù))
[4]Flink繼續(xù)實(shí)踐:從日志清洗到實(shí)時(shí)統(tǒng)計(jì)內(nèi)容PV等多個(gè)指標(biāo)(代碼不完整)
[5]Flink清洗日志服務(wù)SLS的數(shù)據(jù)并求ACU&PCU(工程文件不完整)
下面的最后考慮(博主說是完整的.下面的實(shí)驗(yàn)的原始出處其實(shí)是[1])
[6]Flink入門及實(shí)戰(zhàn)(20)- 數(shù)據(jù)清洗實(shí)時(shí)ETL(1)
[7]Flink入門及實(shí)戰(zhàn)(21)- 數(shù)據(jù)清洗實(shí)時(shí)ETL(2)
[8]Flink入門及實(shí)戰(zhàn)(22)- 數(shù)據(jù)清洗實(shí)時(shí)ETL(3)[10]Flink 清理過期 Checkpoint 目錄的正確姿勢(shì)
[11]Flink學(xué)習(xí)(二):實(shí)驗(yàn)一數(shù)據(jù)清洗(代碼不完整,涉及到了elasticsearch)
[12]網(wǎng)站日志實(shí)時(shí)分析之Flink處理實(shí)時(shí)熱門和PVUV統(tǒng)計(jì)(缺數(shù)據(jù))
[13]Is there a way to delete all the data from a topic or delete the topic before every run?
[14]redis設(shè)置密碼
?
總結(jié)
以上是生活随笔為你收集整理的Flink数据清洗(Kafka事实表+Redis维度表)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 五氯酚酸钠的用途
- 下一篇: Mysql查看和修改时区