日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

详述 Kafka 基本原理

發(fā)布時(shí)間:2023/12/16 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 详述 Kafka 基本原理 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

  • 1 簡(jiǎn)介
  • 2 Kafka 架構(gòu)
  • 3 Kafka 存儲(chǔ)策略
  • 4 Kafka 刪除策略
  • 5 Kafka broker
  • 6 Kafka 官方文檔
  • 7 代碼示例

1 簡(jiǎn)介

Apache Kafka 是分布式發(fā)布-訂閱消息系統(tǒng)。它最初由 LinkedIn 公司開發(fā),之后成為 Apache 項(xiàng)目的一部分。Kafka 是一種快速、可擴(kuò)展的、設(shè)計(jì)內(nèi)在就是分布式的,分區(qū)的和可復(fù)制的提交日志服務(wù)。

2 Kafka 架構(gòu)

它的架構(gòu)包括以下組件:

  • 話題(Topic):是特定類型的消息流。消息是字節(jié)的有效負(fù)載(Payload),話題是消息的分類名或種子(Feed)名。
  • 生產(chǎn)者(Producer):是能夠發(fā)布消息到話題的任何對(duì)象。
  • 服務(wù)代理(Broker):已發(fā)布的消息保存在一組服務(wù)器中,它們被稱為代理(Broker)或 Kafka 集群。
  • 消費(fèi)者(Consumer):可以訂閱一個(gè)或多個(gè)話題,并從Broker拉數(shù)據(jù),從而消費(fèi)這些已發(fā)布的消息。

3 Kafka 存儲(chǔ)策略

  • Kafka 以topic來進(jìn)行消息管理,每個(gè)topic包含多個(gè)partition,每個(gè)partition對(duì)應(yīng)一個(gè)邏輯log,有多個(gè)segment組成。
  • 每個(gè)segment中存儲(chǔ)多條消息(見下圖),消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲(chǔ)位置,避免id到位置的額外映射。
  • 每個(gè)part在內(nèi)存中對(duì)應(yīng)一個(gè)index,記錄每個(gè)segment中的第一條消息偏移。
  • 發(fā)布者發(fā)到某topic的消息會(huì)被均勻的分布到多個(gè)partition上(或根據(jù)用戶指定的路由規(guī)則進(jìn)行分布),broker收到發(fā)布消息往對(duì)應(yīng)partition的最后一個(gè)segment上添加該消息,當(dāng)某個(gè)segment上的消息條數(shù)達(dá)到配置值或消息發(fā)布時(shí)間超過閾值時(shí),segment上的消息會(huì)被flush到磁盤,只有flush到磁盤上的消息訂閱者才能訂閱到,segment達(dá)到一定的大小后將不會(huì)再往該segment寫數(shù)據(jù),broker會(huì)創(chuàng)建新的segment。

4 Kafka 刪除策略

  • N天前的刪除。
  • 保留最近的MGB數(shù)據(jù)。

5 Kafka broker

與其它消息系統(tǒng)不同,Kafka broker是無狀態(tài)的。這意味著消費(fèi)者必須維護(hù)已消費(fèi)的狀態(tài)信息。這些信息由消費(fèi)者自己維護(hù),broker完全不管(由offset managerbroker管理)。

從代理刪除消息變得很棘手,因?yàn)榇聿⒉恢老M(fèi)者是否已經(jīng)使用了該消息。Kafka 創(chuàng)新性地解決了這個(gè)問題,它將一個(gè)簡(jiǎn)單的基于時(shí)間的 SLA 應(yīng)用于保留策略。當(dāng)消息在代理中超過一定時(shí)間后,將會(huì)被自動(dòng)刪除。

這種創(chuàng)新設(shè)計(jì)有很大的好處,消費(fèi)者可以故意倒回到老的偏移量再次消費(fèi)數(shù)據(jù)。這違反了隊(duì)列的常見約定,但被證明是許多消費(fèi)者的基本特征。

6 Kafka 官方文檔

Kafka Design

  • 目標(biāo)
    • 高吞吐量來支持高容量的事件流處理
    • 支持從離線系統(tǒng)加載數(shù)據(jù)
    • 低延遲的消息系統(tǒng)
  • 持久化
    • 依賴文件系統(tǒng),持久化到本地
    • 數(shù)據(jù)持久化到log
  • 效率
    • 解決small IO problem:
      • 使用message set組合消息。
      • server使用chunks of messages寫到log
      • consumer一次獲取大的消息塊。
    • 解決byte copying:
      • 在producer、broker和consumer之間使用統(tǒng)一的binary message format
      • 使用系統(tǒng)pagecache
      • 使用sendfile傳輸log,避免拷貝

端到端的批量壓縮(End-to-end Batch Compression),Kafka 支持 GZIP 和 Snappy 壓縮協(xié)議。

The Producer

  • 負(fù)載均衡
    • producer可以自定義發(fā)送到哪個(gè)partition的路由規(guī)則。默認(rèn)路由規(guī)則:hash(key) % numPartitions,如果key為null則隨機(jī)選擇一個(gè)partition。
    • 自定義路由:如果key是一個(gè)user id,可以把同一個(gè)user的消息發(fā)送到同一個(gè)partition,這時(shí)consume就可以從同一個(gè)partition讀取同一個(gè)user的消息。
  • 異步批量發(fā)送
    • 批量發(fā)送:配置不多于固定消息數(shù)目一起發(fā)送并且等待時(shí)間小于一個(gè)固定延遲的數(shù)據(jù)。

The Consumer

consumer控制消息的讀取。

Push vs Pull:

  • producer推(push)數(shù)據(jù)到broker,consumer從broker拉(pull)數(shù)據(jù)
  • consumer拉的優(yōu)點(diǎn):consumer自己控制消息的讀取速度和數(shù)量
  • consumer拉的缺點(diǎn):如果broker沒有數(shù)據(jù),則可能要pull多次忙等待,Kafka 可以配置consumer long pull一直等到有數(shù)據(jù)

Consumer Position:

  • 大部分消息系統(tǒng)由broker記錄哪些消息被消費(fèi)了,但 Kafka 不是
  • Kafka 由consumer控制消息的消費(fèi),consumer甚至可以回到一個(gè)old offset的位置再次消費(fèi)消息

Message Delivery Semantics:

  • 至多一次(At most once ),消息可能丟失,但不會(huì)重復(fù)
  • 至少一次(At least once),消息不會(huì)丟失,但可能重復(fù)
  • 恰好一次(Exactly once),這正是我們想要的,消息僅被發(fā)送一次

Producer:有個(gè)acks配置可以控制接收的leader的在什么情況下就回應(yīng)producer消息寫入成功。

Consumer:

  • 讀取消息,寫log,處理消息。如果處理消息失敗,log已經(jīng)寫入,則無法再次處理失敗的消息,對(duì)應(yīng)At most once。
  • 讀取消息,處理消息,寫log。如果消息處理成功,寫log失敗,則消息會(huì)被處理兩次,對(duì)應(yīng)At least once。
  • 讀取消息,同時(shí)處理消息并把result和log同時(shí)寫入,這樣保證result和log同時(shí)更新或同時(shí)失敗,對(duì)應(yīng)Exactly once。

Kafka 默認(rèn)保證at-least-once delivery,容許用戶實(shí)現(xiàn)at-most-once語義,exactly-once的實(shí)現(xiàn)取決于目的存儲(chǔ)系統(tǒng),Kafka 提供了讀取offset,實(shí)現(xiàn)也沒有問題。

復(fù)制(Replication)

  • 一個(gè)partition的復(fù)制個(gè)數(shù)(replication factor)包括這個(gè)partition的leader本身。
  • 所有對(duì)partition的讀和寫都通過leader。
  • Followers通過pull獲取leader上log(message和offset)
  • 如果一個(gè)follower掛掉、卡住或者同步太慢,leader會(huì)把這個(gè)follower從in sync replicas(ISR)列表中刪除。
  • 當(dāng)所有的in sync replicas的follower把一個(gè)消息寫入到自己的log中時(shí),這個(gè)消息才被認(rèn)為是committed的。
  • 如果針對(duì)某個(gè)partition的所有復(fù)制節(jié)點(diǎn)都掛了,Kafka 選擇最先復(fù)活的那個(gè)節(jié)點(diǎn)作為leader(這個(gè)節(jié)點(diǎn)不一定在ISR里)。

日志壓縮(Log Compaction)

  • 針對(duì)一個(gè)topic的partition,壓縮使得 Kafka 至少知道每個(gè)key對(duì)應(yīng)的最后一個(gè)值。
  • 壓縮不會(huì)重排序消息。
  • 消息的offset是不會(huì)變的。
  • 消息的offset是順序的。

Distribution

  • Consumer Offset Tracking

  • High-level consumer 記錄每個(gè) partition 所消費(fèi)的 maximum offset,并定期 commit 到 offset manager(broker)。

  • Simple consumer 需要手動(dòng)管理 offset。現(xiàn)在的 Simple consumer Java API 只支持 commit offset 到 zookeeper。

  • Consumers and Consumer Groups

  • consumer 注冊(cè)到 zookeeper

  • 屬于同一個(gè) group 的 consumer(group id 一樣)平均分配 partition,每個(gè) partition 只會(huì)被一個(gè) consumer 消費(fèi)。

  • 當(dāng) broker 或同一個(gè) group 的其他 consumer 的狀態(tài)發(fā)生變化的時(shí)候,consumer rebalance 就會(huì)發(fā)生。

Zookeeper 協(xié)調(diào)控制

  • 管理broker與consumer的動(dòng)態(tài)加入與離開。
  • 觸發(fā)負(fù)載均衡,當(dāng)broker或consumer加入或離開時(shí)會(huì)觸發(fā)負(fù)載均衡算法,使得一個(gè) consumer組內(nèi)的多個(gè)consumer的訂閱負(fù)載平衡。
  • 維護(hù)消費(fèi)關(guān)系及每個(gè)partition的消費(fèi)信息。

7 代碼示例

生產(chǎn)者代碼示例:

import java.util.*;import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;public class TestProducer {public static void main(String[] args) {long events = Long.parseLong(args[0]);Random rnd = new Random();Properties props = new Properties();props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("partitioner.class", "example.producer.SimplePartitioner");props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);Producer<String, String> producer = new Producer<String, String>(config);for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); String ip =192.168.2.+ rnd.nextInt(255); String msg = runtime +,www.example.com,+ ip; KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);producer.send(data);}producer.close();} }

Partitioning Code:

import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties;public class SimplePartitioner implements Partitioner {public SimplePartitioner (VerifiableProperties props) {}public int partition(Object key, int a_numPartitions) {int partition = 0;String stringKey = (String) key;int offset = stringKey.lastIndexOf('.');if (offset > 0) {partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;}return partition;} }

消費(fèi)者代碼示例:

import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;public class ConsumerGroupExample {private final ConsumerConnector consumer;private final String topic;private ExecutorService executor;public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));this.topic = a_topic;}public void shutdown() {if (consumer != null) consumer.shutdown();if (executor != null) executor.shutdown();try {if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");}} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly");}}public void run(int a_numThreads) {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(a_numThreads));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);// now launch all the threads//executor = Executors.newFixedThreadPool(a_numThreads);// now create an object to consume the messages//int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new ConsumerTest(stream, threadNumber));threadNumber++;}}private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {Properties props = new Properties();props.put("zookeeper.connect", a_zookeeper);props.put("group.id", a_groupId);props.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);}public static void main(String[] args) {String zooKeeper = args[0];String groupId = args[1];String topic = args[2];int threads = Integer.parseInt(args[3]);ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);example.run(threads);try {Thread.sleep(10000);} catch (InterruptedException ie) {}example.shutdown();} }

ConsumerTest:

import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream;public class ConsumerTest implements Runnable {private KafkaStream m_stream;private int m_threadNumber;public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {m_threadNumber = a_threadNumber;m_stream = a_stream;}public void run() {ConsumerIterator<byte[], byte[]> it = m_stream.iterator();while (it.hasNext())System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));System.out.println("Shutting down Thread: " + m_threadNumber);} }

轉(zhuǎn)載聲明:本文轉(zhuǎn)自博客園「阿凡盧」,Kafka基本原理。

總結(jié)

以上是生活随笔為你收集整理的详述 Kafka 基本原理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。