日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

从零开始学习Kafka

發布時間:2025/7/14 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 从零开始学习Kafka 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

簡介

kafka是一個分布式消息隊列。具有高性能、持久化、多副本備份、橫向擴展能力。生產者往隊列里寫消息,消費者從隊列里取消息進行業務邏輯。一般在架構設計中起到解耦、削峰、異步處理的作用。

Kafka核心組件-intsmaze

  •   Topic:消息根據Topic進行歸類,可以理解為一個隊里。
  •   Producer:消息生產者,就是向kafka broker發消息的客戶端。
  •   Consumer:消息消費者,向kafka broker取消息的客戶端。
  •   broker:每個kafka實例(server),一臺kafka服務器就是一個broker,一個集群由多個broker組成,一個broker可以容納多個topic。
  •   Zookeeper:依賴集群保存meta信息。
      
    大家先看kafka的介紹或者教程啊,上來都顯示一堆長篇大論,各自文字圖片,看著很懵逼,頭暈。搞程序的,要讓ta跑起來,再針對可運行的成果,慢慢了解ta。所以本文會由淺入深,先實踐后理論,結合實踐講理論。

    Kafka安裝配置

    下載

wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz

解壓

tar -zxvf kafka_2.11-2.2.0.tgz

修改 kafka-server 的配置文件

cd kafka_2.11-2.2.0vim config/server.properties

修改其中的:

# The id of the broker. This must be set to a unique integer for each broker. broker.id=1 # A comma separated list of directories under which to store log files log.dirs=/data/kafka-logs

啟動zk【默認端口2181】

bin/zookeeper-server-start.sh config/zookeeper.properties

啟動Kafka

使用?kafka-server-start.sh?啟動 kafka 服務:

bin/kafka-server-start.sh config/server.properties

測試使用

創建 topic

使用 kafka-topics.sh 創建單分區單副本的 topic demo

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo

查看 topic 列表:

bin/kafka-topics.sh --list --zookeeper localhost:2181

發送消息【生產者】

使用?kafka-console-producer.sh?發送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo

讀取消息【消費者】

使用?kafka-console-consumer.sh 接收消息并在終端打印:

?bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning


注意不要使用
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning,高版本已經不支持

查看描述 topics 信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo [root@localhost kafka_2.11-2.2.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo Topic:demo PartitionCount:1 ReplicationFactor:1 Configs:Topic: demo Partition: 0 Leader: 1 Replicas: 1 Isr: 1

第一行給出了所有分區的摘要,每個附加行給出了關于一個分區的信息。 由于我們只有一個分區,所以只有一行。

  • “Leader”: 是負責給定分區的所有讀取和寫入的節點。 每個節點將成為分區隨機選擇部分的領導者。
  • “Replicas”: 是復制此分區日志的節點列表,無論它們是否是領導者,或者即使他們當前處于活動狀態。
  • “Isr”: 是一組“同步”副本。這是復制品列表的子集,當前活著并被引導到領導者

擴展-集群配置

Kafka 支持兩種模式的集群搭建:可以在單機上運行多個 broker 實例來實現集群,也可在多臺機器上搭建集群,下面介紹下如何實現單機多 broker 實例集群,其實很簡單,只需要如下配置即可。

單機多broker 集群配置

利用單節點部署多個 broker。 不同的 broker 設置不同的 id,監聽端口及日志目錄。 例如:

cp config/server.properties config/server-2.properties vi config/server-2.properties

修改內容:

broker.id=2listeners = PLAINTEXT://127.0.0.1:9093log.dirs=/data/kafka-logs2

同樣,配置第三個broker:

cp config/server-2.properties config/server-3.properties vi config/server-3.properties

修改內容:

broker.id=2listeners = PLAINTEXT://127.0.0.1:9093log.dirs=/data/kafka-logs2

listeners 申明此kafka服務器需要監聽的端口號,默認會使用localhost的地址,如果是在遠程服務器上運行則必須配置,例如:         
listeners=PLAINTEXT:// 192.168.180.128:9092
并確保服務器的9092端口能夠訪問

啟動2/3 borker

bin/kafka-server-start.sh config/server-2.properties & bin/kafka-server-start.sh config/server-3.properties &

至此,單機多broker實例的集群配置完畢。

擴展-多機多borker集群

分別在多個節點按上述方式安裝 Kafka,配置啟動多個 Zookeeper 實例。

假設三臺機器 IP 地址是 : 192.168.153.135, 192.168.153.136, 192.168.153.137

分別配置多個機器上的 Kafka 服務,設置不同的 broker id,zookeeper.connect 設置如下:

config/server.properties里面的?zookeeper.connect

zookeeper.connect=192.168.153.135:2181,192.168.153.136:2181,192.168.153.137:2181

使用 Kafka Connect 來導入/導出數據

從控制臺寫入數據并將其寫回控制臺是一個方便的起點,但您可能想要使用其他來源的數據或將數據從 Kafka 導出到其他系統。對于許多系統,您可以使用 Kafka Connect 來導入或導出數據,而不必編寫自定義集成代碼。

Kafka Connect 是 Kafka 包含的一個工具,可以將數據導入和導出到 Kafka。它是一個可擴展的工具,運行 連接器,實現與外部系統交互的自定義邏輯。在這個快速入門中,我們將看到如何使用簡單的連接器運行 Kafka Connect,這些連接器將數據從文件導入到 Kafka topic,并將數據從 Kafka topic 導出到文件。

參考:

  • http://www.54tianzhisheng.cn/2018/01/04/Kafka/
  • http://kafka.apache.org/10/documentation/streams/quickstart
  • http://kafka.apache.org/20/documentation.html#quickstart

代碼測試

準備測試kafka

cp config/server.properties config/server-idea.properties vi config/server-idea.propertiesbroker.id=999listeners = PLAINTEXT://192.168.1.177:9999log.dirs=/data/kafka-logs-999

192.168.1.177為kafka所在機器的ip地址,9999端口號是對外提供的端口,下文會使用到

Springboot 發送消息、接受消息源碼

很簡單的一個小demo,可以直接拷貝使用。

KafkaApplication.java:

import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate;@SpringBootApplication public class KafkaApplication {public static void main(String[] args) {ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);KafkaTemplate kafkaTemplate = context.getBean(KafkaTemplate.class);for (int i = 0; i < 10; i++) {//調用消息發送類中的消息發送方法kafkaTemplate.send("mytopic", System.currentTimeMillis() + "發送" + i);try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}@KafkaListener(topics = {"mytopic"},groupId = "halburt-demo2")public void consumer1(String message) {System.out.println("consumer1收到消息:" + message);}@KafkaListener(topics = {"mytopic"} ,groupId = "halburt-demo")public void consumer2(ConsumerRecord<?, ?> record) {System.out.println("consumer2收到消息");System.out.println(" topic" + record.topic());System.out.println(" key:" + record.key());System.out.println(" value:"+record.value());} }

application.yml:

server:port: 8090 spring:kafka:consumer:auto-commit-interval: 100bootstrap-servers: 192.168.1.177:9999enable-auto-commit: truegroup-id: halburt-demokey-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 1value-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:concurrency: 5producer:bootstrap-servers: 192.168.1.177:9999key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

192.168.1.177:9999即為kafka的配置文件中配置

pom.xml依賴:

依賴版本:

spring-boot.version:2.1.3.RELEASE
spring-kafka.version:2.2.0.RELEASE

【此處有坑】此處依賴版本可以不用這2個版本,但是一定要注意springboot和kafka的版本對應

<dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.0.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.1.3.RELEASE</version></dependency></dependencies>

啟動kafka并run Application.java

bin/kafka-server-start.sh config/server-idea.properties &


已經啟動了zk,此處不用再啟動,如果未啟動,需要啟動zk。

cd /home/hd/kafka_2.11-2.2.0/ bin/zookeeper-server-start.sh config/zookeeper.properties&

kafka啟動成功之后,run Application,會看到日志如下:

已經接收到消息了。

如果你是跟著本文從頭開始的,一定注意此處有個坑

如果你是從頭開始跟這個本文學習的,那么你直接啟動的話,會發現消息發出去了,但是沒有接收到。
我也是查了好久,看了很多教程,別人都行我就不行。
如果你的zk有其他的topic節點的話,會收不到消息,直接上解決方案:刪除所有的zk節點。怎么刪除?

上碼:

/*** zookeeper znode遞歸刪除節點* @author Halburt**/ public class DeleteZkNode {//zookeeper的地址 private static final String connectString = "192.168.1.177:2181";private static final int sessionTimeout = 2000;private static ZooKeeper zookeeper = null;/*** main函數* @param args* @throws Exception*/public static void main(String[] args) throws Exception {//調用rmr,刪除所有目錄rmr("/");}/*** 遞歸刪除 因為zookeeper只允許刪除葉子節點,如果要刪除非葉子節點,只能使用遞歸* @param path* @throws IOException*/public static void rmr(String path) throws Exception {ZooKeeper zk = getZookeeper();//獲取路徑下的節點List<String> children = zk.getChildren(path, false);for (String pathCd : children) {//獲取父節點下面的子節點路徑String newPath = "";//遞歸調用,判斷是否是根節點if (path.equals("/")) {newPath = "/" + pathCd;} else {newPath = path + "/" + pathCd;}rmr(newPath);}//刪除節點,并過濾zookeeper節點和 /節點if (path != null && !path.trim().startsWith("/zookeeper") && !path.trim().equals("/")) {zk.delete(path, -1);//打印刪除的節點路徑System.out.println("被刪除的節點為:" + path);}}/*** 獲取Zookeeper實例* @return* @throws IOException*/public static ZooKeeper getZookeeper() throws IOException {zookeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {}});return zookeeper;}}

終端命令查看消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.177:9999 --topic mytopic --from-beginning

安利一下可視化工具Kafka Tool 2

下載地址

Kafka Tool 2是一款Kafka的可視化客戶端工具,可以非常方便的查看Topic的隊列信息以及消費者信息以及kafka節點信息。下載地址:http://www.kafkatool.com/download.html

使用

先創建連接

下載安裝之后會彈出一個配置連接的窗口,我們可以看到這個窗口左上角為Add Cluster(添加集群),但沒關系,對應單節點的Kafka實例來說也是可以的,因為這個軟件監控的是Zookeeper而不是Kafka,Kafka的集群搭建也是依賴Zookeeper來實現的,所以默認情況下我們都是直接通過Zookeeper去完成大部分操作。

創建完成之后,連接

我們可以看到已經創建好的Topic。這個軟件默認顯示數據的類型為Byte,可以在設置里面找到對應的修改選項

接下來就自己探索吧

理論學習

kafka單節點的結構如下:


單節點broker包含多個topic主題,而每個topic則包含多個partition副本,每個partition會有序的存儲消息。

kafka的總體數據流

kafka對外使用topic的概念,生產者往topic里寫消息,消費者從topic讀消息。為了做到水平擴展,一個topic實際是由多個partition組成的,遇到瓶頸時,可以通過增加partition的數量來進行橫向擴容。單個parition內是保證消息有序。每新寫一條消息,kafka就是在對應的文件append寫,所以性能非常高。kafka的總體數據流是這樣的:

Producers往Brokers里面的指定Topic中寫消息,Consumers從Brokers里面拉去指定Topic的消息,然后進行業務處理。

名詞解析

Producer

消費者: Producer將消息發布到指定的Topic中,同時Producer也能決定將此消息歸屬于哪個partition;比如基于"round-robin"方式或者通過其他的一些算法等.

Consumer

每個consumer屬于一個consumer group;反過來說,每個group中可以有多個consumer.發送到Topic的消息,只會被訂閱此Topic的每個group中的一個consumer消費(對于一條消息來說,同一組的消費者只會有一個消費者去消費).

?如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會在consumers之間負載均衡.
?如果所有的consumer都具有不同的group,那這就是"發布-訂閱";消息將會廣播給所有的消費者.

在kafka中,一個partition中的消息只會被group中的一個consumer消費;每個group中consumer消息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的消息.kafka只能保證一個partition中的消息被某個consumer消費時,消息是順序的。事實上,從Topic角度來說,消息仍不是有序的。

Topics

一個Topic可以認為是一類消息,每個topic將被分成多個partition(區),每個partition在存儲層面是append log文件。任何發布到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個long型數字,它是唯一標記一條消息。它唯一的標記一條消息。kafka并沒有提供其他額外的索引機制來存儲offset,因為在kafka中幾乎不允許對消息進行“隨機讀寫”。

Partition

topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列

以下是單個生產者和消費者從兩個分區主題讀取和寫入的簡單示例。

此圖顯示了一個producer向2個partition分區寫入日志,以及消費者從相同日志中讀取的內容。日志中的每條記錄都有一個相關的條目號,稱之為偏移量offset。消費者使用此偏移來記錄其在partitiond讀取日志的位置。

當然如果存在多個消費者的話,根據groupId分組,同一組的消費者不會重復讀取日志。

換句話說: 訂閱topic是以一個消費組來訂閱的,一個消費組里面可以有多個消費者。同一個消費組中的兩個消費者,不會同時消費一個partition。換句話來說,就是一個partition,只能被消費組里的一個消費者消費,但是可以同時被多個消費組消費。因此,如果消費組內的消費者如果比partition多的話,那么就會有個別消費者一直空閑。

其實consumer可以使用任意順序消費日志消息,它只需要將offset重置為任意值.(offset將會保存在zookeeper中,kafka集群幾乎不需要維護任何consumer和producer狀態信息,這些信息有zookeeper保存)

?partition有多個.最根本原因是kafka基于文件存儲.通過分區,可以將日志內容分散到多個partition上,來避免文件大小達到單機磁盤的上限,每個partiton都會被當前server(kafka實例)保存;可以將一個topic切分多任意多個partitions,來消息保存/消費的效率.此外越多的partitions意味著可以容納更多的consumer,有效提升并發消費的能力.

使用場景

消息系統、消息隊列

? 對于一些常規的消息系統,kafka是個不錯的選擇;partitons/replication和容錯,可以使kafka具有良好的擴展性和性能優勢.不過到目前為止,我們應該很清楚認識到,kafka并沒有提供JMS中的"事務性""消息傳輸擔保(消息確認機制)""消息分組"等企業級特性;kafka只能使用作為"常規"的消息系統,在一定程度上,尚未確保消息的發送與接收絕對可靠(比如,消息重發,消息發送丟失等)

日志聚合

?kafka的特性決定它非常適合作為"日志收集中心";application可以將操作日志"批量""異步"的發送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/壓縮消息等,這對producer端而言,幾乎感覺不到性能的開支.此時consumer端可以使hadoop等其他系統化的存儲和分析系統.

網站活動追蹤、調用鏈系統、事件采集

可以將網頁/用戶操作等信息發送到kafka中.并實時監控,或者離線統計分析等

等等其他場景

server.properties配置文件解讀

############################# Server Basics ############################# # 節點的ID,必須與其它節點不同 broker.id=0 # 選擇啟用刪除主題功能,默認false #delete.topic.enable=true ############################# Socket Server Settings ############################## 套接字服務器堅挺的地址。如果沒有配置,就使用java.net.InetAddress.getCanonicalHostName()的返回值 # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092# 節點的主機名會通知給生產者和消費者。如果沒有設置,如果配置了"listeners"就使用"listeners"的值。 # 否則就使用java.net.InetAddress.getCanonicalHostName()的返回值 #advertised.listeners=PLAINTEXT://your.host.name:9092# 將偵聽器的名稱映射到安全協議,默認情況下它們是相同的。有關詳細信息,請參閱配置文檔 #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# 服務器用來接受請求或者發送響應的線程數 num.network.threads=3# 服務器用來處理請求的線程數,可能包括磁盤IO num.io.threads=8# 套接字服務器使用的發送緩沖區大小 socket.send.buffer.bytes=102400# 套接字服務器使用的接收緩沖區大小 socket.receive.buffer.bytes=102400# 單個請求最大能接收的數據量 socket.request.max.bytes=104857600############################# Log Basics ############################## 一個逗號分隔的目錄列表,用來存儲日志文件 log.dirs=/tmp/kafka-logs# 每個主題的日志分區的默認數量。更多的分區允許更大的并行操作,但是它會導致節點產生更多的文件 num.partitions=1# 每個數據目錄中的線程數,用于在啟動時日志恢復,并在關閉時刷新。 num.recovery.threads.per.data.dir=1############################# Internal Topic Settings ############################# # 內部主題設置 # 對于除了開發測試之外的其他任何東西,group元數據內部主題的復制因子“__consumer_offsets”和“__transaction_state”,建議值大于1,以確保可用性(如3)。 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1############################# Log Flush Policy ############################## 在強制刷新數據到磁盤之前允許接收消息的數量 #log.flush.interval.messages=10000# 在強制刷新之前,消息可以在日志中停留的最長時間 #log.flush.interval.ms=1000############################# Log Retention Policy ############################## 以下的配置控制了日志段的處理。策略可以配置為每隔一段時間刪除片段或者到達一定大小之后。 # 當滿足這些條件時,將會刪除一個片段。刪除總是發生在日志的末尾。# 一個日志的最小存活時間,可以被刪除 log.retention.hours=168# 一個基于大小的日志保留策略。段將被從日志中刪除只要剩下的部分段不低于log.retention.bytes。 #log.retention.bytes=1073741824# 每一個日志段大小的最大值。當到達這個大小時,會生成一個新的片段。 log.segment.bytes=1073741824# 檢查日志段的時間間隔,看是否可以根據保留策略刪除它們 log.retention.check.interval.ms=300000############################# Zookeeper #############################zookeeper.connect=localhost:2181# 連接到Zookeeper的超時時間 zookeeper.connection.timeout.ms=6000############################# Group Coordinator Settings #############################group.initial.rebalance.delay.ms=0

參考文章

https://www.cnblogs.com/likehua/p/3999538.html

https://www.jianshu.com/p/d3e963ff8b70

如有表述不當之處,敬請指正。

轉載于:https://www.cnblogs.com/Halburt/p/10842597.html

總結

以上是生活随笔為你收集整理的从零开始学习Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。