日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

kafka的c/c++高性能客户端librdkafka简介

發布時間:2023/12/15 综合教程 31 生活家
生活随笔 收集整理的這篇文章主要介紹了 kafka的c/c++高性能客户端librdkafka简介 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Librdkafka是c語言實現的apachekafka的高性能客戶端,為生產和使用kafka提供高效可靠的客戶端,并且提供了c++接口

性能:

Librdkafka 是一款專為現代硬件使用而設計的高性能庫,它嘗試將內存復制保持在最小,可以讓用戶決定是需要高吞吐量還是低延遲的服務,性能調優的兩個最重要的配置是:

*batch.num.messages:在發送消息之前累積在本地隊列中等待的消息的最小數量。

*queue.buffering.max.ms:等待batch.num.messages多長時間來填寫到本地隊列中。

使用:

源碼中的rdkafka.h、CONFIGURATION.md有Librdkafka的API的說明

初始化:

應用程序需要實例化一個頂級對象rd_kafka_t作為基礎容器,提供全局配置和共享狀態,調用rd_kafka_new()創建。

還需要實例化一個或多個topics(`rd_kafka_topic_t`)來提供生產或消費,topic對象保存topic特定的配置,并在內部填充所有可用分區和leader brokers,通過調用`rd_kafka_topic_new()`創建。

`rd_kafka_t``rd_kafka_topic_t`自帶一個可選的配置API,如果沒有調用API,Librdkafka將會使用CONFIGURATION.md中的默認配置。

注意

1.應用程序可能會創建多個`rd_kafka_t`對象,并且它們不共享任何狀態

2.一個`rd_kafka_topic_t`對象僅可以用于創建它的`rd_kafka_t`對象

配置

為了簡化與Apache Kafka官方軟件的集成,降低學習曲線,librdkafka實現了與Apache Kafka官方客戶端相同的配置屬性。

使用`rd_kafka_conf_set()``rd_kafka_topic_conf_set()`在創建對象之前應用配置。

注意:

`rd_kafka.._conf_t`對象在傳遞給rd_kafka.._new()`之后不可重復使用,調用`rd_kafka.._new()`后,應用程序不需要free任何配置資源。

例子

[cpp]view plaincopy

rd_kafka_conf_t*conf;
charerrstr[512];

conf=rd_kafka_conf_new();
rd_kafka_conf_set(conf,"compression.codec","snappy",errstr,sizeof(errstr));
rd_kafka_conf_set(conf,"batch.num.messages","100",errstr,sizeof(errstr));

rd_kafka_new(RD_KAFKA_PRODUCER,conf);

線程和回調函數

librdkafka內部使用多個線程來充分利用硬件資源.

API是線程安全的,應用程序可以在任意時間調用其線程內的任意api函數.

poll-based的API用于向應用程序提供信號,應用程序定期調用` rd_kafka_poll() `,poll API將會調用如下的API:

*消息傳遞報告回調函數:消息傳遞成功或失敗的信號,允許應用程序釋放消息中使用的任何應用程序資源。

*錯誤回調函數:發出錯誤信號,這些錯誤通常具有信息性質,例如連接broker失敗,應用程序通常不需要做任何處理,錯誤的類型通過` rd_kafka_resp_err_t `枚舉值傳遞,包括遠程的broke錯誤和本地錯誤。

可選回調不是通過poll觸發的,可以通過任意線程調用:

*Logging callback :允許應用程序輸出librdkafka生成的日志消息

*partitioner callback:應用提供的消息分區器,可在任意時刻、任意線程中調用,對于相同的鍵,可以調用多次

Brokers

Librdkafka需要至少一個brokers的初始化list,稱作` bootstrap brokers `,通過"metadata.broker.list"配置屬性或`rd_kafka_brokers_add()`來指定,用來連接所有bootstrapbrokers,并查詢每個元數據的信息,其中包含brokers、topic、partitions和它們在kafka cluster中的leaders的完整列表,

Brokers的名字被指定為"host[:port]",端口可選(默認9092),host是主機名或ip地址,如果主機解析到多個地址,librdkafka將輪詢每個嘗試連接的地址,因此,可以使用包含所有brokers地址的DNS記錄來提供可靠的bootstrap broker。

Producer API

使用`RD_KAFKA_PRODUCER`設置了`rd_kafka_t`對象,并設置了一個或多個`rd_kafka_topic_t`對象后,librdkafka已經準備好接收要發送給brokers的消息。

`rd_kafka_produce()`函數有如下參數:

*`rkt` -需要produce的topic,之前通過`rd_kafka_topic_new()`函數創

*`partition` -生產到的partition,如果設置為`RD_KAFKA_PARTITION_UA`(UnAssigned),那么配置的分區函數將會用來選擇目標分區。

*`msgflags`- 0,或者是:

*`RD_KAFKA_MSG_F_COPY`- librdkafka會立刻生成payload的一份拷貝,當payload在非持久化內存中(例如堆)時使用。

* `RD_KAFKA_MSG_F_FREE`- librdkafka使用完payload后,會使用`free(3)`將其釋放。

這兩個指標是互斥的,如果既不需要copy也不需要free,那么這兩個指標都不需要設置。

如果`RD_KAFKA_MSG_F_COPY`沒有設置,將不會執行數據的復制,librdkafka將會hold住payload的指針直到消息成功傳輸或傳輸失敗。

當librdkafka完成消息的傳遞,使應用程序重新獲得payload內存的所有權后,傳遞報告回調函數將會被調用

如果設置了`RD_KAFKA_MSG_F_FREE`,傳遞報告回調函數不能對payload進行free

*`payload`,`len` -消息的payload

*`key`,`keylen` -可以用來進行消息分區的消息鍵

它將被傳遞到topic分區回調函數(如果存在的話),并在發送給broker的時候附加在消息上

*`msg_opaque`- 應用程序提供的一個可選的每條消息的不透明指針,在消息回調函數中提供,讓應用程序引用一個特定的指針。

`rd_kafka_produce()`是一個非阻塞API,它會在內部隊列中排列消息并立即返回。如果已排列的消息個數超過了"queue.buffering.max.messages"配置項,`rd_kafka_produce()`返回-1并將errno設置為`ENOBUFS`,從而提供了一種背壓機制

Simple Consumer API

NOTE: 對于高級KafkaConsumer接口,查看rd_kafka_subscribe(rdkafka.h) 或者 KafkaConsumer (rdkafkacpp.h)。

使用`RD_KAFKA_CONSUMER`和`rd_kafka_topic_t`實例創建`rd_kafka_t`后,應用程序還必須通過調用`rd_kafka_consume_start()`來為給定的分區啟動consumer。

`rd_kafka_consume_start()` 參數:

* `rkt` -需要消費的topic,之前通過`rd_kafka_topic_new()`創建。

*`partition` -從哪個分區消費

*`offset` -開始消費的消息offset,這可能是絕對消息偏移或兩個特殊偏移之一:

`RD_KAFKA_OFFSET_BEGINNING`:從partition隊列的起始位置開始消費(最老的message)

`RD_KAFKA_OFFSET_END`:在下一個要生產到該partition上的消息處開始消費

`RD_KAFKA_OFFSET_STORED`:使用存儲的offset

一個topic+partition的consumer啟動后,librdkafka將會嘗試通過反復從broker獲取批次消息以保持本地隊列中保存"queued.min.messages"條消息,然后這個本地消息隊列將會通過三個不同的consume API傳遞給應用程序:

*`rd_kafka_consume()`- consume單條消息

*`rd_kafka_consume_batch()`- consume單條或多條消息

*`rd_kafka_consume_callback()`- consume本地隊列中的所有消息,并給每條消息調用一個回調函數

這三個API按照性能升序排列,`rd_kafka_consume()`最慢,`rd_kafka_consume_callback()`最快。

使用`rd_kafka_message_t`類型標識一條已消費的消息,其成員為:

*`err` -發回到應用程序的錯誤信號,如果不為0,那么`payload`成員將被認為是一條錯誤消息,`err`是錯誤碼(`rd_kafka_resp_err_t`),如果為0,`payload`則包含消息數據。

*`rkt`,`partition`- 該消息的topic和partition

*`payload`,`len`- payload消息,或者是錯誤信息(err!=0)

*`key`,`key_len`- 生產者指定的可選消息key

*`offset`- Message offset

`payload`和`key`以及整個消息的內存,屬于librdkafka,調用`rd_kafka_message_destroy()`后不可再次使用,librdkafka將為該消息集的所有消息payloads共享相同的消息集接收緩沖存儲器,以避免過度復制,這意味著如果應用程序決定hang on單個rd_kafka_message_t,它將阻止從相同消息集中釋放所有其他消息的備份內存。

當應用程序完成從topic+partition的消息消費后,需要調用`rd_kafka_consume_stop()`來停止這個consumer,這也將清除本地隊列中的當前的消息。

Offset management

broker version >= 0.9.0結合使用高版本的KafkaConsumer接口,可實現基于Broker的offset管理(查看rdkafka.h或 rdkafkacpp.h)

還可以通過本地文件存儲來實現Offset管理,通過如下的topic配置參數,offset被永久寫在本地文件中:

* `auto.commit.enable`

* `auto.commit.interval.ms`

* `offset.store.path`

* `offset.store.sync.interval.ms`

目前還沒有對ZooKeeper的偏移量管理的支持。

Consumer groups

當kafka broker 版本>= 0.9 ,librdkafka支持基于broker的consumer groups

Topics

Librdkafka支持自動創建topic,broker需要配置"auto.create.topics.enable=true"

總結

以上是生活随笔為你收集整理的kafka的c/c++高性能客户端librdkafka简介的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。