搭建高吞吐量 Kafka 分布式发布订阅消息 集群
搭建高吞吐量 Kafka 分布式發布訂閱消息 集群
簡介
Kafka 是一種高吞吐的分布式發布訂閱消息系統,能夠替代傳統的消息隊列用于解耦合數據處理,緩存未處理消息等,同時具有更高的吞吐率,支持分區、多副本、冗余,因此被廣泛用于大規模消息數據處理應用。Kafka 支持Java 及多種其它語言客戶端,可與Hadoop、Storm、Spark等其它大數據工具結合使用。
環境
Zookeeper集群: 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181
kafka 集群: 192.168.252.124 , 192.168.252.125 , 192.168.252.126
kafka-manager: 192.168.252.127
主機名修改
CentOs7.3 修改主機名
ssh 免密登錄
CentOs7.3 ssh 免密登錄
安裝 JDK1.8
CentOs7.3 安裝 JDK1.8
搭建 Zookeeper 集群
CentOs7.3 搭建 ZooKeeper-3.4.9 Cluster 集群服務
Zookeeper集群: 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181
主機名依次被我修改成: node1,node2,node3
搭建 kafka 集群
kafka 集群: 192.168.252.124 , 192.168.252.125 , 192.168.252.126
主機名依次被我修改成: node4,node5,node6
1.下載代碼
kafka 官網下載 http://kafka.apache.org/downloads
下載最新版本的kafka ,我在北京我就選擇,清華鏡像比較快
清華鏡像:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
阿里鏡像:https://mirrors.aliyun.com/apache/kafka/
$ cd /opt $ wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.11.0.0/kafka_2.12-0.11.0.0.tgz $ tar -zxvf kafka_2.12-0.11.0.0.tgz $ cd kafka_2.12-0.11.0.02.修改配置
在 node4 操作
$ vi /opt/kafka_2.12-0.11.0.0/config/server.properties broker.id=0 每臺服務器不能重復#設置zookeeper的集群地址 zookeeper.connect=192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181把配置復制到 node5,node6 集群
$ for a in {5..6} ; do scp -r /opt/kafka_2.12-0.11.0.0/ node$a:/opt/kafka_2.12-0.11.0.0 ; done修改 node5,node6 集群的broker.id
$ vi /opt/kafka_2.12-0.11.0.0/config/server.properties3.啟動kafka
在 node1,啟動 Kafka使用的 ZooKeeper,所以先啟動ZooKeeper服務器
$ for a in {1..3} ; do ssh node$a "source /etc/profile; /opt/zookeeper-3.4.9/bin/zkServer.sh start" ; done現在 node4 啟動Kafka服務器
$ for a in {4..6} ; do ssh node$a "source /etc/profile; /opt/kafka_2.12-0.11.0.0/bin/kafka-server-start.sh /opt/kafka_2.12-0.11.0.0/config/server.properties" ; done或者后臺啟動運行,日志查看去Kafka解壓目錄有個log 文件夾查看
$ for a in {4..6} ; do ssh node$a "source /etc/profile; nohup /opt/kafka_2.12-0.11.0.0/bin/kafka-server-start.sh /opt/kafka_2.12-0.11.0.0/config/server.properties > /dev/null 2>&1 &" ; done查看進程,Kafka 是否啟動成功
$ jps 3825 Kafka 6360 Jps如果報錯刪除
kafka.common.KafkaException: Failed to acquire lock on file .lock in /tmp/kafka-logs. A Kafka instance in another process or thread is using this directory.$ rm -rf /tmp/kafka-logs4.創建主題
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-topics.sh --create --zookeeper 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181 --replication-factor 2 --partitions 1 --topic ymq--replication-factor 2 #復制兩份
--partitions 1 #創建1個分區
--topic #主題為ymq
運行list topic命令,可以看到該主題:
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-topics.sh --list --zookeeper 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:21815.生產消息
Kafka附帶一個命令行客戶端,它將從文件或標準輸入中輸入,并將其作為消息發送到Kafka群集。默認情況下,每行將作為單獨的消息發送。
在 node5 運行生產者,然后在控制臺中輸入一些消息以發送到服務器。
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ymq >www.ymq.io6.消費消息
在node6 運行消費者,將把消息轉儲到標準輸出。
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-console-consumer.sh --zookeeper 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181 --topic ymq --from-beginning Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. www.ymq.io7.topic詳情
用describe 查看集群中topic每個節點情況
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-topics.sh --describe --zookeeper 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181 --topic ymq Topic:ymq PartitionCount:1 ReplicationFactor:2 Configs:Topic: ymq Partition: 0 Leader: 1 Replicas: 1,3 Isr: 3,1以下是輸出的說明。第一行給出了所有分區的摘要,每個附加行提供有關一個分區的信息。由于我們這個主題只有一個分區,只有一行。
leader負責給定分區的讀取和寫入分配節點編號,每個分區的部分數據會隨機指定不同的節點
replicas是復制此分區的日志的節點列表
isr一組正在同步的副本列表
8.刪除topic
$ /opt/kafka_2.12-0.11.0.0/bin/kafka-topics.sh --delete --zookeeper 192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181 --topic ymq Topic ymq is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.9.停止kafka
$ for a in {4..6} ; do ssh node$a "source /etc/profile; /opt/kafka_2.12-0.11.0.0/bin/kafka-server-stop.sh /opt/kafka_2.12-0.11.0.0/config/server.properties " ; done部署 Kafka Manager
Yahoo開源Kafka集群管理器Kafka Manager
作為一個分布式的消息發布-訂閱系統,Apache Kafka在Yahoo內部已經被很多團隊所使用,例如媒體分析團隊就將其應用到了實時分析流水線中,同時,Yahoo整個Kafka集群處理的峰值帶寬超過了20Gbps(壓縮數據)。為了讓開發者和服務工程師能夠更加簡單地維護Kafka集群,Yahoo構建了一個基于Web的管理工具,稱為Kafka Manager,日前該項目已經在GitHub上開源。
通過Kafka Manager用戶能夠更容易地發現集群中哪些主題或者分區分布不均勻,同時能夠管理多個集群,能夠更容易地檢查集群的狀態,能夠創建主題,執行首選的副本選擇,能夠基于集群當前的狀態生成分區分配,并基于生成的分配執行分區的重分配,此外,Kafka Manager還是一個非常好的可以快速查看集群狀態的工具。
Kafka Manager使用Scala語言編寫,其Web控制臺基于Play Framework實現,除此之外,Yahoo還遷移了一些Apache Kafka的幫助程序以便能夠與Apache Curator框架一起工作。
一、它支持以下內容:
- 管理多個群集
- 容易檢查集群狀態(主題,消費者,偏移量,經紀人,副本分發,分區分配)
- 運行首選副本選舉
- 使用選項生成分區分配,以選擇要使用的代理
- 運行分區的重新分配(基于生成的分配)
- 創建可選主題配置的主題(0.8.1.1具有不同于0.8.2+的配置)
- 刪除主題(僅支持0.8.2+,并記住在代理配??置中設置delete.topic.enable = true)
- 主題列表現在表示標記為刪除的主題(僅支持0.8.2+)
- 批量生成多個主題的分區分配,并選擇要使用的代理
- 批量運行多個主題的分區重新分配
- 將分區添加到現有主題
- 更新現有主題的配置
- 可選地,啟用JMX輪詢代理級和主題級度量。
- 可選地篩選出在zookeeper中沒有ids / owner /&offset /目錄的消費者。
源碼,并編譯打包
在 kafka-manager: 192.168.252.127 node7 部署
編譯超級慢
$ yum install git $ cd /opt/ $ git clone https://github.com/yahoo/kafka-manager $ cd kafka-manager/ $ ./sbt clean dist下載編譯好的包
反正我是沒編譯成功,從網上找了一個編譯好的
鏈接: 百度網盤下載 密碼: kha2
$ yum install unzip $ unzip kafka-manager-1.3.2.1.zip $ vi /opt/kafka-manager-1.3.2.1/conf/application.conf修改這個 zk 地址
kafka-manager.zkhosts="192.168.252.121:2181,192.168.252.122:2181,192.168.252.123:2181"啟動 kafka-manager
默認端口 NettyServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000
$ /opt/kafka-manager-1.3.2.1/bin/kafka-manager -Dconfig.file=conf/application.conf或者后臺運行 并且配置端口
$ nohup bin/kafka-manager -Dconfig.file=/home/hadoop/app/kafka-manager-1.3.2.1/conf/application.conf -Dhttp.port=9000 &訪問: http://ip:9000
圖片描述
圖片描述
Contact
- 作者:鵬磊
- 出處:http://www.ymq.io
- Email:admin@souyunku.com
- 版權歸作者所有,轉載請注明出處
- Wechat:關注公眾號,搜云庫,專注于開發技術的研究與知識分享
總結
以上是生活随笔為你收集整理的搭建高吞吐量 Kafka 分布式发布订阅消息 集群的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JavaScript动态加载js文件
- 下一篇: 关于正则验证中文名字2-5位的时候