生活随笔
收集整理的這篇文章主要介紹了
牛客网项目——前置技术(八):Kafka
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
文章目錄
- 1. 阻塞隊(duì)列
- 2. Kafka入門
- 3. Spring整合kafka
- 3.1 引入依賴
- 3.2 application.properties配置
- 3.3 Kafka測(cè)試
1. 阻塞隊(duì)列
生產(chǎn)者線程 線程需要實(shí)現(xiàn) Runnable 接口重寫接口的run方法聲明變量private BlockingQueue<Integer> queue接受傳入的阻塞隊(duì)列創(chuàng)建有參構(gòu)造器實(shí)現(xiàn)示例邏輯,生產(chǎn)100個(gè)數(shù)據(jù),put進(jìn)阻塞隊(duì)列,每生產(chǎn)一個(gè)數(shù)據(jù)停頓20毫秒,輸出信息
class Producer implements Runnable {private BlockingQueue<Integer> queue
;public Producer(BlockingQueue<Integer> queue
) {this.queue
= queue
;}@Overridepublic void run() {try {for (int i
= 0; i
< 100; i
++) {Thread.sleep(20);queue
.put(i
);System.out
.println(Thread.currentThread().getName() + "生產(chǎn):" + queue
.size());}} catch (Exception e
) {e
.printStackTrace();}}}
消費(fèi)者線程 線程需要實(shí)現(xiàn) Runnable 接口重寫接口的run方法聲明變量private BlockingQueue<Integer> queue接受傳入的阻塞隊(duì)列創(chuàng)建有參構(gòu)造器實(shí)現(xiàn)示例邏輯,不停的從隊(duì)列中take,每生產(chǎn)一個(gè)數(shù)據(jù)停頓0-1000隨機(jī)毫秒,輸出信息
class Consumer implements Runnable {private BlockingQueue<Integer> queue
;public Consumer(BlockingQueue<Integer> queue
) {this.queue
= queue
;}@Overridepublic void run() {try {while (true) {Thread.sleep(new Random().nextInt(1000));queue
.take();System.out
.println(Thread.currentThread().getName() + "消費(fèi):" + queue
.size());}} catch (Exception e
) {e
.printStackTrace();}}
}
main函數(shù) 實(shí)例化阻塞隊(duì)列BlockingQueue queue = new ArrayBlockingQueue(10);實(shí)例化一個(gè)生產(chǎn)者線程實(shí)例化三個(gè)消費(fèi)者線程
public static void main(String[] args
) {BlockingQueue queue
= new ArrayBlockingQueue(10);new Thread(new Producer(queue
)).start();new Thread(new Consumer(queue
)).start();new Thread(new Consumer(queue
)).start();new Thread(new Consumer(queue
)).start();}
2. Kafka入門
2.1 基本概念
- Kafka簡(jiǎn)介
- 早先只是消息隊(duì)列,慢慢擴(kuò)展功能不止消息隊(duì)列
- 消息系統(tǒng):消息隊(duì)列的功能,核心功能
- 通過(guò)日志可以分析很多內(nèi)容,用戶追蹤等
- Kfaka特點(diǎn)
- 高吞吐量:可以處理TB級(jí)別數(shù)據(jù)
- 消息持久化:把數(shù)據(jù)永久保存到類似硬盤的某一介質(zhì)。硬盤空間大,價(jià)格低。誤解,讀取硬盤速率高與低取決于對(duì)硬盤使用,對(duì)硬盤的順序讀取效率甚至高于對(duì)內(nèi)存的隨機(jī)讀取,Kafka利用這一點(diǎn)保證能處理海量數(shù)據(jù)
- 高可靠性:分布式的服務(wù),可以做集群部署,有容錯(cuò)能力
- 高擴(kuò)展性:集群服務(wù)器不夠用了簡(jiǎn)單的加一個(gè)服務(wù)器就可以
- Kafka術(shù)語(yǔ)
- Broker:Kafka的服務(wù)器,集群中每一臺(tái)服務(wù)器成為一個(gè)Broker
- Zookeeper:管理集群軟件,Kafka內(nèi)置了Zookeeper
- Topic:消息隊(duì)列實(shí)現(xiàn)的方式兩種,一種點(diǎn)對(duì)點(diǎn),如上面的BlockingQueue,生產(chǎn)者把消息放到一個(gè)隊(duì)列里,消費(fèi)者就從這里面取值,消費(fèi)者可能有多個(gè),如果A消費(fèi)者取到了這個(gè)數(shù)據(jù)這數(shù)據(jù)就出隊(duì)了,每個(gè)數(shù)據(jù)只被一個(gè)消費(fèi)者消費(fèi);還有一種方式發(fā)布訂閱方式,生產(chǎn)者把消息隊(duì)列放到某一個(gè)位置,消息可以被多個(gè)消費(fèi)者讀到。生產(chǎn)者把消息發(fā)布到的位置(空間)就叫Topic
- Partition:分區(qū),對(duì)主題位置的分區(qū),增強(qiáng)了并發(fā)能力
- Offsrt:消息在分區(qū)內(nèi)存放的索引
- Leader Replica:主副本,從分區(qū)讀數(shù)據(jù)時(shí),主副本做響應(yīng)
- Follower Replica:從副本只是備份,不負(fù)責(zé)響應(yīng)
2.2 基本操作
以官網(wǎng)下載的2.12為例。首先更改配置文件中的data地址和log地址。
啟動(dòng)zookeeper
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
啟動(dòng)kafka
bin\windows\kafka-server-start.bat config\server.properties
創(chuàng)建主題
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
查看某一服務(wù)器端口下所有topic
kafka-topics.bat --list --bootstrap-server localhost:9092
以生產(chǎn)者身份發(fā)送消息
kafka-console-producer.bat --broker-list localhost:9092 --topic test
以消費(fèi)者身份讀取消息
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
消息通訊成功
3. Spring整合kafka
3.1 引入依賴
<dependency><groupId>org.springframework.kafka
</groupId><artifactId>spring-kafka
</artifactId>
</dependency>
3.2 application.properties配置
服務(wù)器端口消費(fèi)者分組id是否自動(dòng)提交消費(fèi)者的偏移量自動(dòng)提交頻率
# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000
3.3 Kafka測(cè)試
生產(chǎn)者代碼 注入容器KafkaTemplate調(diào)用方法發(fā)消息
@Component
class KafkaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate
;public void sendMessage(String topic
, String content
) {kafkaTemplate
.send(topic
, content
);}
}
消費(fèi)者代碼 注解,標(biāo)明監(jiān)聽的主題@KafkaListener(topics = {"test"})封裝消息
@Component
class KafkaConsumer {@KafkaListener(topics
= {"test"})public void handleMessage(ConsumerRecord record
) {System.out
.println(record
.value());}
}
Test方法
@Test
public void testKafka() {kafkaProducer
.sendMessage("test", "你好");kafkaProducer
.sendMessage("test", "在嗎");try {Thread.sleep(1000 * 10);} catch (InterruptedException e
) {e
.printStackTrace();}
}
package com.nowcoder.community;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes
= CommunityApplication.class)
public class KafkaTests {@Autowiredprivate KafkaProducer kafkaProducer
;@Testpublic void testKafka() {kafkaProducer
.sendMessage("test", "你好");kafkaProducer
.sendMessage("test", "在嗎");try {Thread.sleep(1000 * 10);} catch (InterruptedException e
) {e
.printStackTrace();}}}@Component
class KafkaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate
;public void sendMessage(String topic
, String content
) {kafkaTemplate
.send(topic
, content
);}}@Component
class KafkaConsumer {@KafkaListener(topics
= {"test"})public void handleMessage(ConsumerRecord record
) {System.out
.println(record
.value());}}
總結(jié)
以上是生活随笔為你收集整理的牛客网项目——前置技术(八):Kafka的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。