基于事件驱动架构构建微服务第12部分:向Apache KAFKA生成事件
原文鏈接:https://logcorner.com/building-microservices-through-event-driven-architecture-part12-produce-events-to-apache-kafka/
在本教程中,我將展示如何將事件發布到apache KAFKA。
當客戶端發生命令時,它將產生一個事件(例如:PlaceOrderCommand => OrderCreatedEvent)。新事件由聚合根注冊為未提交的事件,并插入到僅附加表(事件存儲)中。
現在我必須將這些事件生成到服務總線,以便訂閱服務總線的應用程序可以選擇事件以處理它們。
在接下來的步驟中,我將有一個使用者來選擇事件,并將它們索引到一個高性能的no-sql數據庫,該數據庫將被我的應用程序的查詢端用作后端數據庫。
Apache KAFKA簡介
Apache Kafka是一個社區分布式事件流平臺,能夠每天處理數萬億個事件。最初設想為消息隊列,Kafka基于分布式提交日志的抽象。自2011年由LinkedIn創建并開源以來,Kafka已迅速從消息隊列演變為成熟的事件流平臺。
安裝
安裝Java SE開發工具包
打開以下網址,下載并安裝java:https://www.oracle.com/fr/java/technologies/javase/javase-jdk8-downloads.html
安裝Apache Kafka
到以下網址下載然后安裝Kafka:https://kafka.apache.org/downloads
選擇最新的穩定版本,在我的例子中我選擇了scala 2.13 kafka_2.13-2.6.0版本
在打開的頁面中,我選擇建議的鏡像來下載二進制文件。
下載.tgz存檔文件并將其解壓縮到安裝文件夾(我的工作站上的C:\KAFKADEMO文件夾)。
你應該在Windows上有以下內容
要驗證安裝是否正常,請轉到C:\KAFKADEMO\kafka_2.13-2.6.0\bin\windows位置并運行以下命令:
kafka-topics.bat
添加環境變量
這一步是可選的,你可以編輯你的環境變量并將你的kafka安裝文件夾添加到路徑中
添加一個文件夾working_dir和2個子文件夾zookeeper-data和kafka-data,如下圖所示
啟動zookeeper
要配置zookeeper,請編輯zookeeper.properties文件并按如下方式更新dataDir目錄。
編輯C:\KAFKADEMO\kafka_2.13-2.6.0\config\zookeeper.properties
dataDir=C:/KAFKADEMO/kafka_2.13-2.6.0/working_dir/zookeeper-data
運行以下命令啟動zookeeper:
zookeeper-server-start.bat config\zookeeper.properties
啟動Kafka
要配置Kafka,請編輯server.properties文件并更新log.dirs目錄,如下所示。
編輯C:\KAFKADEMO\kafka_2.13-2.6.0\config\server.properties
log.dirs=C:/KAFKADEMO/kafka_2.13-2.6.0/working_dir/kafka-data
運行以下命令啟動kafka:
kafka-server-start.bat config\server.properties
創建主題
運行以下命令以創建主題:
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –create –partitions 3 –replication-factor 1
運行以下命令以列出主題:
kafka-topics –zookeeper 127.0.0.1:2181 –list
運行以下命令來描述主題:
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –describe
運行以下命令刪除主題:
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –delete
生產者
要創建向apache kafka主題(事件流)生成事件的生產者,請運行以下命令:
kafka-console-producer –broker-list 127.0.0.1:9092 –topic eventstream
消費者
要開始使用在主題(事件流)上生成的事件,請運行以下命令:
kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream
要從第一個事件開始使用在主題(事件流)上生成的所有事件,請運行以下命令:
kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream –from-beginning
Asp.Net Core SignalR介紹
ASP.NET Core SignalR是一個開源庫,可簡化向應用程序添加實時Web功能的過程。實時Web功能使服務器端代碼能夠立即將內容推送到客戶端。
SignalR的使用場景:
需要從服務器進行高頻更新的應用程序。例如游戲、社交網絡、投票、拍賣、地圖和GPS應用程序。
儀表板和監控應用程序。示例包括公司儀表板、即時銷售更新或旅行提醒。
協作應用程序。白板應用程序和團隊會議軟件是協作應用程序的示例。
需要通知的應用程序。社交網絡、電子郵件、聊天、游戲、旅行提醒和許多其他應用程序都使用通知。
創建一個SignalR HUB:
為了創建一個 SignalR Hub,我定義了以下接口以便擁有一個強類型Hub
Task OnPublish(T payload); 在消息發布到中心時獲得通知
Task OnPublish(string topic, T payload); 在消息發布到特定主題時獲得通知
Task OnSubscribe(string connectionId, string topic); 在客戶加入特定主題時收到通知
Task OnUnSubscribe(string connectionId, string topic); 在客戶離開特定主題時收到通知
以下接口用于訂閱和發布事件
Hub定義如下
ISignalRNotifier是發布和接收消息的接口
將事件發布到SignalR Hub
當命令發生時,它作為事件存儲到事件存儲中,然后生產者可以從事件存儲中選擇事件并將其發布到服務總線。我不希望它像那樣工作,因為我想知道哪些事件尚未發布(isPublihed = true/false)并相應地更新它。
因此,為了獲得更大的靈活性,我將介紹一個SignalR Hub。所以我將實現的場景是:
當命令發生時,它會作為事件存儲到事件存儲區,然后發布到SignalR Hub主題。因此,對該主題感興趣的客戶將收到通知,然后可以處理該事件??蛻舳丝梢允欠湛偩€、移動應用程序、單頁應用程序等……
讓我們繼續從系統的命令端將事件發布到SignalR Hub。
所以我必須更新LogCorner.EduSync.Speech.Application.UseCases.EventSourcingHandler.cs文件的Handle函數并添加以下內容:
_publisher.PublishAsync(Topics.Speech, eventStore);
創建工作服務
讓我們創建一個工作服務并添加以下類
ProducerHostedService
ProducerHostedService是承載ProducerService的后臺服務
backgroundService是用于實現長時間運行的IHostedService的基類
ProducerService
ProducerService訂閱一個signalR主題并處理在該主題上發布的事件。
它使用IServiceBus將接收到的事件發送到服務總線主題
ServiceBus
ServiceBus使用IServiceBusProvider接口向服務總線提供者發送消息。這樣我就可以在不改變實現的情況下切換到另一個服務總線提供者(例如:RabbitMq 等)。
KafkaClient
KafkaClient使用Confluent.Kafka向kafka發送消息
測試
啟動zookeeper
zookeeper-server-start.bat config\zookeeper.properties
啟動Kafka
kafka-server-start.bat config\server.properties
啟動消費者
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –create –partitions 3 –replication-factor 1
kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream
啟動以下工程:
LogCorner.EduSync.SignalR.Server
LogCorner.EduSync.Speech.Producer
啟動以下工程:
LogCorner.EduSync.Speech.Presentation
啟動postman并且post一個新的command
您應該在消費者控制臺上看到以下輸出,使用postman上發布的命令
代碼源可在此處獲得:
https://github.com/logcorner/LogCorner.EduSync.Speech.Command/tree/Feature/Task/AddSignalR https://github.com/logcorner/LogCorner.EduSync.Speech.ServiceBus/tree/ProduceMessagesTokafka
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的基于事件驱动架构构建微服务第12部分:向Apache KAFKA生成事件的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: asp.net core自定义依赖注入容
- 下一篇: 使用 Windbg 分析一个 异步操作