日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka java 多线程_20. 多线程开发者实例

發布時間:2025/3/15 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka java 多线程_20. 多线程开发者实例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

# 多線程 Consumer Instance

## Kafka Java Consumer 設計原理

* Kafka Java Consumer 是單線程設計

* 從 Kafka V0.10.1.0,KafkaConsumer 是雙線程:用戶主線程 & 心跳線程

* 用戶主線程

* 啟動 Consumer 應用 main 方法的線程

* 心跳線程

* 只負責定期給對應的 Broker 發送心跳,標示 Consumer 的存活性(liveness)

* 新版本設計:單線程 + 輪詢機制:

* 實現非阻塞式的消息獲取

## 多線程方案

* KafkaConsumer 類不是 thread-safe

* 所有的網絡 IO 處理都是發生在用戶主線程中

* 不能在多個線程中共享同一個 KafkaConsumer 實例

* 可以使用 `KafkaConsumer.wakeup()` 在其他線程中喚醒 Consumer

基于非 thread-safe,兩套多線程方案

* 消費者程序啟動多個線程,每個線程維護專屬的 KafkaConsumer Instance,負責完整的消息獲取、消息處理流程

* 消費者程序使用單或多線程獲取消息,同時創建多個消費線程執行消息處理邏輯

* 處理消息交由特定的線程池來做

* 將消息獲取與處理解耦

![](https://img.kancloud.cn/40/70/4070c15055bf275c44cb7b470fb1f850_696x326.jpeg)

## Code

### 方案 1

```

public class KafkaConsumerRunner implements Runnable {

private final AtomicBoolean closed = new AtomicBoolean(false);

private final KafkaConsumer consumer;

public void run() {

try {

consumer.subscribe(Arrays.asList("topic"));

while (!closed.get()) {

ConsumerRecords records =

consumer.poll(Duration.ofMillis(10000));

// 執行消息處理邏輯

}

} catch (WakeupException e) {

// Ignore exception if closing

if (!closed.get()) throw e;

} finally {

consumer.close();

}

}

// Shutdown hook which can be called from a separate thread

public void shutdown() {

closed.set(true);

consumer.wakeup();

}

```

### 方案 2

```

private final KafkaConsumer consumer;

private ExecutorService executors;

...

private int workerNum = ...;

executors = new ThreadPoolExecutor(

workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,

new ArrayBlockingQueue<>(1000),

new ThreadPoolExecutor.CallerRunsPolicy());

...

while (true) {

ConsumerRecords records =

consumer.poll(Duration.ofSeconds(1));

for (final ConsumerRecord record : records) {

executors.submit(new Worker(record));

}

}

..

```

總結

以上是生活随笔為你收集整理的kafka java 多线程_20. 多线程开发者实例的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。