java kafka 多线程消费
生活随笔
收集整理的這篇文章主要介紹了
java kafka 多线程消费
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
我們先來看下簡單的kafka生產者和消費者模式代碼:
生產者KafkaProducer
/** * @author xiaofeng * @version V1.0 * @title: KafkaProducer.java * @package: com.yingda.xsignal.app.test * @description: kafka生產者demo * @date 2018/4/4 0004 上午 11:20 */ public class KafkaProducer extends Thread {private String topic; public KafkaProducer(String topic) {super(); this.topic = topic; }@Override public void run() {Producer producer = createProducer(); int i = 0; while (true) {String msg = "message"; producer.send(new KeyedMessage<Integer, String>(topic, msg + (i++))); try {TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {e.printStackTrace(); }}}private Producer createProducer() {Properties properties = new Properties(); //聲明zk properties.put("zookeeper.connect", "10.0.2.22:2181"); properties.put("serializer.class", StringEncoder.class.getName()); properties.put("metadata.broker.list", "10.0.2.22:9092"); properties.put("batch.size", 4096); return new Producer<Integer, String>(new ProducerConfig(properties)); }public static void main(String[] args) {new KafkaProducer("TEST_TOPIC").start(); }}消費者KafkaConsumer
/** * @author xiaofeng * @version V1.0 * @title: KafkaConsumer.java * @package: com.yingda.xsignal.app.test * @description: 單線程消費模式 * @date 2018/4/4 0004 上午 11:18 */ public class KafkaConsumer extends Thread {private String topic; public KafkaConsumer(String topic) {super(); this.topic = topic; }@Override public void run() {ConsumerConnector consumer = createConsumer(); Map<String, Integer> topicCountMap = Maps.newHashMap(); // 一次從主題中獲取一個數據 topicCountMap.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); // 獲取每次接收到的這個數據 KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while (iterator.hasNext()) {String message = new String(iterator.next().message()); System.out.println("接收到: " + message); }}private ConsumerConnector createConsumer() {Properties properties = new Properties(); // //聲明zk properties.put("zookeeper.connect", "10.0.2.22:2181"); // // 消費組 properties.put("group.id", "test-group"); properties.put("key.deserializer", StringDeserializer.class); properties.put("value.deserializer", StringDeserializer.class); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); }public static void main(String[] args) {new KafkaConsumer("TEST_TOPIC").start(); }分別啟動producer 和consumer
查看消費者控制臺:
雖然已成功生產和消費,但是這種消費模式很明顯是單個topic和單線程的形式,那么如果我一次性要訂閱多個topic 而且需要多線程消費該怎樣做呢?接下來讓我們一探究竟吧!
構建多線程消費KafkaConsumer
/** * @author xiaofeng * @version V1.0 * @title: OrderBackConsumer.java * @package: com.yingda.xsignal.app.consumer * @description: 訂單備份消費者 * @date 2018/3/16 0016 下午 4:46 */ public class OrderBackConsumer extends BaseSpringApp {protected static final Logger logger = LoggerFactory.getLogger(OrderBackConsumer.class); private final ConsumerConnector consumer; private final String signalTopic = "SIGNAL_ORDERINFO"; private final String followTopic = "FOLLOW_ORDERINFO"; private final String signalHisTopic = "HIS_ORDERINFO"; private final String followHisTopic = "FOLLOW_HIS_ORDERINFO"; private ConsumerConfig consumerConfig; private static int threadNum = 6; /** * Set the ThreadPoolExecutor's core pool size. */ private int corePoolSize = 6; /** * Set the ThreadPoolExecutor's maximum pool size. */ private int maxPoolSize = 200; /** * Set the capacity for the ThreadPoolExecutor's BlockingQueue. */ private int queueCapacity = 1024; /** * thread prefix name */ private String ThreadNamePrefix = "kafka-consumer-pool-%d"; ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(ThreadNamePrefix).build(); /** * Common Thread Pool */ ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); public OrderBackConsumer(String[] args) {super(args, "classpath:app-KafkaConsumer.xml"); Properties properties = new Properties(); //開發環境:10.0.2.22:2181 properties.put("zookeeper.connect", "10.0.2.22:2181"); // 組名稱 properties.put("group.id", "back_consumer_group"); properties.put("key.deserializer", StringDeserializer.class); properties.put("value.deserializer", StringDeserializer.class); consumerConfig = new ConsumerConfig(properties); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); }@Override public void shutdown() {if (consumer != null) {consumer.shutdown(); }if (pool != null) {pool.shutdown(); }try {if (!pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); }} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly"); }}public void run(int numThreads) {Map<String, Integer> topicCountMap = Maps.newHashMap(); topicCountMap.put(signalTopic, new Integer(numThreads)); topicCountMap.put(followTopic, new Integer(numThreads)); topicCountMap.put(signalHisTopic, new Integer(numThreads)); topicCountMap.put(followHisTopic, new Integer(numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); consumerMap.values().stream().forEach(value -> {List<KafkaStream<byte[], byte[]>> streams = value; int threadNumber = 0; /** * 可以為每隔topic創建一個線程池,因為每個topic我設置的partition=6 * (kafka consumer通過增加線程數來增加消費能力,但是需要足夠的分區,如目前我設置的partition=6,那么并發可以啟動6個線程同時消費) * ExecutorService pool = createThreadPool(); */ for (final KafkaStream stream : streams) {pool.submit(new KafkaOrderConsumer(stream, threadNumber)); threadNumber++; }}); }/** * 創建線程池 * * @return */ private ExecutorService createThreadPool() {ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); return pool; }public static void main(String[] args) {int threads = 1; if (args.length < 1) {threads = threadNum; } else {threads = Integer.parseInt(args[0]); }OrderBackConsumer example = new OrderBackConsumer(args); example.run(threads); try {Thread.sleep(Long.MAX_VALUE); } catch (InterruptedException ie) {}example.shutdown(); }以上代碼中,我們創建了一個線程池,線程數為6,因為我設置的partition=6,而且一次性訂閱了4個topic(當然這些topic要真實存在哦),測試的時候隨便往哪個topic中寫數據都可以收到相應的消費數據哦。
總結
以上是生活随笔為你收集整理的java kafka 多线程消费的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 源文件指的是什么意思
- 下一篇: springcloud 之 Eureka