日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka消费者APi

發布時間:2024/4/14 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka消费者APi 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Kafka客戶端從集群中消費消息,并透明地處理kafka集群中出現故障服務器,透明地調節適應集群中變化的數據分區。也和服務器交互,平衡均衡消費者。

public class KafkaConsumer<K,V> extends Object implements Consumer<K,V>

消費者TCP長連接到broker來拉取消息。故障導致的消費者關閉失敗,將會泄露這些連接,消費者不是線程安全的,可以查看更多關于Multi-threaded(多線程)處理的細節。

跨版本兼容性

該客戶端可以與0.10.0或更新版本的broker集群進行通信。較早的版本可能不支持某些功能。例如,0.10.0broker不支持offsetsForTimes,因為此功能是在版本0.10.1中添加的。 如果你調用broker版本不可用的API時,將報 UnsupportedVersionException 異常。

偏移量和消費者的位置

kafka為分區中的每條消息保存一個偏移量(offset),這個偏移量是該分區中一條消息的唯一標示符。也表示消費者在分區的位置。例如,一個位置是5的消費者(說明已經消費了0到4的消息),下一個接收消息的偏移量為5的消息。實際上有兩個與消費者相關的“位置”概念:

消費者的位置給出了下一條記錄的偏移量。它比消費者在該分區中看到的最大偏移量要大一個。 它在每次消費者在調用poll(long)中接收消息時自動增長。

“已提交”的位置是已安全保存的最后偏移量,如果進程失敗或重新啟動時,消費者將恢復到這個偏移量。消費者可以選擇定期自動提交偏移量,也可以選擇通過調用commit API來手動的控制(如:commitSync 和 commitAsync)。

這個區別是消費者來控制一條消息什么時候才被認為是已被消費的,控制權在消費者,下面我們進一步更詳細地討論。

消費者組和主題訂閱

Kafka的消費者組概念,通過進程池瓜分消息并處理消息。這些進程可以在同一臺機器運行,也可分布到多臺機器上,以增加可擴展性和容錯性,相同group.id的消費者將視為同一個消費者組。

分組中的每個消費者都通過subscribe API動態的訂閱一個topic列表。kafka將已訂閱topic的消息發送到每個消費者組中。并通過平衡分區在消費者分組中所有成員之間來達到平均。因此每個分區恰好地分配1個消費者(一個消費者組中)。所有如果一個topic有4個分區,并且一個消費者分組有只有2個消費者。那么每個消費者將消費2個分區。

消費者組的成員是動態維護的:如果一個消費者故障。分配給它的分區將重新分配給同一個分組中其他的消費者。同樣的,如果一個新的消費者加入到分組,將從現有消費者中移一個給它。這被稱為重新平衡分組,并在下面更詳細地討論。當新分區添加到訂閱的topic時,或者當創建與訂閱的正則表達式匹配的新topic時,也將重新平衡。將通過定時刷新自動發現新的分區,并將其分配給分組的成員。

從概念上講,你可以將消費者分組看作是由多個進程組成的單一邏輯訂閱者。作為一個多訂閱系統,Kafka支持對于給定topic任何數量的消費者組,而不重復。

這是在消息系統中常見的功能的略微概括。所有進程都將是單個消費者分組的一部分(類似傳統消息傳遞系統中的隊列的語義),因此消息傳遞就像隊列一樣,在組中平衡。與傳統的消息系統不同的是,雖然,你可以有多個這樣的組。但每個進程都有自己的消費者組(類似于傳統消息系統中pub-sub的語義),因此每個進程都會訂閱到該主題的所有消息。

此外,當分組重新分配自動發生時,可以通過ConsumerRebalanceListener通知消費者,這允許他們完成必要的應用程序級邏輯,例如狀態清除,手動偏移提交等。有關更多詳細信息,請參閱Kafka存儲的偏移。

它也允許消費者通過使用assign(Collection)手動分配指定分區,如果使用手動指定分配分區,那么動態分區分配和協調消費者組將失效。

發現消費者故障

訂閱一組topic后,當調用poll(long)時,消費者將自動加入到組中。只要持續的調用poll,消費者將一直保持可用,并繼續從分配的分區中接收消息。此外,消費者向服務器定時發送心跳。 如果消費者崩潰或無法在session.timeout.ms配置的時間內發送心跳,則消費者將被視為死亡,并且其分區將被重新分配。

還有一種可能,消費可能遇到“活鎖”的情況,它持續的發送心跳,但是沒有處理。為了預防消費者在這種情況下一直持有分區,我們使用max.poll.interval.ms活躍檢測機制。 在此基礎上,如果你調用的poll的頻率大于最大間隔,則客戶端將主動地離開組,以便其他消費者接管該分區。 發生這種情況時,你會看到offset提交失敗(調用commitSync()引發的CommitFailedException)。這是一種安全機制,保障只有活動成員能夠提交offset。所以要留在組中,你必須持續調用poll。

消費者提供兩個配置設置來控制poll循環:

  • max.poll.interval.ms:增大poll的間隔,可以為消費者提供更多的時間去處理返回的消息(調用poll(long)返回的消息,通常返回的消息都是一批)。缺點是此值越大將會延遲組重新平衡。

  • max.poll.records:此設置限制每次調用poll返回的消息數,這樣可以更容易的預測每次poll間隔要處理的最大值。通過調整此值,可以減少poll間隔,減少重新平衡分組的

  • 對于消息處理時間不可預測地的情況,這些選項是不夠的。 處理這種情況的推薦方法是將消息處理移到另一個線程中,讓消費者繼續調用poll。 但是必須注意確保已提交的offset不超過實際位置。另外,你必須禁用自動提交,并只有在線程完成處理后才為記錄手動提交偏移量(取決于你)。 還要注意,你需要pause暫停分區,不會從poll接收到新消息,讓線程處理完之前返回的消息(如果你的處理能力比拉取消息的慢,那創建新線程將導致你機器內存溢出)。

    示例

    這個消費者API提供了靈活性,以涵蓋各種消費場景,下面是一些例子來演示如何使用它們。

    自動提交偏移量

    這是個【自動提交偏移量】的簡單的kafka消費者API。

    Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }

    設置enable.auto.commit,偏移量由auto.commit.interval.ms控制自動提交的頻率。

    集群是通過配置bootstrap.servers指定一個或多個broker。不用指定全部的broker,它將自動發現集群中的其余的borker(最好指定多個,萬一有服務器故障)。

    在這個例子中,客戶端訂閱了主題foo和bar。消費者組叫test。

    broker通過心跳機器自動檢測test組中失敗的進程,消費者會自動ping集群,告訴進群它還活著。只要消費者能夠做到這一點,它就被認為是活著的,并保留分配給它分區的權利,如果它停止心跳的時間超過session.timeout.ms,那么就會認為是故障的,它的分區將被分配到別的進程。

    這個deserializer設置如何把byte轉成object類型,例子中,通過指定string解析器,我們告訴獲取到的消息的key和value只是簡單個string類型。

    手動控制偏移量

    不需要定時的提交offset,可以自己控制offset,當消息認為已消費過了,這個時候再去提交它們的偏移量。這個很有用的,當消費的消息結合了一些處理邏輯,這個消息就不應該認為是已經消費的,直到它完成了整個處理。

    Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } }

    在這個例子中,我們將消費一批消息并將它們存儲在內存中。當我們積累足夠多的消息后,我們再將它們批量插入到數據庫中。如果我們設置offset自動提交(之前說的例子),消費將被認為是已消費的。這樣會出現問題,我們的進程可能在批處理記錄之后,但在它們被插入到數據庫之前失敗了。

    為了避免這種情況,我們將在相應的記錄插入數據庫之后再手動提交偏移量。這樣我們可以準確控制消息是成功消費的。提出一個相反的可能性:在插入數據庫之后,但是在提交之前,這個過程可能會失敗(即使這可能只是幾毫秒,這是一種可能性)。在這種情況下,進程將獲取到已提交的偏移量,并會重復插入的最后一批數據。這種方式就是所謂的“至少一次”保證,在故障情況下,可以重復。

    如果您無法執行這些操作,可能會使已提交的偏移超過消耗的位置,從而導致缺少記錄。 使用手動偏移控制的優點是,您可以直接控制記錄何時被視為“已消耗”。

    注意:使用自動提交也可以“至少一次”。但是要求你必須下次調用poll(long)之前或關閉消費者之前,處理完所有返回的數據。如果操作失敗,這將會導致已提交的offset超過消費的位置,從而導致丟失消息。使用手動控制offset的有點是,你可以直接控制消息何時提交。、

    上面的例子使用commitSync表示所有收到的消息為”已提交",在某些情況下,你可以希望更精細的控制,通過指定一個明確消息的偏移量為“已提交”。在下面,我們的例子中,我們處理完每個分區中的消息后,提交偏移量。

    try {while(running) {ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); }

    注意:已提交的offset應始終是你的程序將讀取的下一條消息的offset。因此,調用commitSync(offsets)時,你應該加1個到最后處理的消息的offset。

    訂閱指定的分區

    在前面的例子中,我們訂閱我們感興趣的topic,讓kafka提供給我們平分后的topic分區。但是,在有些情況下,你可能需要自己來控制分配指定分區,例如:

    • 如果這個消費者進程與該分區保存了某種本地狀態(如本地磁盤的鍵值存儲),則它應該只能獲取這個分區的消息。

    • 如果消費者進程本身具有高可用性,并且如果它失敗,會自動重新啟動(可能使用集群管理框架如YARN,Mesos,或者AWS設施,或作為一個流處理框架的一部分)。 在這種情況下,不需要Kafka檢測故障,重新分配分區,因為消費者進程將在另一臺機器上重新啟動。

    要使用此模式,,你只需調用assign(Collection)消費指定的分區即可:

    String topic = "foo";TopicPartition partition0 = new TopicPartition(topic, 0);TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));

    一旦手動分配分區,你可以在循環中調用poll(跟前面的例子一樣)。消費者分組仍需要提交offset,只是現在分區的設置只能通過調用assign修改,因為手動分配不會進行分組協調,因此消費者故障不會引發分區重新平衡。每一個消費者是獨立工作的(即使和其他的消費者共享GroupId)。為了避免offset提交沖突,通常你需要確認每一個consumer實例的gorupId都是唯一的。

    注意,手動分配分區(即,assgin)和動態分區分配的訂閱topic模式(即,subcribe)不能混合使用。

    offset存儲在其他地方

    消費者可以不使用kafka內置的offset倉庫。可以選擇自己來存儲offset。要注意的是,將消費的offset和結果存儲在同一個的系統中,用原子的方式存儲結果和offset,但這不能保證原子,要想消費是完全原子的,并提供的“正好一次”的消費保證比kafka默認的“至少一次”的語義要更高。你需要使用kafka的offset提交功能。

    這有結合的例子。

    • 如果消費的結果存儲在關系數據庫中,存儲在數據庫的offset,讓提交結果和offset在單個事務中。這樣,事物成功,則offset存儲和更新。如果offset沒有存儲,那么偏移量也不會被更新。

    • 如果offset和消費結果存儲在本地倉庫。例如,可以通過訂閱一個指定的分區并將offset和索引數據一起存儲來構建一個搜索索引。如果這是以原子的方式做的,常見的可能是,即使崩潰引起未同步的數據丟失。索引程序從它確保沒有更新丟失的地方恢復,而僅僅丟失最近更新的消息。

    每個消息都有自己的offset,所以要管理自己的偏移,你只需要做到以下幾點:

    • 配置 enable.auto.commit=false

    • 使用提供的 ConsumerRecord 來保存你的位置。

    • 在重啟時用 seek(TopicPartition, long) 恢復消費者的位置。

    當分區分配也是手動完成的(像上文搜索索引的情況),這種類型的使用是最簡單的。 如果分區分配是自動完成的,需要特別小心處理分區分配變更的情況。可以通過調用subscribe(Collection,ConsumerRebalanceListener)和subscribe(Pattern,ConsumerRebalanceListener)中提供的ConsumerRebalanceListener實例來完成的。例如,當分區向消費者獲取時,消費者將通過實現ConsumerRebalanceListener.onPartitionsRevoked(Collection)來給這些分區提交它們offset。當分區分配給消費者時,消費者通過ConsumerRebalanceListener.onPartitionsAssigned(Collection)為新的分區正確地將消費者初始化到該位置。

    ConsumerRebalanceListener的另一個常見用法是清除應用已移動到其他位置的分區的緩存。

    控制消費的位置

    大多數情況下,消費者只是簡單的從頭到尾的消費消息,周期性的提交位置(自動或手動)。kafka也支持消費者去手動的控制消費的位置,可以消費之前的消息也可以跳過最近的消息。

    有幾種情況,手動控制消費者的位置可能是有用的。

    一種場景是對于時間敏感的消費者處理程序,對足夠落后的消費者,直接跳過,從最近的消費開始消費。

    另一個使用場景是本地狀態存儲系統(上一節說的)。在這樣的系統中,消費者將要在啟動時初始化它的位置(無論本地存儲是否包含)。同樣,如果本地狀態已被破壞(假設因為磁盤丟失),則可以通過重新消費所有數據并重新創建狀態(假設kafka保留了足夠的歷史)在新的機器上重新創建。

    kafka使用seek(TopicPartition, long)指定新的消費位置。用于查找服務器保留的最早和最新的offset的特殊的方法也可用(seekToBeginning(Collection) 和 seekToEnd(Collection))。

    消費者流量控制

    如果消費者分配了多個分區,并同時消費所有的分區,這些分區具有相同的優先級。在一些情況下,消費者需要首先消費一些指定的分區,當指定的分區有少量或者已經沒有可消費的數據時,則開始消費其他分區。

    例如流處理,當處理器從2個topic獲取消息并把這兩個topic的消息合并,當其中一個topic長時間落后另一個,則暫停消費,以便落后的趕上來。

    kafka支持動態控制消費流量,分別在future的poll(long)中使用pause(Collection) 和 resume(Collection) 來暫停消費指定分配的分區,重新開始消費指定暫停的分區。

    多線程處理

    Kafka消費者不是線程安全的。所有網絡I/O都發生在進行調用應用程序的線程中。用戶的責任是確保多線程訪問正確同步的。非同步訪問將導致ConcurrentModificationException。

    此規則唯一的例外是wakeup(),它可以安全地從外部線程來中斷活動操作。在這種情況下,將從操作的線程阻塞并拋出一個WakeupException。這可用于從其他線程來關閉消費者。 以下代碼段顯示了典型模式:

    public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(10000); // Handle new records } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } }

    在單獨的線程中,可以通過設置關閉標志和喚醒消費者來關閉消費者。

    closed.set(true);consumer.wakeup();

    我們沒有多線程模型的例子。但留下幾個操作可用來實現多線程處理消息。

  • 每個線程一個消費者

    每個線程自己的消費者實例。這里是這種方法的優點和缺點:

    • PRO: 這是最容易實現的
    • PRO: 因為它不需要在線程之間協調,所以通常它是最快的。
    • PRO: 它按順序處理每個分區(每個線程只處理它接受的消息)。
    • CON: 更多的消費者意味著更多的TCP連接到集群(每個線程一個)。一般kafka處理連接非常的快,所以這是一個小成本。
    • CON: 更多的消費者意味著更多的請求被發送到服務器,但稍微較少的數據批次可能導致I/O吞吐量的一些下降。
    • CON: 所有進程中的線程總數受到分區總數的限制。
  • 解耦消費和處理

    另一個替代方式是一個或多個消費者線程,它來消費所有數據,其消費所有數據并將ConsumerRecords實例切換到由實際處理記錄處理的處理器線程池來消費的阻塞隊列。這個選項同樣有利弊:

    • PRO: 可擴展消費者和處理進程的數量。這樣單個消費者的數據可分給多個處理器線程來執行,避免對分區的任何限制。
    • CON: 跨多個處理器的順序保證需要特別注意,因為線程是獨立的執行,后來的消息可能比遭到的消息先處理,這僅僅是因為線程執行的運氣。如果對排序沒有問題,這就不是個問題。
    • CON: 手動提交變得更困難,因為它需要協調所有的線程以確保處理對該分區的處理完成。
  • 這種方法有多種玩法,例如,每個處理線程可以有自己的隊列,消費者線程可以使用TopicPartitionhash到這些隊列中,以確保按順序消費,并且提交也將簡化。



    作者:半獸人
    鏈接:http://orchome.com/451
    來源:OrcHome
    著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。

    轉載于:https://www.cnblogs.com/Llh-Forerer2015/p/9668060.html

    總結

    以上是生活随笔為你收集整理的Kafka消费者APi的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。