kafka读写 java_java-Kafka流:从应用程序的每个实例中的所有分区读取
當使用KTable時,當實例/使用者數等于分區數時,Kafka流不允許實例從特定主題的多個分區中讀取.我嘗試使用GlobalKTable來實現這一點,但問題是數據將被覆蓋,并且聚合也無法應用于其上.
假設我有一個名為“ data_in”的主題,具有3個分區(P1,P2,P3).當我運行Kafka流應用程序的3個實例(I1,I2,I3)時,我希望每個實例都從“ data_in”的所有分區中讀取數據.我的意思是,I1可以從P1,P2和P3讀取,I2可以從P1,P2和P3,I2以及其他方式讀取.
編輯:請記住,生產者可以將兩個相似的ID發布到“ data_in”中的兩個不同分區中.因此,當運行兩個不同的實例時,GlobalKtable將被覆蓋.
拜托,如何實現這一目標?這是我代碼的一部分
private KTable globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream KGS = trashStream.groupByKey();
Materialized> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized);
}
解決方法:
將輸入主題“ data_in”的分區數更改為1個分區,或者使用GlobalKtable從主題中所有分區獲取數據,然后可以將其加入流.這樣一來,您的應用實例將不再需要位于不同的使用者組中.
該代碼將如下所示:
private GlobalKTable globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));
thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic
KStream newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream KGS = newTrashStream.groupByKey();
Materialized> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized)
.to("agg_data_in");
return getBuilder().globalTable("agg_data_in");
}
編輯:我編輯了上面的代碼,以強制對名為“ new_data_in”的主題進行重新分區.
標簽:apache-kafka-streams,java,apache-kafka,partitioning
來源: https://codeday.me/bug/20191009/1877602.html
總結
以上是生活随笔為你收集整理的kafka读写 java_java-Kafka流:从应用程序的每个实例中的所有分区读取的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java后ping不是内部_ping不是
- 下一篇: java基础题集