Kafka设计解析(七)- 流式计算的新贵 Kafka Stream
http://www.infoq.com/cn/articles/kafka-analysis-part-7
Kafka Stream背景
Kafka Stream是什么
Kafka Stream是Apache Kafka從0.10版本引入的一個(gè)新Feature。它是提供了對(duì)存儲(chǔ)于Kafka內(nèi)的數(shù)據(jù)進(jìn)行流式處理和分析的功能。
Kafka Stream的特點(diǎn)如下:
- Kafka Stream提供了一個(gè)非常簡(jiǎn)單而輕量的Library,它可以非常方便地嵌入任意Java應(yīng)用中,也可以任意方式打包和部署
- 除了Kafka外,無(wú)任何外部依賴
- 充分利用Kafka分區(qū)機(jī)制實(shí)現(xiàn)水平擴(kuò)展和順序性保證
- 通過(guò)可容錯(cuò)的state store實(shí)現(xiàn)高效的狀態(tài)操作(如windowed join和aggregation)
- 支持正好一次處理語(yǔ)義
- 提供記錄級(jí)的處理能力,從而實(shí)現(xiàn)毫秒級(jí)的低延遲
- 支持基于事件時(shí)間的窗口操作,并且可處理晚到的數(shù)據(jù)(late arrival of records)
- 同時(shí)提供底層的處理原語(yǔ)Processor(類似于Storm的spout和bolt),以及高層抽象的DSL(類似于Spark的map/group/reduce)
什么是流式計(jì)算
一般流式計(jì)算會(huì)與批量計(jì)算相比較。在流式計(jì)算模型中,輸入是持續(xù)的,可以認(rèn)為在時(shí)間上是無(wú)界的,也就意味著,永遠(yuǎn)拿不到全量數(shù)據(jù)去做計(jì)算。同時(shí),計(jì)算結(jié)果是持續(xù)輸出的,也即計(jì)算結(jié)果在時(shí)間上也是無(wú)界的。流式計(jì)算一般對(duì)實(shí)時(shí)性要求較高,同時(shí)一般是先定義目標(biāo)計(jì)算,然后數(shù)據(jù)到來(lái)之后將計(jì)算邏輯應(yīng)用于數(shù)據(jù)。同時(shí)為了提高計(jì)算效率,往往盡可能采用增量計(jì)算代替全量計(jì)算。
(點(diǎn)擊放大圖像)
?
批量處理模型中,一般先有全量數(shù)據(jù)集,然后定義計(jì)算邏輯,并將計(jì)算應(yīng)用于全量數(shù)據(jù)。特點(diǎn)是全量計(jì)算,并且計(jì)算結(jié)果一次性全量輸出。
(點(diǎn)擊放大圖像)
為什么要有Kafka Stream
當(dāng)前已經(jīng)有非常多的流式處理系統(tǒng),最知名且應(yīng)用最多的開(kāi)源流式處理系統(tǒng)有Spark Streaming和Apache Storm。Apache Storm發(fā)展多年,應(yīng)用廣泛,提供記錄級(jí)別的處理能力,當(dāng)前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便與圖計(jì)算,SQL處理等集成,功能強(qiáng)大,對(duì)于熟悉其它Spark應(yīng)用開(kāi)發(fā)的用戶而言使用門檻低。另外,目前主流的Hadoop發(fā)行版,如MapR,Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。
既然Apache Spark與Apache Storm擁用如此多的優(yōu)勢(shì),那為何還需要Kafka Stream呢?筆者認(rèn)為主要有如下原因。
第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個(gè)基于Kafka的流式處理類庫(kù)??蚣芤箝_(kāi)發(fā)者按照特定的方式去開(kāi)發(fā)邏輯部分,供框架調(diào)用。開(kāi)發(fā)者很難了解框架的具體運(yùn)行方式,從而使得調(diào)試成本高,并且使用受限。而Kafka Stream作為流式處理類庫(kù),直接提供具體的類給開(kāi)發(fā)者調(diào)用,整個(gè)應(yīng)用的運(yùn)行方式主要由開(kāi)發(fā)者控制,方便使用和調(diào)試。
(點(diǎn)擊放大圖像)
第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,但是這些框架的部署仍然相對(duì)復(fù)雜。而Kafka Stream作為類庫(kù),可以非常方便的嵌入應(yīng)用程序中,它對(duì)應(yīng)用的打包和部署基本沒(méi)有任何要求。更為重要的是,Kafka Stream充分利用了Kafka的分區(qū)機(jī)制和Consumer的Rebalance機(jī)制,使得Kafka Stream可以非常方便的水平擴(kuò)展,并且各個(gè)實(shí)例可以使用不同的部署方式。具體來(lái)說(shuō),每個(gè)運(yùn)行Kafka Stream的應(yīng)用程序?qū)嵗及薑afka Consumer實(shí)例,多個(gè)同一應(yīng)用的實(shí)例之間并行處理數(shù)據(jù)集。而不同實(shí)例之間的部署方式并不要求一致,比如部分實(shí)例可以運(yùn)行在Web容器中,部分實(shí)例可運(yùn)行在Docker或Kubernetes中。
第三,就流式處理系統(tǒng)而言,基本都支持Kafka作為數(shù)據(jù)源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實(shí)上,Kafka基本上是主流的流式處理系統(tǒng)的標(biāo)準(zhǔn)數(shù)據(jù)源。換言之,大部分流式系統(tǒng)中都已部署了Kafka,此時(shí)使用Kafka Stream的成本非常低。
第四,使用Storm或Spark Streaming時(shí),需要為框架本身的進(jìn)程預(yù)留資源,如Storm的supervisor和Spark on YARN的node manager。即使對(duì)于應(yīng)用實(shí)例而言,框架本身也會(huì)占用部分資源,如Spark Streaming需要為shuffle和storage預(yù)留內(nèi)存。
第五,由于Kafka本身提供數(shù)據(jù)持久化,因此Kafka Stream提供滾動(dòng)部署和滾動(dòng)升級(jí)以及重新計(jì)算的能力。
第六,由于Kafka Consumer Rebalance機(jī)制,Kafka Stream可以在線動(dòng)態(tài)調(diào)整并行度。
Kafka Stream架構(gòu)
Kafka Stream整體架構(gòu)
Kafka Stream的整體架構(gòu)圖如下。
(點(diǎn)擊放大圖像)
目前(Kafka 0.11.0.0)Kafka Stream的數(shù)據(jù)源只能如上圖所示是Kafka。但是處理結(jié)果并不一定要如上圖所示輸出到Kafka。實(shí)際上KStream和Ktable的實(shí)例化都需要指定Topic。
KStream<String, String> stream = builder.stream("words-stream");KTable<String, String> table = builder.table("words-table", "words-store");另外,上圖中的Consumer和Producer并不需要開(kāi)發(fā)者在應(yīng)用中顯示實(shí)例化,而是由Kafka Stream根據(jù)參數(shù)隱式實(shí)例化和管理,從而降低了使用門檻。開(kāi)發(fā)者只需要專注于開(kāi)發(fā)核心業(yè)務(wù)邏輯,也即上圖中Task內(nèi)的部分。
Processor Topology
基于Kafka Stream的流式應(yīng)用的業(yè)務(wù)邏輯全部通過(guò)一個(gè)被稱為Processor Topology的地方執(zhí)行。它與Storm的Topology和Spark的DAG類似,都定義了數(shù)據(jù)在各個(gè)處理單元(在Kafka Stream中被稱作Processor)間的流動(dòng)方式,或者說(shuō)定義了數(shù)據(jù)的處理邏輯。
下面是一個(gè)Processor的示例,它實(shí)現(xiàn)了Word Count功能,并且每秒輸出一次結(jié)果。
public class WordCountProcessor implements Processor<String, String> {private ProcessorContext context;private KeyValueStore<String, Integer> kvStore;@SuppressWarnings("unchecked")@Overridepublic void init(ProcessorContext context) {this.context = context;this.context.schedule(1000);this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");}@Overridepublic void process(String key, String value) {Stream.of(value.toLowerCase().split(" ")).forEach((String word) -> {Optional<Integer> counts = Optional.ofNullable(kvStore.get(word));int count = counts.map(wordcount -> wordcount + 1).orElse(1);kvStore.put(word, count);});}@Overridepublic void punctuate(long timestamp) {KeyValueIterator<String, Integer> iterator = this.kvStore.all();iterator.forEachRemaining(entry -> {context.forward(entry.key, entry.value);this.kvStore.delete(entry.key);});context.commit();}@Overridepublic void close() {this.kvStore.close();}}從上述代碼中可見(jiàn)
- process定義了對(duì)每條記錄的處理邏輯,也印證了Kafka可具有記錄級(jí)的數(shù)據(jù)處理能力。
- context.scheduler定義了punctuate被執(zhí)行的周期,從而提供了實(shí)現(xiàn)窗口操作的能力。
- context.getStateStore提供的狀態(tài)存儲(chǔ)為有狀態(tài)計(jì)算(如窗口,聚合)提供了可能。
Kafka Stream并行模型
Kafka Stream的并行模型中,最小粒度為Task,而每個(gè)Task包含一個(gè)特定子Topology的所有Processor。因此每個(gè)Task所執(zhí)行的代碼完全一樣,唯一的不同在于所處理的數(shù)據(jù)集互補(bǔ)。這一點(diǎn)跟Storm的Topology完全不一樣。Storm的Topology的每一個(gè)Task只包含一個(gè)Spout或Bolt的實(shí)例。因此Storm的一個(gè)Topology內(nèi)的不同Task之間需要通過(guò)網(wǎng)絡(luò)通信傳遞數(shù)據(jù),而Kafka Stream的Task包含了完整的子Topology,所以Task之間不需要傳遞數(shù)據(jù),也就不需要網(wǎng)絡(luò)通信。這一點(diǎn)降低了系統(tǒng)復(fù)雜度,也提高了處理效率。
如果某個(gè)Stream的輸入Topic有多個(gè)(比如2個(gè)Topic,1個(gè)Partition數(shù)為4,另一個(gè)Partition數(shù)為3),則總的Task數(shù)等于Partition數(shù)最多的那個(gè)Topic的Partition數(shù)(max(4,3)=4)。這是因?yàn)镵afka Stream使用了Consumer的Rebalance機(jī)制,每個(gè)Partition對(duì)應(yīng)一個(gè)Task。
下圖展示了在一個(gè)進(jìn)程(Instance)中以2個(gè)Topic(Partition數(shù)均為4)為數(shù)據(jù)源的Kafka Stream應(yīng)用的并行模型。從圖中可以看到,由于Kafka Stream應(yīng)用的默認(rèn)線程數(shù)為1,所以4個(gè)Task全部在一個(gè)線程中運(yùn)行。
(點(diǎn)擊放大圖像)
為了充分利用多線程的優(yōu)勢(shì),可以設(shè)置Kafka Stream的線程數(shù)。下圖展示了線程數(shù)為2時(shí)的并行模型。
(點(diǎn)擊放大圖像)
前文有提到,Kafka Stream可被嵌入任意Java應(yīng)用(理論上基于JVM的應(yīng)用都可以)中,下圖展示了在同一臺(tái)機(jī)器的不同進(jìn)程中同時(shí)啟動(dòng)同一Kafka Stream應(yīng)用時(shí)的并行模型。注意,這里要保證兩個(gè)進(jìn)程的StreamsConfig.APPLICATION_ID_CONFIG完全一樣。因?yàn)镵afka Stream將APPLICATION_ID_CONFI作為隱式啟動(dòng)的Consumer的Group ID。只有保證APPLICATION_ID_CONFI相同,才能保證這兩個(gè)進(jìn)程的Consumer屬于同一個(gè)Group,從而可以通過(guò)Consumer Rebalance機(jī)制拿到互補(bǔ)的數(shù)據(jù)集。
(點(diǎn)擊放大圖像)
既然實(shí)現(xiàn)了多進(jìn)程部署,可以以同樣的方式實(shí)現(xiàn)多機(jī)器部署。該部署方式也要求所有進(jìn)程的APPLICATION_ID_CONFIG完全一樣。從圖上也可以看到,每個(gè)實(shí)例中的線程數(shù)并不要求一樣。但是無(wú)論如何部署,Task總數(shù)總會(huì)保證一致。
(點(diǎn)擊放大圖像)
注意:Kafka Stream的并行模型,非常依賴于《Kafka設(shè)計(jì)解析(一)- Kafka背景及架構(gòu)介紹》一文中介紹的Kafka分區(qū)機(jī)制和《Kafka設(shè)計(jì)解析(四)- Kafka Consumer設(shè)計(jì)解析》中介紹的Consumer的Rebalance機(jī)制。強(qiáng)烈建議不太熟悉這兩種機(jī)制的朋友,先行閱讀這兩篇文章。
這里對(duì)比一下Kafka Stream的Processor Topology與Storm的Topology。
- Storm的Topology由Spout和Bolt組成,Spout提供數(shù)據(jù)源,而B(niǎo)olt提供計(jì)算和數(shù)據(jù)導(dǎo)出。Kafka Stream的Processor Topology完全由Processor組成,因?yàn)樗臄?shù)據(jù)固定由Kafka的Topic提供。
- Storm的不同Bolt運(yùn)行在不同的Executor中,很可能位于不同的機(jī)器,需要通過(guò)網(wǎng)絡(luò)通信傳輸數(shù)據(jù)。而Kafka Stream的Processor Topology的不同Processor完全運(yùn)行于同一個(gè)Task中,也就完全處于同一個(gè)線程,無(wú)需網(wǎng)絡(luò)通信。
- Storm的Topology可以同時(shí)包含Shuffle部分和非Shuffle部分,并且往往一個(gè)Topology就是一個(gè)完整的應(yīng)用。而Kafka Stream的一個(gè)物理Topology只包含非Shuffle部分,而Shuffle部分需要通過(guò)through操作顯示完成,該操作將一個(gè)大的Topology分成了2個(gè)子Topology。
- Storm的Topology內(nèi),不同Bolt/Spout的并行度可以不一樣,而Kafka Stream的子Topology內(nèi),所有Processor的并行度完全一樣。
- Storm的一個(gè)Task只包含一個(gè)Spout或者Bolt的實(shí)例,而Kafka Stream的一個(gè)Task包含了一個(gè)子Topology的所有Processor。
KTable vs. KStream
KTable和KStream是Kafka Stream中非常重要的兩個(gè)概念,它們是Kafka實(shí)現(xiàn)各種語(yǔ)義的基礎(chǔ)。因此這里有必要分析下二者的區(qū)別。
KStream是一個(gè)數(shù)據(jù)流,可以認(rèn)為所有記錄都通過(guò)Insert only的方式插入進(jìn)這個(gè)數(shù)據(jù)流里。而KTable代表一個(gè)完整的數(shù)據(jù)集,可以理解為數(shù)據(jù)庫(kù)中的表。由于每條記錄都是Key-Value對(duì),這里可以將Key理解為數(shù)據(jù)庫(kù)中的Primary Key,而Value可以理解為一行記錄??梢哉J(rèn)為KTable中的數(shù)據(jù)都是通過(guò)Update only的方式進(jìn)入的。也就意味著,如果KTable對(duì)應(yīng)的Topic中新進(jìn)入的數(shù)據(jù)的Key已經(jīng)存在,那么從KTable只會(huì)取出同一Key對(duì)應(yīng)的最后一條數(shù)據(jù),相當(dāng)于新的數(shù)據(jù)更新了舊的數(shù)據(jù)。
以下圖為例,假設(shè)有一個(gè)KStream和KTable,基于同一個(gè)Topic創(chuàng)建,并且該Topic中包含如下圖所示5條數(shù)據(jù)。此時(shí)遍歷KStream將得到與Topic內(nèi)數(shù)據(jù)完全一樣的所有5條數(shù)據(jù),且順序不變。而此時(shí)遍歷KTable時(shí),因?yàn)檫@5條記錄中有3個(gè)不同的Key,所以將得到3條記錄,每個(gè)Key對(duì)應(yīng)最新的值,并且這三條數(shù)據(jù)之間的順序與原來(lái)在Topic中的順序保持一致。這一點(diǎn)與Kafka的日志compact相同。
(點(diǎn)擊放大圖像)
此時(shí)如果對(duì)該KStream和KTable分別基于key做Group,對(duì)Value進(jìn)行Sum,得到的結(jié)果將會(huì)不同。對(duì)KStream的計(jì)算結(jié)果是<Jack,4>,<Lily,7>,<Mike,4>。而對(duì)Ktable的計(jì)算結(jié)果是<Mike,4>,<Jack,3>,<Lily,5>。
State store
流式處理中,部分操作是無(wú)狀態(tài)的,例如過(guò)濾操作(Kafka Stream DSL中用filer方法實(shí)現(xiàn))。而部分操作是有狀態(tài)的,需要記錄中間狀態(tài),如Window操作和聚合計(jì)算。State store被用來(lái)存儲(chǔ)中間狀態(tài)。它可以是一個(gè)持久化的Key-Value存儲(chǔ),也可以是內(nèi)存中的HashMap,或者是數(shù)據(jù)庫(kù)。Kafka提供了基于Topic的狀態(tài)存儲(chǔ)。
Topic中存儲(chǔ)的數(shù)據(jù)記錄本身是Key-Value形式的,同時(shí)Kafka的log compaction機(jī)制可對(duì)歷史數(shù)據(jù)做compact操作,保留每個(gè)Key對(duì)應(yīng)的最后一個(gè)Value,從而在保證Key不丟失的前提下,減少總數(shù)據(jù)量,從而提高查詢效率。
構(gòu)造KTable時(shí),需要指定其state store name。默認(rèn)情況下,該名字也即用于存儲(chǔ)該KTable的狀態(tài)的Topic的名字,遍歷KTable的過(guò)程,實(shí)際就是遍歷它對(duì)應(yīng)的state store,或者說(shuō)遍歷Topic的所有key,并取每個(gè)Key最新值的過(guò)程。為了使得該過(guò)程更加高效,默認(rèn)情況下會(huì)對(duì)該Topic進(jìn)行compact操作。
另外,除了KTable,所有狀態(tài)計(jì)算,都需要指定state store name,從而記錄中間狀態(tài)。
Kafka Stream如何解決流式系統(tǒng)中關(guān)鍵問(wèn)題
時(shí)間
在流式數(shù)據(jù)處理中,時(shí)間是數(shù)據(jù)的一個(gè)非常重要的屬性。從Kafka 0.10開(kāi)始,每條記錄除了Key和Value外,還增加了timestamp屬性。目前Kafka Stream支持三種時(shí)間
- 事件發(fā)生時(shí)間。事件發(fā)生的時(shí)間,包含在數(shù)據(jù)記錄中。發(fā)生時(shí)間由Producer在構(gòu)造ProducerRecord時(shí)指定。并且需要Broker或者Topic將message.timestamp.type設(shè)置為CreateTime(默認(rèn)值)才能生效。
- 消息接收時(shí)間,也即消息存入Broker的時(shí)間。當(dāng)Broker或Topic將message.timestamp.type設(shè)置為L(zhǎng)ogAppendTime時(shí)生效。此時(shí)Broker會(huì)在接收到消息后,存入磁盤(pán)前,將其timestamp屬性值設(shè)置為當(dāng)前機(jī)器時(shí)間。一般消息接收時(shí)間比較接近于事件發(fā)生時(shí)間,部分場(chǎng)景下可代替事件發(fā)生時(shí)間。
- 消息處理時(shí)間,也即Kafka Stream處理消息時(shí)的時(shí)間。
注:Kafka Stream允許通過(guò)實(shí)現(xiàn)org.apache.kafka.streams.processor.TimestampExtractor接口自定義記錄時(shí)間。
窗口
前文提到,流式數(shù)據(jù)是在時(shí)間上無(wú)界的數(shù)據(jù)。而聚合操作只能作用在特定的數(shù)據(jù)集,也即有界的數(shù)據(jù)集上。因此需要通過(guò)某種方式從無(wú)界的數(shù)據(jù)集上按特定的語(yǔ)義選取出有界的數(shù)據(jù)。窗口是一種非常常用的設(shè)定計(jì)算邊界的方式。不同的流式處理系統(tǒng)支持的窗口類似,但不盡相同。
Kafka Stream支持的窗口如下。
(點(diǎn)擊放大圖像)
(點(diǎn)擊放大圖像)
Join
Kafka Stream由于包含KStream和Ktable兩種數(shù)據(jù)集,因此提供如下Join計(jì)算
- KTable Join KTable 結(jié)果仍為KTable。任意一邊有更新,結(jié)果KTable都會(huì)更新。
- KStream Join KStream 結(jié)果為KStream。必須帶窗口操作,否則會(huì)造成Join操作一直不結(jié)束。
- KStream Join KTable / GlobakKTable 結(jié)果為KStream。只有當(dāng)KStream中有新數(shù)據(jù)時(shí),才會(huì)觸發(fā)Join計(jì)算并輸出結(jié)果。KStream無(wú)新數(shù)據(jù)時(shí),KTable的更新并不會(huì)觸發(fā)Join計(jì)算,也不會(huì)輸出數(shù)據(jù)。并且該更新只對(duì)下次Join生效。一個(gè)典型的使用場(chǎng)景是,KStream中的訂單信息與KTable中的用戶信息做關(guān)聯(lián)計(jì)算。
對(duì)于Join操作,如果要得到正確的計(jì)算結(jié)果,需要保證參與Join的KTable或KStream中Key相同的數(shù)據(jù)被分配到同一個(gè)Task。具體方法是
- 參與Join的KTable或KStream的Key類型相同(實(shí)際上,業(yè)務(wù)含意也應(yīng)該相同)
- 參與Join的KTable或KStream對(duì)應(yīng)的Topic的Partition數(shù)相同
- Partitioner策略的最終結(jié)果等效(實(shí)現(xiàn)不需要完全一樣,只要效果一樣即可),也即Key相同的情況下,被分配到ID相同的Partition內(nèi)
如果上述條件不滿足,可通過(guò)調(diào)用如下方法使得它滿足上述條件。
KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic)聚合與亂序處理
聚合操作可應(yīng)用于KStream和KTable。當(dāng)聚合發(fā)生在KStream上時(shí)必須指定窗口,從而限定計(jì)算的目標(biāo)數(shù)據(jù)集。
需要說(shuō)明的是,聚合操作的結(jié)果肯定是KTable。因?yàn)镵Table是可更新的,可以在晚到的數(shù)據(jù)到來(lái)時(shí)(也即發(fā)生數(shù)據(jù)亂序時(shí))更新結(jié)果KTable。
這里舉例說(shuō)明。假設(shè)對(duì)KStream以5秒為窗口大小,進(jìn)行Tumbling Time Window上的Count操作。并且KStream先后出現(xiàn)時(shí)間為1秒, 3秒, 5秒的數(shù)據(jù),此時(shí)5秒的窗口已達(dá)上限,Kafka Stream關(guān)閉該窗口,觸發(fā)Count操作并將結(jié)果3輸出到KTable中(假設(shè)該結(jié)果表示為<1-5,3>)。若1秒后,又收到了時(shí)間為2秒的記錄,由于1-5秒的窗口已關(guān)閉,若直接拋棄該數(shù)據(jù),則可認(rèn)為之前的結(jié)果<1-5,3>不準(zhǔn)確。而如果直接將完整的結(jié)果<1-5,4>輸出到KStream中,則KStream中將會(huì)包含該窗口的2條記錄,<1-5,3>, <1-5,4>,也會(huì)存在骯數(shù)據(jù)。因此Kafka Stream選擇將聚合結(jié)果存于KTable中,此時(shí)新的結(jié)果<1-5,4>會(huì)替代舊的結(jié)果<1-5,3>。用戶可得到完整的正確的結(jié)果。
這種方式保證了數(shù)據(jù)準(zhǔn)確性,同時(shí)也提高了容錯(cuò)性。
但需要說(shuō)明的是,Kafka Stream并不會(huì)對(duì)所有晚到的數(shù)據(jù)都重新計(jì)算并更新結(jié)果集,而是讓用戶設(shè)置一個(gè)retention period,將每個(gè)窗口的結(jié)果集在內(nèi)存中保留一定時(shí)間,該窗口內(nèi)的數(shù)據(jù)晚到時(shí),直接合并計(jì)算,并更新結(jié)果KTable。超過(guò)retention period后,該窗口結(jié)果將從內(nèi)存中刪除,并且晚到的數(shù)據(jù)即使落入窗口,也會(huì)被直接丟棄。
容錯(cuò)
Kafka Stream從如下幾個(gè)方面進(jìn)行容錯(cuò)
- 高可用的Partition保證無(wú)數(shù)據(jù)丟失。每個(gè)Task計(jì)算一個(gè)Partition,而Kafka數(shù)據(jù)復(fù)制機(jī)制保證了Partition內(nèi)數(shù)據(jù)的高可用性,故無(wú)數(shù)據(jù)丟失風(fēng)險(xiǎn)。同時(shí)由于數(shù)據(jù)是持久化的,即使任務(wù)失敗,依然可以重新計(jì)算。
- 狀態(tài)存儲(chǔ)實(shí)現(xiàn)快速故障恢復(fù)和從故障點(diǎn)繼續(xù)處理。對(duì)于Join和聚合及窗口等有狀態(tài)計(jì)算,狀態(tài)存儲(chǔ)可保存中間狀態(tài)。即使發(fā)生Failover或Consumer Rebalance,仍然可以通過(guò)狀態(tài)存儲(chǔ)恢復(fù)中間狀態(tài),從而可以繼續(xù)從Failover或Consumer Rebalance前的點(diǎn)繼續(xù)計(jì)算。
- KTable與retention period提供了對(duì)亂序數(shù)據(jù)的處理能力。
Kafka Stream應(yīng)用示例
下面結(jié)合一個(gè)案例來(lái)講解如何開(kāi)發(fā)Kafka Stream應(yīng)用。本例完整代碼可從作者Github獲取。
訂單KStream(名為orderStream),底層Topic的Partition數(shù)為3,Key為用戶名,Value包含用戶名,商品名,訂單時(shí)間,數(shù)量。用戶KTable(名為userTable),底層Topic的Partition數(shù)為3,Key為用戶名,Value包含性別,地址和年齡。商品KTable(名為itemTable),底層Topic的Partition數(shù)為6,Key為商品名,價(jià)格,種類和產(chǎn)地?,F(xiàn)在希望計(jì)算每小時(shí)購(gòu)買產(chǎn)地與自己所在地相同的用戶總數(shù)。
首先由于希望使用訂單時(shí)間,而它包含在orderStream的Value中,需要通過(guò)提供一個(gè)實(shí)現(xiàn)TimestampExtractor接口的類從orderStream對(duì)應(yīng)的Topic中抽取出訂單時(shí)間。
public class OrderTimestampExtractor implements TimestampExtractor {@Overridepublic long extract(ConsumerRecord<Object, Object> record) {if(record instanceof Order) {return ((Order)record).getTS();} else {return 0;}} }接著通過(guò)將orderStream與userTable進(jìn)行Join,來(lái)獲取訂單用戶所在地。由于二者對(duì)應(yīng)的Topic的Partition數(shù)相同,且Key都為用戶名,再假設(shè)Producer往這兩個(gè)Topic寫(xiě)數(shù)據(jù)時(shí)所用的Partitioner實(shí)現(xiàn)相同,則此時(shí)上文所述Join條件滿足,可直接進(jìn)行Join。
orderUserStream = orderStream.leftJoin(userTable, // 該lamda表達(dá)式定義了如何從orderStream與userTable生成結(jié)果集的Value(Order order, User user) -> OrderUser.fromOrderUser(order, user), // 結(jié)果集Key序列化方式Serdes.String(),// 結(jié)果集Value序列化方式SerdesFactory.serdFrom(Order.class)).filter((String userName, OrderUser orderUser) -> orderUser.userAddress != null)從上述代碼中,可以看到,Join時(shí)需要指定如何從參與Join雙方的記錄生成結(jié)果記錄的Value。Key不需要指定,因?yàn)榻Y(jié)果記錄的Key與Join Key相同,故無(wú)須指定。Join結(jié)果存于名為orderUserStream的KStream中。
接下來(lái)需要將orderUserStream與itemTable進(jìn)行Join,從而獲取商品產(chǎn)地。此時(shí)orderUserStream的Key仍為用戶名,而itemTable對(duì)應(yīng)的Topic的Key為產(chǎn)品名,并且二者的Partition數(shù)不一樣,因此無(wú)法直接Join。此時(shí)需要通過(guò)through方法,對(duì)其中一方或雙方進(jìn)行重新分區(qū),使得二者滿足Join條件。這一過(guò)程相當(dāng)于Spark的Shuffle過(guò)程和Storm的FieldGrouping。
orderUserStrea.through(// Key的序列化方式Serdes.String(),// Value的序列化方式 SerdesFactory.serdFrom(OrderUser.class), // 重新按照商品名進(jìn)行分區(qū),具體取商品名的哈希值,然后對(duì)分區(qū)數(shù)取模(String key, OrderUser orderUser, int numPartitions) -> (orderUser.getItemName().hashCode() & 0x7FFFFFFF) % numPartitions, "orderuser-repartition-by-item").leftJoin(itemTable, (OrderUser orderUser, Item item) -> OrderUserItem.fromOrderUser(orderUser, item), Serdes.String(), SerdesFactory.serdFrom(OrderUser.class))從上述代碼可見(jiàn),through時(shí)需要指定Key的序列化器,Value的序列化器,以及分區(qū)方式和結(jié)果集所在的Topic。這里要注意,該Topic(orderuser-repartition-by-item)的Partition數(shù)必須與itemTable對(duì)應(yīng)Topic的Partition數(shù)相同,并且through使用的分區(qū)方法必須與iteamTable對(duì)應(yīng)Topic的分區(qū)方式一樣。經(jīng)過(guò)這種through操作,orderUserStream與itemTable滿足了Join條件,可直接進(jìn)行Join。
總結(jié)
- Kafka Stream的并行模型完全基于Kafka的分區(qū)機(jī)制和Rebalance機(jī)制,實(shí)現(xiàn)了在線動(dòng)態(tài)調(diào)整并行度
- 同一Task包含了一個(gè)子Topology的所有Processor,使得所有處理邏輯都在同一線程內(nèi)完成,避免了不必的網(wǎng)絡(luò)通信開(kāi)銷,從而提高了效率。
- through方法提供了類似Spark的Shuffle機(jī)制,為使用不同分區(qū)策略的數(shù)據(jù)提供了Join的可能
- log compact提高了基于Kafka的state store的加載效率
- state store為狀態(tài)計(jì)算提供了可能
- 基于offset的計(jì)算進(jìn)度管理以及基于state store的中間狀態(tài)管理為發(fā)生Consumer rebalance或Failover時(shí)從斷點(diǎn)處繼續(xù)處理提供了可能,并為系統(tǒng)容錯(cuò)性提供了保障
- KTable的引入,使得聚合計(jì)算擁用了處理亂序問(wèn)題的能力
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/articles/7884696.html
總結(jié)
以上是生活随笔為你收集整理的Kafka设计解析(七)- 流式计算的新贵 Kafka Stream的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Kafka设计解析(六)- Kafka高
- 下一篇: Kafka设计解析(八)- Kafka事