日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Apache Flink 零基础入门(二十)Flink kafka connector

發布時間:2024/9/16 编程问答 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Flink 零基础入门(二十)Flink kafka connector 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

內置source和sink

內置source包括從文件讀取,從文件夾讀取,從socket中讀取、從集合或者迭代器中讀取。內置的sink包括寫文件、控制臺輸出、socket

內置connectors

  • Apache Kafka?(source/sink)
  • Apache Cassandra?(sink)
  • Amazon Kinesis Streams?(source/sink)
  • Elasticsearch?(sink)
  • Hadoop FileSystem?(sink)
  • RabbitMQ?(source/sink)
  • Apache NiFi?(source/sink)
  • Twitter Streaming API?(source)

?HDFS Connector

這個connector提供了一個sink,可以寫分區到任何一個文件系統(只要支持hadoop filesystem就可以)。

?Kafka Connector

添加依賴

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency>

下載安裝zookeeper

  • 將zookeeper-3.4.5下面的conf文件夾下的zoo_sample.cfg復制一份zoo.cfg,修改dataDir=/home/vincent/tmp
  • 運行zookeeper:./zkServer.sh start?
  • 輸入jps可以看到QuorumPeerMain進程說明zk成功啟動了。
  • 下載安裝kafka

    • 同樣修改kafka_2.11-2.0.1/config下面的server.properties文件,修改:

    ? ? ? ? ? ?log.dirs=/home/vincent/tmp/kafka-logs

    ? ? ? ? ? ?zookeeper.connect=localhost:2181

    • 運行Kafka:
    ./bin/kafka-server-start.sh ./config/server.properties
    • 輸入jps可以看到Kafka進程說明kafka成功啟動了。
    • 創建一個topic
    vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest
    • 創建成功,查看所有topic
    vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181 mytest
    • 啟動生產者
    vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest
    • 啟動消費者
    vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest

    Flink連接Kafka

    source 從Kafka中讀取數據

    Scala:

    object KafkaConnectorConsumerApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval properties = new Properties()properties.setProperty("bootstrap.servers", "192.168.227.128:9092")properties.setProperty("group.id", "test")env.addSource(new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), properties)).print()env.execute("KafkaConnectorConsumerApp")}

    Java:

    public class JavaKafkaConsumerApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.227.128:9092");properties.setProperty("group.id", "test");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("mytest", new SimpleStringSchema(), properties));stream.print();env.execute("JavaKafkaConsumerApp");} }

    sink 將數據輸出到Kafka中

    Scala:

    object KafkaConnectorProducerApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 從socket接受數據,通過Flink,將數據Sink到kafkaval data=env.socketTextStream("192.168.227.128", 9999)val properties = new Properties()properties.setProperty("bootstrap.servers", "192.168.227.128:9092")properties.setProperty("group.id", "test")val kafkaSink = new FlinkKafkaProducer[String]("mytest", new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), properties)data.addSink(kafkaSink)env.execute("KafkaConnectorProducerApp")} }

    Java:

    public class JavaKafkaProducerApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> data = env.socketTextStream("192.168.227.128", 9999);Properties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.227.128:9092");properties.setProperty("group.id", "test");data.addSink(new FlinkKafkaProducer<String>("192.168.227.128:9092", "mytest", new SimpleStringSchema()));env.execute("JavaKafkaProducerApp");} }

    默認的flink kafka消費策略是setStartFromGroupOffsets(default behaviour),會自動從上一次未消費的數據開始

    總結

    以上是生活随笔為你收集整理的Apache Flink 零基础入门(二十)Flink kafka connector的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。