日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java 集成 kafka 0.8.2.1 适配jdk1.6

發布時間:2024/9/27 编程问答 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java 集成 kafka 0.8.2.1 适配jdk1.6 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

          • 一、版本說明
          • 二、實戰
            • 2.1. 依賴
            • 2.2. 生產者代碼
            • 2.3. 消費端代碼
            • 2.4. 測試
          • 三、小伙伴疑難解答
            • 3.1. 首先新建一個maven項目
            • 3.2. 把我的依賴和代碼復制過去
            • 3.3. 把我寫的case調試通
            • 3.4. 找到左邊External Libraries
            • 3.5. jar處理
            • 3.6. 打開非maven項目,添加jar
            • 3.7. 等待項目編譯
          • 四、 項目jar和引入的jar沖突建議
            • 4.1. 定位問題
            • 4.2. 分析問題
          • 五、解決問題
            • 5.1. 解決問題場景
            • 5.2. 方案1
            • 5.2. 方案2
            • 5.2. 方案3

一、版本說明
linux服務器環境軟件版本
jdkjdk-8u144-linux-x64.tar.gz
kafkakafka_2.9.2-0.8.2.1.tgz
應用服務器軟件版本
jdkjdk1.6.0_24
kafkakafka_2.9.2-0.8.2.1
二、實戰
2.1. 依賴
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.9.2</artifactId><version>0.8.2.1</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.15</version><exclusions><exclusion><artifactId>jmxtools</artifactId><groupId>com.sun.jdmk</groupId></exclusion><exclusion><artifactId>jmxri</artifactId><groupId>com.sun.jmx</groupId></exclusion><exclusion><artifactId>jms</artifactId><groupId>javax.jms</groupId></exclusion><exclusion><artifactId>mail</artifactId><groupId>javax.mail</groupId></exclusion></exclusions></dependency>
2.2. 生產者代碼
package com.sinosoft.d;import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;import java.util.Properties;/*** kafka生產端** @author gblfy* @date 2020-08-07** Kafka生產者測試* http://kafka.apache.org/documentation.html#introduction* http://blog.csdn.net/hmsiwtv/article/details/46960053*/ public class KafkaProducetest {private final Producer<String, String> producer;public final static String TOPIC = "clicki_info_topic";private KafkaProducetest() {Properties props = new Properties();//此處配置的是kafka的端口props.put("metadata.broker.list", "10.5.6.19:9092");//配置value的序列化類props.put("serializer.class", "kafka.serializer.StringEncoder");//配置key的序列化類props.put("key.serializer.class", "kafka.serializer.StringEncoder");//0表示不確認主服務器是否收到消息,馬上返回,低延遲但最弱的持久性,數據可能會丟失//1表示確認主服務器收到消息后才返回,持久性稍強,可是如果主服務器死掉,從服務器數據尚未同步,數據可能會丟失//-1表示確認所有服務器都收到數據,完美!props.put("request.required.acks", "-1");//異步生產,批量存入緩存后再發到服務器去props.put("producer.type", "async");//填充配置,初始化生產者producer = new Producer<String, String>(new ProducerConfig(props));}void produce() {int messageNo = 1000;final int COUNT = 2000;while (messageNo < COUNT) {String key = String.valueOf(messageNo);String data = "hello kafka message " + key;String data1 = "{\"c\":0,\"i\":16114765323924126,\"n\":\"http://www.abbo.cn/clicki.html\",\"s\":0,\"sid\":0,\"t\":\"info_url\",\"tid\":0,\"unix\":0,\"viewId\":0}";// 發送消息// producer.send(new KeyedMessage<String, String>(TOPIC,data1));// 消息類型key:valueproducer.send(new KeyedMessage<String, String>(TOPIC, key, data));System.out.println(data);messageNo++;}producer.close();//必須關閉}public static void main(String[] args) {new KafkaProducetest().produce();} }
2.3. 消費端代碼
package com.sinosoft.d;import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties;/*** kafka生產端** @author gblfy* @date 2020-08-07* <p>* Kafka消費者測試*/ public class KafkaConsumertest {private final ConsumerConnector consumer;private KafkaConsumertest() {Properties props = new Properties();//zookeeper 配置props.put("zookeeper.connect", "10.5.6.19:2181");//group 代表一個消費組,加入組里面,消息只能被該組的一個消費者消費//如果所有消費者在一個組內,就是傳統的隊列模式,排隊拿消息//如果所有的消費者都不在同一個組內,就是發布-訂閱模式,消息廣播給所有組//如果介于兩者之間,那么廣播的消息在組內也是要排隊的props.put("group.id", "jd-group");//zk連接超時props.put("zookeeper.session.timeout.ms", "4000");//ZooKeeper的最大超時時間,就是心跳的間隔,若是沒有反映,那么認為已經死了,不易過大props.put("zookeeper.sync.time.ms", "200");//zk follower落后于zk leader的最長時間props.put("auto.commit.interval.ms", "1000");//往zookeeper上寫offset的頻率/** 此配置參數表示當此groupId下的消費者,在ZK中沒有offset值時(比如新的groupId,或者是zk數據被清空),consumer應該從哪個offset開始消費.* largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即從topic的開始位置消費所有消息.* */props.put("auto.offset.reset", "smallest"); //消費最老消息,最新為largest//序列化類props.put("serializer.class", "kafka.serializer.StringEncoder");ConsumerConfig config = new ConsumerConfig(props);consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);}void consume() {// 描述讀取哪個topic,需要幾個線程讀Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(KafkaProducetest.TOPIC, new Integer(1));/* 默認消費時的數據是byte[]形式的,可以傳入String編碼器*/StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());Map<String, List<KafkaStream<String, String>>> consumerMap =consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);//消費數據時每個Topic有多個線程在讀,所以取List第一個流KafkaStream<String, String> stream = consumerMap.get(KafkaProducetest.TOPIC).get(0);ConsumerIterator<String, String> it = stream.iterator();while (it.hasNext()) {System.out.println(it.next().topic() + ":" + it.next().partition() + ":" + it.next().offset() + ":" + it.next().key() + ":" + it.next().message());}}public static void main(String[] args) {new KafkaConsumertest().consume();} }
2.4. 測試

先啟動消費端,在啟動生產端

三、小伙伴疑難解答

有的小伙伴問我,我們的工程師非maven項目該怎么辦呢?
我給這個小伙伴的建議是以下幾點:

3.1. 首先新建一個maven項目
3.2. 把我的依賴和代碼復制過去
3.3. 把我寫的case調試通
3.4. 找到左邊External Libraries

3.5. jar處理

在本地的maven倉庫中把這些以來的jar復制出來,建議先發到一個空的文件夾里面,建議和我一樣

3.6. 打開非maven項目,添加jar

打開你的非maven項目,把這些jar添加進去

3.7. 等待項目編譯
四、 項目jar和引入的jar沖突建議

關于這個問題,很多小伙伴應該也遇到過很正常,首先要保證引入的jar不能和以前的jar產生沖突?
那又該怎么辦?
有的小伙伴說,把以前和本次引入jar沖突的jar刪除唄!要三思

4.1. 定位問題

首先定位引入的jar和項目中的哪一個jar發生沖突

4.2. 分析問題

沖突的原因是什么?
1.引用的對象的包路徑一樣并且對象名也一樣
2.二個沖突的kar項目中加入都存在,但是,代碼中引入jar的優先級問題,引入的jar非自己需要的jar,而自己需要的jar默認不會引入,導致代碼報錯
3.版本問題,jar向下不兼容

五、解決問題
5.1. 解決問題場景

首先解決問題要基于場景:

我引入的jar,只有我自己用,但是,我的代碼中默認引入的jar非我需要的,但是,把項目中以前的jar(沖突的jar)刪除,代碼就不報錯。

5.2. 方案1

這種場景有3種解決方案
方案1(風險可控):如果,可以確保刪除以前的jar不會項目的其他功能造成影響,可以考慮刪除以前舊的jar

5.2. 方案2

方案2(風險不可控):如果,不能評估刪除以前jar的風險范圍,建議換一種方式,完成你的需求

5.2. 方案3

方案3(風險不可控):這種方案在方案2的基礎上做的,找到剛引入沖突jar的源碼,把需要的類和api(方法),單獨復制出來,這樣也可以解決,但是,需要小伙伴具備閱讀源碼的能力和調試的時間。

總結

以上是生活随笔為你收集整理的java 集成 kafka 0.8.2.1 适配jdk1.6的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。