日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

在Kafka中发布订阅模型

發布時間:2023/12/3 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 在Kafka中发布订阅模型 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

這是第四個柱中的一系列關于同步客戶端集成與異步系統( 1, 2, 3 )。 在這里,我們將嘗試了解Kafka的工作方式,以便正確利用其發布-訂閱實現。

卡夫卡概念

根據官方文件 :

Kafka是一種分布式的,分區的,復制的提交日志服務。 它提供消息傳遞系統的功能,但具有獨特的設計。

Kafka作為集群運行,這些節點稱為代理。 代理可以是領導者或副本,以提供高可用性和容錯能力。 代理負責分區,分區是存儲消息的分發單元。 這些消息是有序的,可以通過名為offset的索引進行訪問。 一組分區構成一個主題,是消息的提要。 一個分區可以有不同的使用者,它們使用自己的偏移量訪問消息。 生產者將消息發布到Kafka主題中。 Kafka文檔中的以下圖表可以幫助您理解以下內容:

排隊與發布-訂閱

消費者群體是另一個關鍵概念,有助于解釋為什么Kafka比RabbitMQ等其他消息傳遞解決方案更靈活,功能更強大。 消費者與消費者群體相關聯。 如果每個使用者都屬于同一個使用者組,則主題的消息將在各個使用者之間平均負載均衡; 這就是所謂的“排隊模型”。 相反,如果每個使用者都屬于不同的使用者組,則所有消息都將在每個客戶端中使用。 這就是所謂的“發布-訂閱”模型。

您可以混合使用這兩種方法,分別針對不同的需求使用不同的邏輯使用者組,并在每個組中有多個使用者以通過并行提高吞吐量。 同樣, Kafka文檔中的另一個圖表:

了解我們的需求

正如我們在以前的文章(見1, 2, 3 )該項目服務發布消息到卡夫卡的話題叫item_deleted 。 此消息將位于該主題的一個分區中。 為了定義消息將駐留在哪個分區中,Kafka提供了三種選擇 :

  • 如果記錄中指定了分區,請使用它
  • 如果未指定分區但存在鍵,則根據鍵的哈希值選擇一個分區
  • 如果沒有分區或密鑰,則以循環方式選擇一個分區

我們將使用item_id作為密鑰。 執法服務的不同實例中包含的消費者僅對特定分區感興趣,因為他們保留某些商品的內部狀態。 讓我們檢查不同的Kafka使用者實現,以了解哪種使用最方便。

卡夫卡消費者

卡夫卡共有三個消費者: 高級消費者 , 簡單消費者和新消費者

在這三個消費者中, 簡單消費者在最低級別上運行。 它滿足我們的要求,因為它允許消費者“在流程中僅使用主題中分區的子集”。 但是,正如文檔所述:

SimpleConsumer確實需要使用者組中不需要的大量工作:

  • 您必須跟蹤應用程序中的偏移量,才能知道從何處停止消費
  • 您必須確定哪個Broker是主題和分區的主要Broker。
  • 您必須處理經紀人負責人變更

如果您閱讀了建議的用于處理這些問題的代碼,則將不鼓勵您使用此使用者。

新使用者提供正確的抽象級別,并允許我們訂閱特定的分區。 他們在文檔中建議以下用例:

第一種情況是,如果進程正在維護與該分區關聯的某種本地狀態(例如本地磁盤上的鍵值存儲),因此該進程應僅獲取其在磁盤上維護的分區的記錄。

不幸的是,我們的系統使用的是Kafka 0.8,而該使用者僅從0.9開始可用。 我們沒有足夠的資源來遷移到該版本,因此我們需要堅持使用高級消費者

該使用者提供了一個不錯的API,但不允許我們訂閱特定的分區。 這意味著,執法服務的每個實例都將使用每條消息,即使那些無關的消息也是如此。 我們可以通過為每個實例定義不同的消費者組來實現。

利用Akka Event Bus

在上一篇文章中,我們定義了一些等待ItemDeleted消息的有限狀態機ItemDeleted 。

when(Active) {case Event(ItemDeleted(item), currentItemsToBeDeleted@ItemsToBeDeleted(items)) =>val newItemsToBeDeleted = items.filterNot(_ == item)newItemsToBeDeleted.size match {case 0 => finishWorkWith(CensorResult(Right()))case _ => stay using currentItemsToBeDeleted.copy(items = newItemsToBeDeleted)}}

我們的卡夫卡消費者可以將所有消息轉發給那些演員,并讓他們丟棄/過濾不相關的物品。 但是,我們不想讓參與者浪費大量的冗余和低效的工作,因此我們將添加一層抽象,使他們能夠以一種非常有效的方式丟棄適當的消息。

final case class MsgEnvelope(partitionKey: String, payload: ItemDeleted)class ItemDeletedBus extends EventBus with LookupClassification {override type Event = MsgEnvelopeoverride type Classifier = Stringoverride type Subscriber = ActorRefoverride protected def mapSize(): Int = 128override protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber ! event.payloadoverride protected def classify(event: Event): Classifier = event.partitionKeyoverride protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = a.compareTo(b) }

Akka Event Bus按分區為我們提供訂閱,而我們的Kafka高級消費者中缺少該分區。 我們將從卡夫卡消費者處發布每條消息到公交車上:

itemDeletedBus.publish(MsgEnvelope(item.partitionKey, ItemDeleted(item)))

在上一篇文章中,我們展示了如何使用該分區鍵訂閱消息:

itemDeletedBus.subscribe(self, item.partitionKey)

LookupClassification將過濾不需要的消息,因此我們的參與者不會過載。

摘要

由于Kafka提供的靈活性,我們能夠設計我們的系統以了解不同的折衷方案。 在接下來的文章中,我們將看到如何協調這些FSM的結果以向客戶端提供同步響應。

第一部分 | 第2部分 | 第三部分

翻譯自: https://www.javacodegeeks.com/2016/05/publish-subscribe-model-kafka.html

總結

以上是生活随笔為你收集整理的在Kafka中发布订阅模型的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。