日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Kafka Java consumer动态修改topic订阅

發布時間:2023/12/20 java 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka Java consumer动态修改topic订阅 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

  前段時間在Kafka QQ群中有人問及此事——關于Java consumer如何動態修改topic訂閱的問題。仔細一想才發現這的確是個好問題,因為如果簡單地在另一個線程中直接持有consumer實例然后調用subscribe進行修改,consumer端必然會拋出異常ConcurrentModificationException:KafkaConsumer is not safe for multi-threaded access

  和KafkaProducer不同的是,KafkaConsumer不是線程安全的,所以我們不能直接在沒有同步保護的機制下直接啟用另一個線程調用consumer的任何方法(除了wakeup)。因此,實現這個需求有兩種途徑:

  • 使用重量級的synchorinzed機制來實現線程安全
  • 借助Java類庫已有的線程安全數據結構來實現

  如果是第一種方式,則無論哪個線程訪問consumer都必須要配備必要的同步保護機制,代價相當大且極易出錯。本文選取第二種方式,我們可以借助Java提供的ConcurrentLinkedQueue來幫助我們實現。具體的步驟為:

  • 構建ConcurrentLinkedQueue對象分別給兩個線程使用(這里并不限定于兩個線程,但這個需求最可能的實際場景是consumer主線程和一個后臺管理類的用戶線程,而后者負責觸發“動態修改訂閱”邏輯)
  • 調用KafkaConsumer.poll(timeout)來不斷消費消息。經常有人問這里的timeout到底是做什么用的?這里統一回答一下:這里的timeout賦予了用戶在consumer讀取消息后可以執行其他一些操作的能力,比如定期的記錄日志等。如果你的consumer沒有這樣的需求,那么調用KafkaConsumer.poll(1000)和KafkaConsumer.poll(Integer.MAX)沒有任何區別。事實上, 我們更加推薦用戶使用KafkaConsumer.poll(Integer.MAX) + wakeup的方式來響應后端其他邏輯
  • 每次poll之后嘗試去探查一下ConcurrentLinkedQueue有沒有新東西(如果有說明訂閱topic列表發生變化),響應之
  • 使用另一個線程往ConcurrentLinkedQueue中插入新的訂閱信息
  • 完整樣例代碼如下:

    public class ConsumerTest {public static void main(String[] args) {final ConcurrentLinkedQueue<String> subscribedTopics = new ConcurrentLinkedQueue<>();// 創建另一個測試線程,啟動后首先暫停10秒然后變更topic訂閱Runnable runnable = new Runnable() {@Overridepublic void run() {try {Thread.sleep(10000);} catch (InterruptedException e) {// swallow it.}// 變更為訂閱topic: btopic, ctopicsubscribedTopics.addAll(Arrays.asList("btopic", "ctopic"));}};new Thread(runnable).start();Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "my-group1");props.put("auto.offset.reset", "earliest");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 最開始的訂閱列表:atopic、btopicconsumer.subscribe(Arrays.asList("atopic", "btopic"));while (true) {consumer.poll(2000); //表示每2秒consumer就有機會去輪詢一下訂閱狀態是否需要變更// 本例不關注消息消費,因此每次只是打印訂閱結果!System.out.println(consumer.subscription());if (!subscribedTopics.isEmpty()) {Iterator<String> iter = subscribedTopics.iterator();List<String> topics = new ArrayList<>();while (iter.hasNext()) {topics.add(iter.next());}subscribedTopics.clear();consumer.subscribe(topics); // 重新訂閱topic}}// 本例只是測試之用,使用了while(true),所以這里沒有顯式關閉consumer // consumer.close();} }

      

    輸出如下:?

    [atopic, btopic]
    [atopic, btopic]
    [atopic, btopic]
    [ctopic, btopic]
    [ctopic, btopic]

    由此可見,本consumer在沒有關閉的情況下動態進行了topic的訂閱變更。另外需要說一下,動態變更時最好不要直接調用subscribe(topics),而是要顯式地定義ConsumerRebalanceListener以避免位移提交的混亂。



    轉載于:https://www.cnblogs.com/huxi2b/p/7040617.html

    總結

    以上是生活随笔為你收集整理的Kafka Java consumer动态修改topic订阅的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。