日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka消费者如何读同一生产者消息_Kafka入门之生产者消费者

發(fā)布時間:2023/12/15 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka消费者如何读同一生产者消息_Kafka入门之生产者消费者 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、Kafka安裝與使用 ( kafka介紹? ? ?)

1. 下載Kafka

2. 安裝

Kafka是使用scala編寫的運行與jvm虛擬機上的程序,雖然也可以在windows上使用,但是kafka基本上是運行在linux服務(wù)器上,(也可以運行在windows上)因此我們這里也使用linux來開始今天的實戰(zhàn)。首先確保你的機器上安裝了jdk,kafka需要java運行環(huán)境,以前的kafka還需要zookeeper,新版的kafka已經(jīng)內(nèi)置了一個zookeeper環(huán)境,所以我們可以直接使用。說是安裝,如果只需要進行最簡單的嘗試的話我們只需要解壓到任意目錄即可,這里我們將kafka壓縮包解壓到/home目錄

Kafka目錄如下:

1 其中bin是執(zhí)行文件目錄,包括linux下的執(zhí)行文件,以及bin/window目錄下包含windows執(zhí)行的批處理命令;2 config中包含kafka的配置文件;3 libs中是kafka的各種依賴包。

配置Kafka文件(不配置也能在本地機上執(zhí)行,不配置默認主機是localhost )

命令行輸入:?vi server.properties #編輯修改相應(yīng)的參數(shù)

1 broker.id=0

2

3 port=9092#端口號4

5 host.name=192.168.0.11#服務(wù)器IP地址,修改為自己的服務(wù)器IP6

7 log.dirs=/usr/local/kafka/log/kafka #日志存放路徑,上面創(chuàng)建的目錄 (改成自己的目錄)8

9 zookeeper.connect=localhost:2181 #zookeeper地址和端口,單機配置部署,localhost:2181

4. 命令行運行

4.1? 啟動zookeeper

cd進入kafka解壓目錄,輸入

bin/zookeeper-server-start.sh config/zookeeper.properties

啟動zookeeper成功后會看到如下的輸出

4.2 啟動kafka

cd進入kafka解壓目錄,輸入

bin/kafka-server-start.sh config/server.properties

啟動kafka成功后會看到如下的輸出

5. 第一個消息(Linux)

5.1? ?創(chuàng)建一個topic

Kafka通過topic對同一類的數(shù)據(jù)進行管理,同一類的數(shù)據(jù)使用同一個topic可以在處理數(shù)據(jù)時更加的便捷

在kafka解壓目錄打開終端,輸入

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topictest

創(chuàng)建一個名為test的topic

在創(chuàng)建topic后可以通過輸入

bin/kafka-topics.sh --list --zookeeper localhost:2181

來查看已經(jīng)創(chuàng)建的topic

5.2創(chuàng)建一個消息消費者

在kafka解壓目錄打開終端,輸入

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topictest?--from-beginning

可以創(chuàng)建一個用于消費topic為test的消費者

消費者創(chuàng)建完成之后,因為還沒有發(fā)送任何數(shù)據(jù),因此這里在執(zhí)行后沒有打印出任何數(shù)據(jù)

不過別著急,不要關(guān)閉這個終端,打開一個新的終端,接下來我們創(chuàng)建第一個消息生產(chǎn)者

5.3? ? 創(chuàng)建一個消息生產(chǎn)者

在kafka解壓目錄打開一個新的終端,輸入

bin/kafka-console-producer.sh --broker-list localhost:9092 --topictest

在執(zhí)行完畢后會進入的編輯器頁面

在發(fā)送完消息之后,可以回到我們的消息消費者終端中,可以看到,終端中已經(jīng)打印出了我們剛才發(fā)送的消息

第4步可以通過腳本文件進行實現(xiàn):

1) 創(chuàng)建啟動腳本,假設(shè)我們的Kafka在/usr/local/目錄下

cd /usr/local/kafka??#創(chuàng)建啟動腳本

vi kafkastart.sh?#編輯,添加以下代碼

1 #!/bin/sh

2 #創(chuàng)建啟動腳本3 #啟動zookeeper4 /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &

5 sleep 3#等3秒后執(zhí)行6

7 #啟動kafka8 /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

2) 創(chuàng)建關(guān)閉腳本

vi kafkastop.sh #編輯,添加以下代碼

1 #!/bin/sh

2 #創(chuàng)建關(guān)閉腳本3 #關(guān)閉kafka4 /usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties &

5 sleep 3#等3秒后執(zhí)行6

7 #關(guān)閉zookeeper8 /usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/config/zookeeper.properties &

3)命令行添加執(zhí)行權(quán)限

1 #添加腳本執(zhí)行權(quán)限2 chmod +x kafkastart.sh3 chmod +x kafkastop.sh

4)命令行執(zhí)行腳本

1 sh /usr/local/kafka/kafkastart.sh #啟動kafka2

3 sh /usr/local/kafka/kafkastop.sh #關(guān)閉kafka

進入Kafka安裝目錄D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右鍵,選擇“打開命令窗口”選項,打開命令行,輸入:

6.1 啟動zookeeper

.\bin\windows\zookeeper-server-start.bat .\config\server.properties

6.2 啟動Kafka

.\bin\windows\kafka-server-start.bat .\config\server.properties

注意:注意:不要關(guān)了這個窗口,啟用Kafka前請確保ZooKeeper實例已經(jīng)準(zhǔn)備好并開始運行

6.3 創(chuàng)建主題

進入Kafka安裝目錄D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右鍵,選擇“打開命令窗口”選項,打開命令行,輸入:

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

注意:不要關(guān)了這個窗口

查看主題輸入:

.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

6.4 創(chuàng)建生產(chǎn)者

進入Kafka安裝目錄D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右鍵,選擇“打開命令窗口”選項,打開命令行,輸入:

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

注意:不要關(guān)了這個窗口

6.5 創(chuàng)建消費者

進入Kafka安裝目錄D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右鍵,選擇“打開命令窗口”選項,打開命令行,輸入:

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

其中6.1和6.2可以使用批處理文件

1) 創(chuàng)建啟動腳本,假設(shè)我們的Kafka在D:\Kafka\kafka_2.12-0.11.0.0目錄下

切換到?D:\Kafka\kafka_2.12-0.11.0.0目錄下? #創(chuàng)建啟動腳本

用文本編輯器編輯kafkastart.bat #編輯,添加以下代碼

#創(chuàng)建啟動腳本

# ...自己添加

vi kafkastop.sh #編輯,添加以下代碼

#創(chuàng)建關(guān)閉腳本

# 自己添加

雙擊即可運行。

7. 使用Java程序(模擬真實生產(chǎn)環(huán)境;生產(chǎn)者在Kafka服務(wù)器上,消費者在客戶端; 可以推廣到分布式環(huán)境中)

如果是生產(chǎn)者以及消費者都在本機進行測試,則Kafka中配置文件不需要改變;且生產(chǎn)者和消費者都在同一臺機器上。

否則:

7.1 創(chuàng)建Topic

7.2 生產(chǎn)者

eclipse中創(chuàng)建一個名為KafkaProduce的Java Project;接著右擊該項目new一個名為lib的Folder;然后將我們部署的kafka的libs中的所有Jar包拷貝到該項目的lib目錄下;接著右擊該項目,build Path,然后選擇configure build path中的Libraries,接著Add Jars;將本項目lib下的所有Jar包添加進來。

packagecom.zc.kafka.producer.main;importjava.util.Properties;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;/*** Kafka生產(chǎn)者

* 先啟動生產(chǎn)者,發(fā)送消息到broker,這里簡單發(fā)送了10條從0-9的消息,再啟動消費者,控制臺輸出如下:*/

public classSimpleKafkaProducer {public static voidmain(String[] args) {//TODO Auto-generated method stub

Properties props= newProperties();//broker地址

props.put("bootstrap.servers", "192.168.0.11:9092"); // "localhost:9092"//請求時候需要驗證

props.put("acks", "all");//請求失敗時候需要重試

props.put("retries", 0);//內(nèi)存緩存區(qū)大小

props.put("buffer.memory", 33554432);//指定消息key序列化方式

props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//指定消息本身的序列化方式

props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

Producer producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) { //i < 10//生產(chǎn)一條消息的時間有點長

producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));//System.out.println(i);

}

System.out.println("Message sent successfully");

producer.close();

}

}

7.3 消費者

eclipse中創(chuàng)建一個名為KafkaConsumer的Java Project;接著右擊該項目new一個名為lib的Folder;然后將我們部署的kafka的libs中的所有Jar包拷貝到該項目的lib目錄下;接著右擊該項目,build Path,然后選擇configure build path中的Libraries,接著Add Jars;將本項目lib下的所有Jar包添加進來。

packagecom.zc.kafka.consumer.main;importjava.util.Collections;importjava.util.Properties;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;/*** kafka消費者*/

public classSimpleKafkaConsumer {

@SuppressWarnings({"deprecation", "resource"})public static voidmain(String[] args) {//TODO Auto-generated method stub

Properties props = newProperties();

props.put("bootstrap.servers", "192.168.0.11:9092"); // "localhost:9092"//每個消費者分配獨立的組號

props.put("group.id", "test");//如果value合法,則自動提交偏移量

props.put("enable.auto.commit", "true");//設(shè)置多久一次更新被消費消息的偏移量

props.put("auto.commit.interval.ms", "1000");//設(shè)置會話響應(yīng)的時間,超過這個時間kafka可以選擇放棄消費或者消費下一條消息

props.put("session.timeout.ms", "30000");//

//props.put("auto.offset.reset", "earliest");

props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer consumer = new KafkaConsumer<>(props);

consumer.subscribe(Collections.singletonList("test")); //核心函數(shù)1:訂閱topic

System.out.println("Subscribed to topic " + "test");//int i = 0;

while (true) {//System.out.println(i++);//核心函數(shù)2:long poll,一次拉取回來多個消息

/*讀取數(shù)據(jù),讀取超時時間為100ms*/ConsumerRecords records = consumer.poll(100);//System.out.println(records.count());

for (ConsumerRecordrecord : records)//print the offset,key and value for the consumer records.

System.out.printf("offset = %d, key = %s, value = %s\n",

record.offset(), record.key(), record.value());

}

}

}

7.4 打Jar包執(zhí)行

1)打Jar包

右擊該項目,選擇Export;之后選擇Runnable JAR file,接著next; 然后在Launch configuration中選擇主類(含main方法),如果沒有,則需要先運行該主類,接著Export destination選擇Jar包的存放位置和名稱,接著Library handling 選擇第二個,Finish;會生成相應(yīng)Jar包。

通過 java -jar XXX.jar 運行該Jar包。

2)執(zhí)行

將生產(chǎn)者與消費者都打成相應(yīng)Jar包;都可以在服務(wù)器(有Kafka環(huán)境)和客戶機(沒有Kafka環(huán)境)上執(zhí)行;并且生產(chǎn)者和消費者可以在不同客戶機上也可以在相同客戶機上執(zhí)行。

就是我們編程以及運行的kafka項目,跟有沒有Kafka環(huán)境是無關(guān)的。

1. 服務(wù)器上先啟動Kafka

2. 服務(wù)器或者客戶機上啟動生產(chǎn)者?java -jar KafkaProducer.jar

3. 服務(wù)器或者客戶機上啟動消費者?java -jar KafkaConsumer.jar

創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎

總結(jié)

以上是生活随笔為你收集整理的kafka消费者如何读同一生产者消息_Kafka入门之生产者消费者的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。