日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

java利用kafka生产消费消息

發布時間:2025/4/5 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java利用kafka生产消费消息 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

1.producer程序

package com.test.frame.kafka.controller;import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;import java.util.Properties;public class KafkaProducer {private final Producer<String, String> producer;public final static String TOPIC = "my-multi-topic";//構造方法private KafkaProducer() {Properties props = new Properties();props.put("metadata.broker.list", "localhost:9092");props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("key.serializer.class", "kafka.serializer.StringEncoder");props.put("request.required.acks", "-1");producer = new Producer<String, String>(new ProducerConfig(props));}void produce() {int messageNo = 90;final int COUNT = 100;while (messageNo < COUNT) {String key = String.valueOf(messageNo);String data = "hello kafka message" + key;producer.send(new KeyedMessage<String, String>(TOPIC, key ,data));System.out.println(data);messageNo++;}}public static void main(String[] args) throws Exception {new KafkaProducer().produce();}}

運行結果:

消費方接收到的消息如下:

2.consumer端程序:

package com.test.frame.kafka.controller;import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties;import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;public class KafkaConsumer {private final ConsumerConnector consumer;private KafkaConsumer() {Properties props = new Properties();//zookeeper 配置props.put("zookeeper.connect", "localhost:2181");//group 代表一個消費組props.put("group.id", "jd-group");//zk連接超時props.put("zookeeper.session.timeout.ms", "4000");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "smallest");//序列化類props.put("serializer.class", "kafka.serializer.StringEncoder");ConsumerConfig config = new ConsumerConfig(props);consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); }void consume() {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());Map<String, List<KafkaStream<String, String>>> consumerMap =consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0);ConsumerIterator<String, String> it = stream.iterator();while (it.hasNext())System.out.println(it.next().message());}public static void main(String[] args) {new KafkaConsumer().consume();}}

運行結果如下:

此時已經聯通成功。

?

?

?

?

?

轉載于:https://my.oschina.net/u/2263272/blog/1527979

總結

以上是生活随笔為你收集整理的java利用kafka生产消费消息的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。