日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

基于事件驱动架构构建微服务第12部分:向Apache KAFKA生成事件

發布時間:2023/12/4 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 基于事件驱动架构构建微服务第12部分:向Apache KAFKA生成事件 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

原文鏈接:https://logcorner.com/building-microservices-through-event-driven-architecture-part12-produce-events-to-apache-kafka/

在本教程中,我將展示如何將事件發布到apache KAFKA。

當客戶端發生命令時,它將產生一個事件(例如:PlaceOrderCommand => OrderCreatedEvent)。新事件由聚合根注冊為未提交的事件,并插入到僅附加表(事件存儲)中。

現在我必須將這些事件生成到服務總線,以便訂閱服務總線的應用程序可以選擇事件以處理它們。

在接下來的步驟中,我將有一個使用者來選擇事件,并將它們索引到一個高性能的no-sql數據庫,該數據庫將被我的應用程序的查詢端用作后端數據庫。

Apache KAFKA簡介

Apache Kafka是一個社區分布式事件流平臺,能夠每天處理數萬億個事件。最初設想為消息隊列,Kafka基于分布式提交日志的抽象。自2011年由LinkedIn創建并開源以來,Kafka已迅速從消息隊列演變為成熟的事件流平臺。

安裝

安裝Java SE開發工具包

打開以下網址,下載并安裝java:https://www.oracle.com/fr/java/technologies/javase/javase-jdk8-downloads.html

安裝Apache Kafka

到以下網址下載然后安裝Kafka:https://kafka.apache.org/downloads

選擇最新的穩定版本,在我的例子中我選擇了scala 2.13 kafka_2.13-2.6.0版本

在打開的頁面中,我選擇建議的鏡像來下載二進制文件。

下載.tgz存檔文件并將其解壓縮到安裝文件夾(我的工作站上的C:\KAFKADEMO文件夾)。

你應該在Windows上有以下內容

要驗證安裝是否正常,請轉到C:\KAFKADEMO\kafka_2.13-2.6.0\bin\windows位置并運行以下命令:

kafka-topics.bat

添加環境變量

這一步是可選的,你可以編輯你的環境變量并將你的kafka安裝文件夾添加到路徑中

添加一個文件夾working_dir和2個子文件夾zookeeper-data和kafka-data,如下圖所示

啟動zookeeper

要配置zookeeper,請編輯zookeeper.properties文件并按如下方式更新dataDir目錄。

編輯C:\KAFKADEMO\kafka_2.13-2.6.0\config\zookeeper.properties

dataDir=C:/KAFKADEMO/kafka_2.13-2.6.0/working_dir/zookeeper-data

運行以下命令啟動zookeeper:

zookeeper-server-start.bat config\zookeeper.properties

啟動Kafka

要配置Kafka,請編輯server.properties文件并更新log.dirs目錄,如下所示。

編輯C:\KAFKADEMO\kafka_2.13-2.6.0\config\server.properties

log.dirs=C:/KAFKADEMO/kafka_2.13-2.6.0/working_dir/kafka-data

運行以下命令啟動kafka:

kafka-server-start.bat config\server.properties

創建主題

運行以下命令以創建主題:

kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –create –partitions 3 –replication-factor 1

運行以下命令以列出主題:

kafka-topics –zookeeper 127.0.0.1:2181 –list

運行以下命令來描述主題:

kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –describe

運行以下命令刪除主題:

kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –delete

生產者

要創建向apache kafka主題(事件流)生成事件的生產者,請運行以下命令:

kafka-console-producer –broker-list 127.0.0.1:9092 –topic eventstream

消費者

要開始使用在主題(事件流)上生成的事件,請運行以下命令:

kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream

要從第一個事件開始使用在主題(事件流)上生成的所有事件,請運行以下命令:

kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream –from-beginning

Asp.Net Core SignalR介紹

ASP.NET Core SignalR是一個開源庫,可簡化向應用程序添加實時Web功能的過程。實時Web功能使服務器端代碼能夠立即將內容推送到客戶端。

SignalR的使用場景:

  • 需要從服務器進行高頻更新的應用程序。例如游戲、社交網絡、投票、拍賣、地圖和GPS應用程序。

  • 儀表板和監控應用程序。示例包括公司儀表板、即時銷售更新或旅行提醒。

  • 協作應用程序。白板應用程序和團隊會議軟件是協作應用程序的示例。

  • 需要通知的應用程序。社交網絡、電子郵件、聊天、游戲、旅行提醒和許多其他應用程序都使用通知。

創建一個SignalR HUB:

為了創建一個 SignalR Hub,我定義了以下接口以便擁有一個強類型Hub

  • Task OnPublish(T payload); 在消息發布到中心時獲得通知

  • Task OnPublish(string topic, T payload); 在消息發布到特定主題時獲得通知

  • Task OnSubscribe(string connectionId, string topic); 在客戶加入特定主題時收到通知

  • Task OnUnSubscribe(string connectionId, string topic); 在客戶離開特定主題時收到通知

以下接口用于訂閱和發布事件

Hub定義如下

ISignalRNotifier是發布和接收消息的接口

將事件發布到SignalR Hub

當命令發生時,它作為事件存儲到事件存儲中,然后生產者可以從事件存儲中選擇事件并將其發布到服務總線。我不希望它像那樣工作,因為我想知道哪些事件尚未發布(isPublihed = true/false)并相應地更新它。

因此,為了獲得更大的靈活性,我將介紹一個SignalR Hub。所以我將實現的場景是:

當命令發生時,它會作為事件存儲到事件存儲區,然后發布到SignalR Hub主題。因此,對該主題感興趣的客戶將收到通知,然后可以處理該事件??蛻舳丝梢允欠湛偩€、移動應用程序、單頁應用程序等……

讓我們繼續從系統的命令端將事件發布到SignalR Hub。

所以我必須更新LogCorner.EduSync.Speech.Application.UseCases.EventSourcingHandler.cs文件的Handle函數并添加以下內容:

_publisher.PublishAsync(Topics.Speech, eventStore);

創建工作服務

讓我們創建一個工作服務并添加以下類

ProducerHostedService

ProducerHostedService是承載ProducerService的后臺服務

backgroundService是用于實現長時間運行的IHostedService的基類

ProducerService

ProducerService訂閱一個signalR主題并處理在該主題上發布的事件。

它使用IServiceBus將接收到的事件發送到服務總線主題

ServiceBus

ServiceBus使用IServiceBusProvider接口向服務總線提供者發送消息。這樣我就可以在不改變實現的情況下切換到另一個服務總線提供者(例如:RabbitMq 等)。

KafkaClient

KafkaClient使用Confluent.Kafka向kafka發送消息

測試

啟動zookeeper

zookeeper-server-start.bat config\zookeeper.properties

啟動Kafka

kafka-server-start.bat config\server.properties

啟動消費者

kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –create –partitions 3 –replication-factor 1

kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream

啟動以下工程:

  • LogCorner.EduSync.SignalR.Server

  • LogCorner.EduSync.Speech.Producer

啟動以下工程:

  • LogCorner.EduSync.Speech.Presentation

啟動postman并且post一個新的command

您應該在消費者控制臺上看到以下輸出,使用postman上發布的命令

代碼源可在此處獲得:

https://github.com/logcorner/LogCorner.EduSync.Speech.Command/tree/Feature/Task/AddSignalR https://github.com/logcorner/LogCorner.EduSync.Speech.ServiceBus/tree/ProduceMessagesTokafka

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的基于事件驱动架构构建微服务第12部分:向Apache KAFKA生成事件的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。