kafka0.8消费者实例
生活随笔
收集整理的這篇文章主要介紹了
kafka0.8消费者实例
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
為什么80%的碼農(nóng)都做不了架構(gòu)師?>>> ??
序
這里簡(jiǎn)單展示一下如何使用kafka0.8的client去消費(fèi)一個(gè)topic。
maven
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>0.8.2.2</version></dependency>初始化客戶端
Properties props = new Properties();props.put("zookeeper.connect", zk); // props.put("auto.offset.reset","smallest");props.put("group.id",group);props.put("zookeeper.session.timeout.ms", "10000");props.put("zookeeper.sync.time.ms", "2000");props.put("auto.commit.interval.ms", "10000");props.put("consumer.timeout.ms","10000"); //設(shè)置ConsumerIterator的hasNext的超時(shí)時(shí)間,不設(shè)置則永遠(yuǎn)阻塞直到有新消息來(lái)props.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props);ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, consumerCount);Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);并發(fā)消費(fèi)
consumerMap.get(topic).stream().forEach(stream -> {pool.submit(new Runnable() {@Overridepublic void run() {ConsumerIterator<byte[], byte[]> it = stream.iterator();//it.hasNext()取決于consumer.timeout.ms的值,默認(rèn)為-1try{while (it.hasNext()) {System.out.println(Thread.currentThread().getName()+" hello");//是hasNext拋出異常,而不是next拋出System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message()));}}catch (ConsumerTimeoutException e){e.printStackTrace();}System.out.println(Thread.currentThread().getName()+" end");}});});注意事項(xiàng)
消費(fèi)者實(shí)例數(shù)*每個(gè)實(shí)例的消費(fèi)線程數(shù) <= topic的partition數(shù)量,否則多余的就浪費(fèi)了。
轉(zhuǎn)載于:https://my.oschina.net/go4it/blog/1544496
總結(jié)
以上是生活随笔為你收集整理的kafka0.8消费者实例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 车损险要不要买 车损险能够买吗
- 下一篇: 完全虚拟化和半虚拟化区别