Kafka的基本介绍和在linux的安装配置
1.介紹
Kafka最初是由LinkedIn公司采用Scala語言開發的一個多分區、多副本并且基于ZooKeeper協調的分布式消息系統,現在已經捐獻給了Apache基金會。目前Kafka已經定位為一個分布式流式處理平臺,它以高吞吐、可持久化、可水平擴展、支持流處理等多種特性而被廣泛應用。
Apache Kafka是一個分布式的發布-訂閱消息系統,能夠支撐海量數據的數據傳遞。在離線和實時的消息處理業務系統中,Kafka都有廣泛的應用。Kafka將消息持久化到磁盤中,并對消息創建了備份保證了數據的安全。Kafka在保證了較高的處理速度的同時,又能保證數據處理的低延遲和數據的零丟失。
2.特性
(1)高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個主題可以分多個分區, 消費組對分區進行消費操作;
(2)可擴展性:kafka集群支持熱擴展;
(3)持久性、可靠性:消息被持久化到本地磁盤,并且支持數據備份防止數據丟失;
(4)容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗);
(5)高并發:支持數千個客戶端同時讀寫;
(6)日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如Hadoop、Hbase、Solr等;
(7)消息系統:解耦和生產者和消費者、緩存消息等;
(8)用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到Hadoop、數據倉庫中做離線分析和挖掘;
(9)運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告;
(10)流式處理:比如spark streaming和storm;
3.技術優勢
-
可伸縮性:Kafka 的兩個重要特性造就了它的可伸縮性。
-
Kafka 集群在運行期間可以輕松地擴展或收縮(可以添加或刪除代理),而不會宕機
-
可以擴展一個 Kafka 主題來包含更多的分區。由于一個分區無法擴展到多個代理,所以它的容量受到代理磁盤空間的限制。能夠增加分區和代理的數量意味著單個主題可以存儲的數據量是沒有限制的
-
-
容錯性和可靠性:Kafka 的設計方式使某個代理的故障能夠被集群中的其他代理檢測到。由于每個主題都可以在多個代理上復制,所以集群可以在不中斷服務的情況下從此類故障中恢復并繼續運行。
-
吞吐量:代理能夠以超快的速度有效地存儲和檢索數據。
4.概念詳解
?
1)Producer
生產者即數據的發布者,該角色將消息發布到Kafka的topic中。broker接收到生產者發送的消息后,broker將該消息追加到當前用于追加數據的segment文件中。生產者發送的消息,存儲到一個partition中,生產者也可以指定數據存儲的partition。
2)Consumer
消費者可以從broker中讀取數據。消費者可以消費多個topic中的數據。
3)Topic
在Kafka中,使用一個類別屬性來劃分數據的所屬類,劃分數據的這個類稱為topic。如果把Kafka看做一個數據庫,topic可以理解為數據庫中的一張表,topic的名字即為表名。
4)Partition
topic中的數據分割為一個或多個partition。每個topic至少有一個partition。每個partition中的數據使用多個segment文件存儲。partition中的數據是有序的,partition間的數據丟失了數據的順序。如果topic有多個partition,消費數據時就不能保證數據的順序。在需要嚴格保證消息的消費順序的場景下,需要將partition數目設為1。
5)Partition offset
每條消息都有一個當前Partition下唯一的64字節的offset,它指明了這條消息的起始位置。
6)Replicas of partition
副本是一個分區的備份。副本不會被消費者消費,副本只用于防止數據丟失,即消費者不從為follower的partition中消費數據,而是從為leader的partition中讀取數據。副本之間是一主多從的關系。
7)Broker
Kafka 集群包含一個或多個服務器,服務器節點稱為broker。broker存儲topic的數據。如果某topic有N個partition,集群有N個broker,那么每個broker存儲該topic的一個partition。如果某topic有N個partition,集群有(N+M)個broker,那么其中有N個broker存儲該topic的一個partition,剩下的M個broker不存儲該topic的partition數據。如果某topic有N個partition,集群中broker數目少于N個,那么一個broker存儲該topic的一個或多個partition。在實際生產環境中,盡量避免這種情況的發生,這種情況容易導致Kafka集群數據不均衡。
8)Leader
9)Follower
跟隨Leader,所有寫請求都通過Leader路由,數據變更會廣播給所有Follower,Follower與Leader保持數據同步。如果Leader失效,則從Follower中選舉出一個新的Leader。當Follower與Leader掛掉、卡住或者同步太慢,leader會把這個follower從“in sync replicas”(ISR)列表中刪除,重新創建一個Follower。
10)Zookeeper
Zookeeper負責維護和協調broker。當Kafka系統中新增了broker或者某個broker發生故障失效時,由ZooKeeper通知生產者和消費者。生產者和消費者依據Zookeeper的broker狀態信息與broker協調數據的發布和訂閱任務。
11)AR(Assigned Replicas)
分區中所有的副本統稱為AR。
12)ISR(In-Sync Replicas)
所有與Leader部分保持一定程度的副本(包括Leader副本在內)組成ISR。
13)OSR(Out-of-Sync-Replicas)
與Leader副本同步滯后過多的副本。
14)HW(High Watermark)
高水位,標識了一個特定的offset,消費者只能拉取到這個offset之前的消息。
15)LEO(Log End Offset)
即日志末端位移(log end offset),記錄了該副本底層日志(log)中下一條消息的位移值。注意是下一條消 息!也就是說,如果LEO=10,那么表示該副本保存了10條消息,位移值范圍是[0, 9]。
5.安裝與配置
1)首先需要安裝Java環境
yum install java-1.8.0-openjdk* -y2)安裝zookeeper
Zookeeper是安裝Kafka集群的必要組件,Kafka通過Zookeeper來實施對元數據信息的管理,包括集群、主題、分區等內容。 同樣在官網下載安裝包到指定目錄解壓縮,步驟如下: ZooKeeper官網:http://zookeeper.apache.org/
tar -zxvf zookeeper-3.4.10.tar.gz mv zookeeper-3.4.10 zookeeper修改Zookeeper的配置文件,首先進入安裝路徑conf目錄,并將zoo_sample.cfg文件修改為 zoo.cfg,并對核心參數進行配置。
dataDir=/tmp/zookeeper/data dataLogDir=/tmp/zookeeper/log啟動Zookeeper命令:
bin/zkServer.sh start ? ? ZooKeeper JMX enabled by default Using config: /data/soft/zookeeper/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED3)安裝Kafka
官網下載安裝解壓縮:http://kafka.apache.org/downloads
tar -zxvf kafka_2.12-2.2.1 mv kafka_2.12-2.2.1 kafka下載解壓啟動
啟動命令
bin/kafka-server-start.sh config/server.propertiesserver.properties配置中需要關注以下幾個參數:
broker.id=0 表示broker的編號,如果集群中有多個broker,則每個broker的編號需要設置的不同
listeners=PLAINTEXT://:9092 brokder對外提供的服務入口地址
log.dirs=/tmp/kafka/log 設置存放消息日志文件的地址
zookeeper.connect=localhost:2181 Kafka所需Zookeeper集群地址
啟動成功如下顯示:
?
啟動成功之后重新打開一個終端,驗證啟動進程
?
4)Kafka測試消息生產和消費
-
首先創建一個主題
-
展示所有主題
-
查看主題詳情
-
生產端發送消息
-
消費端消費消息
5)常用參數配置
- zookeeper.connect
指明Zookeeper主機地址,如果zookeeper是集群則以逗號隔開,
如:172.6.14.61:2181,172.6.14.62:2181,172.6.14.63:2181
- listeners
監聽列表,broker對外提供服務時綁定的IP和端口。多個以逗號隔開,如果監聽器名稱不是一個安全的協議, listener.security.protocol.map也必須設置。主機名稱設置0.0.0.0綁定所有的接口,主機名稱為空則綁定默認的接口。
如:PLAINTEXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
- broker.id
broker的唯一標識符,如果不配置則自動生成,建議配置且一定要保證集群中必須唯一,默認-1
- log.dirs
日志數據存放的目錄,如果沒有配置則使用log.dir
- message.max.bytes
服務器接受單個消息的最大大小,默認1000012 約等于976.6KB。
-
log.retention.{hours|minutes|ms}
Kafka 通常根據時間來決定數據可以被保留多久。默認使用 log.retention.hour 參數來配置時間 ,默認值為 168 小時,也就是一周。除此以外,還有其他兩個參數 minutes|ms。
雖然 ms 設置有最高的優先級,但是通常情況下我們還是設置 hours 級別的多一些,比如log.retention.hours=168表示默認保存 7 天的數據,自動刪除 7 天前的數據。很多公司把 Kafka 當做存儲來使用,那么這個值就要相應地調大。
-
log.retention.bytes
這是指定 Broker 為消息保存的總磁盤容量大小。這個值默認是 -1,表明你想在這臺 Broker 上保存多少數據都可以,至少在容量方面 Broker 絕對為你開綠燈,不會做任何阻攔。這個參數真正發揮作用的場景其實是在云上構建多租戶的 Kafka 集群:設想你要做一個云上的 Kafka 服務,每個租戶只能使用 100GB 的磁盤空間,為了避免有個“惡意”租戶使用過多的磁盤空間,設置這個參數就顯得至關重要了。
如果同時指定了log.retention.xx和log.retention.bytes,只要任意一個條件得到滿足,消息就會被刪除。例如log.retention.hour=24(也就是1天),log.retention.bytes=1 000 000 000(也就是1G),那么消息總數在1天內超過1G的部分將會被刪除。相反,如果消息總數小于1G,1天之后也會被刪除,盡管分區的數據總量小于1GB。
-
log.segment.bytes
以上的設置都作用在日志片段上,而不是作用在單個消息上。當消息到達 broker 時,它們被迫加到分區的當前日志片段上。當日志片段大小達到 log.segment bytes 定的上限(默認是 lGB )時,當前日志片段就會被關閉,一個新的日志片段被打開。如果一個日志片段被關閉,就開始等待過期。這個參數的值越小,就會越頻繁地關閉和分配新文件,從而降低磁盤 入的整體效率。
-
log.segment.ms
它指定了 多長時間之后日志片段會被關閉。就像 log.retention.ms和log.retention.bytes這兩個參數 樣, log.segment.ms和log.segment.bytes 這兩個參數之間也不存在互斥問題。日志片段會在大小或時間達到上限時被關閉,就看哪個條件先得到滿足。
-
message.max.bytes
控制 Broker 能夠接收的單個消息的最大大小。這個參數也是一樣,默認的 1000 000 太少了,還不到 1MB。實際場景中突破 1MB 的消息都是屢見不鮮的,因此在線上環境中設置一個比較大的值還是比較保險的做法。畢竟它只是一個標尺而已,僅僅衡量 Broker 能夠處理的最大消息大小,即使設置大一點也不會耗費什么磁盤空間的。
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Kafka的基本介绍和在linux的安装配置的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Jenkins构建之常用的触发器和Git
- 下一篇: Kafka生产者详解