日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring Cloud构建微服务架构(七)消息总线(续:Kafka)

發(fā)布時間:2025/3/21 javascript 48 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring Cloud构建微服务架构(七)消息总线(续:Kafka) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Spring Cloud Bus除了支持RabbitMQ的自動化配置之外,還支持現(xiàn)在被廣泛應(yīng)用的Kafka。在本文中,我們將搭建一個Kafka的本地環(huán)境,并通過它來嘗試使用Spring Cloud Bus對Kafka的支持,實(shí)現(xiàn)消息總線的功能。由于本文會以之前Rabbit的實(shí)現(xiàn)作為基礎(chǔ)來修改,所以先閱讀《Spring Cloud構(gòu)建微服務(wù)架構(gòu)(七)消息總線》有助于理解本文。

Kafka簡介

Kafka是一個由LinkedIn開發(fā)的分布式消息系統(tǒng),它于2011年初開源,現(xiàn)在由著名的Apache基金會維護(hù)與開發(fā)。Kafka使用Scala實(shí)現(xiàn),被用作LinkedIn的活動流和運(yùn)營數(shù)據(jù)處理的管道,現(xiàn)在也被諸多互聯(lián)網(wǎng)企業(yè)廣泛地用作為數(shù)據(jù)流管道和消息系統(tǒng)。

Kafka是基于消息發(fā)布/訂閱模式實(shí)現(xiàn)的消息系統(tǒng),其主要設(shè)計目標(biāo)如下:

  • 消息持久化:以時間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數(shù)據(jù)也能保證常數(shù)時間復(fù)雜度的訪問性能。
  • 高吞吐:在廉價的商用機(jī)器上也能支持單機(jī)每秒100K條以上的吞吐量
  • 分布式:支持消息分區(qū)以及分布式消費(fèi),并保證分區(qū)內(nèi)的消息順序
  • 跨平臺:支持不同技術(shù)平臺的客戶端(如:Java、PHP、Python等)
  • 實(shí)時性:支持實(shí)時數(shù)據(jù)處理和離線數(shù)據(jù)處理
  • 伸縮性:支持水平擴(kuò)展

Kafka中涉及的一些基本概念:

  • Broker:Kafka集群包含一個或多個服務(wù)器,這些服務(wù)器被稱為Broker。
  • Topic:邏輯上同Rabbit的Queue隊列相似,每條發(fā)布到Kafka集群的消息都必須有一個Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于一個或多個Broker上,但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)
  • Partition:Partition是物理概念上的分區(qū),為了提供系統(tǒng)吞吐率,在物理上每個Topic會分成一個或多個Partition,每個Partition對應(yīng)一個文件夾(存儲對應(yīng)分區(qū)的消息內(nèi)容和索引文件)。
  • Producer:消息生產(chǎn)者,負(fù)責(zé)生產(chǎn)消息并發(fā)送到Kafka Broker。
  • Consumer:消息消費(fèi)者,向Kafka Broker讀取消息并處理的客戶端。
  • Consumer Group:每個Consumer屬于一個特定的組(可為每個Consumer指定屬于一個組,若不指定則屬于默認(rèn)組),組可以用來實(shí)現(xiàn)一條消息被組內(nèi)多個成員消費(fèi)等功能。

快速入門

在對Kafka有了一些基本了解之后,下面我們來嘗試構(gòu)建一個Kafka服務(wù)端,并體驗(yàn)一下基于Kafka的消息生產(chǎn)與消費(fèi)。

環(huán)境安裝

首先,我們需要從官網(wǎng)上下載安裝介質(zhì)。下載地址為:http://kafka.apache.org/downloads.html。本例中采用的版本為:Kafka-0.10.0.1

在解壓Kafka的安裝包之后,可以看到其目錄結(jié)構(gòu)如下:

kafka+-bin +-windows+-config+-libs+-logs+-site-docs

由于Kafka的設(shè)計中依賴了ZooKeeper,所以我們可以在bin和config目錄中除了看到Kafka相關(guān)的內(nèi)容之外,還有ZooKeeper相關(guān)的內(nèi)容。其中bin目錄存放了Kafka和ZooKeeper的命令行工具,bin根目錄下是適用于Linux/Unix的shell,而bin/windows下的則是適用于windows下的bat。我們可以根據(jù)實(shí)際的系統(tǒng)來設(shè)置環(huán)境變量,以方便后續(xù)的使用和操作。而在config目錄中,則是用來存放了關(guān)于Kafka與ZooKeeper的配置信息。

啟動測試

下面我們來嘗試啟動ZooKeeper和Kafka來進(jìn)行消息的生產(chǎn)和消費(fèi)。示例中所有的命令均已配置了Kafka的環(huán)境變量為例。

  • 啟動ZooKeeper,執(zhí)行命令:zookeeper-server-start config/zookeeper.properties,該命令需要指定zookeeper的配置文件位置才能正確啟動,kafka的壓縮包中包含了其默認(rèn)配置,開發(fā)與測試環(huán)境不需要修改。
[2016-09-28 08:05:34,849] INFO Reading configuration from: config\zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2016-09-28 08:05:34,850] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager) [2016-09-28 08:05:34,851] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager) [2016-09-28 08:05:34,851] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager) [2016-09-28 08:05:34,852] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain) [2016-09-28 08:05:34,868] INFO Reading configuration from: config\zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2016-09-28 08:05:34,869] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain) ... [2016-09-28 08:05:34,940] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

從控制臺信息中,我們可以看到ZooKeeper從指定的config/zookeeper.properties配置文件中讀取信息并綁定2181端口啟動服務(wù)。有時候啟動失敗,可查看一下端口是否被占用,可以殺掉占用進(jìn)程或通過修改config/zookeeper.properties配置文件中的clientPort內(nèi)容以綁定其他端口號來啟動ZooKeeper。

  • 啟動Kafka,執(zhí)行命令:kafka-server-start config/server.properties,該命令也需要指定Kafka配置文件的正確位置,如上命令中指向了解壓目錄包含的默認(rèn)配置。若在測試時,使用外部集中環(huán)境的ZooKeeper的話,我們可以在該配置文件中通過zookeeper.connect參數(shù)來設(shè)置ZooKeeper的地址和端口,它默認(rèn)會連接本地2181端口的ZooKeeper;如果需要設(shè)置多個ZooKeeper節(jié)點(diǎn),可以為這個參數(shù)配置多個ZooKeeper地址,并用逗號分割。比如:zookeeper.connect=127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002。

  • 創(chuàng)建Topic,執(zhí)行命令:kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test,通過該命令,創(chuàng)建一個名為“test”的Topic,該Topic包含一個分區(qū)一個Replica。在創(chuàng)建完成后,可以使用kafka-topics --list --zookeeper localhost:2181命令來查看當(dāng)前的Topic。

另外,如果我們不使用kafka-topics命令來手工創(chuàng)建,直接進(jìn)行下面的內(nèi)容進(jìn)行消息創(chuàng)建時也會自動創(chuàng)建Topics來使用。

  • 創(chuàng)建消息生產(chǎn)者,執(zhí)行命令:kafka-console-producer --broker-list localhost:9092 --topic test。kafka-console-producer命令可以啟動Kafka基于命令行的消息生產(chǎn)客戶端,啟動后可以直接在控制臺中輸入消息來發(fā)送,控制臺中的每一行數(shù)據(jù)都會被視為一條消息來發(fā)送。我們可以嘗試輸入幾行消息,由于此時并沒有消費(fèi)者,所以這些輸入的消息都會被阻塞在名為test的Topics中,直到有消費(fèi)者將其消費(fèi)掉位置。

  • 創(chuàng)建消息消費(fèi)者,執(zhí)行命令:kafka-console-consumer --zookeeper localhost:2181 --topic test --from-beginning。kafka-console-consumer命令啟動的是Kafka基于命令行的消息消費(fèi)客戶端,在啟動之后,我們馬上可以在控制臺中看到輸出了之前我們在消息生產(chǎn)客戶端中發(fā)送的消息。我們可以再次打開之前的消息生產(chǎn)客戶端來發(fā)送消息,并觀察消費(fèi)者這邊對消息的輸出來體驗(yàn)Kafka對消息的基礎(chǔ)處理。

整合Spring Cloud Bus

在上一篇使用Rabbit實(shí)現(xiàn)消息總線的案例中,我們已經(jīng)通過引入spring-cloud-starter-bus-amqp模塊,完成了使用RabbitMQ來實(shí)現(xiàn)的消息總線。若我們要使用Kafka來實(shí)現(xiàn)消息總線時,只需要把spring-cloud-starter-bus-amqp替換成spring-cloud-starter-bus-kafka模塊,在pom.xml的dependenies節(jié)點(diǎn)中進(jìn)行修改,具體如下:

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-kafka</artifactId> </dependency>

如果我們在啟動Kafka時均采用了默認(rèn)配置,那么我們不需要再做任何其他配置就能在本地實(shí)現(xiàn)從RabbitMQ到Kafka的切換。我們可以嘗試把剛剛搭建的ZooKeeper、Kafka啟動起來,并將修改為spring-cloud-starter-bus-kafka模塊的config-server和config-client啟動起來。

在config-server啟動時,我們可以在控制臺中看到如下輸出:

2016-09-28 22:11:29.627 INFO 15144 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder : Using kafka topic for outbound: springCloudBus 2016-09-28 22:11:29.642 INFO 15144 --- [-localhost:2181] org.I0Itec.zkclient.ZkEventThread : Starting ZkClient event thread. ... 016-09-28 22:11:30.290 INFO 15144 --- [ main] o.s.i.kafka.support.ProducerFactoryBean : Using producer properties => {bootstrap.servers=localhost:9092, linger.ms=0, acks=1, compression.type=none, batch.size=16384} 2016-09-28 22:11:30.298 INFO 15144 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: ... 2016-09-28 22:11:30.322 INFO 15144 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : Adding {message-handler:outbound.springCloudBus} as a subscriber to the 'springCloudBusOutput' channel 2016-09-28 22:11:30.322 INFO 15144 --- [ main] o.s.integration.channel.DirectChannel : Channel 'config-server:7001.springCloudBusOutput' has 1 subscriber(s). 2016-09-28 22:11:30.322 INFO 15144 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : started outbound.springCloudBus ... 2016-09-28 22:11:31.465 INFO 15144 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@4178cb34 2016-09-28 22:11:31.467 INFO 15144 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : Adding {message-handler:inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282a0d5a30b} as a subscriber to the 'bridge.springCloudBus' channel 2016-09-28 22:11:31.467 INFO 15144 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : started inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282a0d5a30b

從控制臺的輸出內(nèi)容,我們可以看到config-server連接到了Kafka中,并使用了名為springCloudBus的Topic。

此時,我們可以使用kafka-topics --list --zookeeper localhost:2181命令來查看當(dāng)前Kafka中的Topic,若已成功啟動了config-server并配置正確,我們就可以在Kafka中看到已經(jīng)多了一個名為springCloudBus的Topic。

我們再啟動配置了spring-cloud-starter-bus-kafka模塊的config-client,可以看到控制臺中輸出如下內(nèi)容:

2016-09-28 22:43:55.067 INFO 6136 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder : Using kafka topic for outbound: springCloudBus 2016-09-28 22:43:55.078 INFO 6136 --- [-localhost:2181] org.I0Itec.zkclient.ZkEventThread : Starting ZkClient event thread. ... 2016-09-28 22:50:38.584 INFO 828 --- [ main] o.s.i.kafka.support.ProducerFactoryBean : Using producer properties => {bootstrap.servers=localhost:9092, linger.ms=0, acks=1, compression.type=none, batch.size=16384} 2016-09-28 22:50:38.592 INFO 828 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: ... 2016-09-28 22:50:38.615 INFO 828 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : Adding {message-handler:outbound.springCloudBus} as a subscriber to the 'springCloudBusOutput' channel 2016-09-28 22:50:38.616 INFO 828 --- [ main] o.s.integration.channel.DirectChannel : Channel 'didispace:7002.springCloudBusOutput' has 1 subscriber(s). 2016-09-28 22:50:38.616 INFO 828 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : started outbound.springCloudBus ... 2016-09-28 22:50:39.162 INFO 828 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@60cf855e 2016-09-28 22:50:39.162 INFO 828 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : Adding {message-handler:inbound.springCloudBus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee216} as a subscriber to the 'bridge.springCloudBus' channel 2016-09-28 22:50:39.163 INFO 828 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : started inbound.springCloudBus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee216

可以看到,config-client啟動時輸出了類似的內(nèi)容,他們都訂閱了名為springCloudBus的Topic。

在啟動了config-server和config-client之后,為了更明顯地觀察消息總線刷新配置的效果,我們可以在本地啟動多個不同端口的config-client。此時,我們的config-server以及多個config-client都已經(jīng)連接到了由Kafka實(shí)現(xiàn)的消息總線上。我們可以先訪問各個config-client上的/from請求,查看他獲取到的配置內(nèi)容。然后,修改Git中對應(yīng)的參數(shù)內(nèi)容,再訪問各個config-client上的/from請求,可以看到配置內(nèi)容并沒有改變。最后,我們向config-server發(fā)送POST請求:/bus/refresh,此時我們再去訪問各個config-client上的/from請求,就能獲得到最新的配置信息,各客戶端上的配置都已經(jīng)加載為最新的Git配置內(nèi)容。

從config-client的控制臺中,我們可以看到如下內(nèi)容:

2016-09-29 08:20:34.361 INFO 21256 --- [ kafka-binder-1] o.s.cloud.bus.event.RefreshListener : Received remote refresh request. Keys refreshed [from]

RefreshListener監(jiān)聽類記錄了收到遠(yuǎn)程刷新請求,并刷新了from屬性的日志。

Kafka配置

在上面的例子中,由于Kafka、ZooKeeper均運(yùn)行于本地,所以我們沒有在測試程序中通過配置信息來指定Kafka和ZooKeeper的配置信息,就完成了本地消息總線的試驗(yàn)。但是我們實(shí)際應(yīng)用中,Kafka和ZooKeeper一般都會獨(dú)立部署,所以在應(yīng)用中都需要來為Kafka和ZooKeeper配置一些連接信息等。Kafka的整合與RabbitMQ不同,在Spring Boot 1.3.7中并沒有直接提供的Starter模塊,而是采用了Spring Cloud Stream的Kafka模塊,所以對于Kafka的配置均采用了spring.cloud.stream.kafka的前綴,比如:

屬性名說明默認(rèn)值
spring.cloud.stream.kafka.binder.brokersKafka的服務(wù)端列表localhost
spring.cloud.stream.kafka.binder.defaultBrokerPortKafka服務(wù)端的默認(rèn)端口,當(dāng)brokers屬性中沒有配置端口信息時,就會使用這個默認(rèn)端口9092
spring.cloud.stream.kafka.binder.zkNodesKafka服務(wù)端連接的ZooKeeper節(jié)點(diǎn)列表localhost
spring.cloud.stream.kafka.binder.defaultZkPortZooKeeper節(jié)點(diǎn)的默認(rèn)端口,當(dāng)zkNodes屬性中沒有配置端口信息時,就會使用這個默認(rèn)端口2181

更多配置參數(shù)請參考官方文檔

本文完整示例:

  • 開源中國:http://git.oschina.net/didispace/SpringCloud-Learning/tree/master/Chapter1-1-7
  • GitHub:https://github.com/dyc87112/SpringCloud-Learning/tree/master/Chapter1-1-7

總結(jié)

以上是生活随笔為你收集整理的Spring Cloud构建微服务架构(七)消息总线(续:Kafka)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。