akka kafka java_当使用websocket连接时,使用akka-stream-kafka从kafka主题获取最后一条消息...
使用David van Geest非常巧妙地描述的方法,我能夠避免在客戶端連接時獲取任何上游數據here
歸結為在Consumer上有一個BroadcastHub:
val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic1, topic2))
.map(kafkaObject => utils.WebSockets.kafkaWrapper(kafkaObject.topic(), kafkaObject.value()))
.toMat(BroadcastHub.sink)(Keep.right)
.run()
并連接靜態消費者吃掉所有上游數據
liveSource.to(Sink.ignore).run()
從此,我可以讓WebSocket客戶端訂閱消費者收到的所有數據:
def source(): Flow[Any, String, NotUsed] = {Flow.fromSinkAndSource(Sink.ignore, liveSource)}
或者根據KafkaTopic(或其他任何你想要的)過濾
def KafkaSpecificSource(kafkaTopic: String): Flow[Any, String, NotUsed] = {
Flow.fromSinkAndSource(Sink.ignore, liveSource.filter({
x =>
(Json.parse(x) \ "topic").asOpt[String] match {
case Some(str) => str.equals(kafkaTopic)
case None => false
}
}))
}
這并不能解決在第一次連接時向用戶提供x數據量的問題,但我預見到我們會為任何歷史數據添加簡單的數據庫查詢,并且讓WebSocket連接只關注直播數據 .
總結
以上是生活随笔為你收集整理的akka kafka java_当使用websocket连接时,使用akka-stream-kafka从kafka主题获取最后一条消息...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: xlwings 合并单元格 读取_xlw
- 下一篇: POSIX多线程API函数