Kafka深入浅出(一)
最近閑來無事,搜集一些消息架構(gòu)方面的資料進(jìn)行學(xué)習(xí),偶然在一個站點發(fā)現(xiàn)Kafka還是不錯的,故想按著自己研究、學(xué)習(xí)的經(jīng)歷,寫下一篇日記,以便后續(xù)在工作中可以 Ctrl + C & Ctrl + V , 嘿嘿
?
Kafak是誰?
Kafka源自LinkIn,主要支撐分布式日志服務(wù),主要開發(fā)語言是Scala+少量的Java,設(shè)計目標(biāo)是一種基于集群可處理流式數(shù)據(jù)的消息分布式系統(tǒng),后開源交付給Apache基金會進(jìn)行維護(hù),目前是該基金會的頂級項目之一,當(dāng)然還有一款大名鼎鼎消息系統(tǒng)ActiveMQ目前也是Apache基金會進(jìn)行維護(hù),后續(xù)在學(xué)習(xí)中會逐漸對兩款消息機(jī)制進(jìn)行對比
?
Apache對Kafka的定位
1. Publish & Subscribe (發(fā)布&訂閱)
Kafka是一種基于發(fā)布&訂閱(也有稱為生產(chǎn)/消費模型)架構(gòu)的一種消息機(jī)制,目前需要大吞吐量的架構(gòu)大多基于消息機(jī)制。
2. Process (數(shù)據(jù)處理)
Kafka可高效的處理流式數(shù)據(jù),并且可以進(jìn)行實時處理
3. Store (存儲)
Kafka的消息數(shù)據(jù)存儲是基于自身一種叫 Isr 的東西來進(jìn)行安全可靠的分布式存儲
快速開始使用Kafka
1. 下載Kafka服務(wù)器端
目前Kafka最新版本是0.10.1.0,但坊間流傳0.8.x系列才是最為流行的,原因是當(dāng)時由于storm的流行,造就了目前的Kafka(當(dāng)然你需要Linux系統(tǒng)來玩Kafka,我用的是Oracle VM VirtualBox)
http://kafka.apache.org/downloads
有多個版本可以(自行)進(jìn)行選擇
wget http://mirror.bit.edu.cn/apache/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz如果你這里沒有wget工具的話,可以通過yum進(jìn)行安裝
yum install wget解壓
tar -zxvf kafka_2.10-0.10.1.0.tgz將Kafka移動到本地用戶目錄下面
mv kafka_2.10-0.10.1.0 /usr/local/看一下Kafka里面有什么東西
[root@localhost kafka_2.10-0.10.1.0]# ls bin config libs LICENSE NOTICE site-docs通過長期對Apache的東西使用,看名字也應(yīng)該知道目錄大概的用途(其實懂計算機(jī)的都應(yīng)該知道)
bin:提供一些寫好的腳本文件,用于管理、測試服務(wù)器
config:肯定是一堆相關(guān)的配置文件
libs:一對libs(依賴)
LICENSE:許可文件說明(開源不等于隨便免費使用,各位開發(fā)者請注意一下版權(quán))
NOTICE:沒什么用的一些內(nèi)容
site-docs:里面有一個?kafka_2.10-0.10.1.0-site-docs.tgz 壓縮包,應(yīng)該是使用手冊的本地版本(API說明之類的)
2. 啟動服務(wù)器(你沒看錯、就是這么簡單..)
當(dāng)然Kafka是依賴于zookeeper的(當(dāng)年面試只知道是動物園管理員 :D),所以需要先啟動zookeeper,由于kafka高度依賴zookeeper(目前基本算一個分布式JDNI實現(xiàn)),所以kafka壓縮包內(nèi)自帶了一個簡易版的zokeeper,使用下便命令啟動(前提條件是服務(wù)器需要jvm支持,可以安裝JDK解決)
bin/zookeeper-server-start.sh -daemon config/zookeeper.propertieszookeeper-server-start.sh 腳本的參數(shù)如下:
[root@localhost kafka_2.10-0.10.1.0]# bin/zookeeper-server-start.sh USAGE: bin/zookeeper-server-start.sh [-daemon] zookeeper.properties-daemon:后臺值守模式?(靜默模式)反正就是后臺啟動,需要看到輸出的同學(xué)可以不加這個參數(shù)
zookeeper.properties:properties當(dāng)然是配置文件,通常會放在config目錄下面
驗證一下是否啟動成功(lsof 也可以直接 yum install lsof)
[root@localhost kafka_2.10-0.10.1.0]# lsof -i:2181 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 2236 root 83u IPv6 19048 0t0 TCP *:eforward (LISTEN)如果2181端口處于 LISTEN 狀態(tài)的話,基本說明zookeeper已經(jīng)啟動完畢了,接下來就可以啟動kafka服務(wù)器了,腳本看名字就知道是這個,看一下參數(shù),大體和zookeeper一樣
[root@localhost kafka_2.10-0.10.1.0]# bin/kafka-server-start.sh USAGE: bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*啟動之
bin/kafka-server-start.sh -daemon config/server.properties查看一下啟動情況,(kafka默認(rèn)服務(wù)端口是9092,可以通過看server.properties知道-當(dāng)然也可以修改)
[root@localhost kafka_2.10-0.10.1.0]# lsof -i:9092 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 2468 root 97u IPv6 19365 0t0 TCP *:XmlIpcRegSvc (LISTEN) java 2468 root 101u IPv6 19367 0t0 TCP localhost:47108->localhost:XmlIpcRegSvc (ESTABLISHED) java 2468 root 102u IPv6 19368 0t0 TCP localhost:XmlIpcRegSvc->localhost:47108 (ESTABLISHED) [root@localhost kafka_2.10-0.10.1.0]# lsof -i:2181 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 2236 root 83u IPv6 19048 0t0 TCP *:eforward (LISTEN) java 2236 root 84u IPv6 19360 0t0 TCP localhost:eforward->localhost:59873 (ESTABLISHED) java 2468 root 82u IPv6 19357 0t0 TCP localhost:59873->localhost:eforward (ESTABLISHED)如果9092處于LISTEN,說明kafka(單機(jī)版)已經(jīng)可以工作了
3. 創(chuàng)建一個 topic
topic 是一個話題,如果從rpc架構(gòu)理解有點像接口名,通過kafka-topics.sh這個腳本就可以創(chuàng)建topic了,直接運行可以看到幫助
[root@localhost kafka_2.10-0.10.1.0]# bin/kafka-topics.sh Create, delete, describe, or change a topic. Option Description ------ ----------- --alter Alter the number of partitions, replica assignment, and/or configuration for the topic. --config <name=value> A topic configuration override for the topic being created or altered.The following is a list of valid configurations: cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs. --create Create a new topic. --delete Delete a topic --delete-config <name> A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). --describe List details for the given topics. --disable-rack-aware Disable rack aware replica assignment --force Suppress console prompts --help Print usage information. --if-exists if set when altering or deleting topics, the action will only execute if the topic exists --if-not-exists if set when creating topics, the action will only execute if the topic does not already exist --list List all available topics. --partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected --replica-assignment A list of manual partition-to-broker <broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...> --replication-factor <Integer: The replication factor for each replication factor> partition in the topic being created. --topic <topic> The topic to be create, alter or describe. Can also accept a regular expression except for --create option --topics-with-overrides if set when describing topics, only show topics that have overridden configs --unavailable-partitions if set when describing topics, only show partitions whose leader is not available --under-replicated-partitions if set when describing topics, only show under replicated partitions --zookeeper <urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.咱們使用如下命令創(chuàng)建一個測試的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test創(chuàng)建的時候需要指定 zookeeper 集群的地址,--topice是指定名字test
如果沒錯會顯示?Created topic "test". 說明創(chuàng)建成功
可以通過如下命令驗證
[root@localhost kafka_2.10-0.10.1.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe Topic:test PartitionCount:1 ReplicationFactor:1 Configs:Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0可以看到test topic已經(jīng)創(chuàng)建成功
4. 通過Kafka發(fā)送消息
kafka提供命令行輸入腳本來測試服務(wù)器是否工作正常,可以使用如下命令進(jìn)行發(fā)送(同樣方式查看幫助)
[root@localhost kafka_2.10-0.10.1.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test kafka is cool--broker-list:指定kafka服務(wù)器的信息
5. 接收信息
可以啟動一個consumer(消費者,消息接收、處理者)來接收剛才producer(生產(chǎn)者,消息發(fā)送者)的消息
[root@localhost kafka_2.10-0.10.1.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning kafka is cool--bootstrap-server:指定kafka服務(wù)器配置
--from-beginning:說明接收所有消息(從開始起)
終于可以開心的玩耍了
?
為什么要用消息中間件?
1. 解耦
其實很多架構(gòu)及編程思想都是解決解耦的問題的,其實這個思想我自認(rèn)為應(yīng)該來自與工業(yè)制造,假設(shè)一臺汽車如果沒有合理的對發(fā)動機(jī)、地盤等結(jié)構(gòu)進(jìn)行解耦,估計是沒有辦法進(jìn)行獨立生產(chǎn),然后進(jìn)行組裝的,把這種思想用于軟件開發(fā)、架構(gòu)設(shè)計里其實是一樣的,目的就是讓各個零部件(組件)可以單獨開發(fā)(生產(chǎn)),然后進(jìn)行拼轉(zhuǎn),當(dāng)然如果符合某種制式(標(biāo)準(zhǔn))的組件是可以隨意更換的(其實就是接口的用途)
2. 冗余
電子電路的發(fā)明,以及替代了很多機(jī)械結(jié)構(gòu)的好處之一就是電子電路很容易做到冗余,傳說F22戰(zhàn)斗機(jī)一般的電子系統(tǒng)都是四路冗余,被擊干壞了一部分,并不影響整體的性能(服務(wù)),當(dāng)然消息系統(tǒng)通過制定標(biāo)準(zhǔn)的接口,也可以簡單的實現(xiàn)冗余(所有的分布式系統(tǒng)都可以實現(xiàn)冗余好吧)
3. 擴(kuò)展
說到擴(kuò)展一般要求是水平擴(kuò)展能力(加十臺服務(wù)器即可獲得十臺服務(wù)器的性能),并非垂直擴(kuò)展(加十臺服務(wù)器可能只獲得兩臺服務(wù)器的能力提升),當(dāng)然這種水平擴(kuò)展能力一般都需要無狀態(tài)的組件設(shè)計(通常理解沒有SESSION,或者自己封裝SESSION),如果沒個幾百上千萬的用戶,還是不要勞民傷財了
4. 峰值處理能力
說道峰值處理能力(消峰),也是消息中間件在部分架構(gòu)中優(yōu)于RPC(遠(yuǎn)程方法調(diào)用)的原因之一,因為消息中間件可以用一個列隊(內(nèi)存Queue)之類的東西,把所有的業(yè)務(wù)請求緩存其中,等待業(yè)務(wù)服務(wù)器在列隊中自助取用,這個列隊(內(nèi)存)就好比一個請求的緩沖墊,當(dāng)然只要良好的監(jiān)控這個內(nèi)存緩沖墊(不要溢出),并且有良好的機(jī)制防止這塊內(nèi)存crash(崩潰),這種方法還是挺安全的,當(dāng)然還是有一個另外的缺點,就是由于有這個內(nèi)存的列隊,很多請求處理都是異步的,是的如果你用過AJAX,你就會明白所有的業(yè)務(wù)請求都需要一個回調(diào),同時還要處理這個回調(diào),但JS是在瀏覽器運行與用戶及時交互,但后臺架構(gòu)如果是異步+回調(diào),同時還需要把響應(yīng)反饋給前端(oh omg),還是不要勞民傷財了
5. 異步通訊
由于有這個內(nèi)存,當(dāng)然就可以異步通訊,如果業(yè)務(wù)沒必要需要處理這么大的數(shù)據(jù),還是不要勞民傷財了????
?
常用的消息中間件有哪些?
1. RabbitMQ
Erlang語言,支持AMQP、XMPP、SMTP、STOMP,支持負(fù)載均衡、數(shù)據(jù)持久,支持P2P和發(fā)布/訂閱
2. Redis
基于鍵值對的NoSql數(shù)據(jù)庫(Map集合能干的事太多了,當(dāng)然Map就是鍵值對),同時支持列隊服務(wù)(據(jù)說是輕量級),消息小于10k時性能比RabbitMQ好,大于的時候會比RabbitMQ差(據(jù)傳說)
3. ZeroMQ
沒有服務(wù)器中間件,應(yīng)用程序集成需要通過庫(SDK)來集成服務(wù)器功能,很明顯開發(fā)工作量亞歷山大
4. ActiveMQ
Apache的又一產(chǎn)品線,據(jù)說是最為流行的中間件(為什么沒研究它呢?見后面),實現(xiàn)了JMS of J2EE,支持持久化、P2P,傳說支持XA事務(wù)(二段事務(wù)),這個有點牛,就是不知道性能怎么樣
5. Kafka
Apache維護(hù)的產(chǎn)品線(頂級項目),主要是為高性能而生,據(jù)說可以做到O1的性能,宣傳是下一代消息中間件(我們要研究下一代,嘿嘿),支持?jǐn)?shù)據(jù)持久化、同時支持?jǐn)?shù)據(jù)在線、離線處理,不支持XA事務(wù)(由于側(cè)重點不同)
?
Kafka架構(gòu)簡介
1. Producer 生產(chǎn)者
一般理解生產(chǎn)數(shù)據(jù)的一方,有的時候也可以理解為發(fā)起請求需要服務(wù)的一方,Kafka中Producer直接訪問Kafka集群,并且是數(shù)據(jù)通過push(推送)的方式,壓入到Kafka的列隊中,這里采用push的方式可以確保數(shù)據(jù)及時到達(dá)列隊中
2. Broker 代理
topic是數(shù)據(jù)保存的名字(文件名),每一個topic對應(yīng)一個Broker(Broker負(fù)責(zé)保存Topic的內(nèi)容),一個Broker由多個Partition組成,每個Partition均勻分布在kafka集群之中(Partition分布于多臺服務(wù)器,致使Kafka具備冗余)
3. Consumer 消費者
一般消費者指的是提供服務(wù)的一方,記錄日志、處理業(yè)務(wù)邏輯之類的,Kafka的Consumer設(shè)計通過pull(拉)的方式,到Kafka集群中拉數(shù)據(jù),這樣的設(shè)計可以降低Consumer的壓力(吃多少拿多少,當(dāng)然降低壓力)
4. Kafka 與 Zookeeper
Kafka 經(jīng)歷數(shù)次改版,老的版本消費者并非從Kafka集群,而是zookeeper集群中直接拿數(shù)據(jù),新版本已經(jīng)修復(fù)這個設(shè)計問題
?
結(jié)后語
通過對kafka的簡單了解,后面就準(zhǔn)備開始開發(fā)Producer和Consumer拉,動手寫代碼的感覺總是很好 :D
同時感謝 郭俊 先生的指導(dǎo),他的博客?http://www.jasongj.com/
?
參考資料
1.?http://kafka.apache.org/
轉(zhuǎn)載于:https://my.oschina.net/u/2279119/blog/774700
總結(jié)
以上是生活随笔為你收集整理的Kafka深入浅出(一)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Docker-核心篇(1)-CentOS
- 下一篇: jquery选择器可以利用后代和直系后代