日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka 消息队列

發(fā)布時間:2023/12/31 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka 消息队列 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

kafka 消息隊列

kafka 架構原理

大數(shù)據(jù)時代來臨,如果你還不知道Kafka那就真的out了!據(jù)統(tǒng)計,有三分之一的世界財富500強企業(yè)正在使用Kafka,包括所有TOP10旅游公司,7家TOP10銀行,8家TOP10保險公司,9家TOP10電信公司等等。LinkedIn、Microsoft和Netflix每天都用Kafka處理萬億級的信息。本文就讓我們一起來大白話kafka的架構原理。

kafka官網(wǎng):http://kafka.apache.org/


01 kafka簡介

Kafka最初由Linkedin公司開發(fā),是一個分布式的、分區(qū)的、多副本的、多訂閱者,基于zookeeper協(xié)調的分布式日志系統(tǒng)(也可以當做MQ系統(tǒng)),常用于web/nginx日志、訪問日志、消息服務等等,Linkedin于2010年貢獻給了Apache基金會并成為頂級開源項目。


02 kafka的特性

  • 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒;

  • 可擴展性:kafka集群支持熱擴展;

  • 持久性、可靠性:消息被持久化到本地磁盤,并且支持數(shù)據(jù)備份防止丟失;

  • 容錯性:允許集群中的節(jié)點失敗(若分區(qū)副本數(shù)量為n,則允許n-1個節(jié)點失敗);

  • 高并發(fā):單機可支持數(shù)千個客戶端同時讀寫;


03 kafka的應用場景

  • 日志收集:一個公司可以用Kafka收集各種服務的log,通過kafka以統(tǒng)一接口開放給各種消費端,例如hadoop、Hbase、Solr等。

  • 消息系統(tǒng):解耦生產者和消費者、緩存消息等。

  • 用戶活動跟蹤:Kafka經(jīng)常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網(wǎng)頁、搜索記錄、點擊等活動,這些活動信息被各個服務器發(fā)布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監(jiān)控分析,或者裝載到hadoop、數(shù)據(jù)倉庫中做離線分析和挖掘。

  • 運營指標:Kafka也經(jīng)常用來記錄運營監(jiān)控數(shù)據(jù)。

  • 流式處理


04 kafka架構(重頭戲!)

下面是一個kafka的架構圖,

整體來看,kafka架構中包含四大組件:生產者、消費者、kafka集群、zookeeper集群。對照上面的結構圖,我們先來搞清楚幾個很重要的術語,(看圖!對照圖理解~)

1、broker

kafka 集群包含一個或多個服務器,每個服務器節(jié)點稱為一個broker。

2、topic

每條發(fā)布到kafka集群的消息都有一個類別,這個類別稱為topic,其實就是將消息按照topic來分類,topic就是邏輯上的分類,同一個topic的數(shù)據(jù)既可以在同一個broker上也可以在不同的broker結點上。

3、partition

分區(qū),每個topic被物理劃分為一個或多個分區(qū),每個分區(qū)在物理上對應一個文件夾,該文件夾里面存儲了這個分區(qū)的所有消息和索引文件。在創(chuàng)建topic時可指定parition數(shù)量,生產者將消息發(fā)送到topic時,消息會根據(jù) 分區(qū)策略 追加到分區(qū)文件的末尾,屬于順序寫磁盤,因此效率非常高(經(jīng)驗證,順序寫磁盤效率比隨機寫內存還要高,這是Kafka高吞吐率的一個很重要的保證)。

上面提到了分區(qū)策略,所謂分區(qū)策略就是決定生產者將消息發(fā)送到哪個分區(qū)的算法。Kafka 為我們提供了默認的分區(qū)策略,同時它也支持自定義分區(qū)策略。kafka允許為每條消息設置一個key,一旦消息被定義了 Key,那么就可以保證同一個 Key 的所有消息都進入到相同的分區(qū),這種策略屬于自定義策略的一種,被稱作"按消息key保存策略",或Key-ordering 策略。

同一主題的多個分區(qū)可以部署在多個機器上,以此來實現(xiàn) kafka 的伸縮性。同一partition中的數(shù)據(jù)是有序的,但topic下的多個partition之間在消費數(shù)據(jù)時不能保證有序性,在需要嚴格保證消息順序消費的場景下,可以將partition數(shù)設為1,但這種做法的缺點是降低了吞吐,一般來說,只需要保證每個分區(qū)的有序性,再對消息設置key來保證相同key的消息落入同一分區(qū),就可以滿足絕大多數(shù)的應用。

4、offset

partition中的每條消息都被標記了一個序號,這個序號表示消息在partition中的偏移量,稱為offset,每一條消息在partition都有唯一的offset,消息者通過指定offset來指定要消費的消息。

正常情況下,消費者在消費完一條消息后會遞增offset,準備去消費下一條消息,但也可以將offset設成一個較小的值,重新消費一些消費過的消息,可見offset是由consumer控制的,consumer想消費哪一條消息就消費哪一條消息,所以kafka broker是無狀態(tài)的,它不需要標記哪些消息被消費過。

5、producer

生產者,生產者發(fā)送消息到指定的topic下,消息再根據(jù)分配規(guī)則append到某個partition的末尾。

6、consumer

消費者,消費者從topic中消費數(shù)據(jù)。

7、consumer group

消費者組,每個consumer屬于一個特定的consumer group,可為每個consumer指定consumer group,若不指定則屬于默認的group。

同一topic的一條消息只能被同一個consumer group內的一個consumer消費,但多個consumer group可同時消費這一消息。這也是kafka用來實現(xiàn)一個topic消息的廣播和單播的手段,如果需要實現(xiàn)廣播,一個consumer group內只放一個消費者即可,要實現(xiàn)單播,將所有的消費者放到同一個consumer group即可。

用consumer group還可以將consumer進行自由的分組而不需要多次發(fā)送消息到不同的topic。

8、leader

每個partition有多個副本,其中有且僅有一個作為leader,leader會負責所有的客戶端讀寫操作。

9、follower

follower不對外提供服務,只與leader保持數(shù)據(jù)同步,如果leader失效,則選舉一個follower來充當新的leader。當follower與leader掛掉、卡住或者同步太慢,leader會把這個follower從ISR列表中刪除,重新創(chuàng)建一個follower。

10、rebalance

同一個consumer group下的多個消費者互相協(xié)調消費工作,我們這樣想,一個topic分為多個分區(qū),一個consumer group里面的所有消費者合作,一起去消費所訂閱的某個topic下的所有分區(qū)(每個消費者消費部分分區(qū)),kafka會將該topic下的所有分區(qū)均勻的分配給consumer group下的每個消費者,如下圖,

rebalance表示"重平衡",consumer group內某個消費者掛掉后,其他消費者自動重新分配訂閱主題分區(qū)的過程,是 Kafka 消費者端實現(xiàn)高可用的重要手段。如下圖Consumer Group A中的C2掛掉,C1會接收P1和P2,以達到重新平衡。同樣的,當有新消費者加入consumer group,也會觸發(fā)重平衡操作。

05 對kafka架構的幾點解釋

  • 一個典型的kafka集群中包含若干producer,若干broker(Kafka支持水平擴展,一般broker數(shù)量越多,集群吞吐率越高),若干consumer group,以及一個zookeeper集群。kafka通過zookeeper協(xié)調管理kafka集群,選舉分區(qū)leader,以及在consumer group發(fā)生變化時進行rebalance。
  • kafka的topic被劃分為一個或多個分區(qū),多個分區(qū)可以分布在一個或多個broker節(jié)點上,同時為了故障容錯,每個分區(qū)都會復制多個副本,分別位于不同的broker節(jié)點,這些分區(qū)副本中(不管是leader還是follower都稱為分區(qū)副本),一個分區(qū)副本會作為leader,其余的分區(qū)副本作為follower。其中l(wèi)eader負責所有的客戶端讀寫操作,follower不對外提供服務,僅僅從leader上同步數(shù)據(jù),當leader出現(xiàn)故障時,其中的一個follower會頂替成為leader,繼續(xù)對外提供服務。
  • 對于傳統(tǒng)的MQ而言,已經(jīng)被消費的消息會從隊列中刪除,但在Kafka中被消費的消息也不會立馬刪除,在kafka的server.propertise配置文件中定義了數(shù)據(jù)的保存時間,當文件到設定的保存時間時才會刪除,

# 數(shù)據(jù)的保存時間(單位:小時,默認為7天)

log.retention.hours=168

因為Kafka讀取消息的時間復雜度為O(1),與文件大小無關,所以這里刪除過期文件與提高Kafka性能并沒有關系,所以選擇怎樣的刪除策略應該考慮磁盤以及具體的需求。

  • 點對點模式 VS 發(fā)布訂閱模式

傳統(tǒng)的消息系統(tǒng)中,有兩種主要的消息傳遞模式:點對點模式、發(fā)布訂閱模式。

①點對點模式

生產者發(fā)送消息到queue中,queue支持存在多個消費者,但是對一個消息而言,只可以被一個消費者消費,并且在點對點模式中,已經(jīng)消費過的消息會從queue中刪除不再存儲。

②發(fā)布訂閱模式

生產者將消息發(fā)布到topic中,topic可以被多個消費者訂閱,且發(fā)布到topic的消息會被所有訂閱者消費。而kafka就是一種發(fā)布訂閱模式。

  • 消費端 pull 和 push

① push方式:由消息中間件主動地將消息推送給消費者;

優(yōu)點:優(yōu)點是不需要消費者額外開啟線程監(jiān)控中間件,節(jié)省開銷。

缺點:無法適應消費速率不相同的消費者。因為消息的發(fā)送速率是broker決定的,而消

費者的處理速度又不盡相同,所以容易造成部分消費者空閑,部分消費者堆積,造成緩

沖區(qū)溢出。

② pull方式:由消費者主動向消息中間件拉取消息;

優(yōu)點:消費端可以按處理能力進行拉取;

缺點:消費端需要另開線程監(jiān)控中間件,有性能開銷;

對于Kafka而言,pull模式更合適。pull模式可簡化broker的設計,Consumer可自主控制消費消息的速率,同時Consumer可以自己控制消費方式,既可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現(xiàn)不同的傳輸語義。


06 開啟zookeeper


07 開啟 kafka

如果服務掛了,就刪掉 kafka-logs 的緩存,重新啟動


08 開生產者producer 發(fā)送消息

  • 使用kafka-topics.bat來創(chuàng)建一個主題。–zookeeper (表示在哪個服務器創(chuàng)建),–replication-factor(表示創(chuàng)建多少個副本),–partitions 1(表示創(chuàng)建一個分區(qū)),創(chuàng)建主題名稱 test
  • kafka-topics.bat --create --zookeeper localhost:9092 --replication-factor 1 --partitions 1 --topic test
  • 查看
  • kafka-topics.bat --list ——zookeeper localhost:2181
  • 刪除
  • [外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-A3thYPDq-1663024484450)(C:\Users\lyz123\AppData\Roaming\Typora\typora-user-images\image-20220912200532032.png)]

    09 開生產者producer 發(fā)送消息

  • 調用工具kafka-console-producer.bat 生產者 發(fā)送信息,–broker-1ist(服務器列表),–topic test(往test這個主題發(fā)送消息)
  • kafka-console-producer.bat --broker-1ist 1ocalhost:9092 --topic test

    10 開生產者consumer發(fā)送消息

    bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

    11 簡單代碼實現(xiàn)

    通過@KafkaKistener 注解來實現(xiàn)的,就是一個kafka的監(jiān)聽器,topics 是復數(shù),監(jiān)聽一個或多個主題。一但監(jiān)聽到有消息,就會調用handleMessage 來處理這個 tipic(主題),會把消息包裝成一個ConsumerRecord 傳進來,通過record對象來處理。

    • 引入依賴
    <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.9.0</version> </dependency>
    • yaml配置
    # KafkaProperties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=community-consumer-group //kafka的包consumer.properties配置 spring.kafka.consumer.enable-auto-commit=true //是否自動提交消費者的偏移量 spring.kafka.consumer.auto-commit-interval=3000 //自動提交的頻率,三千毫秒,3秒
    • 代碼實現(xiàn)

    總之,生產者發(fā)消息,是我們主動去調用的,

    @RunWith(SpringRunner.class) @SpringBootTest @ContextConfiguration(classes = CommunityApplication.class) public class KafkaTests {@Autowiredprivate KafkaProducer kafkaProducer;@Testpublic void testKafka() { //*****阻塞方法,就是等一會才發(fā)消息給你,而不是立即發(fā)私信kafkaProducer.sendMessage("test", "你好");kafkaProducer.sendMessage("test", "在嗎");try {Thread.sleep(1000 * 10); //時間一過,consumer會自動收到這個消息,調用她修飾的方法} catch (InterruptedException e) {e.printStackTrace();}} }@Component class KafkaProducer { //******生產者@Autowiredprivate KafkaTemplate kafkaTemplate; //生產者被spring 整合了public void sendMessage(String topic, String content) {kafkaTemplate.send(topic, content);} }@Component //一旦服務啟動了,spring就會去監(jiān)聽 topic ,有一個線程會阻塞,一直會嘗試讀取消息,但是 阻塞的狀態(tài),如果沒有消息,沒有主題她就會阻塞在這里。一旦有了topic(主題),她就會交給她所修飾的這個方法去讀取。 class KafkaConsumer { //******消費者@KafkaListener(topics = {"test"}) //監(jiān)聽的 topicpublic void handleMessage(ConsumerRecord record) { //在調取這個方法的時候會對這個消息進行封裝,通過record對象,就能讀到原始的消息。System.out.println(record.value());} }

    12 項目實戰(zhàn)

    public class Event {private String topic;private int userId;private int entityType;private int entityId;private int entityUserId;private Map<String, Object> data = new HashMap<>();public String getTopic() {return topic;}public Event setTopic(String topic) {this.topic = topic;return this;}public int getUserId() {return userId;}public Event setUserId(int userId) {this.userId = userId;return this;}public int getEntityType() {return entityType;}public Event setEntityType(int entityType) {this.entityType = entityType;return this;}public int getEntityId() {return entityId;}public Event setEntityId(int entityId) {this.entityId = entityId;return this;}public int getEntityUserId() {return entityUserId;}public Event setEntityUserId(int entityUserId) {this.entityUserId = entityUserId;return this;}public Map<String, Object> getData() {return data;}public Event setData(String key, Object value) {this.data.put(key, value);return this;} } @Component public class EventProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;// 處理事件public void fireEvent(Event event) {// 將事件發(fā)布到指定的主題kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));} }

    總結

    以上是生活随笔為你收集整理的kafka 消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。