日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java kafka 多线程消费

發布時間:2023/12/15 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java kafka 多线程消费 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

我們先來看下簡單的kafka生產者和消費者模式代碼:

生產者KafkaProducer

/** * @author xiaofeng * @version V1.0 * @title: KafkaProducer.java * @package: com.yingda.xsignal.app.test * @description: kafka生產者demo * @date 2018/4/4 0004 上午 11:20 */ public class KafkaProducer extends Thread {private String topic; public KafkaProducer(String topic) {super(); this.topic = topic; }@Override public void run() {Producer producer = createProducer(); int i = 0; while (true) {String msg = "message"; producer.send(new KeyedMessage<Integer, String>(topic, msg + (i++))); try {TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {e.printStackTrace(); }}}private Producer createProducer() {Properties properties = new Properties(); //聲明zk properties.put("zookeeper.connect", "10.0.2.22:2181"); properties.put("serializer.class", StringEncoder.class.getName()); properties.put("metadata.broker.list", "10.0.2.22:9092"); properties.put("batch.size", 4096); return new Producer<Integer, String>(new ProducerConfig(properties)); }public static void main(String[] args) {new KafkaProducer("TEST_TOPIC").start(); }}

消費者KafkaConsumer

/** * @author xiaofeng * @version V1.0 * @title: KafkaConsumer.java * @package: com.yingda.xsignal.app.test * @description: 單線程消費模式 * @date 2018/4/4 0004 上午 11:18 */ public class KafkaConsumer extends Thread {private String topic; public KafkaConsumer(String topic) {super(); this.topic = topic; }@Override public void run() {ConsumerConnector consumer = createConsumer(); Map<String, Integer> topicCountMap = Maps.newHashMap(); // 一次從主題中獲取一個數據 topicCountMap.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); // 獲取每次接收到的這個數據 KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while (iterator.hasNext()) {String message = new String(iterator.next().message()); System.out.println("接收到: " + message); }}private ConsumerConnector createConsumer() {Properties properties = new Properties(); // //聲明zk properties.put("zookeeper.connect", "10.0.2.22:2181"); // // 消費組 properties.put("group.id", "test-group"); properties.put("key.deserializer", StringDeserializer.class); properties.put("value.deserializer", StringDeserializer.class); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); }public static void main(String[] args) {new KafkaConsumer("TEST_TOPIC").start(); }

分別啟動producer 和consumer

查看消費者控制臺:


雖然已成功生產和消費,但是這種消費模式很明顯是單個topic和單線程的形式,那么如果我一次性要訂閱多個topic 而且需要多線程消費該怎樣做呢?接下來讓我們一探究竟吧!

構建多線程消費KafkaConsumer

/** * @author xiaofeng * @version V1.0 * @title: OrderBackConsumer.java * @package: com.yingda.xsignal.app.consumer * @description: 訂單備份消費者 * @date 2018/3/16 0016 下午 4:46 */ public class OrderBackConsumer extends BaseSpringApp {protected static final Logger logger = LoggerFactory.getLogger(OrderBackConsumer.class); private final ConsumerConnector consumer; private final String signalTopic = "SIGNAL_ORDERINFO"; private final String followTopic = "FOLLOW_ORDERINFO"; private final String signalHisTopic = "HIS_ORDERINFO"; private final String followHisTopic = "FOLLOW_HIS_ORDERINFO"; private ConsumerConfig consumerConfig; private static int threadNum = 6; /** * Set the ThreadPoolExecutor's core pool size. */ private int corePoolSize = 6; /** * Set the ThreadPoolExecutor's maximum pool size. */ private int maxPoolSize = 200; /** * Set the capacity for the ThreadPoolExecutor's BlockingQueue. */ private int queueCapacity = 1024; /** * thread prefix name */ private String ThreadNamePrefix = "kafka-consumer-pool-%d"; ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(ThreadNamePrefix).build(); /** * Common Thread Pool */ ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); public OrderBackConsumer(String[] args) {super(args, "classpath:app-KafkaConsumer.xml"); Properties properties = new Properties(); //開發環境:10.0.2.22:2181 properties.put("zookeeper.connect", "10.0.2.22:2181"); // 組名稱 properties.put("group.id", "back_consumer_group"); properties.put("key.deserializer", StringDeserializer.class); properties.put("value.deserializer", StringDeserializer.class); consumerConfig = new ConsumerConfig(properties); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); }@Override public void shutdown() {if (consumer != null) {consumer.shutdown(); }if (pool != null) {pool.shutdown(); }try {if (!pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); }} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly"); }}public void run(int numThreads) {Map<String, Integer> topicCountMap = Maps.newHashMap(); topicCountMap.put(signalTopic, new Integer(numThreads)); topicCountMap.put(followTopic, new Integer(numThreads)); topicCountMap.put(signalHisTopic, new Integer(numThreads)); topicCountMap.put(followHisTopic, new Integer(numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); consumerMap.values().stream().forEach(value -> {List<KafkaStream<byte[], byte[]>> streams = value; int threadNumber = 0; /** * 可以為每隔topic創建一個線程池,因為每個topic我設置的partition=6 * kafka consumer通過增加線程數來增加消費能力,但是需要足夠的分區,如目前我設置的partition=6,那么并發可以啟動6個線程同時消費) * ExecutorService pool = createThreadPool(); */ for (final KafkaStream stream : streams) {pool.submit(new KafkaOrderConsumer(stream, threadNumber)); threadNumber++; }}); }/** * 創建線程池 * * @return */ private ExecutorService createThreadPool() {ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); return pool; }public static void main(String[] args) {int threads = 1; if (args.length < 1) {threads = threadNum; } else {threads = Integer.parseInt(args[0]); }OrderBackConsumer example = new OrderBackConsumer(args); example.run(threads); try {Thread.sleep(Long.MAX_VALUE); } catch (InterruptedException ie) {}example.shutdown(); }


/** * @author xiaofeng * @version V1.0 * @title: KafkaOrderConsumer.java * @package: com.yingda.xsignal.app.service.impl * @description: kafka消費服務 * @date 2018/3/20 0020 下午 8:03 */ public class KafkaOrderConsumer implements Runnable {protected static final Logger logger = LoggerFactory.getLogger(KafkaOrderConsumer.class); private KafkaStream stream; private int threadNumber; public KafkaOrderConsumer(KafkaStream stream, int threadNumber) {this.stream = stream; this.threadNumber = threadNumber; }@Override public void run() {ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) {//消費隊列內容 final MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next(); try {final byte[] messageBytes = (byte[]) messageAndMetadata.message(); if (messageBytes != null && messageBytes.length > 0) {String content = new String(messageBytes); logger.info("message:'" + content + "'"); /** * @// TODO: 2018/3/20 0020 消費入庫 */ }} catch (Exception e) {logger.error("kafka back order consumer error", e); }}logger.info("Shutting down Thread: " + threadNumber); } }

以上代碼中,我們創建了一個線程池,線程數為6,因為我設置的partition=6,而且一次性訂閱了4個topic(當然這些topic要真實存在哦),測試的時候隨便往哪個topic中寫數據都可以收到相應的消費數據哦。




總結

以上是生活随笔為你收集整理的java kafka 多线程消费的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。