mac安装kafka
1.使用brew工具來安裝,只需要一條命令就完成了下載和安裝的過程,包含zookeeper默認(rèn)安裝
????????brew install kafka
????????安裝完成后可以看到Kafka安裝的路徑和配置文件路徑
????????/usr/local/Cellar/zookeeper
????????/usr/local/Cellar/kafka
????????配置文件位置
????????/usr/local/etc/kafka/server.properties
????????/usr/local/etc/kafka/zookeeper.properties
2 啟動Kafka依賴zookeeper,而Kafka中默認(rèn)有一個單機版的zookeeper。實際生產(chǎn)部署中不推薦使用默認(rèn)的zookeeper2.1 啟動zookeeper在kafka的安裝目錄下,執(zhí)行腳本啟動zookeepercd /usr/local/Cellar/kafka/3.1.0mac最好用這個命令: 執(zhí)行腳本啟動zookeeper: bin/zookeeper-server-start? /usr/local/etc/kafka/zookeeper.propertiesliunx 在zk的bin路徑下執(zhí)行:zkServer.sh start 1、啟動zkServer start2、停止zkServer stop3、查看狀態(tài)zkServer status2.2 啟動Kafka 新開一個控制窗口,在Kafka安裝目錄下,執(zhí)行腳本啟動kafka cd /usr/local/Cellar/kafka/3.1.0mac最好用這個命令: bin/kafka-server-start /usr/local/etc/kafka/server.properties liunx 在kafka路徑下啟動執(zhí)行:/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & liunx 在kafka路徑下關(guān)閉執(zhí)行:/usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties &3.測試新建兩個窗口發(fā)送和接收?
? ? ? ? 3.1新建topic
????????#創(chuàng)建topic?
????????bin/kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic wanggf-test
? ? ? ? 3.2查詢創(chuàng)建topic
????????#查看創(chuàng)建的topic
????????bin/kafka-topics --list --bootstrap-server localhost:9092
? ? ? ? ?3.3 終端1發(fā)送消息
????????bin/kafka-console-producer --broker-list localhost:9092 --topic wanggf-test
?
? ? ? ? 3.4終端2接受消息
????????kafka-console-consumer --bootstrap-server localhost:9092 --topic wanggf-test --from-beginning
7.擴展
#指定分區(qū)消費消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --partition 1 --from-beginning
#添加分區(qū)
kafka-topics --alter --zookeeper localhost:2181 --partitions 10 ?--topic test ? ?//添加10個
#啟動kafka,指定配置文件,后臺啟動并打印日志到 /usr/local/etc/kafka/kafka.log
?nohup kafka-server-start /usr/local/etc/kafka/server.properties > /usr/local/etc/kafka/kafka.log 2>&1 &
8.資料小擴展,本段為本人學(xué)習(xí)kafka時學(xué)到的小知識,不是很深但是很實用,經(jīng)驗不多,小弟也是剛?cè)腴T,先分享為快
8.1 topic
# topic:
1.kafka集群會將每個topic進(jìn)行分區(qū),每個分區(qū)都是一個排序且不可改變的隊列,新的消息會進(jìn)入隊尾并分配一個唯一ID,官方稱之為偏移量(offset)
2.無論消息是否被消費,集群都會保留消息,有一個配置的時間(過期時間),超過這個時間后,消息會被清除
3.消費端唯一記錄的元信息就是自己在topic中的位置(offset),
4.分布式的原因:第一集群可以容納大量的數(shù)據(jù) 第二:可以并行的處理
8.2 消費語義的理解
?at last once:至少消費一次(對一條消息有可能多次消費,有可能會造成重復(fù)消費數(shù)據(jù))
?????原因:Proudcer產(chǎn)生數(shù)據(jù)的時候,已經(jīng)寫入在broker中,但是由于broker的網(wǎng)絡(luò)異常,沒有返回ACK,這時Producer,認(rèn)為數(shù)據(jù)沒有寫入成功,此時producer會再次寫入,相當(dāng)于一條數(shù)據(jù),被寫入了多次。
?????
at most once:最多消費一次,對于消息,有可能消費一次,有可能一次也消費不了
????原因:producer在產(chǎn)生數(shù)據(jù)的時候,有可能寫數(shù)據(jù)的時候不成功,此時broker就跳過這個消息,那么這條數(shù)據(jù)就會丟失,導(dǎo)致consumer無法消費。
????
exactly once:有且僅有一次。這種情況是我們所需要的,也就是精準(zhǔn)消費一次。
?kafka中消費語義的場景
???at last once:可以先讀取數(shù)據(jù),處理數(shù)據(jù),最后記錄offset,當(dāng)然如果在記錄offset之前就crash,新的consumer會重復(fù)的來消費這條數(shù)據(jù),導(dǎo)致了”最少一次“
???at most once:可以先讀取數(shù)據(jù),然后記錄offset,最后在處理數(shù)據(jù),這個方式,就有可能在offset后,還沒有及時的處理數(shù)據(jù),就crash了,導(dǎo)致了新的consumer繼續(xù)從這個offset處理,那么剛剛還沒來得及處理的數(shù)據(jù),就永遠(yuǎn)不會被處理,導(dǎo)致了”最多消費一次“
???exactly once:可以通過將提交分成兩個階段來解決:保存了offset后提交一次,消息處理成功后,再提交一次。
8.3 消息一致性
kafka中如何實現(xiàn)精準(zhǔn)寫入數(shù)據(jù)?
A:Producer 端寫入數(shù)據(jù)的時候保證冪等性操作:
冪等性:對于同一個數(shù)據(jù)無論操作多少次都只寫入一條數(shù)據(jù),如果重復(fù)寫入,則執(zhí)行不成功
B:broker寫入數(shù)據(jù)的時候,保證原子性操作, 要么寫入成功,要么寫入失敗。(不成功不斷進(jìn)行重試)
8.4 AckMode
先放上源碼
/**
? ? ?* The offset commit behavior enumeration.
? ? ?*/
? ? public enum AckMode {
? ? ? ? /**
? ? ? ? ?* Commit after each record is processed by the listener.
? ? ? ? ?*/
? ? ? ? RECORD,
? ? ? ? /**
? ? ? ? ?* Commit whatever has already been processed before the next poll.
? ? ? ? ?*/
? ? ? ? BATCH,
? ? ? ? /**
? ? ? ? ?* Commit pending updates after
? ? ? ? ?* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
? ? ? ? ?*/
? ? ? ? TIME,
? ? ? ? /**
? ? ? ? ?* Commit pending updates after
? ? ? ? ?* {@link ContainerProperties#setAckCount(int) ackCount} has been
? ? ? ? ?* exceeded.
? ? ? ? ?*/
? ? ? ? COUNT,
? ? ? ? /**
? ? ? ? ?* Commit pending updates after
? ? ? ? ?* {@link ContainerProperties#setAckCount(int) ackCount} has been
? ? ? ? ?* exceeded or after {@link ContainerProperties#setAckTime(long)
? ? ? ? ?* ackTime} has elapsed.
? ? ? ? ?*/
? ? ? ? COUNT_TIME,
? ? ? ? /**
? ? ? ? ?* User takes responsibility for acks using an
? ? ? ? ?* {@link AcknowledgingMessageListener}.
? ? ? ? ?*/
? ? ? ? MANUAL,
? ? ? ? /**
? ? ? ? ?* User takes responsibility for acks using an
? ? ? ? ?* {@link AcknowledgingMessageListener}. The consumer is woken to
? ? ? ? ?* immediately process the commit.
? ? ? ? ?*/
? ? ? ? MANUAL_IMMEDIATE,
? ? }
源碼解讀
RECORD
每處理一條commit一次
BATCH(默認(rèn))
每次poll的時候批量提交一次,頻率取決于每次poll的調(diào)用頻率
TIME?
每次間隔ackTime的時間去commit(跟auto commit interval有什么區(qū)別呢?)
COUNT?
累積達(dá)到ackCount次的ack去commit
COUNT_TIME
ackTime或ackCount哪個條件先滿足,就commit
MANUAL
listener負(fù)責(zé)ack,但是背后也是批量上去
MANUAL_IMMEDIATE
listner負(fù)責(zé)ack,每調(diào)用一次,就立即commit
總結(jié)
以上是生活随笔為你收集整理的mac安装kafka的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ESP32学习笔记(35)——蓝牙MAC
- 下一篇: 基于STM32的uc/OS系统移植及用S