基于事件驱动架构构建微服务第12部分:向Apache KAFKA生成事件
原文鏈接:https://logcorner.com/building-microservices-through-event-driven-architecture-part12-produce-events-to-apache-kafka/
在本教程中,我將展示如何將事件發(fā)布到apache KAFKA。
當(dāng)客戶端發(fā)生命令時(shí),它將產(chǎn)生一個(gè)事件(例如:PlaceOrderCommand => OrderCreatedEvent)。新事件由聚合根注冊(cè)為未提交的事件,并插入到僅附加表(事件存儲(chǔ))中。
現(xiàn)在我必須將這些事件生成到服務(wù)總線,以便訂閱服務(wù)總線的應(yīng)用程序可以選擇事件以處理它們。
在接下來的步驟中,我將有一個(gè)使用者來選擇事件,并將它們索引到一個(gè)高性能的no-sql數(shù)據(jù)庫,該數(shù)據(jù)庫將被我的應(yīng)用程序的查詢端用作后端數(shù)據(jù)庫。
Apache KAFKA簡介
Apache Kafka是一個(gè)社區(qū)分布式事件流平臺(tái),能夠每天處理數(shù)萬億個(gè)事件。最初設(shè)想為消息隊(duì)列,Kafka基于分布式提交日志的抽象。自2011年由LinkedIn創(chuàng)建并開源以來,Kafka已迅速從消息隊(duì)列演變?yōu)槌墒斓氖录髌脚_(tái)。
安裝
安裝Java SE開發(fā)工具包
打開以下網(wǎng)址,下載并安裝java:https://www.oracle.com/fr/java/technologies/javase/javase-jdk8-downloads.html
安裝Apache Kafka
到以下網(wǎng)址下載然后安裝Kafka:https://kafka.apache.org/downloads
選擇最新的穩(wěn)定版本,在我的例子中我選擇了scala 2.13 kafka_2.13-2.6.0版本
在打開的頁面中,我選擇建議的鏡像來下載二進(jìn)制文件。
下載.tgz存檔文件并將其解壓縮到安裝文件夾(我的工作站上的C:\KAFKADEMO文件夾)。
你應(yīng)該在Windows上有以下內(nèi)容
要驗(yàn)證安裝是否正常,請(qǐng)轉(zhuǎn)到C:\KAFKADEMO\kafka_2.13-2.6.0\bin\windows位置并運(yùn)行以下命令:
kafka-topics.bat
添加環(huán)境變量
這一步是可選的,你可以編輯你的環(huán)境變量并將你的kafka安裝文件夾添加到路徑中
添加一個(gè)文件夾working_dir和2個(gè)子文件夾zookeeper-data和kafka-data,如下圖所示
啟動(dòng)zookeeper
要配置zookeeper,請(qǐng)編輯zookeeper.properties文件并按如下方式更新dataDir目錄。
編輯C:\KAFKADEMO\kafka_2.13-2.6.0\config\zookeeper.properties
dataDir=C:/KAFKADEMO/kafka_2.13-2.6.0/working_dir/zookeeper-data
運(yùn)行以下命令啟動(dòng)zookeeper:
zookeeper-server-start.bat config\zookeeper.properties
啟動(dòng)Kafka
要配置Kafka,請(qǐng)編輯server.properties文件并更新log.dirs目錄,如下所示。
編輯C:\KAFKADEMO\kafka_2.13-2.6.0\config\server.properties
log.dirs=C:/KAFKADEMO/kafka_2.13-2.6.0/working_dir/kafka-data
運(yùn)行以下命令啟動(dòng)kafka:
kafka-server-start.bat config\server.properties
創(chuàng)建主題
運(yùn)行以下命令以創(chuàng)建主題:
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –create –partitions 3 –replication-factor 1
運(yùn)行以下命令以列出主題:
kafka-topics –zookeeper 127.0.0.1:2181 –list
運(yùn)行以下命令來描述主題:
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –describe
運(yùn)行以下命令刪除主題:
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –delete
生產(chǎn)者
要?jiǎng)?chuàng)建向apache kafka主題(事件流)生成事件的生產(chǎn)者,請(qǐng)運(yùn)行以下命令:
kafka-console-producer –broker-list 127.0.0.1:9092 –topic eventstream
消費(fèi)者
要開始使用在主題(事件流)上生成的事件,請(qǐng)運(yùn)行以下命令:
kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream
要從第一個(gè)事件開始使用在主題(事件流)上生成的所有事件,請(qǐng)運(yùn)行以下命令:
kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream –from-beginning
Asp.Net Core SignalR介紹
ASP.NET Core SignalR是一個(gè)開源庫,可簡化向應(yīng)用程序添加實(shí)時(shí)Web功能的過程。實(shí)時(shí)Web功能使服務(wù)器端代碼能夠立即將內(nèi)容推送到客戶端。
SignalR的使用場景:
需要從服務(wù)器進(jìn)行高頻更新的應(yīng)用程序。例如游戲、社交網(wǎng)絡(luò)、投票、拍賣、地圖和GPS應(yīng)用程序。
儀表板和監(jiān)控應(yīng)用程序。示例包括公司儀表板、即時(shí)銷售更新或旅行提醒。
協(xié)作應(yīng)用程序。白板應(yīng)用程序和團(tuán)隊(duì)會(huì)議軟件是協(xié)作應(yīng)用程序的示例。
需要通知的應(yīng)用程序。社交網(wǎng)絡(luò)、電子郵件、聊天、游戲、旅行提醒和許多其他應(yīng)用程序都使用通知。
創(chuàng)建一個(gè)SignalR HUB:
為了創(chuàng)建一個(gè) SignalR Hub,我定義了以下接口以便擁有一個(gè)強(qiáng)類型Hub
Task OnPublish(T payload); 在消息發(fā)布到中心時(shí)獲得通知
Task OnPublish(string topic, T payload); 在消息發(fā)布到特定主題時(shí)獲得通知
Task OnSubscribe(string connectionId, string topic); 在客戶加入特定主題時(shí)收到通知
Task OnUnSubscribe(string connectionId, string topic); 在客戶離開特定主題時(shí)收到通知
以下接口用于訂閱和發(fā)布事件
Hub定義如下
ISignalRNotifier是發(fā)布和接收消息的接口
將事件發(fā)布到SignalR Hub
當(dāng)命令發(fā)生時(shí),它作為事件存儲(chǔ)到事件存儲(chǔ)中,然后生產(chǎn)者可以從事件存儲(chǔ)中選擇事件并將其發(fā)布到服務(wù)總線。我不希望它像那樣工作,因?yàn)槲蚁胫滥男┦录形窗l(fā)布(isPublihed = true/false)并相應(yīng)地更新它。
因此,為了獲得更大的靈活性,我將介紹一個(gè)SignalR Hub。所以我將實(shí)現(xiàn)的場景是:
當(dāng)命令發(fā)生時(shí),它會(huì)作為事件存儲(chǔ)到事件存儲(chǔ)區(qū),然后發(fā)布到SignalR Hub主題。因此,對(duì)該主題感興趣的客戶將收到通知,然后可以處理該事件。客戶端可以是服務(wù)總線、移動(dòng)應(yīng)用程序、單頁應(yīng)用程序等……
讓我們繼續(xù)從系統(tǒng)的命令端將事件發(fā)布到SignalR Hub。
所以我必須更新LogCorner.EduSync.Speech.Application.UseCases.EventSourcingHandler.cs文件的Handle函數(shù)并添加以下內(nèi)容:
_publisher.PublishAsync(Topics.Speech, eventStore);
創(chuàng)建工作服務(wù)
讓我們創(chuàng)建一個(gè)工作服務(wù)并添加以下類
ProducerHostedService
ProducerHostedService是承載ProducerService的后臺(tái)服務(wù)
backgroundService是用于實(shí)現(xiàn)長時(shí)間運(yùn)行的IHostedService的基類
ProducerService
ProducerService訂閱一個(gè)signalR主題并處理在該主題上發(fā)布的事件。
它使用IServiceBus將接收到的事件發(fā)送到服務(wù)總線主題
ServiceBus
ServiceBus使用IServiceBusProvider接口向服務(wù)總線提供者發(fā)送消息。這樣我就可以在不改變實(shí)現(xiàn)的情況下切換到另一個(gè)服務(wù)總線提供者(例如:RabbitMq 等)。
KafkaClient
KafkaClient使用Confluent.Kafka向kafka發(fā)送消息
測試
啟動(dòng)zookeeper
zookeeper-server-start.bat config\zookeeper.properties
啟動(dòng)Kafka
kafka-server-start.bat config\server.properties
啟動(dòng)消費(fèi)者
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –create –partitions 3 –replication-factor 1
kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream
啟動(dòng)以下工程:
LogCorner.EduSync.SignalR.Server
LogCorner.EduSync.Speech.Producer
啟動(dòng)以下工程:
LogCorner.EduSync.Speech.Presentation
啟動(dòng)postman并且post一個(gè)新的command
您應(yīng)該在消費(fèi)者控制臺(tái)上看到以下輸出,使用postman上發(fā)布的命令
代碼源可在此處獲得:
https://github.com/logcorner/LogCorner.EduSync.Speech.Command/tree/Feature/Task/AddSignalR https://github.com/logcorner/LogCorner.EduSync.Speech.ServiceBus/tree/ProduceMessagesTokafka
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的基于事件驱动架构构建微服务第12部分:向Apache KAFKA生成事件的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: asp.net core自定义依赖注入容
- 下一篇: 使用 Windbg 分析一个 异步操作