原創作品,允許轉載,轉載時請務必以超鏈接形式標明文章?原始出處?、作者信息和本聲明。否則將追究法律責任。http://tchuairen.blog.51cto.com/3848118/1855090
一、基礎理論
這塊是整個kafka的核心無論你是先操作在來看還是先看在操作都需要多看幾遍。
首先來了解一下Kafka所使用的基本術語
Topic
Kafka將消息種子(Feed)分門別類 每一類的消息稱之為話題(Topic).
Producer
發布消息的對象稱之為話題生產者(Kafka topic producer)
Consumer
訂閱消息并處理發布的消息的種子的對象稱之為話題消費者(consumers)
Broker
已發布的消息保存在一組服務器中稱之為Kafka集群。集群中的每一個服務器都是一個代理(Broker). 消費者可以訂閱一個或多個話題并從Broker拉數據從而消費這些已發布的消息。
讓我們站的高一點從高的角度來看Kafka集群的業務處理就像這樣子
Client和Server之間的通訊是通過一條簡單、高性能并且和開發語言無關的TCP協議。除了Java Client外還有非常多的其它編程語言的Client。
話題和日志 ?(Topic和Log)
讓我們更深入的了解Kafka中的Topic。
Topic是發布的消息的類別或者種子Feed名。對于每一個TopicKafka集群維護這一個分區的log就像下圖中的示例
每一個分區都是一個順序的、不可變的消息隊列 并且可以持續的添加。分區中的消息都被分配了一個序列號稱之為偏移量(offset)在每個分區中此偏移量都是唯一的。 Kafka集群保持所有的消息直到它們過期 無論消息是否被消費了。 實際上消費者所持有的僅有的元數據就是這個偏移量也就是消費者在這個log中的位置。 這個偏移量由消費者控制正常情況當消費者消費消息的時候偏移量也線性的的增加。但是實際偏移量由消費者控制消費者可以將偏移量重置為更老的一個偏移量重新讀取消息。 可以看到這種設計對消費者來說操作自如 一個消費者的操作不會影響其它消費者對此log的處理。 再說說分區。Kafka中采用分區的設計有幾個目的。一是可以處理更多的消息不受單臺服務器的限制。Topic擁有多個分區意味著它可以不受限的處理更多的數據。第二分區可以作為并行處理的單元。
分布式(Distribution)
Log的分區被分布到集群中的多個服務器上。每個服務器處理它分到的分區。 根據配置每個分區還可以復制到其它服務器作為備份容錯。 每個分區有一個leader零或多個follower。Leader處理此分區的所有的讀寫請求而follower被動的復制數據。如果leader宕機其它的一個follower會被推舉為新的leader。 一臺服務器可能同時是一個分區的leader另一個分區的follower。 這樣可以平衡負載避免所有的請求都只讓一臺或者某幾臺服務器處理。
生產者(Producers)
生產者往某個Topic上發布消息。生產者也負責選擇發布到Topic上的哪一個分區。最簡單的方式從分區列表中輪流選擇。也可以根據某種算法依照權重選擇分區。開發者負責如何選擇分區的算法。
消費者(Consumers)
通常來講消息模型可以分為兩種 隊列和發布-訂閱式。 隊列的處理方式是 一組消費者從服務器讀取消息一條消息只有其中的一個消費者來處理。在發布-訂閱模型中消息被廣播給所有的消費者接收到消息的消費者都可以處理此消息。Kafka為這兩種模型提供了單一的消費者抽象模型 消費者組 consumer group。 消費者用一個消費者組名標記自己。 一個發布在Topic上消息被分發給此消費者組中的一個消費者。 假如所有的消費者都在一個組中那么這就變成了queue模型。 假如所有的消費者都在不同的組中那么就完全變成了發布-訂閱模型。 更通用的 我們可以創建一些消費者組作為邏輯上的訂閱者。每個組包含數目不等的消費者 一個組內多個消費者可以用來擴展性能和容錯。正如下圖所示
??2個kafka集群托管4個分區P0-P32個消費者組消費組A有2個消費者實例消費組B有4個。
正像傳統的消息系統一樣Kafka保證消息的順序不變。 再詳細扯幾句。傳統的隊列模型保持消息并且保證它們的先后順序不變。但是 盡管服務器保證了消息的順序消息還是異步的發送給各個消費者消費者收到消息的先后順序不能保證了。這也意味著并行消費將不能保證消息的先后順序。用過傳統的消息系統的同學肯定清楚消息的順序處理很讓人頭痛。如果只讓一個消費者處理消息又違背了并行處理的初衷。 在這一點上Kafka做的更好盡管并沒有完全解決上述問題。 Kafka采用了一種分而治之的策略分區。 因為Topic分區中消息只能由消費者組中的唯一一個消費者處理所以消息肯定是按照先后順序進行處理的。但是它也僅僅是保證Topic的一個分區順序處理不能保證跨分區的消息先后處理順序。 所以如果你想要順序的處理Topic的所有消息那就只提供一個分區。
Kafka的保證(Guarantees)
生產者發送到一個特定的Topic的分區上的消息將會按照它們發送的順序依次加入
消費者收到的消息也是此順序
如果一個Topic配置了復制因子( replication facto)為N 那么可以允許N-1服務器宕機而不丟失任何已經增加的消息
Kafka官網
http://kafka.apache.org/
作者半獸人
鏈接http://orchome.com/5
來源OrcHome
著作權歸作者所有。商業轉載請聯系作者獲得授權非商業轉載請注明出處。
二、安裝和啟動
1、下載二進制安裝包直接解壓
| 1 2 | tar?xf?kafka_2.11-0.10.0.1.tgz cd?kafka_2.11-0.10.0.1 |
2、啟動服務
Kafka需要用到ZooKeepr所以需要先啟動一個ZooKeepr服務端如果沒有單獨的ZooKeeper服務端可以使用Kafka自帶的腳本快速啟動一個單節點ZooKeepr實例
| 1 2 3 | bin/zookeeper-server-start.sh?config/zookeeper.properties??#?啟動zookeeper服務端實例 bin/kafka-server-start.sh?config/server.properties??#?啟動kafka服務端實例 |
三、基本操作指令
1、新建一個主題topic
創建一個名為“test”的Topic只有一個分區和一個備份
| 1 | bin/kafka-topics.sh?--create?--zookeeper?localhost:2181?--replication-factor?1?--partitions?1?--topic?test |
2、創建好之后可以通過運行以下命令查看已創建的topic信息
| 1 | bin/kafka-topics.sh?--list??--zookeeper?localhost:2181 |
3、發送消息
Kafka提供了一個命令行的工具可以從輸入文件或者命令行中讀取消息并發送給Kafka集群。每一行是一條消息。
運行producer生產者,然后在控制臺輸入幾條消息到服務器。
| 1 2 3 | bin/kafka-console-producer.sh?--broker-list?localhost:9092?--topic?test? This?is?a?message This?is?another?message |
4、消費消息
Kafka也提供了一個消費消息的命令行工具,將存儲的信息輸出出來。
| 1 2 3 | bin/kafka-console-consumer.sh?--zookeeper?localhost:2181?--topic?test?--from-beginning This?is?a?message This?is?another?message |
5、查看topic詳細情況
| 1 | bin/kafka-topics.sh?--describe?--zookeeper?localhost:2181??--topic?peiyinlog |
Topic: 主題名稱
Partition: 分片編號
Leader: 該分區的leader節點
Replicas: 該副本存在于哪個broker節點
Isr: 活躍狀態的broker
6、給Topic添加分區
| 1 | bin/kafka-topics.sh?--zookeeper?192.168.90.201:2181?--alter?--topic?test2?--partitions?20 |
7、刪除Topic
| 1 | bin/kafka-topics.sh?--zookeeper?zk_host:port/chroot?--delete?--topic?my_topic_name |
主題(Topic)刪除選項默認是關閉的,需要服務器配置開啟它。
| 1 | delete.topic.enable=true |
注:如果需要在其他節點作為客戶端使用指令連接kafka broker,則需要注意以下兩點(二選一即可)
另 : ( 使用logstash input 連接kafka也需要注意 )
1、設置kafka broker 配置文件中 host.name 參數為監聽的IP地址
2、給broker設置一個唯一的主機名,然后在本機/etc/hosts文件配置解析到自己的IP(當然如果有內網的DNS服務器也行),同時還需要在zk server 和 客戶端的 /etc/hosts 添加broker主機名的解析。?
原因詳解:
場景假設
| broker_server ip | 主機名 | zookeeper ip | 客戶端 ip |
| 192.168.1.2? | 默認 localhost | 192.168.1.4 | 192.168.1.5 |
| 1 2 3 | #?此時客戶端向broker發起一些消費: bin/kafka-console-consumer.sh?--zookeeper?192.168.1.4:2181?--topic?test2?--from-beginning |
這時客戶端連接到zookeeper要求消費數據,zk則返回broker的ip地址和端口給客戶端,但是如果broker沒有設置host.name 和 advertised.host.name ?broker默認返回的是自己的主機名,默認就是localhost和端口9092,這時客戶端拿到這個主機名解析到自己,操作失敗。
所以,需要配置broker 的host.name參數為監聽的IP,這時broker就會返回IP。 客戶端就能正常連接了。
或者也可以設置好broker的主機名,然后分別給雙方配置好解析。
四、broker基本配置
| 1 2 3 4 5 6 7 8 | #??server.properties broker.id=0??#?broker節點的唯一標識?ID?不能重復。 host.name=10.10.4.1??#?監聽的地址,如果不設置默認返回主機名給zk_server log.dirs=/u01/kafka/kafka_2.11-0.10.0.1/data??#?消息數據存放路徑 num.partitions=6??#?默認主題(Topic)分片數 log.retention.hours=24??#?消息數據的最大保留時長 zookeeper.connect=10.160.4.225:2181??#?zookeeper?server?連接地址和端口 |
五、Logstash + Kafka 實戰應用
Logstash-1.51才開始內置Kafka插件,也就是說用之前的logstash版本是需要手動編譯Kafka插件的,相信也很少人用了。建議使用2.3以上的logstash版本。
1、使用logstash向kafka寫入一些數據
軟件版本:
logstash 2.3.2?
kafka_2.11-0.10.0.1
logstash output 部分配置
| 1 2 3 4 5 6 7 8 9 | output?{ ??kafka?{ ????workers?=>?2 ????bootstrap_servers?=>?"10.160.4.25:9092,10.160.4.26:9092,10.160.4.27:9092" ????topic_id?=>?"xuexilog" } } |
參數解釋 :?
workers:用于寫入時的工作線程
bootstrap_servers:指定可用的kafka broker實例列表
topic_id:指定topic名稱,可以在寫入前手動在broker創建定義好分片數和副本數,也可以不提前創建,那么在logstash寫入時會自動創建topic,分片數和副本數則默認為broker配置文件中設置的。
2、使用logstash消費一些數據,并寫入到elasticsearch
軟件版本:
logstash 2.3.2?
elasticsearch-2.3.4
logstash 配置文件
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | input{ ????kafka?{ ????????zk_connect?=>?"112.100.6.1:2181,112.100.6.2:2181,112.100.6.3:2181" ????????group_id?=>?"logstash" ????????topic_id?=>?"xuexilog" ????????reset_beginning?=>?false ????????consumer_threads?=>?5 ????????decorate_events?=>?true } } #?這里group_id?需要解釋一下,在Kafka中,相同group的Consumer可以同時消費一個topic,不同group的Consumer工作則互不干擾。 #?補充:?在同一個topic中的同一個partition同時只能由一個Consumer消費,當同一個topic同時需要有多個Consumer消費時,則可以創建更多的partition。 output?{ ????if?[type]?==?"nginxacclog"?{ ????????elasticsearch?{ ????????????hosts?=>?["10.10.1.90:9200"] ????????????index?=>?"logstash-nginxacclog-%{+YYYY.MM.dd}" ????????????manage_template?=>?true ????????????flush_size?=>?50000 ????????????idle_flush_time?=>?10 ????????????workers?=>?2 } } } |
3、通過group_id 查看當前詳細的消費情況
| 1 | bin/kafka-consumer-groups.sh?--group?logstash?--describe?--zookeeper?127.0.0.1:2181 |
輸出解釋:
| GROUP | TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG |
| 消費者組 | 話題id | 分區id | 當前已消費的條數 | 總條數 | 未消費的條數 |
本文出自 “突破舒適區” 博客,請務必保留此出處http://tchuairen.blog.51cto.com/3848118/1855090
來源:http://tchuairen.blog.51cto.com/3848118/1855090
總結
以上是生活随笔為你收集整理的Kafka 入门 and kafka+logstash 实战应用的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。