日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

kafka的简单使用

發布時間:2025/7/14 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka的简单使用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

為什么80%的碼農都做不了架構師?>>> ??

在eclipse中新建kafka-demo的maven項目,pom.xml依賴如下

<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.9.1</artifactId><version>0.8.2.1</version></dependency><!--?<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.8.2.1</version></dependency>?--></dependencies>

ProducerDemo.java

package?com.leech.kafka.demo;import?java.util.Date; import?java.util.Properties;import?kafka.javaapi.producer.Producer; import?kafka.producer.KeyedMessage; import?kafka.producer.ProducerConfig;public?class?ProducerDemo?{public?static?void?main(String[]?args)?{//Random?rnd?=?new?Random();int?events=100;//?設置配置屬性Properties?props?=?new?Properties();//props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");props.put("metadata.broker.list","192.168.1.82:9092");props.put("serializer.class",?"kafka.serializer.StringEncoder");//?key.serializer.class默認為serializer.classprops.put("key.serializer.class",?"kafka.serializer.StringEncoder");//?可選配置,如果不配置,則使用默認的partitionerprops.put("partitioner.class",?"com.leech.kafka.demo.PartitionerDemo");//?觸發acknowledgement機制,否則是fire?and?forget,可能會引起數據丟失//?值為0,1,-1,可以參考//?http://kafka.apache.org/08/configuration.htmlprops.put("request.required.acks",?"1");ProducerConfig?config?=?new?ProducerConfig(props);//?創建producerProducer<String,?String>?producer?=?new?Producer<String,?String>(config);//?產生并發送消息long?start=System.currentTimeMillis();for?(long?i?=?0;?i?<?events;?i++)?{long?runtime?=?new?Date().getTime();String?ip?=?"192.168.2."?+?i;//rnd.nextInt(255);String?msg?=?runtime?+?",www.example.com,"?+?ip;//如果topic不存在,則會自動創建,默認replication-factor為1,partitions為0KeyedMessage<String,?String>?data?=?new?KeyedMessage<String,?String>("page_visits",?ip,?msg);producer.send(data);}System.out.println("耗時:"?+?(System.currentTimeMillis()?-?start));//?關閉producerproducer.close();} }

ConsumerDemo.java

package?com.leech.kafka.demo;import?java.util.HashMap; import?java.util.List; import?java.util.Map; import?java.util.Properties; import?java.util.concurrent.ExecutorService; import?java.util.concurrent.Executors;import?kafka.consumer.Consumer; import?kafka.consumer.ConsumerConfig; import?kafka.consumer.KafkaStream; import?kafka.javaapi.consumer.ConsumerConnector;/***?詳細可以參考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example*?*?@author?Fung**/ public?class?ConsumerDemo?{private?final?ConsumerConnector?consumer;private?final?String?topic;private?ExecutorService?executor;public?ConsumerDemo(String?a_zookeeper,?String?a_groupId,?String?a_topic)?{consumer?=?Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));this.topic?=?a_topic;}public?void?shutdown()?{if?(consumer?!=?null)consumer.shutdown();if?(executor?!=?null)executor.shutdown();}public?void?run(int?numThreads)?{Map<String,?Integer>?topicCountMap?=?new?HashMap<String,?Integer>();topicCountMap.put(topic,?new?Integer(numThreads));Map<String,?List<KafkaStream<byte[],?byte[]>>>?consumerMap?=?consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[],?byte[]>>?streams?=?consumerMap.get(topic);//?now?launch?all?the?threadsexecutor?=?Executors.newFixedThreadPool(numThreads);//?now?create?an?object?to?consume?the?messages//int?threadNumber?=?0;for?(final?KafkaStream?stream?:?streams)?{executor.submit(new?ConsumerMsgTask(stream,?threadNumber));threadNumber++;}}private?static?ConsumerConfig?createConsumerConfig(String?a_zookeeper,String?a_groupId)?{Properties?props?=?new?Properties();props.put("zookeeper.connect",?a_zookeeper);props.put("group.id",?a_groupId);props.put("zookeeper.session.timeout.ms",?"400");props.put("zookeeper.sync.time.ms",?"200");props.put("auto.commit.interval.ms",?"1000");return?new?ConsumerConfig(props);}public?static?void?main(String[]?arg)?{String[]?args?=?{?"192.168.1.82:2181",?"group-1",?"page_visits",?"12"?};String?zooKeeper?=?args[0];String?groupId?=?args[1];String?topic?=?args[2];int?threads?=?Integer.parseInt(args[3]);ConsumerDemo?demo?=?new?ConsumerDemo(zooKeeper,?groupId,?topic);demo.run(threads);try?{Thread.sleep(10000);}?catch?(InterruptedException?ie)?{}demo.shutdown();} }

ConsumerMsgTask.java

package?com.leech.kafka.demo;import?kafka.consumer.ConsumerIterator; import?kafka.consumer.KafkaStream;public?class?ConsumerMsgTask?implements?Runnable?{private?KafkaStream?m_stream;private?int?m_threadNumber;public?ConsumerMsgTask(KafkaStream?stream,?int?threadNumber)?{m_threadNumber?=?threadNumber;m_stream?=?stream;}@Overridepublic?void?run()?{ConsumerIterator<byte[],?byte[]>?it?=?m_stream.iterator();while?(it.hasNext())System.out.println("Thread?"?+?m_threadNumber?+?":?"+?new?String(it.next().message()));System.out.println("Shutting?down?Thread:?"?+?m_threadNumber);} }

PartitionerDemo.java

package?com.leech.kafka.demo;import?kafka.producer.Partitioner; import?kafka.utils.VerifiableProperties;public?class?PartitionerDemo?implements?Partitioner?{public?PartitionerDemo(VerifiableProperties?props)?{}@Overridepublic?int?partition(Object?obj,?int?numPartitions)?{int?partition?=?0;if?(obj?instanceof?String)?{String?key=(String)obj;int?offset?=?key.lastIndexOf('.');if?(offset?>?0)?{partition?=?Integer.parseInt(key.substring(offset?+?1))?%?numPartitions;}}else{partition?=?obj.toString().length()?%?numPartitions;}return?partition;}}


轉載于:https://my.oschina.net/chaun/blog/408511

總結

以上是生活随笔為你收集整理的kafka的简单使用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。