日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

牛客网项目——前置技术(八):Kafka

發(fā)布時(shí)間:2023/12/14 编程问答 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 牛客网项目——前置技术(八):Kafka 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

  • 1. 阻塞隊(duì)列
  • 2. Kafka入門
    • 2.1 基本概念
    • 2.2 基本操作
  • 3. Spring整合kafka
    • 3.1 引入依賴
    • 3.2 application.properties配置
    • 3.3 Kafka測(cè)試

1. 阻塞隊(duì)列

  • 生產(chǎn)者線程
  • 線程需要實(shí)現(xiàn) Runnable 接口
  • 重寫接口的run方法
  • 聲明變量private BlockingQueue<Integer> queue接受傳入的阻塞隊(duì)列
  • 創(chuàng)建有參構(gòu)造器
  • 實(shí)現(xiàn)示例邏輯,生產(chǎn)100個(gè)數(shù)據(jù),put進(jìn)阻塞隊(duì)列,每生產(chǎn)一個(gè)數(shù)據(jù)停頓20毫秒,輸出信息
  • class Producer implements Runnable {private BlockingQueue<Integer> queue;public Producer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {try {for (int i = 0; i < 100; i++) {Thread.sleep(20);queue.put(i);System.out.println(Thread.currentThread().getName() + "生產(chǎn):" + queue.size());}} catch (Exception e) {e.printStackTrace();}}}
  • 消費(fèi)者線程
  • 線程需要實(shí)現(xiàn) Runnable 接口
  • 重寫接口的run方法
  • 聲明變量private BlockingQueue<Integer> queue接受傳入的阻塞隊(duì)列
  • 創(chuàng)建有參構(gòu)造器
  • 實(shí)現(xiàn)示例邏輯,不停的從隊(duì)列中take,每生產(chǎn)一個(gè)數(shù)據(jù)停頓0-1000隨機(jī)毫秒,輸出信息
  • class Consumer implements Runnable {private BlockingQueue<Integer> queue;public Consumer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {Thread.sleep(new Random().nextInt(1000));queue.take();System.out.println(Thread.currentThread().getName() + "消費(fèi):" + queue.size());}} catch (Exception e) {e.printStackTrace();}} }
  • main函數(shù)
  • 實(shí)例化阻塞隊(duì)列BlockingQueue queue = new ArrayBlockingQueue(10);
  • 實(shí)例化一個(gè)生產(chǎn)者線程
  • 實(shí)例化三個(gè)消費(fèi)者線程
  • public static void main(String[] args) {BlockingQueue queue = new ArrayBlockingQueue(10);new Thread(new Producer(queue)).start();new Thread(new Consumer(queue)).start();new Thread(new Consumer(queue)).start();new Thread(new Consumer(queue)).start();}

    2. Kafka入門

    2.1 基本概念

    • Kafka簡(jiǎn)介
    • 早先只是消息隊(duì)列,慢慢擴(kuò)展功能不止消息隊(duì)列
    • 消息系統(tǒng):消息隊(duì)列的功能,核心功能
    • 通過(guò)日志可以分析很多內(nèi)容,用戶追蹤等
    • Kfaka特點(diǎn)
    • 高吞吐量:可以處理TB級(jí)別數(shù)據(jù)
    • 消息持久化:把數(shù)據(jù)永久保存到類似硬盤的某一介質(zhì)。硬盤空間大,價(jià)格低。誤解,讀取硬盤速率高與低取決于對(duì)硬盤使用,對(duì)硬盤的順序讀取效率甚至高于對(duì)內(nèi)存的隨機(jī)讀取,Kafka利用這一點(diǎn)保證能處理海量數(shù)據(jù)
    • 高可靠性:分布式的服務(wù),可以做集群部署,有容錯(cuò)能力
    • 高擴(kuò)展性:集群服務(wù)器不夠用了簡(jiǎn)單的加一個(gè)服務(wù)器就可以
    • Kafka術(shù)語(yǔ)
    • Broker:Kafka的服務(wù)器,集群中每一臺(tái)服務(wù)器成為一個(gè)Broker
    • Zookeeper:管理集群軟件,Kafka內(nèi)置了Zookeeper
    • Topic:消息隊(duì)列實(shí)現(xiàn)的方式兩種,一種點(diǎn)對(duì)點(diǎn),如上面的BlockingQueue,生產(chǎn)者把消息放到一個(gè)隊(duì)列里,消費(fèi)者就從這里面取值,消費(fèi)者可能有多個(gè),如果A消費(fèi)者取到了這個(gè)數(shù)據(jù)這數(shù)據(jù)就出隊(duì)了,每個(gè)數(shù)據(jù)只被一個(gè)消費(fèi)者消費(fèi);還有一種方式發(fā)布訂閱方式,生產(chǎn)者把消息隊(duì)列放到某一個(gè)位置,消息可以被多個(gè)消費(fèi)者讀到。生產(chǎn)者把消息發(fā)布到的位置(空間)就叫Topic
    • Partition:分區(qū),對(duì)主題位置的分區(qū),增強(qiáng)了并發(fā)能力
    • Offsrt:消息在分區(qū)內(nèi)存放的索引
    • Leader Replica:主副本,從分區(qū)讀數(shù)據(jù)時(shí),主副本做響應(yīng)
    • Follower Replica:從副本只是備份,不負(fù)責(zé)響應(yīng)

    2.2 基本操作

    以官網(wǎng)下載的2.12為例。首先更改配置文件中的data地址和log地址。

  • 啟動(dòng)zookeeper
    bin\windows\zookeeper-server-start.bat config\zookeeper.properties

  • 啟動(dòng)kafka
    bin\windows\kafka-server-start.bat config\server.properties

  • 創(chuàng)建主題
    kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

  • 查看某一服務(wù)器端口下所有topic
    kafka-topics.bat --list --bootstrap-server localhost:9092

  • 以生產(chǎn)者身份發(fā)送消息
    kafka-console-producer.bat --broker-list localhost:9092 --topic test

  • 以消費(fèi)者身份讀取消息
    kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

    消息通訊成功

  • 3. Spring整合kafka

    3.1 引入依賴

    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>

    3.2 application.properties配置

  • 服務(wù)器端口
  • 消費(fèi)者分組id
  • 是否自動(dòng)提交消費(fèi)者的偏移量
  • 自動(dòng)提交頻率
  • # KafkaProperties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=community-consumer-group spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=3000

    3.3 Kafka測(cè)試

  • 生產(chǎn)者代碼
  • 注入容器KafkaTemplate
  • 調(diào)用方法發(fā)消息
  • @Component class KafkaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, String content) {kafkaTemplate.send(topic, content);} }
  • 消費(fèi)者代碼
  • 注解,標(biāo)明監(jiān)聽的主題@KafkaListener(topics = {"test"})
  • 封裝消息
  • @Component class KafkaConsumer {@KafkaListener(topics = {"test"})public void handleMessage(ConsumerRecord record) {System.out.println(record.value());} }
  • Test方法
  • @Test public void testKafka() {kafkaProducer.sendMessage("test", "你好");kafkaProducer.sendMessage("test", "在嗎");try {Thread.sleep(1000 * 10);} catch (InterruptedException e) {e.printStackTrace();} } package com.nowcoder.community;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class) @SpringBootTest @ContextConfiguration(classes = CommunityApplication.class) public class KafkaTests {@Autowiredprivate KafkaProducer kafkaProducer;@Testpublic void testKafka() {kafkaProducer.sendMessage("test", "你好");kafkaProducer.sendMessage("test", "在嗎");try {Thread.sleep(1000 * 10);} catch (InterruptedException e) {e.printStackTrace();}}}@Component class KafkaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, String content) {kafkaTemplate.send(topic, content);}}@Component class KafkaConsumer {@KafkaListener(topics = {"test"})public void handleMessage(ConsumerRecord record) {System.out.println(record.value());}}

    總結(jié)

    以上是生活随笔為你收集整理的牛客网项目——前置技术(八):Kafka的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。