Kafka消费者APi
Kafka客戶端從集群中消費消息,并透明地處理kafka集群中出現故障服務器,透明地調節適應集群中變化的數據分區。也和服務器交互,平衡均衡消費者。
public class KafkaConsumer<K,V> extends Object implements Consumer<K,V>消費者TCP長連接到broker來拉取消息。故障導致的消費者關閉失敗,將會泄露這些連接,消費者不是線程安全的,可以查看更多關于Multi-threaded(多線程)處理的細節。
跨版本兼容性
該客戶端可以與0.10.0或更新版本的broker集群進行通信。較早的版本可能不支持某些功能。例如,0.10.0broker不支持offsetsForTimes,因為此功能是在版本0.10.1中添加的。 如果你調用broker版本不可用的API時,將報 UnsupportedVersionException 異常。
偏移量和消費者的位置
kafka為分區中的每條消息保存一個偏移量(offset),這個偏移量是該分區中一條消息的唯一標示符。也表示消費者在分區的位置。例如,一個位置是5的消費者(說明已經消費了0到4的消息),下一個接收消息的偏移量為5的消息。實際上有兩個與消費者相關的“位置”概念:
消費者的位置給出了下一條記錄的偏移量。它比消費者在該分區中看到的最大偏移量要大一個。 它在每次消費者在調用poll(long)中接收消息時自動增長。
“已提交”的位置是已安全保存的最后偏移量,如果進程失敗或重新啟動時,消費者將恢復到這個偏移量。消費者可以選擇定期自動提交偏移量,也可以選擇通過調用commit API來手動的控制(如:commitSync 和 commitAsync)。
這個區別是消費者來控制一條消息什么時候才被認為是已被消費的,控制權在消費者,下面我們進一步更詳細地討論。
消費者組和主題訂閱
Kafka的消費者組概念,通過進程池瓜分消息并處理消息。這些進程可以在同一臺機器運行,也可分布到多臺機器上,以增加可擴展性和容錯性,相同group.id的消費者將視為同一個消費者組。
分組中的每個消費者都通過subscribe API動態的訂閱一個topic列表。kafka將已訂閱topic的消息發送到每個消費者組中。并通過平衡分區在消費者分組中所有成員之間來達到平均。因此每個分區恰好地分配1個消費者(一個消費者組中)。所有如果一個topic有4個分區,并且一個消費者分組有只有2個消費者。那么每個消費者將消費2個分區。
消費者組的成員是動態維護的:如果一個消費者故障。分配給它的分區將重新分配給同一個分組中其他的消費者。同樣的,如果一個新的消費者加入到分組,將從現有消費者中移一個給它。這被稱為重新平衡分組,并在下面更詳細地討論。當新分區添加到訂閱的topic時,或者當創建與訂閱的正則表達式匹配的新topic時,也將重新平衡。將通過定時刷新自動發現新的分區,并將其分配給分組的成員。
從概念上講,你可以將消費者分組看作是由多個進程組成的單一邏輯訂閱者。作為一個多訂閱系統,Kafka支持對于給定topic任何數量的消費者組,而不重復。
這是在消息系統中常見的功能的略微概括。所有進程都將是單個消費者分組的一部分(類似傳統消息傳遞系統中的隊列的語義),因此消息傳遞就像隊列一樣,在組中平衡。與傳統的消息系統不同的是,雖然,你可以有多個這樣的組。但每個進程都有自己的消費者組(類似于傳統消息系統中pub-sub的語義),因此每個進程都會訂閱到該主題的所有消息。
此外,當分組重新分配自動發生時,可以通過ConsumerRebalanceListener通知消費者,這允許他們完成必要的應用程序級邏輯,例如狀態清除,手動偏移提交等。有關更多詳細信息,請參閱Kafka存儲的偏移。
它也允許消費者通過使用assign(Collection)手動分配指定分區,如果使用手動指定分配分區,那么動態分區分配和協調消費者組將失效。
發現消費者故障
訂閱一組topic后,當調用poll(long)時,消費者將自動加入到組中。只要持續的調用poll,消費者將一直保持可用,并繼續從分配的分區中接收消息。此外,消費者向服務器定時發送心跳。 如果消費者崩潰或無法在session.timeout.ms配置的時間內發送心跳,則消費者將被視為死亡,并且其分區將被重新分配。
還有一種可能,消費可能遇到“活鎖”的情況,它持續的發送心跳,但是沒有處理。為了預防消費者在這種情況下一直持有分區,我們使用max.poll.interval.ms活躍檢測機制。 在此基礎上,如果你調用的poll的頻率大于最大間隔,則客戶端將主動地離開組,以便其他消費者接管該分區。 發生這種情況時,你會看到offset提交失敗(調用commitSync()引發的CommitFailedException)。這是一種安全機制,保障只有活動成員能夠提交offset。所以要留在組中,你必須持續調用poll。
消費者提供兩個配置設置來控制poll循環:
max.poll.interval.ms:增大poll的間隔,可以為消費者提供更多的時間去處理返回的消息(調用poll(long)返回的消息,通常返回的消息都是一批)。缺點是此值越大將會延遲組重新平衡。
max.poll.records:此設置限制每次調用poll返回的消息數,這樣可以更容易的預測每次poll間隔要處理的最大值。通過調整此值,可以減少poll間隔,減少重新平衡分組的
對于消息處理時間不可預測地的情況,這些選項是不夠的。 處理這種情況的推薦方法是將消息處理移到另一個線程中,讓消費者繼續調用poll。 但是必須注意確保已提交的offset不超過實際位置。另外,你必須禁用自動提交,并只有在線程完成處理后才為記錄手動提交偏移量(取決于你)。 還要注意,你需要pause暫停分區,不會從poll接收到新消息,讓線程處理完之前返回的消息(如果你的處理能力比拉取消息的慢,那創建新線程將導致你機器內存溢出)。
示例
這個消費者API提供了靈活性,以涵蓋各種消費場景,下面是一些例子來演示如何使用它們。
自動提交偏移量
這是個【自動提交偏移量】的簡單的kafka消費者API。
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }設置enable.auto.commit,偏移量由auto.commit.interval.ms控制自動提交的頻率。
集群是通過配置bootstrap.servers指定一個或多個broker。不用指定全部的broker,它將自動發現集群中的其余的borker(最好指定多個,萬一有服務器故障)。
在這個例子中,客戶端訂閱了主題foo和bar。消費者組叫test。
broker通過心跳機器自動檢測test組中失敗的進程,消費者會自動ping集群,告訴進群它還活著。只要消費者能夠做到這一點,它就被認為是活著的,并保留分配給它分區的權利,如果它停止心跳的時間超過session.timeout.ms,那么就會認為是故障的,它的分區將被分配到別的進程。
這個deserializer設置如何把byte轉成object類型,例子中,通過指定string解析器,我們告訴獲取到的消息的key和value只是簡單個string類型。
手動控制偏移量
不需要定時的提交offset,可以自己控制offset,當消息認為已消費過了,這個時候再去提交它們的偏移量。這個很有用的,當消費的消息結合了一些處理邏輯,這個消息就不應該認為是已經消費的,直到它完成了整個處理。
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } }在這個例子中,我們將消費一批消息并將它們存儲在內存中。當我們積累足夠多的消息后,我們再將它們批量插入到數據庫中。如果我們設置offset自動提交(之前說的例子),消費將被認為是已消費的。這樣會出現問題,我們的進程可能在批處理記錄之后,但在它們被插入到數據庫之前失敗了。
為了避免這種情況,我們將在相應的記錄插入數據庫之后再手動提交偏移量。這樣我們可以準確控制消息是成功消費的。提出一個相反的可能性:在插入數據庫之后,但是在提交之前,這個過程可能會失敗(即使這可能只是幾毫秒,這是一種可能性)。在這種情況下,進程將獲取到已提交的偏移量,并會重復插入的最后一批數據。這種方式就是所謂的“至少一次”保證,在故障情況下,可以重復。
如果您無法執行這些操作,可能會使已提交的偏移超過消耗的位置,從而導致缺少記錄。 使用手動偏移控制的優點是,您可以直接控制記錄何時被視為“已消耗”。
注意:使用自動提交也可以“至少一次”。但是要求你必須下次調用poll(long)之前或關閉消費者之前,處理完所有返回的數據。如果操作失敗,這將會導致已提交的offset超過消費的位置,從而導致丟失消息。使用手動控制offset的有點是,你可以直接控制消息何時提交。、
上面的例子使用commitSync表示所有收到的消息為”已提交",在某些情況下,你可以希望更精細的控制,通過指定一個明確消息的偏移量為“已提交”。在下面,我們的例子中,我們處理完每個分區中的消息后,提交偏移量。
try {while(running) {ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); }注意:已提交的offset應始終是你的程序將讀取的下一條消息的offset。因此,調用commitSync(offsets)時,你應該加1個到最后處理的消息的offset。
訂閱指定的分區
在前面的例子中,我們訂閱我們感興趣的topic,讓kafka提供給我們平分后的topic分區。但是,在有些情況下,你可能需要自己來控制分配指定分區,例如:
-
如果這個消費者進程與該分區保存了某種本地狀態(如本地磁盤的鍵值存儲),則它應該只能獲取這個分區的消息。
-
如果消費者進程本身具有高可用性,并且如果它失敗,會自動重新啟動(可能使用集群管理框架如YARN,Mesos,或者AWS設施,或作為一個流處理框架的一部分)。 在這種情況下,不需要Kafka檢測故障,重新分配分區,因為消費者進程將在另一臺機器上重新啟動。
要使用此模式,,你只需調用assign(Collection)消費指定的分區即可:
String topic = "foo";TopicPartition partition0 = new TopicPartition(topic, 0);TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));一旦手動分配分區,你可以在循環中調用poll(跟前面的例子一樣)。消費者分組仍需要提交offset,只是現在分區的設置只能通過調用assign修改,因為手動分配不會進行分組協調,因此消費者故障不會引發分區重新平衡。每一個消費者是獨立工作的(即使和其他的消費者共享GroupId)。為了避免offset提交沖突,通常你需要確認每一個consumer實例的gorupId都是唯一的。
注意,手動分配分區(即,assgin)和動態分區分配的訂閱topic模式(即,subcribe)不能混合使用。
offset存儲在其他地方
消費者可以不使用kafka內置的offset倉庫。可以選擇自己來存儲offset。要注意的是,將消費的offset和結果存儲在同一個的系統中,用原子的方式存儲結果和offset,但這不能保證原子,要想消費是完全原子的,并提供的“正好一次”的消費保證比kafka默認的“至少一次”的語義要更高。你需要使用kafka的offset提交功能。
這有結合的例子。
-
如果消費的結果存儲在關系數據庫中,存儲在數據庫的offset,讓提交結果和offset在單個事務中。這樣,事物成功,則offset存儲和更新。如果offset沒有存儲,那么偏移量也不會被更新。
-
如果offset和消費結果存儲在本地倉庫。例如,可以通過訂閱一個指定的分區并將offset和索引數據一起存儲來構建一個搜索索引。如果這是以原子的方式做的,常見的可能是,即使崩潰引起未同步的數據丟失。索引程序從它確保沒有更新丟失的地方恢復,而僅僅丟失最近更新的消息。
每個消息都有自己的offset,所以要管理自己的偏移,你只需要做到以下幾點:
-
配置 enable.auto.commit=false
-
使用提供的 ConsumerRecord 來保存你的位置。
-
在重啟時用 seek(TopicPartition, long) 恢復消費者的位置。
當分區分配也是手動完成的(像上文搜索索引的情況),這種類型的使用是最簡單的。 如果分區分配是自動完成的,需要特別小心處理分區分配變更的情況。可以通過調用subscribe(Collection,ConsumerRebalanceListener)和subscribe(Pattern,ConsumerRebalanceListener)中提供的ConsumerRebalanceListener實例來完成的。例如,當分區向消費者獲取時,消費者將通過實現ConsumerRebalanceListener.onPartitionsRevoked(Collection)來給這些分區提交它們offset。當分區分配給消費者時,消費者通過ConsumerRebalanceListener.onPartitionsAssigned(Collection)為新的分區正確地將消費者初始化到該位置。
ConsumerRebalanceListener的另一個常見用法是清除應用已移動到其他位置的分區的緩存。
控制消費的位置
大多數情況下,消費者只是簡單的從頭到尾的消費消息,周期性的提交位置(自動或手動)。kafka也支持消費者去手動的控制消費的位置,可以消費之前的消息也可以跳過最近的消息。
有幾種情況,手動控制消費者的位置可能是有用的。
一種場景是對于時間敏感的消費者處理程序,對足夠落后的消費者,直接跳過,從最近的消費開始消費。
另一個使用場景是本地狀態存儲系統(上一節說的)。在這樣的系統中,消費者將要在啟動時初始化它的位置(無論本地存儲是否包含)。同樣,如果本地狀態已被破壞(假設因為磁盤丟失),則可以通過重新消費所有數據并重新創建狀態(假設kafka保留了足夠的歷史)在新的機器上重新創建。
kafka使用seek(TopicPartition, long)指定新的消費位置。用于查找服務器保留的最早和最新的offset的特殊的方法也可用(seekToBeginning(Collection) 和 seekToEnd(Collection))。
消費者流量控制
如果消費者分配了多個分區,并同時消費所有的分區,這些分區具有相同的優先級。在一些情況下,消費者需要首先消費一些指定的分區,當指定的分區有少量或者已經沒有可消費的數據時,則開始消費其他分區。
例如流處理,當處理器從2個topic獲取消息并把這兩個topic的消息合并,當其中一個topic長時間落后另一個,則暫停消費,以便落后的趕上來。
kafka支持動態控制消費流量,分別在future的poll(long)中使用pause(Collection) 和 resume(Collection) 來暫停消費指定分配的分區,重新開始消費指定暫停的分區。
多線程處理
Kafka消費者不是線程安全的。所有網絡I/O都發生在進行調用應用程序的線程中。用戶的責任是確保多線程訪問正確同步的。非同步訪問將導致ConcurrentModificationException。
此規則唯一的例外是wakeup(),它可以安全地從外部線程來中斷活動操作。在這種情況下,將從操作的線程阻塞并拋出一個WakeupException。這可用于從其他線程來關閉消費者。 以下代碼段顯示了典型模式:
public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(10000); // Handle new records } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } }在單獨的線程中,可以通過設置關閉標志和喚醒消費者來關閉消費者。
closed.set(true);consumer.wakeup();我們沒有多線程模型的例子。但留下幾個操作可用來實現多線程處理消息。
每個線程一個消費者
每個線程自己的消費者實例。這里是這種方法的優點和缺點:
- PRO: 這是最容易實現的
- PRO: 因為它不需要在線程之間協調,所以通常它是最快的。
- PRO: 它按順序處理每個分區(每個線程只處理它接受的消息)。
- CON: 更多的消費者意味著更多的TCP連接到集群(每個線程一個)。一般kafka處理連接非常的快,所以這是一個小成本。
- CON: 更多的消費者意味著更多的請求被發送到服務器,但稍微較少的數據批次可能導致I/O吞吐量的一些下降。
- CON: 所有進程中的線程總數受到分區總數的限制。
解耦消費和處理
另一個替代方式是一個或多個消費者線程,它來消費所有數據,其消費所有數據并將ConsumerRecords實例切換到由實際處理記錄處理的處理器線程池來消費的阻塞隊列。這個選項同樣有利弊:
- PRO: 可擴展消費者和處理進程的數量。這樣單個消費者的數據可分給多個處理器線程來執行,避免對分區的任何限制。
- CON: 跨多個處理器的順序保證需要特別注意,因為線程是獨立的執行,后來的消息可能比遭到的消息先處理,這僅僅是因為線程執行的運氣。如果對排序沒有問題,這就不是個問題。
- CON: 手動提交變得更困難,因為它需要協調所有的線程以確保處理對該分區的處理完成。
這種方法有多種玩法,例如,每個處理線程可以有自己的隊列,消費者線程可以使用TopicPartitionhash到這些隊列中,以確保按順序消費,并且提交也將簡化。
作者:半獸人
鏈接:http://orchome.com/451
來源:OrcHome
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
轉載于:https://www.cnblogs.com/Llh-Forerer2015/p/9668060.html
總結
以上是生活随笔為你收集整理的Kafka消费者APi的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 浅析Block的内部结构 , 及分析其是
- 下一篇: 对比let、const、var的异同