日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka的简单使用

發(fā)布時間:2025/7/14 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka的简单使用 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

為什么80%的碼農(nóng)都做不了架構(gòu)師?>>> ??

在eclipse中新建kafka-demo的maven項目,pom.xml依賴如下

<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.9.1</artifactId><version>0.8.2.1</version></dependency><!--?<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.8.2.1</version></dependency>?--></dependencies>

ProducerDemo.java

package?com.leech.kafka.demo;import?java.util.Date; import?java.util.Properties;import?kafka.javaapi.producer.Producer; import?kafka.producer.KeyedMessage; import?kafka.producer.ProducerConfig;public?class?ProducerDemo?{public?static?void?main(String[]?args)?{//Random?rnd?=?new?Random();int?events=100;//?設(shè)置配置屬性Properties?props?=?new?Properties();//props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");props.put("metadata.broker.list","192.168.1.82:9092");props.put("serializer.class",?"kafka.serializer.StringEncoder");//?key.serializer.class默認為serializer.classprops.put("key.serializer.class",?"kafka.serializer.StringEncoder");//?可選配置,如果不配置,則使用默認的partitionerprops.put("partitioner.class",?"com.leech.kafka.demo.PartitionerDemo");//?觸發(fā)acknowledgement機制,否則是fire?and?forget,可能會引起數(shù)據(jù)丟失//?值為0,1,-1,可以參考//?http://kafka.apache.org/08/configuration.htmlprops.put("request.required.acks",?"1");ProducerConfig?config?=?new?ProducerConfig(props);//?創(chuàng)建producerProducer<String,?String>?producer?=?new?Producer<String,?String>(config);//?產(chǎn)生并發(fā)送消息long?start=System.currentTimeMillis();for?(long?i?=?0;?i?<?events;?i++)?{long?runtime?=?new?Date().getTime();String?ip?=?"192.168.2."?+?i;//rnd.nextInt(255);String?msg?=?runtime?+?",www.example.com,"?+?ip;//如果topic不存在,則會自動創(chuàng)建,默認replication-factor為1,partitions為0KeyedMessage<String,?String>?data?=?new?KeyedMessage<String,?String>("page_visits",?ip,?msg);producer.send(data);}System.out.println("耗時:"?+?(System.currentTimeMillis()?-?start));//?關(guān)閉producerproducer.close();} }

ConsumerDemo.java

package?com.leech.kafka.demo;import?java.util.HashMap; import?java.util.List; import?java.util.Map; import?java.util.Properties; import?java.util.concurrent.ExecutorService; import?java.util.concurrent.Executors;import?kafka.consumer.Consumer; import?kafka.consumer.ConsumerConfig; import?kafka.consumer.KafkaStream; import?kafka.javaapi.consumer.ConsumerConnector;/***?詳細可以參考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example*?*?@author?Fung**/ public?class?ConsumerDemo?{private?final?ConsumerConnector?consumer;private?final?String?topic;private?ExecutorService?executor;public?ConsumerDemo(String?a_zookeeper,?String?a_groupId,?String?a_topic)?{consumer?=?Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));this.topic?=?a_topic;}public?void?shutdown()?{if?(consumer?!=?null)consumer.shutdown();if?(executor?!=?null)executor.shutdown();}public?void?run(int?numThreads)?{Map<String,?Integer>?topicCountMap?=?new?HashMap<String,?Integer>();topicCountMap.put(topic,?new?Integer(numThreads));Map<String,?List<KafkaStream<byte[],?byte[]>>>?consumerMap?=?consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[],?byte[]>>?streams?=?consumerMap.get(topic);//?now?launch?all?the?threadsexecutor?=?Executors.newFixedThreadPool(numThreads);//?now?create?an?object?to?consume?the?messages//int?threadNumber?=?0;for?(final?KafkaStream?stream?:?streams)?{executor.submit(new?ConsumerMsgTask(stream,?threadNumber));threadNumber++;}}private?static?ConsumerConfig?createConsumerConfig(String?a_zookeeper,String?a_groupId)?{Properties?props?=?new?Properties();props.put("zookeeper.connect",?a_zookeeper);props.put("group.id",?a_groupId);props.put("zookeeper.session.timeout.ms",?"400");props.put("zookeeper.sync.time.ms",?"200");props.put("auto.commit.interval.ms",?"1000");return?new?ConsumerConfig(props);}public?static?void?main(String[]?arg)?{String[]?args?=?{?"192.168.1.82:2181",?"group-1",?"page_visits",?"12"?};String?zooKeeper?=?args[0];String?groupId?=?args[1];String?topic?=?args[2];int?threads?=?Integer.parseInt(args[3]);ConsumerDemo?demo?=?new?ConsumerDemo(zooKeeper,?groupId,?topic);demo.run(threads);try?{Thread.sleep(10000);}?catch?(InterruptedException?ie)?{}demo.shutdown();} }

ConsumerMsgTask.java

package?com.leech.kafka.demo;import?kafka.consumer.ConsumerIterator; import?kafka.consumer.KafkaStream;public?class?ConsumerMsgTask?implements?Runnable?{private?KafkaStream?m_stream;private?int?m_threadNumber;public?ConsumerMsgTask(KafkaStream?stream,?int?threadNumber)?{m_threadNumber?=?threadNumber;m_stream?=?stream;}@Overridepublic?void?run()?{ConsumerIterator<byte[],?byte[]>?it?=?m_stream.iterator();while?(it.hasNext())System.out.println("Thread?"?+?m_threadNumber?+?":?"+?new?String(it.next().message()));System.out.println("Shutting?down?Thread:?"?+?m_threadNumber);} }

PartitionerDemo.java

package?com.leech.kafka.demo;import?kafka.producer.Partitioner; import?kafka.utils.VerifiableProperties;public?class?PartitionerDemo?implements?Partitioner?{public?PartitionerDemo(VerifiableProperties?props)?{}@Overridepublic?int?partition(Object?obj,?int?numPartitions)?{int?partition?=?0;if?(obj?instanceof?String)?{String?key=(String)obj;int?offset?=?key.lastIndexOf('.');if?(offset?>?0)?{partition?=?Integer.parseInt(key.substring(offset?+?1))?%?numPartitions;}}else{partition?=?obj.toString().length()?%?numPartitions;}return?partition;}}


轉(zhuǎn)載于:https://my.oschina.net/chaun/blog/408511

總結(jié)

以上是生活随笔為你收集整理的kafka的简单使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。