日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

akka kafka java_当使用websocket连接时,使用akka-stream-kafka从kafka主题获取最后一条消息...

發(fā)布時(shí)間:2024/7/19 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 akka kafka java_当使用websocket连接时,使用akka-stream-kafka从kafka主题获取最后一条消息... 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

使用David van Geest非常巧妙地描述的方法,我能夠避免在客戶(hù)端連接時(shí)獲取任何上游數(shù)據(jù)here

歸結(jié)為在Consumer上有一個(gè)BroadcastHub:

val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic1, topic2))

.map(kafkaObject => utils.WebSockets.kafkaWrapper(kafkaObject.topic(), kafkaObject.value()))

.toMat(BroadcastHub.sink)(Keep.right)

.run()

并連接靜態(tài)消費(fèi)者吃掉所有上游數(shù)據(jù)

liveSource.to(Sink.ignore).run()

從此,我可以讓W(xué)ebSocket客戶(hù)端訂閱消費(fèi)者收到的所有數(shù)據(jù):

def source(): Flow[Any, String, NotUsed] = {Flow.fromSinkAndSource(Sink.ignore, liveSource)}

或者根據(jù)KafkaTopic(或其他任何你想要的)過(guò)濾

def KafkaSpecificSource(kafkaTopic: String): Flow[Any, String, NotUsed] = {

Flow.fromSinkAndSource(Sink.ignore, liveSource.filter({

x =>

(Json.parse(x) \ "topic").asOpt[String] match {

case Some(str) => str.equals(kafkaTopic)

case None => false

}

}))

}

這并不能解決在第一次連接時(shí)向用戶(hù)提供x數(shù)據(jù)量的問(wèn)題,但我預(yù)見(jiàn)到我們會(huì)為任何歷史數(shù)據(jù)添加簡(jiǎn)單的數(shù)據(jù)庫(kù)查詢(xún),并且讓W(xué)ebSocket連接只關(guān)注直播數(shù)據(jù) .

總結(jié)

以上是生活随笔為你收集整理的akka kafka java_当使用websocket连接时,使用akka-stream-kafka从kafka主题获取最后一条消息...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。