日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Flink(十五):流批一体API Connectors ​​​​​​​Kafka

發布時間:2023/11/28 生活经验 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Flink(十五):流批一体API Connectors ​​​​​​​Kafka 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

Kafka

pom依賴

參數設置

參數說明

Kafka命令

代碼實現-Kafka Consumer

代碼實現-Kafka Producer

代碼實現-實時ETL


Kafka

pom依賴

Flink 里已經提供了一些綁定的 Connector,例如 kafka source 和 sink,Es sink 等。讀寫 kafka、es、rabbitMQ 時可以直接使用相應 connector 的 api 即可,雖然該部分是 Flink 項目源代碼里的一部分,但是真正意義上不算作 Flink 引擎相關邏輯,并且該部分沒有打包在二進制的發布包里面。所以在提交 Job 時候需要注意, job 代碼 jar 包中一定要將相應的 connetor 相關類打包進去,否則在提交作業時就會失敗,提示找不到相應的類,或初始化某些類異常。

//ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

參數設置

以下參數都必須/建議設置上

1.訂閱的主題

2.反序列化規則

3.消費者屬性-集群地址

4.消費者屬性-消費者組id(如果不設置,會有默認的,但是默認的不方便管理)

5.消費者屬性-offset重置規則,如earliest/latest...

6.動態分區檢測(當kafka的分區數變化/增加時,Flink能夠檢測到!)

7.如果沒有設置Checkpoint,那么可以設置自動提交offset,后續學習了Checkpoint會把offset隨著做Checkpoint的時候提交到Checkpoint和默認主題中

???????參數說明

實際的生產環境中可能有這樣一些需求,比如:

l場景一:有一個 Flink 作業需要將五份數據聚合到一起,五份數據對應五個 kafka topic,隨著業務增長,新增一類數據,同時新增了一個 kafka topic,如何在不重啟作業的情況下作業自動感知新的 topic。

l場景二:作業從一個固定的 kafka topic 讀數據,開始該 topic 有 10 個 partition,但隨著業務的增長數據量變大,需要對 kafka partition 個數進行擴容,由 10 個擴容到 20。該情況下如何在不重啟作業情況下動態感知新擴容的 partition?

針對上面的兩種場景,首先需要在構建 FlinkKafkaConsumer 時的 properties 中設置 flink.partition-discovery.interval-millis 參數為非負值,表示開啟動態發現的開關,以及設置的時間間隔。此時 FlinkKafkaConsumer 內部會啟動一個單獨的線程定期去 kafka 獲取最新的 meta 信息。

l針對場景一,還需在構建 FlinkKafkaConsumer 時,topic 的描述可以傳一個正則表達式描述的 pattern。每次獲取最新 kafka meta 時獲取正則匹配的最新 topic 列表。

l針對場景二,設置前面的動態發現參數,在定期獲取 kafka 最新 meta 信息時會匹配新的 partition。為了保證數據的正確性,新發現的 partition 從最早的位置開始讀取。

注意:

開啟 checkpoint 時 offset 是 Flink 通過狀態 state 管理和恢復的,并不是從 kafka 的 offset 位置恢復。在 checkpoint 機制下,作業從最近一次checkpoint 恢復,本身是會回放部分歷史數據,導致部分數據重復消費,Flink 引擎僅保證計算狀態的精準一次,要想做到端到端精準一次需要依賴一些冪等的存儲系統或者事務操作。

???????Kafka命令

??● 查看當前服務器中的所有topic

/export/server/kafka/bin/kafka-topics.sh --list --zookeeper ?node1:2181

??● 創建topic

/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2?--partitions?3?--topic flink_kafka

??● 查看某個Topic的詳情

/export/server/kafka/bin/kafka-topics.sh --topic flink_kafka --describe --zookeeper node1:2181

??● 刪除topic

/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic flink_kafka

??● 通過shell命令發送消息

/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka

??● 通過shell消費消息

/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka?--from-beginning

??● 修改分區

?/export/server/kafka/bin/kafka-topics.sh --alter --partitions 4?--topic flink_kafka?--zookeeper node1:2181

???????代碼實現-Kafka Consumer

package cn.it.connectors;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;import java.util.Properties;/*** Author lanson* Desc* 需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消費Kafka中的數據做WordCount* 需要設置如下參數:* 1.訂閱的主題* 2.反序列化規則* 3.消費者屬性-集群地址* 4.消費者屬性-消費者組id(如果不設置,會有默認的,但是默認的不方便管理)* 5.消費者屬性-offset重置規則,如earliest/latest...* 6.動態分區檢測(當kafka的分區數變化/增加時,Flink能夠檢測到!)* 7.如果沒有設置Checkpoint,那么可以設置自動提交offset,后續學習了Checkpoint會把offset隨著做Checkpoint的時候提交到Checkpoint和默認主題中*/
public class ConnectorsDemo_KafkaConsumer {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.SourceProperties props ?= new Properties();props.setProperty("bootstrap.servers", "node1:9092");props.setProperty("group.id", "flink");props.setProperty("auto.offset.reset","latest");props.setProperty("flink.partition-discovery.interval-millis","5000");//會開啟一個后臺線程每隔5s檢測一下Kafka的分區情況props.setProperty("enable.auto.commit", "true");props.setProperty("auto.commit.interval.ms", "2000");//kafkaSource就是KafkaConsumerFlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props);kafkaSource.setStartFromGroupOffsets();//設置從記錄的offset開始消費,如果沒有記錄從auto.offset.reset配置開始消費//kafkaSource.setStartFromEarliest();//設置直接從Earliest消費,和auto.offset.reset配置無關DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);//3.Transformation//3.1切割并記為1SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}});//3.2分組KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);//3.3聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);//4.Sinkresult.print();//5.executeenv.execute();}
}

???????代碼實現-Kafka Producer

  • 需求:

將Flink集合中的數據通過自定義Sink保存到Kafka

  • 代碼實現
package cn.it.connectors;import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;/*** Author lanson* Desc* 使用自定義sink-官方提供的flink-connector-kafka_2.12-將數據保存到Kafka*/
public class ConnectorsDemo_KafkaProducer {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.SourceDataStreamSource<Student> studentDS = env.fromElements(new Student(1, "tonyma", 18));//3.Transformation//注意:目前來說我們使用Kafka使用的序列化和反序列化都是直接使用最簡單的字符串,所以先將Student轉為字符串//可以直接調用Student的toString,也可以轉為JSONSingleOutputStreamOperator<String> jsonDS = studentDS.map(new MapFunction<Student, String>() {@Overridepublic String map(Student value) throws Exception {//String str = value.toString();String jsonStr = JSON.toJSONString(value);return jsonStr;}});//4.SinkjsonDS.print();//根據參數創建KafkaProducer/KafkaSinkProperties props = new Properties();props.setProperty("bootstrap.servers", "node1:9092");FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", ?new SimpleStringSchema(), ?props);jsonDS.addSink(kafkaSink);//5.executeenv.execute();// /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Student {private Integer id;private String name;private Integer age;}
}

???????代碼實現-實時ETL

package cn.it.connectors;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;/*** Author lanson* Desc 演示Flink-Connectors-KafkaComsumer/Source + KafkaProducer/Sink*/
public class KafkaETLDemo {public static void main(String[] args) throws Exception {//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 1.source//準備kafka連接參數Properties props ?= new Properties();props.setProperty("bootstrap.servers", "node1:9092");//集群地址props.setProperty("group.id", "flink");//消費者組idprops.setProperty("auto.offset.reset","latest");//latest有offset記錄從記錄位置開始消費,沒有記錄從最新的/最后的消息開始消費?/earliest有offset記錄從記錄位置開始消費,沒有記錄從最早的/最開始的消息開始消費props.setProperty("flink.partition-discovery.interval-millis","5000");//會開啟一個后臺線程每隔5s檢測一下Kafka的分區情況,實現動態分區檢測props.setProperty("enable.auto.commit", "true");//自動提交(提交到默認主題,后續學習了Checkpoint后隨著Checkpoint存儲在Checkpoint和默認主題中)props.setProperty("auto.commit.interval.ms", "2000");//自動提交的時間間隔//使用連接參數創建FlinkKafkaConsumer/kafkaSourceFlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("flink_kafka", new SimpleStringSchema(), props);//使用kafkaSourceDataStream<String> kafkaDS = env.addSource(kafkaSource);//TODO 2.transformationSingleOutputStreamOperator<String> etlDS = kafkaDS.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return value.contains("success");}});//TODO 3.sinketlDS.print();Properties props2 = new Properties();props2.setProperty("bootstrap.servers", "node1:9092");FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka2", new SimpleStringSchema(), props2);etlDS.addSink(kafkaSink);//TODO 4.executeenv.execute();}
}
//控制臺生成者?---> flink_kafka主題?--> Flink -->etl ---> flink_kafka2主題--->控制臺消費者
//準備主題?/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka
//準備主題?/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka2
//啟動控制臺生產者發送數據?/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka
//log:2020-10-10 success xxx
//log:2020-10-10 success xxx
//log:2020-10-10 success xxx
//log:2020-10-10 fail xxx
//啟動控制臺消費者消費數據?/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka2 --from-beginning
//啟動程序FlinkKafkaConsumer
//觀察控制臺輸出結果

總結

以上是生活随笔為你收集整理的2021年大数据Flink(十五):流批一体API Connectors ​​​​​​​Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。