大数据 -- zookeeper和kafka集群环境搭建
一 運行環(huán)境
從阿里云申請三臺云服務(wù)器,這里我使用了兩個不同的阿里云賬號去申請云服務(wù)器。我們配置三臺主機名分別為zy1,zy2,zy3。
我們通過阿里云可以獲取主機的公網(wǎng)ip地址,如下:
?通過secureRCT連接主機106.15.74.155,運行ifconfig,可以查看其內(nèi)網(wǎng)ip地址:
1、賬號1申請了兩臺云服務(wù)器:
主機zy1的公網(wǎng)ip為:106.15.74.155,內(nèi)網(wǎng)ip為172.19.182.67。
主機zy2的公網(wǎng)ip為:47.103.134.70,內(nèi)網(wǎng)ip為172.19.14.178。
2、賬號2申請了一臺云服務(wù)器:
主機zy3的公網(wǎng)ip為:47.97.10.51,內(nèi)網(wǎng)ip為172.16.229.255。
3、阿里云入規(guī)則配置
由于主機位于不同的局域網(wǎng)下,因此需要進行一個公網(wǎng)端口到內(nèi)網(wǎng)端口的映射。在搭建zookeeper和kafka需要使用到2181,2888 ,3888,9092端口。需要在阿里云中配置入規(guī)則,具體可以參考阿里云官方收藏:同一個地域、不同賬號下的實例實現(xiàn)內(nèi)網(wǎng)互通?。
注意:如果7.103.134.70配置一個入端口3888,那么對該47.103.134.70:3888的訪問會實際映射到172.19.14.178:3888下。如果是同一局域網(wǎng)下的兩個主機,是不需要配置這個的,可以直接互4通。
如果想了解更多,可以參考以下博客:
通過SSH訪問阿里云服務(wù)器的原理可以參考-用SSH訪問內(nèi)網(wǎng)主機的方法
云服務(wù)器主機內(nèi)網(wǎng)ip和外網(wǎng)ip的區(qū)別
一臺阿里云2臺騰訊云服務(wù)器搭建Hadoop集群
4、配置/etc/hosts
以主機zy1為例:配置如下:
注意zy1對應(yīng)的ip需要配置為內(nèi)網(wǎng)ip,也就是本機ip:172.19.182.67。而zy2、zy3配置的都是公網(wǎng)ip。
二 JDK安裝
在每個主機下執(zhí)行以下操作:
1、安裝之前先查看一下有無系統(tǒng)自帶jdk
rpm -qa |grep javarpm -qa |grep jdkrpm -qa |grep gcj如果有就使用批量卸載命令
rpm -qa | grep java | xargs rpm -e --nodeps2、直接yum安裝1.8.0版本openjdk
yum install java-1.8.0-openjdk* -y默認jre jdk 安裝路徑是/usr/lib/jvm 下面:
3、配置環(huán)境變量
vim /etc/profile #set java environment , appendexport JAVA_HOME=/usr/lib/jvm/javaexport CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jarexport PATH=$PATH:$JAVA_HOME/bin使得配置生效
. /etc/profile4、查看版本
echo $JAVA_HOME echo $CLASSPATH java -version三 安裝zookeeper
在主機zy1下面執(zhí)行以下操作:
1、下載并解壓
wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz創(chuàng)建目錄/opt/bigdata:
mkdir /opt/bigdata解壓文件到/opt/bigdata:
tar -zxvf ?zookeeper-3.4.13.tar.gz -C /opt/bigdata跳轉(zhuǎn)目錄:
cd /opt/bigdata/zookeeper-3.4.13/2、復(fù)制配置文件
cp conf/zoo_sample.cfg conf/zoo.cfg修改配置文件如下:
vim conf/zoo.cfg其中部分參數(shù)意義如下:
- server.1=zy1:2888:3888:server.1 這個1是服務(wù)器的標識也可以是其他的數(shù)字, 表示這個是第幾號服務(wù)器,用來標識服務(wù)器,這個標識要寫到快照目錄下面myid文件里。第一個端口是master和slave之間的通信端口,默認是2888,第二個端口是leader選舉的端口,集群剛啟動的時候選舉或者leader掛掉之后進行新的選舉的端口默認是3888
- dataDir:快照日志的存儲路徑。
- dataLogDir:事物日志的存儲路徑,如果不配置這個那么事物日志會默認存儲到dataDir制定的目錄,這樣會嚴重影響zk的性能,當zk吞吐量較大的時候,產(chǎn)生的事物日志、快照日志太多。
- clientPort:這個端口就是客戶端連接 zookeeper 服務(wù)器的端口,zookeeper 會監(jiān)聽這個端口,接受客戶端的訪問請求。
3、創(chuàng)建myid文件
創(chuàng)建/opt/bigdata/data/zookeeper/zkdata:
mkdir -vp /opt/bigdata/data/zookeeper/zkdata創(chuàng)建myid文件:
echo 1 > /opt/bigdata/data/zookeeper/zkdata/myid4、拷貝zookeeper到主機zy2、zy3
scp -r /opt/bigdata/zookeeper-3.4.13/ zy2:/opt/bigdata/ scp -r /opt/bigdata/zookeeper-3.4.13/ zy3:/opt/bigdata/5、創(chuàng)建主機zy2、zy3的myid文件
zyx主機:
創(chuàng)建/opt/bigdata/data/zookeeper/zkdata:
mkdir -vp /opt/bigdata/data/zookeeper/zkdata創(chuàng)建myid文件:
echo x > /opt/bigdata/data/zookeeper/zkdata/myid注意:x表示主機的編號。
6、配置環(huán)境變量(每個主機都需要配置)
vim /etc/profile #set java environment , appendexport ZOOKEEPER_HOME=/opt/bigdata/zookeeper-3.4.13export PATH=$ZOOKEEPER_HOME/bin:$PATH使得配置生效
. /etc/profile7、啟動服務(wù)并查看
進入到zookeeper目錄下,在每個主機下分別執(zhí)行
cd /opt/bigdata/zookeeper-3.4.13 bin/zkServer.sh start檢查服務(wù)狀態(tài)
bin/zkServer.sh status可以用“jps”查看zk的進程,這個是zk的整個工程的main
jps注意:zk集群一般只有一個leader,多個follower,主一般是相應(yīng)客戶端的讀寫請求,而從主同步數(shù)據(jù),當主掛掉之后就會從follower里投票選舉一個leader出來。
8、客戶端連接
zookeeper服務(wù)開啟后,進入客戶端的命令:
zkCli.sh更多常用命令參考博客:Kafka在zookeeper中存儲結(jié)構(gòu)和查看方式。
9、出現(xiàn)錯誤常用排錯手段
1、防火墻
防火墻沒有關(guān)閉問題。解決方式參考:https://blog.csdn.net/weiyongle1996/article/details/73733228
2、端口沒有開啟
如果/etc/hosts全部配置為公網(wǎng):在zy1運行zkServer.sh start,查看端口開啟狀態(tài):
netstat -an | grep 3888則會發(fā)現(xiàn)無法開啟公網(wǎng)3888端口,我們應(yīng)該打開的是內(nèi)網(wǎng)機器對應(yīng)的端口。
如果端口已經(jīng)開啟,可以通過telnet ip por判斷該端口是否可以從外部訪問。
四 安裝kafka
在主機zy1下執(zhí)行:
1、下載并解壓
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.1/kafka_2.12-2.1.1.tgz tar -zxvf kafka_2.12-2.1.1.tgz -C /opt/bigdata重命名:
cd /opt/bigdata/ mv kafka_2.12-2.1.1 kafka2、修改kafka配置文件
?在/opt/bigdata/kafka下:
vim config/server.properties各個參數(shù)意義:
- broker.id=1? #當前機器在集群中的唯一標識,和zookeeper的myid性質(zhì)一樣;
- listeners=PLAINTEXT://主機:9092? #當前kafka對外提供服務(wù)的主機:端口(默認是9092);
- num.network.threads=3? #這個是borker進行網(wǎng)絡(luò)處理的線程數(shù);
- num.io.threads=8 #這個是borker進行I/O處理的線程數(shù);
- log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大于這個目錄的個數(shù)這個目錄,如果配置多個目錄,新創(chuàng)建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區(qū)數(shù)最少就放那一個;
- socket.send.buffer.bytes=102400 #發(fā)送緩沖區(qū)buffer大小,數(shù)據(jù)不是一下子就發(fā)送的,先回存儲到緩沖區(qū)了到達一定的大小后在發(fā)送,能提高性能;
- socket.receive.buffer.bytes=102400 #kafka接收緩沖區(qū)大小,當數(shù)據(jù)到達一定大小后在序列化到磁盤;
- socket.request.max.bytes=104857600 #這個參數(shù)是向kafka請求消息或者向kafka發(fā)送消息的請請求的最大數(shù),這個值不能超過java的堆棧大小;
- num.partitions=1 #默認的分區(qū)數(shù),一個topic默認1個分區(qū)數(shù);
- log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天;
- message.max.byte=5242880? #消息保存的最大值5M;
- default.replication.factor=2? #kafka保存消息的副本數(shù),如果一個副本失效了,另一個還可以繼續(xù)提供服務(wù);
- replica.fetch.max.bytes=5242880? #取消息的最大直接數(shù);
- log.segment.bytes=1073741824 #這個參數(shù)是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件;
- log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除;
- log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能;
- zookeeper.connect=xx:12181,xx:12181,xx:12181#設(shè)置zookeeper的連接端口;
?注意,這里如果希望在java中創(chuàng)建topic也是多個備份,需要添加一下屬性
#default replication factors for automatically created topics,默認值1;
default.replication.factor=3
#When a producer sets acks to "all" (or "-1"), this configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful.
#min.insync.replicas and acks allow you to enforce greater durability guarantees,默認值1;
min.insync.replicas=3
上面是參數(shù)的解釋,實際的修改項為:
broker.id=1
listeners=PLAINTEXT://zy1:9092? ? ? ? ? ?#內(nèi)網(wǎng)地址
advertised.listeners=PLAINTEXT://106.15.74.155:9092? ?#公網(wǎng)地址(不然遠程客戶端無法訪問)
log.dirs=/opt/bigdata/kafka/kafka-logs
#此外,可以在log.retention.hours=168 下面新增下面三項:
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
#設(shè)置zookeeper的連接端口
zookeeper.connect=zy1:2181,zy2:2181,zy3:2181?
如果我們需要刪除topic,還需要配置一下內(nèi)容:
delete.topic.enable=true具體參考博客:kafka安裝及刪除Topic,Kafka0.8.2.1刪除topic邏輯。
3、復(fù)制kafka到zy2、zy3
scp -r /opt/bigdata/kafka zy2:/opt/bigdata/scp -r /opt/bigdata/kafka zy3:/opt/bigdata/
4、修改zy2、zy3的配置文件server.properties
拷貝文件過去的其他兩個節(jié)點需要更改broker.id和listeners,以zy2為例:
5、啟動kafka
我們可以根據(jù)Kafka內(nèi)帶的zk集群來啟動,但是建議使用獨立的zk集群:
zkServer.sh start 在/opt/bigdata/kafka下 ,三個節(jié)點分別執(zhí)行如下命令,啟動kafka集群:bin/kafka-server-start.sh config/server.properties &
運行命令后服務(wù)確實后臺啟動了,但日志會打印在控制臺,而且關(guān)掉命令行窗口,服務(wù)就會隨之停止,這個讓我挺困惑的。后來,參考了其他的啟動腳本,通過測試和調(diào)試最終找到了完全滿足要求的命令。
bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &其中1>/dev/null 2>&1 是將命令產(chǎn)生的輸入和錯誤都輸入到空設(shè)備,也就是不輸出的意思。/dev/null代表空設(shè)備。
注意:如果內(nèi)存不足:打開kafka安裝位置,在bin目錄下找到kafka-server-start.sh文件,將export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"修改為export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"。
6、驗證
思路:以下給出幾條kafka指令。創(chuàng)建一個topic,一個節(jié)點作為生產(chǎn)者,兩個節(jié)點作為消費者分別看看能否接收數(shù)據(jù),進行驗證:
創(chuàng)建及查看topic:
cd /opt/big/data/kafka bin/kafka-topics.sh -list -zookeeper zy1:2181 bin/kafka-topics.sh --create --zookeeper zy1:2181 --replication-factor 3 --partitions 3 --topic zy-test開啟生產(chǎn)者:
bin/kafka-console-producer.sh --broker-list zy1:9092 --topic zy-test開啟消費者:
bin/kafka-console-consumer.sh --bootstrap-server zy2:9092 --topic zy-test --from-beginning?
節(jié)點zy1產(chǎn)生消息,如果消息沒有清理,在節(jié)點zy2、zy3都可以接收到消息。
7、更多kafka命令
以下是kafka常用命令行總結(jié):??
查看topic的詳細信息??
bin/kafka-topics.sh -zookeeper zy1:2181 --describe --topic zy-test可以看到topic包含3個復(fù)本,每個副本又分為三個partition。以zy-test:partition0為例,其leader保存在broker.id=1的主機上,副本保存在2、3節(jié)點上。其消息保存在配置參數(shù)log.dirs所指定的路徑下:
為topic增加副本??
bin/kafka-reassign-partitions.sh --zookeeper zy1:2181 --reassignment-json-file json/partitions-to-move.json -execute創(chuàng)建topic?
bin/kafka-topics.sh --create --zookeeper zy1:2181 --replication-factor 3 --partitions 3 --topic zy-test為topic增加partition??
bin/kafka-topics.sh –-zookeeper zy1:2181 –-alter –-partitions 3 –-topic zy-testkafka生產(chǎn)者客戶端命令??
bin/kafka-console-producer.sh --broker-list zy1:9092 --topic zy-testkafka消費者客戶端命令??
bin/kafka-console-consumer.sh --bootstrap-server zy2:9092 --topic zy-test --from-beginningkafka服務(wù)啟動??
bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &刪除topic??
bin/kafka-topics.sh --zookeeper zy1:2181 --delete --topic zy-test8、關(guān)閉kafka
bin/kafka-server-stop.sh五 consumer offsets
由于Zookeeper并不適合大批量的頻繁寫入操作,新版Kafka已推薦將consumer的位移信息保存在kafka內(nèi)部的topic中,即__consumer_offsets topic,并且默認提供了kafka_consumer_groups.sh腳本供用戶查看consumer信息。
1、獲取消息在topic中的記錄信息
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list zy1:9092 --topic zy-test輸出結(jié)果每個字段分別表示topic、partition、untilOffset(當前partition的最大偏移);
上面的輸出結(jié)果表明kafka隊列總有產(chǎn)生過4條消息(這并不代表kafka隊列現(xiàn)在一定有4條消息,因為kafka有兩種策略可以刪除舊數(shù)據(jù):基于時間、基于大小)。
由于我使用kafka-console-producer.sh生成了四條消息:zy、19941108、?liuyan、1。因此kafka消息隊列中存在4條消息。
2、創(chuàng)建一個console consumer group
bin/kafka-console-consumer.sh --bootstrap-server zy1:9092 --consumer.config config/consumer.properties --topic zy-test --from-beginning?
再次建立消費者:
會發(fā)現(xiàn)獲取不到數(shù)據(jù)。這是因為我們指定了消費組,第一次消費時從offset為0開始消費,把4條消息全部讀出,此時offset移動到最后,當再次使用同一消費組讀取數(shù)據(jù),則會從上次的offset開始獲取數(shù)據(jù)。
而使用:
bin/kafka-console-consumer.sh --bootstrap-server zy1:9092 --topic zy-test --from-beginning每次都會獲取四條數(shù)據(jù),這是因為每次都會創(chuàng)建一個新的消費者,這些消費者會被隨機分配到一個不同的組,因此每次都是從offset為0開始消費。
參數(shù)解釋:
3、?獲取該consumer group的group id
bin/kafka-consumer-groups.sh --bootstrap-server zy1:9092 --list可以看到有三個消費組,前兩個消費者沒有指定消費組,隨機產(chǎn)生一個console-consumer-***的group.ig。
第三個是我們剛剛在config/consumer.properties 中指定的消費組。
4、查看消費者組的offset
bin/kafka-consumer-groups.sh --bootstrap-server zy1:9092 --describe --group test-consumer-group如果此時再使用生產(chǎn)者客戶端生成兩條消息:
再次查看消費組test-consumer-group的消費情況:
5、KAFKA API指定位移消費
由于我們還沒有介紹KAFKA的API,這塊內(nèi)容就不先介紹,具體參考博客:Kafka消費者 之 指定位移消費。
六 各個端口作用
- 2888:zookeeper集群三臺主機心跳端口;
- 3888:zookeeper集群三臺主機選取leader端口,防止其中一個宕機了;
- 2181:zookeeper服務(wù)器監(jiān)聽端口,等待消費者連接,消費者可以從中獲取topic分區(qū)以及消費offset等信息(高版本消費者offset已經(jīng)保存在kafka內(nèi)部的topic中了);
- 9092:kafka服務(wù)器綁定端口,等待生產(chǎn)者和消費者連接;
因此上面介紹的kafka命令,與topic相關(guān)的使用--zookeeper zy1:2181,與生產(chǎn)者、消費者相關(guān)的使用?--bootstrap-server zy1:9092。
參考博客:
[1]kafka和zookeeper集群搭建詳細步驟
[2]yum安裝jdk環(huán)境變量配置
[3]使用命令讀取kafka的內(nèi)部topic:__consumer_offsets
[4]kafka學(xué)習(xí)筆記:知識點整理 - cyfonly - 博客園(推薦)
轉(zhuǎn)載于:https://www.cnblogs.com/zyly/p/11327605.html
總結(jié)
以上是生活随笔為你收集整理的大数据 -- zookeeper和kafka集群环境搭建的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 嘗試著去做
- 下一篇: 三星WP7手机MANGO一分钟完美越狱