Java+Kafka消息队列
生活随笔
收集整理的這篇文章主要介紹了
Java+Kafka消息队列
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
本文主要針對,Java端對Kafka消息隊列的生產和消費。Kafka的安裝部署,請看查看相關文章。
筆者最近所用的是Spring mvc,監聽文件路徑,然后將讀取到的文件內容發送到消息隊列中。由另外系統去消費消息。
當然消息隊列作為消息交換機,本系統既有生產消息也有消費消息。不做詳述。
生成者代碼相對簡單很多。
package com.dhc.test.kafka;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.log4j.Logger;import java.util.Properties;public class ProducerHandler {private final KafkaProducer<String, String> producer;private static Logger logger = Logger.getLogger(DataInManager.class.getName());public ProducerHandler(String topic,String message) {Properties props = new Properties();props.put("bootstrap.servers”,"127.0.0.1:9092");props.put("acks", "all");props.put("retries", "0");props.put("batch.size", "16384");props.put("linger.ms", "1");props.put("buffer.memory", "33554432");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");producer = new KafkaProducer<String, String>(props);//生成消息ProducerRecord record = new ProducerRecord(topic,message);//發送消息producer.send(record);logger.info("【kafka】向Kafka的TOPIC【" + topic + "】中發送消息");logger.info("【kafka】消息內容:" + message);logger.info("【kafka】推送成功");} }消費者代碼 package com.dhc.test.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.log4j.Logger;import java.util.List; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;public class ConsumerHandler {static Logger logger = Logger.getLogger(DataInManager.class.getName());private final KafkaConsumer<String, String> consumer;private ExecutorService executors;public ConsumerHandler(List<String> topics) {Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");props.put("group.id", "test");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(topics);execute(1);}public void execute(int workerNum) {executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy());Thread t = new Thread(new Runnable(){//啟動一個子線程來監聽kafka消息public void run(){while (true) {ConsumerRecords<String, String> records = consumer.poll(200);for (final ConsumerRecord record : records) {logger.info("【Kafka】監聽到kafka的TOPIC【" + record.topic() + "】的消息");logger.info("【Kafka】消息內容:" + record.value());executors.submit(new ConsumerWorker(record));}}}});t.start();}public void shutdown() {if (consumer != null) {consumer.close();}if (executors != null) {executors.shutdown();}try {if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {logger.info("【Kafka】Timeout.... Ignore for this case ");}} catch (InterruptedException ignored) {logger.info("【Kafka】Other thread interrupted this shutdown, ignore for this case.");Thread.currentThread().interrupt();}} }package com.dhc.test.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.log4j.Logger;public class ConsumerWorker implements Runnable {private ConsumerRecord<String, String> consumerRecord;public ConsumerWorker(ConsumerRecord record) {this.consumerRecord = record;}private static Logger logger = Logger.getLogger(DataInManager.class.getName());public void run() {// consumer接收消息后,這里可以寫針對收到的消息的業務處理System.out.println(consumerRecord.value());} }main方法啟動 package com.dhc.test;import com.dhc.test.kafka.ConsumerHandler;import java.util.ArrayList; import java.util.List; import java.util.Properties;public class Start {public static void main(String[] args) throws Exception {// 啟動Kafka consumer監視List<String> topics = new ArrayList<String>();// 監聽的消息通道topics.add("test");new ConsumerHandler(topics);} }
謝謝關注!
總結
以上是生活随笔為你收集整理的Java+Kafka消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java+面板颜色分块_地图区块颜色分块
- 下一篇: 快速排序原理及Java实现