kafka消费者如何读同一生产者消息_Kafka入门之生产者消费者
一、Kafka安裝與使用 ( kafka介紹? ? ?)
1. 下載Kafka
2. 安裝
Kafka是使用scala編寫的運行與jvm虛擬機上的程序,雖然也可以在windows上使用,但是kafka基本上是運行在linux服務(wù)器上,(也可以運行在windows上)因此我們這里也使用linux來開始今天的實戰(zhàn)。首先確保你的機器上安裝了jdk,kafka需要java運行環(huán)境,以前的kafka還需要zookeeper,新版的kafka已經(jīng)內(nèi)置了一個zookeeper環(huán)境,所以我們可以直接使用。說是安裝,如果只需要進行最簡單的嘗試的話我們只需要解壓到任意目錄即可,這里我們將kafka壓縮包解壓到/home目錄
Kafka目錄如下:
1 其中bin是執(zhí)行文件目錄,包括linux下的執(zhí)行文件,以及bin/window目錄下包含windows執(zhí)行的批處理命令;2 config中包含kafka的配置文件;3 libs中是kafka的各種依賴包。
配置Kafka文件(不配置也能在本地機上執(zhí)行,不配置默認主機是localhost )
命令行輸入:?vi server.properties #編輯修改相應(yīng)的參數(shù)
1 broker.id=0
2
3 port=9092#端口號4
5 host.name=192.168.0.11#服務(wù)器IP地址,修改為自己的服務(wù)器IP6
7 log.dirs=/usr/local/kafka/log/kafka #日志存放路徑,上面創(chuàng)建的目錄 (改成自己的目錄)8
9 zookeeper.connect=localhost:2181 #zookeeper地址和端口,單機配置部署,localhost:2181
4. 命令行運行
4.1? 啟動zookeeper
cd進入kafka解壓目錄,輸入
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動zookeeper成功后會看到如下的輸出
4.2 啟動kafka
cd進入kafka解壓目錄,輸入
bin/kafka-server-start.sh config/server.properties
啟動kafka成功后會看到如下的輸出
5. 第一個消息(Linux)
5.1? ?創(chuàng)建一個topic
Kafka通過topic對同一類的數(shù)據(jù)進行管理,同一類的數(shù)據(jù)使用同一個topic可以在處理數(shù)據(jù)時更加的便捷
在kafka解壓目錄打開終端,輸入
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topictest
創(chuàng)建一個名為test的topic
在創(chuàng)建topic后可以通過輸入
bin/kafka-topics.sh --list --zookeeper localhost:2181
來查看已經(jīng)創(chuàng)建的topic
5.2創(chuàng)建一個消息消費者
在kafka解壓目錄打開終端,輸入
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topictest?--from-beginning
可以創(chuàng)建一個用于消費topic為test的消費者
消費者創(chuàng)建完成之后,因為還沒有發(fā)送任何數(shù)據(jù),因此這里在執(zhí)行后沒有打印出任何數(shù)據(jù)
不過別著急,不要關(guān)閉這個終端,打開一個新的終端,接下來我們創(chuàng)建第一個消息生產(chǎn)者
5.3? ? 創(chuàng)建一個消息生產(chǎn)者
在kafka解壓目錄打開一個新的終端,輸入
bin/kafka-console-producer.sh --broker-list localhost:9092 --topictest
在執(zhí)行完畢后會進入的編輯器頁面
在發(fā)送完消息之后,可以回到我們的消息消費者終端中,可以看到,終端中已經(jīng)打印出了我們剛才發(fā)送的消息
第4步可以通過腳本文件進行實現(xiàn):
1) 創(chuàng)建啟動腳本,假設(shè)我們的Kafka在/usr/local/目錄下
cd /usr/local/kafka??#創(chuàng)建啟動腳本
vi kafkastart.sh?#編輯,添加以下代碼
1 #!/bin/sh
2 #創(chuàng)建啟動腳本3 #啟動zookeeper4 /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
5 sleep 3#等3秒后執(zhí)行6
7 #啟動kafka8 /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
2) 創(chuàng)建關(guān)閉腳本
vi kafkastop.sh #編輯,添加以下代碼
1 #!/bin/sh
2 #創(chuàng)建關(guān)閉腳本3 #關(guān)閉kafka4 /usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties &
5 sleep 3#等3秒后執(zhí)行6
7 #關(guān)閉zookeeper8 /usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/config/zookeeper.properties &
3)命令行添加執(zhí)行權(quán)限
1 #添加腳本執(zhí)行權(quán)限2 chmod +x kafkastart.sh3 chmod +x kafkastop.sh
4)命令行執(zhí)行腳本
1 sh /usr/local/kafka/kafkastart.sh #啟動kafka2
3 sh /usr/local/kafka/kafkastop.sh #關(guān)閉kafka
進入Kafka安裝目錄D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右鍵,選擇“打開命令窗口”選項,打開命令行,輸入:
6.1 啟動zookeeper
.\bin\windows\zookeeper-server-start.bat .\config\server.properties
6.2 啟動Kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
注意:注意:不要關(guān)了這個窗口,啟用Kafka前請確保ZooKeeper實例已經(jīng)準(zhǔn)備好并開始運行
6.3 創(chuàng)建主題
進入Kafka安裝目錄D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右鍵,選擇“打開命令窗口”選項,打開命令行,輸入:
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
注意:不要關(guān)了這個窗口
查看主題輸入:
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
6.4 創(chuàng)建生產(chǎn)者
進入Kafka安裝目錄D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右鍵,選擇“打開命令窗口”選項,打開命令行,輸入:
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
注意:不要關(guān)了這個窗口
6.5 創(chuàng)建消費者
進入Kafka安裝目錄D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右鍵,選擇“打開命令窗口”選項,打開命令行,輸入:
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
其中6.1和6.2可以使用批處理文件
1) 創(chuàng)建啟動腳本,假設(shè)我們的Kafka在D:\Kafka\kafka_2.12-0.11.0.0目錄下
切換到?D:\Kafka\kafka_2.12-0.11.0.0目錄下? #創(chuàng)建啟動腳本
用文本編輯器編輯kafkastart.bat #編輯,添加以下代碼
#創(chuàng)建啟動腳本
# ...自己添加
vi kafkastop.sh #編輯,添加以下代碼
#創(chuàng)建關(guān)閉腳本
# 自己添加
雙擊即可運行。
7. 使用Java程序(模擬真實生產(chǎn)環(huán)境;生產(chǎn)者在Kafka服務(wù)器上,消費者在客戶端; 可以推廣到分布式環(huán)境中)
如果是生產(chǎn)者以及消費者都在本機進行測試,則Kafka中配置文件不需要改變;且生產(chǎn)者和消費者都在同一臺機器上。
否則:
7.1 創(chuàng)建Topic
7.2 生產(chǎn)者
eclipse中創(chuàng)建一個名為KafkaProduce的Java Project;接著右擊該項目new一個名為lib的Folder;然后將我們部署的kafka的libs中的所有Jar包拷貝到該項目的lib目錄下;接著右擊該項目,build Path,然后選擇configure build path中的Libraries,接著Add Jars;將本項目lib下的所有Jar包添加進來。
packagecom.zc.kafka.producer.main;importjava.util.Properties;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;/*** Kafka生產(chǎn)者
* 先啟動生產(chǎn)者,發(fā)送消息到broker,這里簡單發(fā)送了10條從0-9的消息,再啟動消費者,控制臺輸出如下:*/
public classSimpleKafkaProducer {public static voidmain(String[] args) {//TODO Auto-generated method stub
Properties props= newProperties();//broker地址
props.put("bootstrap.servers", "192.168.0.11:9092"); // "localhost:9092"//請求時候需要驗證
props.put("acks", "all");//請求失敗時候需要重試
props.put("retries", 0);//內(nèi)存緩存區(qū)大小
props.put("buffer.memory", 33554432);//指定消息key序列化方式
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//指定消息本身的序列化方式
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) { //i < 10//生產(chǎn)一條消息的時間有點長
producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));//System.out.println(i);
}
System.out.println("Message sent successfully");
producer.close();
}
}
7.3 消費者
eclipse中創(chuàng)建一個名為KafkaConsumer的Java Project;接著右擊該項目new一個名為lib的Folder;然后將我們部署的kafka的libs中的所有Jar包拷貝到該項目的lib目錄下;接著右擊該項目,build Path,然后選擇configure build path中的Libraries,接著Add Jars;將本項目lib下的所有Jar包添加進來。
packagecom.zc.kafka.consumer.main;importjava.util.Collections;importjava.util.Properties;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;/*** kafka消費者*/
public classSimpleKafkaConsumer {
@SuppressWarnings({"deprecation", "resource"})public static voidmain(String[] args) {//TODO Auto-generated method stub
Properties props = newProperties();
props.put("bootstrap.servers", "192.168.0.11:9092"); // "localhost:9092"//每個消費者分配獨立的組號
props.put("group.id", "test");//如果value合法,則自動提交偏移量
props.put("enable.auto.commit", "true");//設(shè)置多久一次更新被消費消息的偏移量
props.put("auto.commit.interval.ms", "1000");//設(shè)置會話響應(yīng)的時間,超過這個時間kafka可以選擇放棄消費或者消費下一條消息
props.put("session.timeout.ms", "30000");//
//props.put("auto.offset.reset", "earliest");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test")); //核心函數(shù)1:訂閱topic
System.out.println("Subscribed to topic " + "test");//int i = 0;
while (true) {//System.out.println(i++);//核心函數(shù)2:long poll,一次拉取回來多個消息
/*讀取數(shù)據(jù),讀取超時時間為100ms*/ConsumerRecords records = consumer.poll(100);//System.out.println(records.count());
for (ConsumerRecordrecord : records)//print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
7.4 打Jar包執(zhí)行
1)打Jar包
右擊該項目,選擇Export;之后選擇Runnable JAR file,接著next; 然后在Launch configuration中選擇主類(含main方法),如果沒有,則需要先運行該主類,接著Export destination選擇Jar包的存放位置和名稱,接著Library handling 選擇第二個,Finish;會生成相應(yīng)Jar包。
通過 java -jar XXX.jar 運行該Jar包。
2)執(zhí)行
將生產(chǎn)者與消費者都打成相應(yīng)Jar包;都可以在服務(wù)器(有Kafka環(huán)境)和客戶機(沒有Kafka環(huán)境)上執(zhí)行;并且生產(chǎn)者和消費者可以在不同客戶機上也可以在相同客戶機上執(zhí)行。
就是我們編程以及運行的kafka項目,跟有沒有Kafka環(huán)境是無關(guān)的。
1. 服務(wù)器上先啟動Kafka
2. 服務(wù)器或者客戶機上啟動生產(chǎn)者?java -jar KafkaProducer.jar
3. 服務(wù)器或者客戶機上啟動消費者?java -jar KafkaConsumer.jar
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的kafka消费者如何读同一生产者消息_Kafka入门之生产者消费者的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 不愧是玩无人机的 大疆车载产品矩阵揭秘:
- 下一篇: cocos2d 屏幕適配_Cocos2d