akka kafka java_当使用websocket连接时,使用akka-stream-kafka从kafka主题获取最后一条消息...
使用David van Geest非常巧妙地描述的方法,我能夠避免在客戶(hù)端連接時(shí)獲取任何上游數(shù)據(jù)here
歸結(jié)為在Consumer上有一個(gè)BroadcastHub:
val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic1, topic2))
.map(kafkaObject => utils.WebSockets.kafkaWrapper(kafkaObject.topic(), kafkaObject.value()))
.toMat(BroadcastHub.sink)(Keep.right)
.run()
并連接靜態(tài)消費(fèi)者吃掉所有上游數(shù)據(jù)
liveSource.to(Sink.ignore).run()
從此,我可以讓W(xué)ebSocket客戶(hù)端訂閱消費(fèi)者收到的所有數(shù)據(jù):
def source(): Flow[Any, String, NotUsed] = {Flow.fromSinkAndSource(Sink.ignore, liveSource)}
或者根據(jù)KafkaTopic(或其他任何你想要的)過(guò)濾
def KafkaSpecificSource(kafkaTopic: String): Flow[Any, String, NotUsed] = {
Flow.fromSinkAndSource(Sink.ignore, liveSource.filter({
x =>
(Json.parse(x) \ "topic").asOpt[String] match {
case Some(str) => str.equals(kafkaTopic)
case None => false
}
}))
}
這并不能解決在第一次連接時(shí)向用戶(hù)提供x數(shù)據(jù)量的問(wèn)題,但我預(yù)見(jiàn)到我們會(huì)為任何歷史數(shù)據(jù)添加簡(jiǎn)單的數(shù)據(jù)庫(kù)查詢(xún),并且讓W(xué)ebSocket連接只關(guān)注直播數(shù)據(jù) .
總結(jié)
以上是生活随笔為你收集整理的akka kafka java_当使用websocket连接时,使用akka-stream-kafka从kafka主题获取最后一条消息...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: xlwings 合并单元格 读取_xlw
- 下一篇: POSIX多线程API函数