kafka->Flink->ElasticSearch(Java形式)
概述
本文主要是對[1]中內容的復現
環境
| 開源組件 | 版本 |
| KAFKA | 2.5.0 |
| Flink | 1.6.0 |
| Zookeeper | 3.6.0 |
| ElasticSearch | 7.10.1 |
| KIBANA | 7.10.1 |
因為ElasticSearch7的寫法似乎不兼容 ES6的,所以代碼中依然是ES6的寫法。
代碼中依然是ElasticSearch6的驅動
這個實驗注意,不要追求太新的版本,最新版本Flink1.12的kafka驅動依賴包都還沒有開發出來。
流程圖
? lateLog用來保存側邊流輸出的遲到的數據
ElasticSearch準備工作
| KIBANA操作 | 講人話 | 具體命令 |
| 刪除原有的index索引 | 刪除原有的數據庫 | curl -XDELETE 'Desktop:9201/auditindex' |
| 新建index | 新建數據庫 | curl -XPUT 'http://Desktop:9201/auditindex?pretty' |
| 創建type的mapping信息 | 新建表格的字段信息 | curl -H "Content-Type: application/json" -XPOST 'http://Desktop:9201/auditindex/audittype/_mapping?include_type_name=true' -d ' { ?"audittype":{ ? ? "properties":{ ? ? ? ? "area":{"type":"keyword"}, ? ? ? ? "type":{"type":"keyword"}, ? ? ? ? "count":{"type":"long"}, ? ? ? ? "time":{"type":"date","format": "yyyy-MM-dd HH:mm:ss"} ? ? ? ? } ? ? ?} } ' |
上述命令尤其是最后一個,不要直接粘貼到terminal中運行,而要寫入一個bash腳本中再運行
代碼與運行流程
https://gitee.com/appleyuchi/Flink_Code/tree/master/flink清洗數據案例/FlinkProj
ElasticSearch查看接收到的數據
http://desktop:9201/auditindex/_search?pretty=true
打開后可以看到一個大大的JSON
KIBANA設置時區
進入http://desktop:5601/app/management/kibana/settings
然后下面的dateFormat設置為
Etc/UTC
然后點擊上圖右下角的Save Changes
?
KIBANA可視化
可視化效果如下:
注意
[1]中的設置是area,這里復現的用的是_id
因為KIBANA操作的時候沒有找到area,只有area.keyword
版本差異,暫時無法解決。
本實驗相關的JPS進程
106851 TaskManagerRunner
85543 NailgunRunner
84330 ZooKeeperMain
81133 NameNode
87055 Kafka
106575 StandaloneSessionClusterEntrypoint
82193 NodeManager
81617 SecondaryNameNode
38320 RemoteMavenServer
81968 ResourceManager
36945 Main
83639 Elasticsearch
81078 QuorumPeerMain
89016 Launcher
89019 DataReport
126490 Jps
81341 DataNode
87391 kafkaProducerDataReport
?
Reference:
[1]【20】Flink 實戰案例開發(二):數據報表
[2]kibana7.10.1基本操作(餅圖+直方圖)
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的kafka->Flink->ElasticSearch(Java形式)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 剑三汗血宝马是什么 《剑网3》官网
- 下一篇: Java程序优化之享元模式