Kafka 指南
提到消息系統(tǒng),目前最火熱的非 Kafka 莫屬,公司也打算利用 Kafka 進行各業(yè)務日志統(tǒng)一收集,這里結合自己的實踐來分享一下具體的配置及使用。Kafka 版本 0.10.0.1
更新記錄
- 2016.08.15: 初稿
介紹
作為云計算大數據的套件,Kafka 是一個分布式的、可分區(qū)的、可復制的消息系統(tǒng)。該有的功能基本都有,而且有自己的特色:
- 以 topic 為單位進行消息歸納
- 向 topic 發(fā)布消息的是 producer
- 從 topic 獲取消息的是 consumer
- 集群方式運行,每個服務叫 broker
- 客戶端和服務器通過 TCP 進行通信
在Kafka集群中,沒有“中心主節(jié)點”的概念,集群中所有的服務器都是對等的,因此,可以在不做任何配置的更改的情況下實現(xiàn)服務器的的添加與刪除,同樣的消息的生產者和消費者也能夠做到隨意重啟和機器的上下線。
對每個 topic 來說,Kafka 會對其進行分區(qū),每個分區(qū)都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到分區(qū)中。分區(qū)中的每個消息都有一個連續(xù)的序列號叫做 offset,用來在分區(qū)中唯一的標識這個消息。
發(fā)布消息通常有兩種模式:隊列模式(queuing)和發(fā)布-訂閱模式(publish-subscribe)。隊列模式中,consumers 可以同時從服務端讀取消息,每個消息只被其中一個 consumer 讀到;發(fā)布-訂閱模式中消息被廣播到所有的 consumer 中。更常見的是,每個 topic 都有若干數量的 consumer 組,每個組都是一個邏輯上的『訂閱者』,為了容錯和更好的穩(wěn)定性,每個組由若干 consumer 組成。這其實就是一個發(fā)布-訂閱模式,只不過訂閱者是個組而不是單個 consumer。
通過分區(qū)的概念,Kafka可以在多個consumer組并發(fā)的情況下提供較好的有序性和負載均衡。將每個分區(qū)分只分發(fā)給一個consumer組,這樣一個分區(qū)就只被這個組的一個consumer消費,就可以順序的消費這個分區(qū)的消息。因為有多個分區(qū),依然可以在多個consumer組之間進行負載均衡。注意consumer組的數量不能多于分區(qū)的數量,也就是有多少分區(qū)就允許多少并發(fā)消費。
Kafka 只能保證一個分區(qū)之內消息的有序性,在不同的分區(qū)之間是不可以的,這已經可以滿足大部分應用的需求。如果需要 topic 中所有消息的有序性,那就只能讓這個 topic 只有一個分區(qū),當然也就只有一個 consumer 組消費它。
單機配置
按照下列步驟即可(來自官網教程)
1. 下載 Kafka
- 下載?wget http://apache.01link.hk/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz?或者?wget http://ftp.cuhk.edu.hk/pub/packages/apache.org/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz(看哪個源比較快)
- 解壓?tar -xzf kafka_2.11-0.10.0.0.tgz
- 進入文件夾?cd kafka_2.11-0.10.0.0/
2. 啟動服務
- 啟動 ZooKeeper?bin/zookeeper-server-start.sh config/zookeeper.properties &(利用?&放到后臺方便繼續(xù)操作)
- 啟動 Kafka?bin/kafka-server-start.sh config/server.properties &
3. 創(chuàng)建一個叫做 dawang 的 topic,它只有一個分區(qū),一個副本
- 創(chuàng)建?bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dawang
- 查看?bin/kafka-topics.sh --list --zookeeper localhost:2181
- 還可以配置 broker 讓它自動創(chuàng)建 topic
4. 發(fā)送消息。Kafka 使用一個簡單的命令行producer,從文件中或者從標準輸入中讀取消息并發(fā)送到服務端。默認的每條命令將發(fā)送一條消息。
- 發(fā)送消息?bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dawang(然后可以隨意輸入內容,回車可以發(fā)送,ctrl+c 退出)
5. 啟動 consumer。可以讀取消息并輸出到標準輸出:
- 接收消息?bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dawang --from-beginning
- 在一個終端中運行 consumer 命令行,另一個終端中運行 producer 命令行,就可以在一個終端輸入消息,另一個終端讀取消息。這兩個命令都有自己的可選參數,可以在運行的時候不加任何參數可以看到幫助信息。
6. 搭建一個多個 broker 的集群,啟動有 3 個 broker 組成的集群,這些 broker 節(jié)點也都在本機
首先復制一下配置文件:cp config/server.properties config/server-1.properties?和?cp config/server.properties config/server-2.properties
兩個文件需要改動的內容為:
| config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dir=/tmp/kafka-logs-2 |
這里我們把 broker id, 端口號和日志地址配置成和之前不一樣,然后我們啟動這兩個 broker:
| bin/kafka-server-start.sh config/server-1.properties &bin/kafka-server-start.sh config/server-2.properties & |
然后創(chuàng)建一個復制因子為 3 的 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic oh3topic
可以使用?describe?命令來顯示 topic 詳情
| > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic oh3topicTopic:oh3topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: oh3topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 |
這里簡單解釋一下
- Leader 是給定分區(qū)的節(jié)點編號,每個分區(qū)的部分數據會隨機指定不同的節(jié)點
- Replicas 是該日志會保存的復制
- Isr 表示正在同步的復制
我們也可以來看看之前的另一個 topic 的情況
| > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic dawangTopic:dawang PartitionCount:1 ReplicationFactor:1 Configs:Topic: dawang Partition: 0 Leader: 0 Replicas: 0 Isr: 0 |
最后我們可以按照同樣的方法來生產和消費消息,例如
| # 生產bin/kafka-console-producer.sh --broker-list localhost:9092 --topic oh3topic# 消費bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic oh3topic |
開倆終端就可以一邊生產消息,一邊消費消息了。
注意事項
如果要配置自定義端口,server.properties?中 listeners 一定要配置成為 IP 地址;如果配置為 localhost 或服務器的 hostname,在使用 java 發(fā)送數據時就會拋出異
| # 創(chuàng)建 topicbin/kafka-topics.sh --create --zookeeper bi03:2181 --replication-factor 1 --partitions 1 --topic logs# 生產消息bin/kafka-console-producer.sh --broker-list localhost:13647 --topic logs# 消費消息# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic logs |
如果 Zookeeper 出現(xiàn)?fsync-ing the write ahead log in SyncThread:1 took 2243ms which will adversely effect operation latency. See the ZooKeeper troubleshooting guide,是因為 FOLLOWER 在跟 LEADER 同步時,fsync 操作時間過長,導致超時。增加?tickTime?或者?initLimit?和?syncLimit?的值即可
集群配置
kafka 使用 ZooKeeper 用于管理、協(xié)調代理。每個 Kafka 代理通過 Zookeeper 協(xié)調其他 Kafka 代理。當 Kafka 系統(tǒng)中新增了代理或某個代理失效時,Zookeeper 服務將通知生產者和消費者。生產者與消費者據此開始與其他代理協(xié)調工作。
安裝 Java
先給兩臺機子安裝 Java
| sudo add-apt-repository -y ppa:webupd8team/javasudo apt-get updatesudo apt-get -y install oracle-java8-installer |
更新 Hosts
這里用兩臺機器做例子(理論上最好是 3 臺起步,偶數個不是不可以的,但是zookeeper集群是以宕機個數過半才會讓整個集群宕機的,所以奇數個集群更佳),分別配置?/etc/hosts?文件為
| 127.0.0.1 localhost10.1.1.164 bi0310.1.1.44 bi02 |
修改 Zookeeper 配置文件
修改?config/zookeeper.properties?為
| dataDir=/data/home/logger/kafka_2.11-0.10.0.0/zookeeper-logs/clientPort=2181# maxClientCnxns=0tickTime=2000initLimit=5syncLimit=2server.1=bi03:13645:13646server.2=bi02:13645:13646 |
參數的意義為:
- initLimit: zookeeper集群中的包含多臺 server,其中一臺為 leader,集群中其余的 server 為 follower。initLimit 參數配置初始化連接時,follower 和 leader 之間的最長心跳時間。此時該參數設置為 5,說明時間限制為 5 倍 tickTime,即?5*2000=10000ms=10s
- syncLimit: 該參數配置 leader 和 follower 之間發(fā)送消息,請求和應答的最大時間長度。此時該參數設置為 2,說明時間限制為 2 倍 tickTime,即 4000ms
- server.X=A:B:C 其中 X 是一個數字, 表示這是第幾號 server。A 是該 server 所在的 IP 地址。B 配置該 server 和集群中的 leader 交換消息所使用的端口。C 配置選舉 leader 時所使用的端口。
給服務器編號
在 dataDir 目錄下建立一個 myid 文件,分別為
| # server.1echo 1 > myid# server.2echo 2 > myid |
啟動 Zookeeper
然后在每臺機子上啟動 zookeeper 服務
bin/zookeeper-server-start.sh config/zookeeper.properties &
所有機子的 zookeeper 都啟動之前會報錯,這都是正常的
如果不想要任何輸出
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
修改 Kafka 配置文件
修改?config/server.properties,幾個要改的部分是
| # 允許刪除 topicdelete.topic.enable=truebroker.id=0 # 這里不能重復listeners=PLAINTEXT://bi03:13647 # 這里要配置成本機的 host name# 這里需要配置成外網能夠訪問的地址及端口advertised.listeners=PLAINTEXT://external.ip:8080log.dirs=/data/home/logger/kafka_2.11-0.10.0.0/kafka-logsnum.partitions=2zookeeper.connect=bi03:2181,bi02:2181 |
啟動 Kafka
在每個節(jié)點上執(zhí)行
bin/kafka-server-start.sh config/server.properties &
如果不想要任何輸出
nohup bin/kafka-server-start.sh config/server.properties &
驗證安裝
創(chuàng)建一個 topic
bin/kafka-topics.sh --create --zookeeper bi03:2181,bi02:2181 --replication-factor 2 --partitions 1 --topic test
查看集群狀態(tài)
bin/kafka-topics.sh --describe --zookeeper bi03:2181,bi02:2181 --topic test
生產消息,這里注意要生產到前面設置的監(jiān)聽端口,而不是 zookeeper 的端口
bin/kafka-console-producer.sh --broker-list bi03:13647,bi02:13647 --topic test
消費消息,這里注意是 zookeeper 的端口,而不是 kafka 的端口
bin/kafka-console-consumer.sh --zookeeper bi03:2181,bi02:2181 --from-beginning --topic test
顯示 topic 列表
bin/kafka-topics.sh --zookeeper bi03:2181,bi02:2181 --list
刪除 topic
bin/kafka-topics.sh --zookeeper bi03:2181,bi02:2181 --delete --topic hello
其他配置
Kafka 使用鍵值對的屬性文件格式來進行配置,比如?config/server.properties,具體的值可以從文件中讀取,或者在代碼中進行指定。最重要的三個屬性是:
- broker.id: broker 的編號,不能相同
- log.dirs: 日志保存的文件夾,默認為?/tmp/kafka-logs
- zookeeper.connect: zookeeper 的 host
其他一些我覺得比較有用的屬性為
- auto.create.topics.enable?是否允許自動創(chuàng)建 topic,boolean 值,默認為?true
- auto.leader.rebalance.enable?是否允許 leader 進行自動平衡,boolean 值,默認為?true
- background.threads?后臺進程數目,int 值,默認為 10 個
- compression.type?指定 topic 的壓縮方式,string 值,可選有
- gzip,?snappy,?lz4?壓縮方法
- uncompressed?不壓縮
- producer?跟隨 producer 的壓縮方式
- delete.topic.enable?是否允許刪除 topic,boolean 值,默認為 false(主要用于控制 admin 界面中的控制)
- leader.imbalance.check.interval.seconds?檢查是否平衡的時間間隔,long 值,默認為 300
- leader.imbalance.per.broker.percentage?允許的不平衡的百分比,超出則會進行重平衡,int 值,默認為 10
- log.flush.interval.messages?攢了多少條消息之后會把數據刷入磁盤,long 值,默認是 9223372036854775807
- log.flush.interval.ms?每條消息在保存到磁盤中前會在內存中待多久,單位毫秒,long 值,如果不設定,默認使用?log.flush.scheduler.interval.ms,也就是 9223372036854775807
更多的配置可以參考這里,以上的配置均針對 broker,因為目前我只用 broker 的部分
基本操作
所有的工具都可以在?bin/?文件夾下查看,如果不帶任何參數,就會給出所有命令的列表說明,這里只簡要說明一些常用的命令
創(chuàng)建和移除 topic
可以手動創(chuàng)建 topic,或在數據進來時自動創(chuàng)建不存在的 topic,如果是自動創(chuàng)建的話,可能需要根據這里來進行對應調整。
創(chuàng)建 topic
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
replication-factor 控制復制的份數,建議 2-3 份來兼顧容錯和效率。partitions 控制該 topic 將被分區(qū)的數目,partitions 的數目最好不要超過服務器的個數(因為分區(qū)的意義是增加并行效率,而服務器數量決定了并行的數量,假設只有 2 臺服務器,分 4 個區(qū)和 2 個區(qū)其實差別不大)。另外,topic 的名稱不能超過 249 個字符
修改 topic
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 40
這里需要注意,即使修改了分區(qū)的個數,已有的數據也不會進行變動,Kafka 不會做任何自動重分布
增加配置
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y
移除配置
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --delete-config x
刪除 topic
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
這個需要?delete.topic.enable=true,目前 Kafka 不支持減少 topic 的分區(qū)數目
優(yōu)雅關閉
Kafka 會自動檢測 broker 的狀態(tài)并根據機器狀態(tài)選舉出新的 leader。但是如果需要進行配置更改停機的時候,我們就需要使用優(yōu)雅關閉了,好處在于:
但是這個需要開啟?controlled.shutdown.enable=true。
剛重啟之后的節(jié)點不是任何分區(qū)的 leader,所以這時候需要進行重新分配:
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
這里需要開啟?auto.leader.rebalance.enable=true
然后可以使用腳本?bin/kafka-server-stop.sh
注意,如果配置文件中沒有?auto.leader.rebalance.enable=true,就還需要重新平衡
深入理解
這里只是一部分摘錄,更多內容可查閱參考鏈接(尤其是美團技術博客的那篇)
文件系統(tǒng)
Kafka 大量依賴文件系統(tǒng)去存儲和緩存消息。而文件系統(tǒng)最終會放在硬盤上,不過不用擔心,很多時候硬盤的快慢完全取決于使用它的方式。設計良好的硬盤架構可以和內存一樣快。
所以與傳統(tǒng)的將數據緩存在內存中然后刷到硬盤的設計不同,Kafka直接將數據寫到了文件系統(tǒng)的日志中,因此也避開了 JVM 的劣勢——Java 對象占用空間巨大,數據量增大后垃圾回收有困難。使用文件系統(tǒng),即使系統(tǒng)重啟了,也不需要刷新數據,也簡化了維護數據一致性的邏輯。
對于主要用于日志處理的消息系統(tǒng),數據的持久化可以簡單的通過將數據追加到文件中實現(xiàn),讀的時候從文件中讀就好了。這樣做的好處是讀和寫都是 O(1) 的,并且讀操作不會阻塞寫操作和其他操作。這樣帶來的性能優(yōu)勢是很明顯的,因為性能和數據的大小沒有關系了。
既然可以使用幾乎沒有容量限制(相對于內存來說)的硬盤空間建立消息系統(tǒng),就可以在沒有性能損失的情況下提供一些一般消息系統(tǒng)不具備的特性。比如,一般的消息系統(tǒng)都是在消息被消費后立即刪除,Kafka卻可以將消息保存一段時間(比如一星期),這給consumer提供了很好的機動性和靈活性。
事務定義
數據傳輸的事務定義通常有以下三種級別:
- 最多一次: 消息不會被重復發(fā)送,最多被傳輸一次,但也有可能一次不傳輸。
- 最少一次: 消息不會被漏發(fā)送,最少被傳輸一次,但也有可能被重復傳輸.
- 精確的一次(Exactly once): 不會漏傳輸也不會重復傳輸,每個消息都傳輸被一次而且僅僅被傳輸一次,這是大家所期望的。
Kafka 的機制和 git 有點類似,有一個 commit 的概念,一旦提交且 broker 在工作,那么數據就不會丟失。如果 producer 發(fā)布消息時發(fā)生了網絡錯誤,但又不確定實在提交之前發(fā)生的還是提交之后發(fā)生的,這種情況雖然不常見,但是必須考慮進去,現(xiàn)在Kafka版本還沒有解決這個問題,將來的版本正在努力嘗試解決。
并不是所有的情況都需要“精確的一次”這樣高的級別,Kafka 允許 producer 靈活的指定級別。比如 producer 可以指定必須等待消息被提交的通知,或者完全的異步發(fā)送消息而不等待任何通知,或者僅僅等待 leader 聲明它拿到了消息(followers沒有必要)。
現(xiàn)在從 consumer 的方面考慮這個問題,所有的副本都有相同的日志文件和相同的offset,consumer 維護自己消費的消息的 offset。如果 consumer 崩潰了,會有另外一個 consumer 接著消費消息,它需要從一個合適的 offset 繼續(xù)處理。這種情況下可以有以下選擇:
- consumer 可以先讀取消息,然后將 offset 寫入日志文件中,然后再處理消息。這存在一種可能就是在存儲 offset 后還沒處理消息就 crash 了,新的 consumer 繼續(xù)從這個 offset 處理,那么就會有些消息永遠不會被處理,這就是上面說的『最多一次』
- consumer 可以先讀取消息,處理消息,最后記錄o ffset,當然如果在記錄 offset 之前就 crash 了,新的 consumer 會重復的消費一些消息,這就是上面說的『最少一次』
- 『精確一次』可以通過將提交分為兩個階段來解決:保存了 offset 后提交一次,消息處理成功之后再提交一次。但是還有個更簡單的做法:將消息的 offset 和消息被處理后的結果保存在一起。比如用 Hadoop ETL 處理消息時,將處理后的結果和 offset 同時保存在 HDFS 中,這樣就能保證消息和 offser 同時被處理了
性能優(yōu)化
Kafka 在提高效率方面做了很大努力。Kafka 的一個主要使用場景是處理網站活動日志,吞吐量是非常大的,每個頁面都會產生好多次寫操作。讀方面,假設每個消息只被消費一次,讀的量的也是很大的,Kafka 也盡量使讀的操作更輕量化。
線性讀寫的情況下影響磁盤性能問題大約有兩個方面:太多的瑣碎的 I/O 操作和太多的字節(jié)拷貝。I/O 問題發(fā)生在客戶端和服務端之間,也發(fā)生在服務端內部的持久化的操作中。
消息集(message set)
為了避免這些問題,Kafka 建立了消息集(message set)的概念,將消息組織到一起,作為處理的單位。以消息集為單位處理消息,比以單個的消息為單位處理,會提升不少性能。Producer 把消息集一塊發(fā)送給服務端,而不是一條條的發(fā)送;服務端把消息集一次性的追加到日志文件中,這樣減少了瑣碎的 I/O 操作。consumer 也可以一次性的請求一個消息集。
另外一個性能優(yōu)化是在字節(jié)拷貝方面。在低負載的情況下這不是問題,但是在高負載的情況下它的影響還是很大的。為了避免這個問題,Kafka 使用了標準的二進制消息格式,這個格式可以在 producer, broker 和 producer 之間共享而無需做任何改動。
zero copy
Broker 維護的消息日志僅僅是一些目錄文件,消息集以固定隊的格式寫入到日志文件中,這個格式 producer 和 consumer 是共享的,這使得 Kafka 可以一個很重要的點進行優(yōu)化:消息在網絡上的傳遞。現(xiàn)代的 unix 操作系統(tǒng)提供了高性能的將數據從頁面緩存發(fā)送到 socket 的系統(tǒng)函數,在 linux 中,這個函數是?sendfile
為了更好的理解?sendfile?的好處,我們先來看下一般將數據從文件發(fā)送到 socket 的數據流向:
- 操作系統(tǒng)把數據從文件拷貝內核中的頁緩存中
- 應用程序從頁緩存從把數據拷貝自己的內存緩存中
- 應用程序將數據寫入到內核中 socket 緩存中
- 操作系統(tǒng)把數據從 socket 緩存中拷貝到網卡接口緩存,從這里發(fā)送到網絡上。
這顯然是低效率的,有 4 次拷貝和 2 次系統(tǒng)調用。sendfile?通過直接將數據從頁面緩存發(fā)送網卡接口緩存,避免了重復拷貝,大大的優(yōu)化了性能。
在一個多consumers的場景里,數據僅僅被拷貝到頁面緩存一次而不是每次消費消息的時候都重復的進行拷貝。這使得消息以近乎網絡帶寬的速率發(fā)送出去。這樣在磁盤層面你幾乎看不到任何的讀操作,因為數據都是從頁面緩存中直接發(fā)送到網絡上去了。
數據壓縮
很多時候,性能的瓶頸并非CPU或者硬盤而是網絡帶寬,對于需要在數據中心之間傳送大量數據的應用更是如此。當然用戶可以在沒有 Kafka 支持的情況下各自壓縮自己的消息,但是這將導致較低的壓縮率,因為相比于將消息單獨壓縮,將大量文件壓縮在一起才能起到最好的壓縮效果。
Kafka 采用了端到端的壓縮:因為有『消息集』的概念,客戶端的消息可以一起被壓縮后送到服務端,并以壓縮后的格式寫入日志文件,以壓縮的格式發(fā)送到 consumer,消息從 producer 發(fā)出到 consumer 拿到都被是壓縮的,只有在 consumer 使用的時候才被解壓縮,所以叫做『端到端的壓縮』。Kafka支持GZIP和Snappy壓縮協(xié)議。
參考鏈接
- Kafka學習整理六(server.properties配置實踐)
- Apache Kafka
- Quick Start
- Kafka入門經典教程
- Apache kafka 工作原理介紹
- 事無巨細 Apache Kafka 0.9.0.1 集群環(huán)境搭建
- kafka集群搭建
- Kafka文件存儲機制那些事
- kafka原理以及設計實現(xiàn)思想
- kafka設計原理介紹
- Kafka集群操作指南
- What is the actual role of ZooKeeper in Kafka?
總結
- 上一篇: Kafka文件存储机制那些事
- 下一篇: 聊聊并发(一)——深入分析Volatil