java 集成 kafka 0.8.2.1 适配jdk1.6
文章目錄
- 一、版本說明
- 二、實戰
- 2.1. 依賴
- 2.2. 生產者代碼
- 2.3. 消費端代碼
- 2.4. 測試
- 三、小伙伴疑難解答
- 3.1. 首先新建一個maven項目
- 3.2. 把我的依賴和代碼復制過去
- 3.3. 把我寫的case調試通
- 3.4. 找到左邊External Libraries
- 3.5. jar處理
- 3.6. 打開非maven項目,添加jar
- 3.7. 等待項目編譯
- 四、 項目jar和引入的jar沖突建議
- 4.1. 定位問題
- 4.2. 分析問題
- 五、解決問題
- 5.1. 解決問題場景
- 5.2. 方案1
- 5.2. 方案2
- 5.2. 方案3
一、版本說明
| jdk | jdk-8u144-linux-x64.tar.gz |
| kafka | kafka_2.9.2-0.8.2.1.tgz |
| jdk | jdk1.6.0_24 |
| kafka | kafka_2.9.2-0.8.2.1 |
二、實戰
2.1. 依賴
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.9.2</artifactId><version>0.8.2.1</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.15</version><exclusions><exclusion><artifactId>jmxtools</artifactId><groupId>com.sun.jdmk</groupId></exclusion><exclusion><artifactId>jmxri</artifactId><groupId>com.sun.jmx</groupId></exclusion><exclusion><artifactId>jms</artifactId><groupId>javax.jms</groupId></exclusion><exclusion><artifactId>mail</artifactId><groupId>javax.mail</groupId></exclusion></exclusions></dependency>2.2. 生產者代碼
package com.sinosoft.d;import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;import java.util.Properties;/*** kafka生產端** @author gblfy* @date 2020-08-07** Kafka生產者測試* http://kafka.apache.org/documentation.html#introduction* http://blog.csdn.net/hmsiwtv/article/details/46960053*/ public class KafkaProducetest {private final Producer<String, String> producer;public final static String TOPIC = "clicki_info_topic";private KafkaProducetest() {Properties props = new Properties();//此處配置的是kafka的端口props.put("metadata.broker.list", "10.5.6.19:9092");//配置value的序列化類props.put("serializer.class", "kafka.serializer.StringEncoder");//配置key的序列化類props.put("key.serializer.class", "kafka.serializer.StringEncoder");//0表示不確認主服務器是否收到消息,馬上返回,低延遲但最弱的持久性,數據可能會丟失//1表示確認主服務器收到消息后才返回,持久性稍強,可是如果主服務器死掉,從服務器數據尚未同步,數據可能會丟失//-1表示確認所有服務器都收到數據,完美!props.put("request.required.acks", "-1");//異步生產,批量存入緩存后再發到服務器去props.put("producer.type", "async");//填充配置,初始化生產者producer = new Producer<String, String>(new ProducerConfig(props));}void produce() {int messageNo = 1000;final int COUNT = 2000;while (messageNo < COUNT) {String key = String.valueOf(messageNo);String data = "hello kafka message " + key;String data1 = "{\"c\":0,\"i\":16114765323924126,\"n\":\"http://www.abbo.cn/clicki.html\",\"s\":0,\"sid\":0,\"t\":\"info_url\",\"tid\":0,\"unix\":0,\"viewId\":0}";// 發送消息// producer.send(new KeyedMessage<String, String>(TOPIC,data1));// 消息類型key:valueproducer.send(new KeyedMessage<String, String>(TOPIC, key, data));System.out.println(data);messageNo++;}producer.close();//必須關閉}public static void main(String[] args) {new KafkaProducetest().produce();} }2.3. 消費端代碼
package com.sinosoft.d;import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties;/*** kafka生產端** @author gblfy* @date 2020-08-07* <p>* Kafka消費者測試*/ public class KafkaConsumertest {private final ConsumerConnector consumer;private KafkaConsumertest() {Properties props = new Properties();//zookeeper 配置props.put("zookeeper.connect", "10.5.6.19:2181");//group 代表一個消費組,加入組里面,消息只能被該組的一個消費者消費//如果所有消費者在一個組內,就是傳統的隊列模式,排隊拿消息//如果所有的消費者都不在同一個組內,就是發布-訂閱模式,消息廣播給所有組//如果介于兩者之間,那么廣播的消息在組內也是要排隊的props.put("group.id", "jd-group");//zk連接超時props.put("zookeeper.session.timeout.ms", "4000");//ZooKeeper的最大超時時間,就是心跳的間隔,若是沒有反映,那么認為已經死了,不易過大props.put("zookeeper.sync.time.ms", "200");//zk follower落后于zk leader的最長時間props.put("auto.commit.interval.ms", "1000");//往zookeeper上寫offset的頻率/** 此配置參數表示當此groupId下的消費者,在ZK中沒有offset值時(比如新的groupId,或者是zk數據被清空),consumer應該從哪個offset開始消費.* largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即從topic的開始位置消費所有消息.* */props.put("auto.offset.reset", "smallest"); //消費最老消息,最新為largest//序列化類props.put("serializer.class", "kafka.serializer.StringEncoder");ConsumerConfig config = new ConsumerConfig(props);consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);}void consume() {// 描述讀取哪個topic,需要幾個線程讀Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(KafkaProducetest.TOPIC, new Integer(1));/* 默認消費時的數據是byte[]形式的,可以傳入String編碼器*/StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());Map<String, List<KafkaStream<String, String>>> consumerMap =consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);//消費數據時每個Topic有多個線程在讀,所以取List第一個流KafkaStream<String, String> stream = consumerMap.get(KafkaProducetest.TOPIC).get(0);ConsumerIterator<String, String> it = stream.iterator();while (it.hasNext()) {System.out.println(it.next().topic() + ":" + it.next().partition() + ":" + it.next().offset() + ":" + it.next().key() + ":" + it.next().message());}}public static void main(String[] args) {new KafkaConsumertest().consume();} }2.4. 測試
先啟動消費端,在啟動生產端
三、小伙伴疑難解答
有的小伙伴問我,我們的工程師非maven項目該怎么辦呢?
我給這個小伙伴的建議是以下幾點:
3.1. 首先新建一個maven項目
3.2. 把我的依賴和代碼復制過去
3.3. 把我寫的case調試通
3.4. 找到左邊External Libraries
3.5. jar處理
在本地的maven倉庫中把這些以來的jar復制出來,建議先發到一個空的文件夾里面,建議和我一樣
3.6. 打開非maven項目,添加jar
打開你的非maven項目,把這些jar添加進去
3.7. 等待項目編譯
四、 項目jar和引入的jar沖突建議
關于這個問題,很多小伙伴應該也遇到過很正常,首先要保證引入的jar不能和以前的jar產生沖突?
那又該怎么辦?
有的小伙伴說,把以前和本次引入jar沖突的jar刪除唄!要三思
4.1. 定位問題
首先定位引入的jar和項目中的哪一個jar發生沖突
4.2. 分析問題
沖突的原因是什么?
1.引用的對象的包路徑一樣并且對象名也一樣
2.二個沖突的kar項目中加入都存在,但是,代碼中引入jar的優先級問題,引入的jar非自己需要的jar,而自己需要的jar默認不會引入,導致代碼報錯
3.版本問題,jar向下不兼容
五、解決問題
5.1. 解決問題場景
首先解決問題要基于場景:
我引入的jar,只有我自己用,但是,我的代碼中默認引入的jar非我需要的,但是,把項目中以前的jar(沖突的jar)刪除,代碼就不報錯。
5.2. 方案1
這種場景有3種解決方案
方案1(風險可控):如果,可以確保刪除以前的jar不會項目的其他功能造成影響,可以考慮刪除以前舊的jar
5.2. 方案2
方案2(風險不可控):如果,不能評估刪除以前jar的風險范圍,建議換一種方式,完成你的需求
5.2. 方案3
方案3(風險不可控):這種方案在方案2的基礎上做的,找到剛引入沖突jar的源碼,把需要的類和api(方法),單獨復制出來,這樣也可以解決,但是,需要小伙伴具備閱讀源碼的能力和調試的時間。
總結
以上是生活随笔為你收集整理的java 集成 kafka 0.8.2.1 适配jdk1.6的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mybatis-plus大批量数据插入缓
- 下一篇: Git bash 编码格式配置_02