Kafka Design
Github:https://kafka.apache.org/0100/documentation.html
中文版的設(shè)計(jì)文檔,?http://www.oschina.net/translate/kafka-design
?
Kafka: a Distributed Messaging System for Log Processing
1. Introduction
We have built a novel messaging system for log processing called Kafka [18] that?combines?the benefits of traditional?log aggregators?and?messaging systems.
On the one hand, Kafka is distributed and scalable, and offers high throughput.
On the other hand, Kafka provides an API similar to a messaging system and allows applications to consume log events in real time.
可以理解成一個(gè)分布式的product-consumer架構(gòu).
2. Related Work
既然先前有那么多的log aggreagtor和messaging system系統(tǒng), 為什么還需要kafka?
和傳統(tǒng)messaging system 對(duì)比
1. MQ或JMS都有很強(qiáng)的delivery guarantees功能, 這個(gè)對(duì)于log aggregator不需要, 某些log丟就丟了無(wú)所謂,而這些功能大大增加系統(tǒng)復(fù)雜性.?
2. 因?yàn)橐郧皼](méi)有bigdata, 所以沒(méi)有focus在throughput上, 比如不支持批發(fā)送?
3. 缺乏對(duì)distributed的support?
4. 不支持實(shí)時(shí)分析, consume的速度必須非常快, 否則隊(duì)列過(guò)長(zhǎng)會(huì)有效率問(wèn)題
Traditional messaging system?tend not to be a good fit for log processing.
First, there is a mismatch in features offered by enterprise systems.
For example, IBM Websphere MQ [7] has transactional supports that allow an application to insert messages into multiple queues atomically. The JMS [14] specification allows each individual message to be acknowledged after consumption, potentially out of order.
Second, many systems do not focus as strongly on?throughput?as their primary design constraint.
Third, those systems are?weak?in distributed support.
Finally, many messaging systems assume near immediate consumption of messages, so the queue of unconsumed messages is always fairly small.
?
和現(xiàn)有的log aggregator對(duì)比, Pull特性
A number of?specialized log aggregators?have been built over the last few years.
Facebook?uses a system called?Scribe. Each frontend machine can send log data to a set of Scribe machines over sockets. Each Scribe machine aggregates the log entries and periodically dumps them to HDFS [9] or an NFS device.
Yahoo’s data highway?project has a similar dataflow. A set of machines aggregate events from the clients and roll out “minute” files, which are then added to HDFS.
Flume?is a relatively new log aggregator developed by?Cloudera. It supports extensible “pipes” and “sinks”, and makes streaming log data very flexible. It also has more integrated distributed support. However, most of those systems are built for consuming the log data offline, and often expose implementation details unnecessarily (e.g. “minute files”) to the consumer.
most of them use a “push” model in which the broker forwards data to consumers. At LinkedIn, we find the “pull” model more suitable for our applications since each consumer can retrieve the messages at the maximum rate it can sustain and?avoid being flooded?by messages pushed faster than it can handle.
為什么要使用pull來(lái)代替push, consumer的飽饑只有consumer知道, 所以broker強(qiáng)制推送沒(méi)有consumer自己來(lái)拿合理.
那以前的系統(tǒng)就想不到這點(diǎn), 不是的, 這個(gè)不難想到, 問(wèn)題是以前的系統(tǒng)都是基于offline consumer, consumer都是直接將數(shù)據(jù)存儲(chǔ)到HDFS中, 不會(huì)在線分析, 所以通常情況下consumer不會(huì)存在被flooded的危險(xiǎn). 在這樣的前提下, push更為簡(jiǎn)單些.
?
3. Kafka Architecture and Design Principles
We first introduce the basic concepts in Kafka.
A stream of messages of a particular type is defined by a?topic.
A?producer?can publish messages to a topic.
The published messages are then stored at a set of servers called?brokers.
A?consumer?can subscribe to one or more topics from the brokers, and consume the subscribed messages by pulling data from the brokers.
To balance load, a topic is divided into multiple?partitions?and each broker stores one or more of those partitions.
通過(guò)topic劃分partition的策略, 來(lái)保證load balance?
這種分區(qū)相對(duì)比較合理, topic的熱度不一樣, 所以如果把不同的topic放到不同的broker上的話, 容易導(dǎo)致負(fù)載失衡.?
默認(rèn)是使用random partition, 可以定制更為合理的partition策略
?
3.1 Efficiency on a Single Partition
Simple storage, 簡(jiǎn)單存儲(chǔ)
Kafka具有非常簡(jiǎn)單的存儲(chǔ)結(jié)構(gòu)
1. 存儲(chǔ)的單元是partition, 而每個(gè)partition其實(shí)就是一組segment files, 之所以用一組files, 防止單個(gè)文件過(guò)大?
所以從邏輯上, 你可以認(rèn)為, 一個(gè)partition就是一個(gè)log file, 而新的message就會(huì)被append到file的末端?
象所以的文件系統(tǒng)一樣, 所有的message只有在flush后才可被consumer取到.
2. 使用logic offset來(lái)代替message id, 減少存儲(chǔ)的overhead.?
Kafka has a very simple storage layout.
1. Each partition of a topic corresponds to a logical log.
Physically, a log is implemented as a set of segment files of approximately the same size (e.g., 1GB).
Every time a producer publishes a message to a partition, the broker simply appends the message to the last segment file.
A message is only exposed to the consumers after it is flushed.
2. A message stored in Kafka doesn’t have an explicit message id. Instead, each message is addressed by its?logical offset?in the log. This avoids the overhead of maintaining auxiliary, seek-intensive random-access index structures that map the message ids to the actual message locations.
?
Efficient transfer, 高效傳輸
1. 一組messages批量發(fā)送, 提高吞吐效率
2. 使用文件系統(tǒng)cache, 而非memory cache
3. 使用sendfile, 繞過(guò)應(yīng)用層buffer, 直接將數(shù)據(jù)從file傳到socket (前提是應(yīng)用邏輯確實(shí)不關(guān)心發(fā)送的內(nèi)容)
We are very careful about transferring data in and out of Kafka.
1. Producer can submit a set of messages in a single send request. Consumer also retrieves multiple messages up to a certain size, typically hundreds of kilobytes.
2. Another unconventional choice that we made is to avoid explicitly caching messages in memory at the Kafka layer. Instead, we rely on the underlying?file system page cache.
使用文件系統(tǒng)的page cache的優(yōu)點(diǎn)? 參考kafka design?
首先, 直接利用page cache比較簡(jiǎn)單高效, 不需要做特別的事, 避免特意去創(chuàng)建memory buffer?
再者, page cache只要磁盤不斷電, 就一直存在, broker進(jìn)程重啟或crash都不會(huì)丟失?
最后, 最重要的是, 適合這個(gè)場(chǎng)景, kafka都是順序讀寫
This has the main benefit of avoiding double buffering---messages are only cached in the page cache.
This has the additional benefit of retaining warm cache even when a broker process is restarted.
Since both the producer and the consumer access the segment files, sequentially, with the consumer often lagging the producer by a small amount, normal operating system caching heuristics are very effective. producer和consumer
3. We optimize the network access for consumers.
On Linux and other Unix operating systems, there exists a sendfile API [5] that can directly transfer bytes from a file channel to a socket channel.
用這個(gè)省去了, (2) copy data in the page cache to an application buffer, (3) copy application buffer to another kernel buffer,
因?yàn)樗緛?lái)就不想用memory buffer, 所以這樣更高效, 直接從page cache copy到kernel buffer.
?
Stateless broker, broker無(wú)狀態(tài)
Unlike most other messaging systems, in Kafka, the information about how much each consumer has consumed is not maintained by the broker, but by the consumer itself.
由于consumer采用pull決定的, broker沒(méi)有必要知道consumer讀了多少. 如果是push, 你必須知道...
帶來(lái)的問(wèn)題是你不知道consumer什么時(shí)候來(lái)pull, 那么broker什么時(shí)候把message刪掉, 他用了個(gè)很簡(jiǎn)單的方法, simple time-based SLA, 過(guò)段時(shí)間就刪, 比如7天.?
There is an important side benefit of this design. A consumer can deliberately rewind back to an old offset and re-consume data.
這個(gè)特性很方便, 比如測(cè)試, 深有體會(huì), 一般queue讀一次就沒(méi)了, 要用相同數(shù)據(jù)反復(fù)測(cè)試非常麻煩, 對(duì)于kafka改改offset就可以, 很方便
還有就是consumer掛了, 數(shù)據(jù)沒(méi)有寫成功, 沒(méi)事, 拿上次的offset再讀還能讀到. 確實(shí)不錯(cuò)...
問(wèn)題是, kafka本身不提供offset的操作接口, 看上去很美好, 實(shí)際使用確不是很方便.
?
3.2 Distributed Coordination
We now describe how the producers and the consumers behave in a distributed setting.
Each producer can publish a message to either a randomly selected partition or a partition semantically determined by a partitioning key and a partitioning function. We will focus on how the consumers interact with the brokers.
對(duì)于producer, 很簡(jiǎn)單, 要么隨機(jī), 要么通過(guò)某種hash方法, 發(fā)到某一個(gè)partition.
對(duì)于consumer, 就比較復(fù)雜了, 一個(gè)topic有那么多partition, 為了效率肯定需要用多個(gè)consumer去consume, 那么怎么保證consumers之間的coordination.
?
Kafka has the concept of consumer groups. Each?consumer group?consists of one or more consumers that jointly consume a set of?
subscribed topics, i.e., each message is delivered to only one of the consumers within the group.
你可以把一個(gè)consumer group抽象為單一的consumer, 每條message我只需要consume一次, 之所以使用group是為了并發(fā)操作
而對(duì)于不同的group之間, 完全獨(dú)立的, 一條message可以被每個(gè)group都consume一次, 所以group之間是不需要coordination的.
問(wèn)題是同一個(gè)group之間的consumer需要coordinate, 來(lái)保證只每個(gè)message只被consume一次, 而且我們的目標(biāo)是盡量減少這種coordinate的overhead.
?
1. 為了簡(jiǎn)化設(shè)計(jì), 取消paritition本身的并發(fā)性, 只支持partition之間的并發(fā)?
Our first decision?is to make a partition within a topic the smallest unit of parallelism. This means that at any given time, all messages from one partition are consumed only by a single consumer within each consumer group.
一個(gè)partition, 只能有一個(gè)consumer, 這樣就避免了多consumer讀時(shí)的locking and state maintenance overhead
那每個(gè)partition都安排一個(gè)專屬consumer可不可以, 可以, 但太浪費(fèi)...partition往往比consumer的數(shù)量多很多的
所以一個(gè)consumer需要cover多個(gè)partition, 這樣就產(chǎn)生一個(gè)問(wèn)題, 當(dāng)partition或consumer的數(shù)量發(fā)生變化的時(shí)候, 我們需要去做rebalance, 以從新分配consume關(guān)系. 只有當(dāng)這個(gè)時(shí)候, 我們需要去coordinate各個(gè)consumer, 所以coordinate的overhead是比較低的.
這樣設(shè)計(jì)最大的問(wèn)題在于, 單個(gè)或少量partition的低速會(huì)拖慢整個(gè)處理速度, 因?yàn)橐粋€(gè)partition只能有一個(gè)consumer, 其他consumers就算閑著也無(wú)法幫你.?
所以你必須保證每個(gè)partition的數(shù)據(jù)產(chǎn)生和消費(fèi)速度差不多, 否則就會(huì)有問(wèn)題?
比如必須巧妙的設(shè)計(jì)partition的數(shù)目, 因?yàn)槿绻鹥artition數(shù)目不能整除consumer數(shù)目, 就會(huì)導(dǎo)致不平均?
個(gè)人認(rèn)為這不算值得借鑒的設(shè)計(jì), 應(yīng)該有更好的選擇...
?
2. 使用Zookeeper來(lái)代替center master
The second decision?that we made is to not have a central “master” node, but instead let consumers coordinate among themselves in a decentralized fashion.
Kafka uses Zookeeper for the following tasks:
(1) detecting the addition and the removal of brokers and consumers,
(2) triggering a rebalance process in each consumer when the above events happen, and
(3) maintaining the consumption relationship and keeping track of the consumed offset of each partition.
Specifically, when each broker or consumer starts up, it stores its information in a broker or consumer registry in Zookeeper.
The broker registry (ephemeral)?contains the broker’s host name and port, and the set of topics and partitions stored on it.
The consumer registry (ephemeral)?includes the consumer group to which a consumer belongs and the set of topics that it subscribes to.
The ownership registry (ephemeral)?has one path for every subscribed partition and the path value is the id of the consumer currently consuming from this partition (we use the terminology that the consumer owns this partition).
The offset registry (persistent)?stores for each subscribed partition, the offset of the last consumed message in the partition (for Each consumer group).
當(dāng)broker和consumer發(fā)生變化時(shí), 增加或減少, 對(duì)應(yīng)的ephemeral registry會(huì)自動(dòng)跟隨變化, 很簡(jiǎn)單.
但同時(shí), 還會(huì)觸發(fā)consumer的rebalance event, 根據(jù)rebalance的結(jié)果去修改或增減ownership registry.
這里面只有offset registry是persistent的, 無(wú)論你consumer怎樣變化, 只要記錄了每個(gè)group的在partition上的offset, 就可以保證group內(nèi)的coordinate.
?
3. Consumer Rebalance
Algorithm 1: rebalance process for consumer Ci in group G #對(duì)于group中的某個(gè)consumer?
For each topic T that Ci subscribes to {??????????????????????? #按Topic逐個(gè)進(jìn)行的, 不同topic的partition數(shù)目不同?
??? remove partitions owned by Ci from the ownership registry? #先清除own關(guān)系?
??? read the broker and the consumer registries from Zookeeper #讀取broker和consumer registries?
??? compute PT = partitions available in all brokers under topic T #取出T的partition list?
??? compute CT = all consumers in G that subscribe to topic T?? #取出T對(duì)應(yīng)的consumer list?
??? sort PT and CT?????????????????????????????????????????????????????????? #對(duì)兩個(gè)list進(jìn)行排序?
??? let j be the index position of Ci in CT and let N = |PT|/|CT| #找出C在consumer list的順序, j?
??? assign partitions from j*N to (j+1)*N - 1 in PT to consumer Ci
??? for each assigned partition p {?
??????? set the owner of p to Ci in the ownership registry #改ownship?
??????? let Op = the offset of partition p stored in the offset registry #讀offset?
??????? invoke a thread to pull data in partition p from offset Op #創(chuàng)建線程去并發(fā)的handle每個(gè)partition?
??? }?
}
?
算法關(guān)鍵就是這個(gè)公式, j*N to (j+1)*N - 1
其實(shí)很簡(jiǎn)單, 如果有10個(gè)partition, 2個(gè)consumer, 每個(gè)consumer應(yīng)該handle幾個(gè)partition?
怎么分配這5個(gè)partition, 根據(jù) C在consumer list的順序, j?
根據(jù)這個(gè)就可以實(shí)現(xiàn)kafka的自動(dòng)負(fù)載均衡, 總是保證每個(gè)partition都被consumer均勻分布的handle, 但某個(gè)consumer掛了, 通過(guò)rebalance就會(huì)有其他的consumer補(bǔ)上.
但是kafka的"make a partition within a topic the smallest unit of parallelism”策略雖然簡(jiǎn)化的復(fù)雜度, 但是也降低了balance的粒度, 他無(wú)法handle某一個(gè)partition的數(shù)據(jù)特別多這種case, 因?yàn)橐粋€(gè)paritition最多只能有一個(gè)consumer. 所以producer在扔的時(shí)候需要保證各個(gè)partition的均衡.
設(shè)計(jì)的關(guān)鍵,?由于對(duì)于partition會(huì)記錄該group讀取的offset, 所以任何時(shí)候可以任意切換讀取的consumer, 所以rebalance只是簡(jiǎn)單的做了重新分配, 不用考慮其他.
但在rebalance的時(shí)候, 有時(shí)會(huì)導(dǎo)致數(shù)據(jù)讀重.?
原因是我們考慮到consumer的不穩(wěn)定性, 當(dāng)把數(shù)據(jù)處理完后再commit到broker, 這樣consumer crash也不會(huì)丟失數(shù)據(jù)?
但當(dāng)consumer rebalance的時(shí)候, 就會(huì)導(dǎo)致其他consumer讀到相同數(shù)據(jù)...
?
Partition ownership的競(jìng)爭(zhēng), 由于通知時(shí)機(jī)導(dǎo)致
When there are multiple consumers within a group, each of them will be notified of a broker or a consumer change.?
However, the notification may come at slightly different times at the consumers.
So, it is possible that one consumer tries to take ownership of a partition still owned by another consumer. When this happens, the first consumer simply releases all the partitions that it currently owns, waits a bit and retries the rebalance process. In practice, the rebalance process often stabilizes after only a few retries.
?
3.3 Delivery Guarantees
In general, Kafka only guarantees at-least-once delivery. Exactly once delivery typically requires two-phase commits and is not necessary for our applications.
這塊不是kafka的重點(diǎn), 不需要如兩段提交這種commit機(jī)制. 一般情況下, 我們是可以保證Exactly once delivery, 但如果一個(gè)consumer讀完數(shù)據(jù)后在更新zookeeper之前掛了, 那后續(xù)的consumer是有可能讀到重復(fù)數(shù)據(jù)的.?
Kafka guarantees that messages from a single partition are delivered to a consumer in order. However, there is no guarantee on the ordering of messages coming from different partitions.
To avoid log corruption, Kafka stores a CRC for each message in the log.?使用CRC來(lái)防止網(wǎng)絡(luò)錯(cuò)誤, 數(shù)據(jù)被篡改
If a broker goes down, any message stored on it not yet consumed becomes unavailable. If the storage system on a broker is permanently damaged, any unconsumed message is lost forever.?
如果broker crash, 會(huì)有數(shù)據(jù)丟失問(wèn)題.
In the future, we plan to add built-in?replication?in Kafka to redundantly store each message on multiple brokers.
?
4. Kafka Usage at LinkedIn
We have one Kafka cluster co-located with each datacenter where our userfacing services run.
首先在run service的datacenter 跑個(gè)kafka集群用于收集數(shù)據(jù)
The frontend services generate various kinds of log data and publish it to the local Kafka brokers in batches.?
We rely on a hardware load-balancer to distribute the publish requests to the set of Kafka brokers evenly.
這個(gè)很重要, 必須要保證distribute the publish requests的balance, 因?yàn)楹竺鏌o(wú)法彌補(bǔ)這種unbalance
The online consumers of Kafka run in services within the same datacenter.
對(duì)于這個(gè)集群, 我們采用online consumer, 來(lái)實(shí)時(shí)分析
We also deploy a cluster of Kafka in a separate datacenter for offline analysis, located geographically close to our Hadoop?
cluster and other data warehouse infrastructure.
在離Hadoop集群和數(shù)據(jù)倉(cāng)庫(kù)比較近的地方, 建一個(gè)為了offline分析的kafka集群
This instance of Kafka runs a set of embedded consumers to pull data from the Kafka instances in the live datacenters.
consumer本身可以是另一個(gè)kafka集群, 很有創(chuàng)意的用法...
We then run data load jobs to pull data from this replica cluster of Kafka into Hadoop and our data warehouse, where we run various reporting jobs and analytical process on the data.
總結(jié)
以上是生活随笔為你收集整理的Kafka Design的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 【Spark Summit EU 201
- 下一篇: Implementing Synchro