kafka读写 java_java-Kafka流:从应用程序的每个实例中的所有分区读取
當(dāng)使用KTable時(shí),當(dāng)實(shí)例/使用者數(shù)等于分區(qū)數(shù)時(shí),Kafka流不允許實(shí)例從特定主題的多個(gè)分區(qū)中讀取.我嘗試使用GlobalKTable來(lái)實(shí)現(xiàn)這一點(diǎn),但問(wèn)題是數(shù)據(jù)將被覆蓋,并且聚合也無(wú)法應(yīng)用于其上.
假設(shè)我有一個(gè)名為“ data_in”的主題,具有3個(gè)分區(qū)(P1,P2,P3).當(dāng)我運(yùn)行Kafka流應(yīng)用程序的3個(gè)實(shí)例(I1,I2,I3)時(shí),我希望每個(gè)實(shí)例都從“ data_in”的所有分區(qū)中讀取數(shù)據(jù).我的意思是,I1可以從P1,P2和P3讀取,I2可以從P1,P2和P3,I2以及其他方式讀取.
編輯:請(qǐng)記住,生產(chǎn)者可以將兩個(gè)相似的ID發(fā)布到“ data_in”中的兩個(gè)不同分區(qū)中.因此,當(dāng)運(yùn)行兩個(gè)不同的實(shí)例時(shí),GlobalKtable將被覆蓋.
拜托,如何實(shí)現(xiàn)這一目標(biāo)?這是我代碼的一部分
private KTable globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream KGS = trashStream.groupByKey();
Materialized> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized);
}
解決方法:
將輸入主題“ data_in”的分區(qū)數(shù)更改為1個(gè)分區(qū),或者使用GlobalKtable從主題中所有分區(qū)獲取數(shù)據(jù),然后可以將其加入流.這樣一來(lái),您的應(yīng)用實(shí)例將不再需要位于不同的使用者組中.
該代碼將如下所示:
private GlobalKTable globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));
thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic
KStream newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream KGS = newTrashStream.groupByKey();
Materialized> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized)
.to("agg_data_in");
return getBuilder().globalTable("agg_data_in");
}
編輯:我編輯了上面的代碼,以強(qiáng)制對(duì)名為“ new_data_in”的主題進(jìn)行重新分區(qū).
標(biāo)簽:apache-kafka-streams,java,apache-kafka,partitioning
來(lái)源: https://codeday.me/bug/20191009/1877602.html
總結(jié)
以上是生活随笔為你收集整理的kafka读写 java_java-Kafka流:从应用程序的每个实例中的所有分区读取的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: java后ping不是内部_ping不是
- 下一篇: java基础题集