日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java程序创建Kafka Topic,以及数据生产消费,常用的命令

發布時間:2023/12/3 java 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java程序创建Kafka Topic,以及数据生产消费,常用的命令 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

轉自: Java程序創建Kafka Topic,以及數據生產消費,常用的命令_Zyy_z_的博客-CSDN博客_java kafka創建topicKafka簡介: Kafka是一個分布式發布——訂閱消息傳遞系統。Kafka快速、可擴展且耐用。它保留主題中的消息源。生產者將數據寫入主題,消費者從主題中讀取數據。Kafka的特點: 1. 同時為分布和訂閱提供高吞吐量。據了解,Kafka每秒可以生產約25萬條消息(50MB),每秒處理55萬條消息...https://blog.csdn.net/Zyy_z_/article/details/101680138


【1】Kafka簡介

Kafka是一個分布式發布——訂閱消息傳遞系統。Kafka快速、可擴展且耐用。它保留主題中的消息源。生產者將數據寫入主題,消費者從主題中讀取數據。

1)Kafka的特點:

  • 1. 同時為分布和訂閱提供高吞吐量。 據了解,Kafka每秒可以生產約25萬條消息(50MB),每秒處理55萬條消息(110MB)這里說條數,可能不上特別準確,因為消息的大小可能不一致;
  • 2. 可進行持久化操作,將消息持久化到到磁盤,以日志的形式存儲,因此可用于批量消費,例如ETL,以及實時應用程序。 通過將數據持久化到硬盤以及replication防止數據丟失。
  • 3. 分布式系統,易于向外拓展。所有的Producer、broker和consumer都會有多個,均為分布式。無需停機即可拓展 機器。
  • 4. 消息被處理的狀態是在consumer端維護,而不是由server端維護,當失敗時能自動平衡。

2)Kafka名詞解釋:

  • producer:消息的生成者
  • consumer:消息的消費者
  • topic:你把它理解為標簽
  • broker:Kafka處理資源的消息源(feeds of messages)的不同分類
  • 3)Kafka常用命令:

  • 創建主題(4個分區,2個副本):? kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
  • 查詢所有Topic:kafka-topics.sh --zookeeper localhost:2181 --list
  • 查看指定得Topic:kafka-topics.sh --zookeeper localhost:2181 --describe --topic t_cdr
  • 刪除Topic:kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic t_cdr
  • 生產者 :kafka-console-producer.sh --broker-list localhost:9092 --topic test
  • 消費者 : kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
  • 新生產者(支持0.9版本+):? kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
  • 新消費者(支持0.9版本+):? kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties

  • 【2】kafka java api

    【2.1】Java程序操作創建Topic:? ?

    Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); AdminClient create = KafkaAdminClient.create(props);//創建Topic create.createTopics(Lists.newArrayList(new NewTopic("Topic名稱"),1,(short)1));//一個分區 create.close();//關閉

    其他創建Topic得方式Java API:

    https://blog.csdn.net/meng984611383/article/details/80500761https://blog.csdn.net/meng984611383/article/details/80500761


    【2.2】Kafka生產數據:

    Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) //生產數據producer.send(new ProducerRecord<String, String>("Topic名稱", Integer.toString(i), Integer.toString(i))); producer.close(); //關閉

    【2.3】消費數據

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

    生產者的緩沖空間池保留尚未發送到服務器的消息,后臺I/O線程負責將這些消息轉換成請求發送到集群。如果使用后不關閉生產者,則會泄露這些資源。

    send()方法是異步的,添加消息到緩沖區等待發送,并立即返回。生產者將單個的消息批量在一起發送來提高效率。

  • ack是判別請求是否為完整的條件(就是是判斷是不是成功發送了)。我們指定了“all”將會阻塞消息,這種設置性能最低,但是是最可靠的。
  • retries,如果請求失敗,生產者會自動重試,我們指定是0次,如果啟用重試,則會有重復消息的可能性。
  • producer(生產者)緩存每個分區未發送的消息。緩存的大小是通過 batch.size 配置指定的。值較大的話將會產生更大的批。并需要更多的內存(因為每個“活躍”的分區都有1個緩沖區)。
  • 默認緩沖可立即發送,即便緩沖空間還沒有滿,但是,如果你想減少請求的數量,可以設置linger.ms大于0。這將指示生產者發送請求之前等待一段時間,希望更多的消息填補到未滿的批中。這類似于TCP的算法,例如上面的代碼段,可能100條消息在一個請求發送,因為我們設置了linger(逗留)時間為1毫秒,然后,如果我們沒有填滿緩沖區,這個設置將增加1毫秒的延遲請求以等待更多的消息。需要注意的是,在高負載下,相近的時間一般也會組成批,即使是 linger.ms=0。在不處于高負載的情況下,如果設置比0大,以少量的延遲代價換取更少的,更有效的請求。
  • buffer.memory 控制生產者可用的緩存總量,如果消息發送速度比其傳輸到服務器的快,將會耗盡這個緩存空間。當緩存空間耗盡,其他發送調用將被阻塞,阻塞時間的閾值通過max.block.ms設定,之后它將拋出一個TimeoutException。
  • key.serializer和value.serializer示例,將用戶提供的key和value對象ProducerRecord轉換成字節,你可以使用附帶的ByteArraySerializaer或StringSerializer處理簡單的string或byte類型。
  • ?
    ?

    總結

    以上是生活随笔為你收集整理的Java程序创建Kafka Topic,以及数据生产消费,常用的命令的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。