日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

akka kafka java_当使用websocket连接时,使用akka-stream-kafka从kafka主题获取最后一条消息...

發布時間:2024/7/19 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 akka kafka java_当使用websocket连接时,使用akka-stream-kafka从kafka主题获取最后一条消息... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

使用David van Geest非常巧妙地描述的方法,我能夠避免在客戶端連接時獲取任何上游數據here

歸結為在Consumer上有一個BroadcastHub:

val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic1, topic2))

.map(kafkaObject => utils.WebSockets.kafkaWrapper(kafkaObject.topic(), kafkaObject.value()))

.toMat(BroadcastHub.sink)(Keep.right)

.run()

并連接靜態消費者吃掉所有上游數據

liveSource.to(Sink.ignore).run()

從此,我可以讓WebSocket客戶端訂閱消費者收到的所有數據:

def source(): Flow[Any, String, NotUsed] = {Flow.fromSinkAndSource(Sink.ignore, liveSource)}

或者根據KafkaTopic(或其他任何你想要的)過濾

def KafkaSpecificSource(kafkaTopic: String): Flow[Any, String, NotUsed] = {

Flow.fromSinkAndSource(Sink.ignore, liveSource.filter({

x =>

(Json.parse(x) \ "topic").asOpt[String] match {

case Some(str) => str.equals(kafkaTopic)

case None => false

}

}))

}

這并不能解決在第一次連接時向用戶提供x數據量的問題,但我預見到我們會為任何歷史數據添加簡單的數據庫查詢,并且讓WebSocket連接只關注直播數據 .

總結

以上是生活随笔為你收集整理的akka kafka java_当使用websocket连接时,使用akka-stream-kafka从kafka主题获取最后一条消息...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。