centos7安装flink集群_《从0到1学习Flink》—— Flink 写入数据到 Kafka
前言
之前文章 《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 ElasticSearch 寫了如何將 Kafka 中的數(shù)據(jù)存儲到 ElasticSearch 中,里面其實就已經(jīng)用到了 Flink 自帶的 Kafka source connector(FlinkKafkaConsumer)。存入到 ES 只是其中一種情況,那么如果我們有多個地方需要這份通過 Flink 轉(zhuǎn)換后的數(shù)據(jù),是不是又要我們繼續(xù)寫個 sink 的插件呢?確實,所以 Flink 里面就默認(rèn)支持了不少 sink,比如也支持 Kafka sink connector(FlinkKafkaProducer),那么這篇文章我們就講講如何將數(shù)據(jù)寫入到 Kafka。
準(zhǔn)備
添加依賴
Flink 里面支持 Kafka 0.8、0.9、0.10、0.11 ,以后有時間可以分析下源碼的實現(xiàn)。
這里我們需要安裝下 Kafka,請對應(yīng)添加對應(yīng)的 Flink Kafka connector 依賴的版本,這里我們使用的是 0.11 版本:
<dependency>Kafka 安裝
這里就不寫這塊內(nèi)容了,可以參考我以前的文章 Kafka 安裝及快速入門。
這里我們演示把其他 Kafka 集群中 topic 數(shù)據(jù)原樣寫入到自己本地起的 Kafka 中去。
配置文件
kafka.brokers=xxx:9092,xxx:9092,xxx:9092 kafka.group.id=metrics-group-test kafka.zookeeper.connect=xxx:2181 metrics.topic=xxx stream.parallelism=5 kafka.sink.brokers=localhost:9092 kafka.sink.topic=metric-test stream.checkpoint.interval=1000 stream.checkpoint.enable=false stream.sink.parallelism=5目前我們先看下本地 Kafka 是否有這個 metric-test topic 呢?需要執(zhí)行下這個命令:
bin/kafka-topics.sh --list --zookeeper localhost:2181可以看到本地的 Kafka 是沒有任何 topic 的,如果等下我們的程序運(yùn)行起來后,再次執(zhí)行這個命令出現(xiàn) metric-test topic,那么證明我的程序確實起作用了,已經(jīng)將其他集群的 Kafka 數(shù)據(jù)寫入到本地 Kafka 了。
程序代碼
Main.java
public運(yùn)行結(jié)果
啟動程序,查看運(yùn)行結(jié)果,不段執(zhí)行上面命令,查看是否有新的 topic 出來:
執(zhí)行命令可以查看該 topic 的信息:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic metric-test分析
上面代碼我們使用 Flink Kafka Producer 只傳了三個參數(shù):brokerList、topicId、serializationSchema(序列化)
其實也可以傳入多個參數(shù)進(jìn)去,現(xiàn)在有的參數(shù)用的是默認(rèn)參數(shù),因為這個內(nèi)容比較多,后面可以抽出一篇文章單獨來講。
總結(jié)
本篇文章寫了 Flink 讀取其他 Kafka 集群的數(shù)據(jù),然后寫入到本地的 Kafka 上。我在 Flink 這層沒做什么數(shù)據(jù)轉(zhuǎn)換,只是原樣的將數(shù)據(jù)轉(zhuǎn)發(fā)了下,如果你們有什么其他的需求,是可以在 Flink 這層將數(shù)據(jù)進(jìn)行各種轉(zhuǎn)換操作,比如這篇文章中的一些轉(zhuǎn)換:《從0到1學(xué)習(xí)Flink》—— Flink Data transformation(轉(zhuǎn)換),然后將轉(zhuǎn)換后的數(shù)據(jù)發(fā)到 Kafka 上去。
本文原創(chuàng)地址是: http://www.54tianzhisheng.cn/2019/01/06/Flink-Kafka-sink/ , 未經(jīng)允許禁止轉(zhuǎn)載。
關(guān)注我
微信公眾號:zhisheng
另外我自己整理了些 Flink 的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然后回復(fù)關(guān)鍵字:Flink 即可無條件獲取到。
Github 代碼倉庫
https://github.com/zhisheng17/flink-learning/
以后這個項目的所有代碼都將放在這個倉庫里,包含了自己學(xué)習(xí) flink 的一些 demo 和博客
相關(guān)文章
1、《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹
2、《從0到1學(xué)習(xí)Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運(yùn)行簡單程序入門
3、《從0到1學(xué)習(xí)Flink》—— Flink 配置文件詳解
4、《從0到1學(xué)習(xí)Flink》—— Data Source 介紹
5、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Source ?
6、《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹
7、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Sink ?
8、《從0到1學(xué)習(xí)Flink》—— Flink Data transformation(轉(zhuǎn)換)
9、《從0到1學(xué)習(xí)Flink》—— 介紹Flink中的Stream Windows
10、《從0到1學(xué)習(xí)Flink》—— Flink 中的幾種 Time 詳解
11、《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 ElasticSearch
12、《從0到1學(xué)習(xí)Flink》—— Flink 項目如何運(yùn)行?
13、《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 Kafka
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的centos7安装flink集群_《从0到1学习Flink》—— Flink 写入数据到 Kafka的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 交通银行信用卡好享贷
- 下一篇: 正则表达式来判断Sql语句中Select