日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka读写 java_java-Kafka流:从应用程序的每个实例中的所有分区读取

發布時間:2024/7/19 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka读写 java_java-Kafka流:从应用程序的每个实例中的所有分区读取 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

當使用KTable時,當實例/使用者數等于分區數時,Kafka流不允許實例從特定主題的多個分區中讀取.我嘗試使用GlobalKTable來實現這一點,但問題是數據將被覆蓋,并且聚合也無法應用于其上.

假設我有一個名為“ data_in”的主題,具有3個分區(P1,P2,P3).當我運行Kafka流應用程序的3個實例(I1,I2,I3)時,我希望每個實例都從“ data_in”的所有分區中讀取數據.我的意思是,I1可以從P1,P2和P3讀取,I2可以從P1,P2和P3,I2以及其他方式讀取.

編輯:請記住,生產者可以將兩個相似的ID發布到“ data_in”中的兩個不同分區中.因此,當運行兩個不同的實例時,GlobalKtable將被覆蓋.

拜托,如何實現這一目標?這是我代碼的一部分

private KTable globalStream() {

// KStream of records from data-in topic using String and theDataSerde deserializers

KStream trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));

// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)

KGroupedStream KGS = trashStream.groupByKey();

Materialized> materialized = Materialized.as("agg-stream-store");

materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

// Return a KTable

return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {

if (!value.getValideData())

aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());

else

aggregate.getList().add(value);

return aggregate;

}, materialized);

}

解決方法:

將輸入主題“ data_in”的分區數更改為1個分區,或者使用GlobalKtable從主題中所有分區獲取數據,然后可以將其加入流.這樣一來,您的應用實例將不再需要位于不同的使用者組中.

該代碼將如下所示:

private GlobalKTable globalStream() {

// KStream of records from data-in topic using String and theDataSerde deserializers

KStream trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic

KStream newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)

KGroupedStream KGS = newTrashStream.groupByKey();

Materialized> materialized = Materialized.as("agg-stream-store");

materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

// Return a KTable

KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {

if (!value.getValideData())

aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());

else

aggregate.getList().add(value);

return aggregate;

}, materialized)

.to("agg_data_in");

return getBuilder().globalTable("agg_data_in");

}

編輯:我編輯了上面的代碼,以強制對名為“ new_data_in”的主題進行重新分區.

標簽:apache-kafka-streams,java,apache-kafka,partitioning

來源: https://codeday.me/bug/20191009/1877602.html

總結

以上是生活随笔為你收集整理的kafka读写 java_java-Kafka流:从应用程序的每个实例中的所有分区读取的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。