日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

消息队列之Kafka

發布時間:2023/12/31 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 消息队列之Kafka 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

背景:Dubbo遠程調用的性能問題

Dubbo調用普遍存在于我們的微服務項目中, 這些Dubbo調用全部是同步的操作, 這里的"同步"指:消費者A調用生產者B之后,A的線程會進入阻塞狀態,等待生產者B運行結束返回之后,A才能運行之后的代碼, Dubbo消費者發送調用后進入阻塞狀態,這個狀態表示該線程仍占用內存資源,但是什么動作都不做, 如果生產者運行耗時較久,消費者就一直等待,如果消費者利用這個時間,那么可以處理更多請求,業務整體效率會提升

實際情況下,Dubbo有些必要的返回值必須等待,但是不必要等待的服務返回值,我們可以不等待去做別的事情, 這種情況下我們就要使用消息隊列

什么是消息隊列

消息隊列(Message Queue)簡稱MQ,也稱:"消息中間件",? 消息隊列是采用"異步"的方式來傳遞數據完成業務操作流程的業務處理方式(要求兩個微服務項目并不需要同時完成請求)

消息隊列的特征

  • 利用異步的特性, 提高服務器的運行效率, 減少因為遠程調用出現的線程等待\阻塞時間

  • 削峰填谷:在并發峰值超過當前系統處理能力時,我們將沒處理的信息保存在消息隊列中,在后面出現的較閑的時間中去處理,直到所有數據依次處理完成,能夠防止在并發峰值時短時間大量請求而導致的系統不穩定

  • 消息隊列的延時:因為是異步執行,請求的發起者并不知道消息何時能處理完,如果業務不能接受這種延遲,就不要使用消息隊列

常見消息隊列軟件

  • Kafka:性能好\功能弱:適合大數據量,高并發的情況,大數據領域使用較多

  • RabbitMQ:功能強\性能一般:適合發送業務需求復雜的消息隊列,java業務中使用較多

  • RocketMQ:阿里的

  • ActiveMQ:前幾年流行的,老項目可能用到

消息隊列的事務處理

當接收消息隊列中信息的模塊運行發送異常時,怎么完成事務的回滾? ?

在處理消息隊列異常時,經常會設置一個"死信隊列",將無法處理的異常信息發送到這個隊列中, 死信隊列沒有任何處理者,通常情況下會有專人周期性的處理死信隊列的消息

什么是Kafka

Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。該項目的目標是為處理實時數據提供一個統一、高吞吐、低延遲的平臺。Kafka最初是由LinkedIn開發,并隨后于2011年初開源。 ?

Kafka Cluster(Kafka集群)

Producer:消息的發送方,也就是消息的來源,Kafka中的生產者

order就是消息的發送方,在Dubbo中order是消費者,這個身份變化了

Consumer:消息的接收方,也是消息的目標,Kafka中的消費者

stock就是消息的接收方,在Dubbo中stock是生產者,這個身份變化了

Topic:話題或主題的意思,消息的收發雙方要依據同一個話題名稱,才不會將信息錯發給別人

Record:消息記錄,就是生產者和消費者傳遞的信息內容,保存在指定的Topic中

Kafka的特征與優勢

Kafka作為消息隊列,它和其他同類產品相比,突出的特點就是性能強大

Kafka將消息隊列中的信息保存在硬盤

Kafka對硬盤的讀取規則進行優化后,效率能夠接近內存

硬盤的優化規則主要依靠"順序讀寫,零拷貝,日志壓縮等技術"

Kafka處理隊列中數據的默認設置:

  • Kafka隊列信息能夠一直向硬盤中保存(理論上沒有大小限制)

  • Kafka默認隊列中的信息保存7天,可以配置這個時間,縮短這個時間可以減少Kafka的磁盤消耗

Kafka的安裝和配置

必須將我們kafka軟件的解壓位置設置在一個根目錄,文件夾名稱盡量短(例如:kafka), 然后路徑不要有空格和中文,?我們要創建一個空目錄用于保存Kafka運行過程中產生的數據,本次創建名稱為data的空目錄,下面進行Kafka啟動前的配置,先到D:\kafka\config下配置有文件zookeeper.properties,找到dataDir屬性修改如下

dataDir=D:/data

修改完畢之后要Ctrl+S進行保存,否則修改無效!!!!

注意D盤和data文件夾名稱,匹配自己電腦的真實路徑和文件夾名稱

還要修改server.properties配置文件

log.dirs=D:/data

啟動kafka

要想啟動Kafka必須先啟動Zookeeper ?

Zookeeper介紹

Linux服務器中安裝的各種軟件,很多都是有動物形象的

如果這些軟件在Linux中需要修改配置信息的話,就需要進入這個軟件,去修改配置,每個軟件都需要單獨修改配置的話,工作量很大

我們使用Zookeeper之后,可以創建一個新的管理各種軟件配置的文件管理系統

Linux系統中各個軟件的配置文件集中到Zookeeper中

實現在Zookeeper中,可以修改服務器系統中的各個軟件配置信息

長此以往,很多軟件就刪除了自己寫配置文件的功能,而直接從Zookeeper中獲取

Kafka就是需要將配置編寫在Zookeeper中的軟件之一

所以要先啟動zookeeper才能啟動kafka

Zookeeper啟動

進入路徑D:\kafka\bin\windows

輸入cmd進入dos命令行

D:\kafka\bin\windows>kafka-server-start.bat ..\..\config\server.properties

kafka啟動

D:\kafka\bin\windows>kafka-server-start.bat ..\..\config\server.properties

Kafka使用演示

啟動的zookeeper和kafka的窗口不要關閉, 我們在csmall項目中編寫一個kafka使用的演示, csmall-cart-webapi模塊, 添加依賴

<!-- Google JSON API --> <!-- 它是java對象和json格式字符串相互轉換的工具類 --> <dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId> </dependency> <!-- Kafka API --> <!-- Spring整合支持Kafka的依賴 --> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>

修改yml文件進行配置 ?

spring:kafka:# 定義kafka的位置bootstrap-servers: localhost:9092# consumer.group-id是一個必須配置的設置,不配置的話啟動時會報錯# 意思是"話題分組",這配置的目的是為了區分不同項目的話題名稱# 本質上,這個分組名稱會在消息發送時,自動前綴在話題名稱前# 例如當前項目發送了一個話題名稱為message的消息,實際傳輸的話題名稱為csmall.messageconsumer:group-id: csmall

?在SpringBoot啟動類中添加啟動Kafka的注解

@SpringBootApplication @EnableDubbo // 啟動對kafka的支持 @EnableKafka // 為了測試kafka收發消息的功能 // 我們利用SpringBoot自帶的任務調用工具,周期性的向kafka發送消息 // 明確下面的注解和kafka沒有必然的支持關系 @EnableScheduling public class CsmallCartWebapiApplication {public static void main(String[] args) {SpringApplication.run(CsmallCartWebapiApplication.class, args);}}

下面我們就可以實現周期性的向kafka發送消息并接收的操作了

編寫消息的發送

cart-webapi包下創建kafka包

包中創建Producer類來發送消息

?生產者

// 這個類中要編寫代碼進行周期運行,所以要交由Spring管理 @Component public class Producer {// 直接從Spring容器中獲取能夠操作Kafka的對象// 這個對象是在添加好依賴和yml配合后,啟動SpringBoot時自動添加到Spring容器的// KafkaTemplate<[話題名稱的類型],[消息的類型]>@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;int i=1;// 實現每隔10秒鐘(10000毫秒)運行一次的方法@Scheduled(fixedRate = 10000)public void sendMessage(){// 實例化Cart對象并賦值,嘗試發送給KafkaCart cart=new Cart();cart.setId(i++);cart.setCommodityCode("PC100");cart.setUserId("UU100");cart.setPrice(RandomUtils.nextInt(90)+10);cart.setCount(RandomUtils.nextInt(10)+1);// {"id":"1","userId":"UU100",.....}// 利用gson工具,將cart對象轉換為json格式字符串方便發送Gson gson=new Gson();String json=gson.toJson(cart);System.out.println("要發送的消息為:"+json);// 執行發送kafkaTemplate.send("myCart",json);}}

kafka包中創建一個叫Consumer的類來接收消息, 接收消息的類可以是本模塊的類,也可以是其它模塊的類,編寫的代碼是完全一致

// 要接收kafka的消息需要講接收消息的對象保存到Spring容器中 // 因為KafkaTemplate是spring在管理的 @Component public class Consumer {// SpringKafka接收消息依靠了框架提供的"監聽機制"// 框架中有一個線程,一直實時關注kafka的消息情況// 如果我們指定的話題名稱(myCart)接收了消息,那么這條線程就會自動調用下面的方法@KafkaListener(topics = "myCart")// 下面定義的方法就是接收到消息后運行的方法// 這個方法有參數,參數類型是固定的,參數的值就是監聽器接收到的消息內容public void received(ConsumerRecord<String,String> record){// 參數類型必須是ConsumerRecord// 泛型<[話題名稱的類型],[消息的類型]>// 我們可以將record視為從kafka中接收到的消息對象String json=record.value();// json可能的值: {"id":2,"commodityCode":"PC100","price":74,"count":8,"userId":"UU100"}Gson gson=new Gson();// gson也可以將json字符串轉換為java對象Cart cart=gson.fromJson(json,Cart.class);System.out.println("接收到了消息:"+cart);}}

總結

以上是生活随笔為你收集整理的消息队列之Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。