日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > c/c++ >内容正文

c/c++

Kafka C++客户端库librdkafka笔记

發布時間:2024/9/21 c/c++ 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka C++客户端库librdkafka笔记 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

目錄 1

1.?前言 2

2.?縮略語 2

3.?配置和主題 3

3.1.?配置和主題結構 3

3.1.1.?Conf 3

3.1.2.?ConfImpl 3

3.1.3.?Topic 3

3.1.4.?TopicImpl 3

4.?線程 4

5.?消費者 5

5.1.?消費者結構 5

5.1.1.?Handle 5

5.1.2.?HandleImpl 5

5.1.3.?ConsumeCb 6

5.1.4.?EventCb 6

5.1.5.?Consumer 7

5.1.6.?KafkaConsumer 7

5.1.7.?KafkaConsumerImpl 7

5.1.8.?rd_kafka_message_t 7

5.1.9.?rd_kafka_msg_s 7

5.1.10.?rd_kafka_msgq_t 8

5.1.11.?rd_kafka_toppar_t 8

6.?生產者 10

6.1.?生產者結構 10

6.1.1.?DeliveryReportCb 11

6.1.2.?PartitionerCb 11

6.1.3.?Producer 11

6.1.4.?ProduceImpl 11

6.2.?生產者啟動過程1 11

6.3.?生產者啟動過程2 12

6.4.?生產者生產過程 14

7.?poll過程 15

1.?前言

librdkafka提供的異步的生產接口,異步的消費接口和同步的消息接口,沒有同步的生產接口。

2.?縮略語

縮略語

縮略語全稱

示例或說明

rd

Rapid?Development

rd.h

rk

RdKafka

?

toppar

Topic?Partition

struct?rd_kafka_toppar_t

{

};

rep

Reply,

struct?rd_kafka_t?{

??rd_kafka_q_t?*rk_rep

};

msgq

Message?Queue

struct?rd_kafka_msgq_t?{

};

rkb

RdKafka?Broker

Kafka代理

rko

RdKafka?Operation

Kafka操作

rkm

RdKafka?Message

Kafka消息

payload

?

存在Kafka上的消息(或叫Log)

3.?配置和主題

3.1.?配置和主題結構

?

3.1.1.?Conf

配置接口,配置分兩種:全局的和主題的。

3.1.2.?ConfImpl

配置的實現。

3.1.3.?Topic

主題接口。

3.1.4.?TopicImpl

主題的實現。

4.?線程

RdKafka編程涉及到三類線程:

1)?應用線程,業務代碼的實現

2)?Kafka?Broker線程rd_kafka_broker_thread_main,負責與Broker通訊,多個

3)?Kafka?Handler線程rd_kafka_thread_main,每創建一個consumerproducer即會創建一個Handler線程。

?

5.?消費者

5.1.?消費者結構

?

5.1.1.?Handle

定義了poll等接口,它的實現者為HandleImpl

5.1.2.?HandleImpl

實現了消費者和生產者均使用的poll等,其中poll的作用為:

1)?為生產者回調消息發送結果;

2)?為生產者和消費者回調事件。

class?Handle?{

??/**

???*?@brief?Polls?the?provided?kafka?handle?for?events.

???*

???*?Events?will?trigger?application?provided?callbacks?to?be?called.

???*

???*?The?\p?timeout_ms?argument?specifies?the?maximum?amount?of?time

???*?(in?milliseconds)?that?the?call?will?block?waiting?for?events.

???*?For?non-blocking?calls,?provide?0?as?\p?timeout_ms.

???*?To?wait?indefinately?for?events,?provide?-1.

???*

???*?Events:

???*???-?delivery?report?callbacks?(if?an?RdKafka::DeliveryCb?is?configured)?[producer]

???*???-?event?callbacks?(if?an?RdKafka::EventCb?is?configured)?[producer?&?consumer]

???*

???*?@remark??An?application?should?make?sure?to?call?poll()?at?regular

???*??????????intervals?to?serve?any?queued?callbacks?waiting?to?be?called.

???*

???*?@warning?This?method?MUST?NOT?be?used?with?the?RdKafka::KafkaConsumer,

???*??????????use?its?RdKafka::KafkaConsumer::consume()?instead.

???*

???*?@returns?the?number?of?events?served.

???*/

??virtual?int?poll(int?timeout_ms)?=?0;

};

5.1.3.?ConsumeCb

只針對消費者的Callback

5.1.4.?RebalanceCb

只針對消費者的Callback

5.1.5.?EventCb

消費者和生產者均可設置EventCb,如:_global_conf->set("event_cb",?&_event_cb,?errmsg);

/**

?*?@brief?Event?callback?class

?*

?*?Events?are?a?generic?interface?for?propagating?errors,?statistics,?logs,?etc

?*?from?librdkafka?to?the?application.

?*

?*?@sa?RdKafka::Event

?*/

class?RD_EXPORT?EventCb?{

?public:

??/**

???*?@brief?Event?callback

???*

???*?@sa?RdKafka::Event

???*/

??virtual?void?event_cb?(Event?&event)?=?0;

?

??virtual?~EventCb()?{?}

};

?

/**

?*?@brief?Event?object?class?as?passed?to?the?EventCb?callback.

?*/

class?RD_EXPORT?Event?{

?public:

??/**?@brief?Event?type?*/

??enum?Type?{

????EVENT_ERROR,?????/**<?Event?is?an?error?condition?*/

????EVENT_STATS,?????/**<?Event?is?a?statistics?JSON?document?*/

????EVENT_LOG,???????/**<?Event?is?a?log?message?*/

????EVENT_THROTTLE???/**<?Event?is?a?throttle?level?signaling?from?the?broker?*/

??};

};

5.1.6.?Consumer

簡單消息者,一般不使用,而是使用KafkaConsumer

5.1.7.?KafkaConsumer

消費者和生產者均采用多重繼承方式,其中KafkaConsumer為消費者接口,KafkaConsumerImpl為消費者實現。

5.1.8.?KafkaConsumerImpl

KafkaConsumerImpl為消費者實現。

5.1.9.?rd_kafka_message_t

消息結構。

5.1.10.?rd_kafka_msg_s

消息結構,但消息數據實際存儲在rd_kafka_message_t,結構大致如下:

struct?rd_kafka_msg_s

{

??rd_kafka_message_t?rkm_rkmessage;

??struct

??{

????rd_kafka_msg_s*?tqe_next;

????rd_kafka_msg_s**?tqe_prev;

????int64_t?rkm_timestamp;

????rd_kafka_timestamp_type_t?rkm_tstype;

??}rkm_link;

};

5.1.11.?rd_kafka_msgq_t

存儲消息的消息隊列,生產者生產的消息并不直接socket發送到brokers,而是放入了這個隊列,結構大致如下:

struct?rd_kafka_msgq_t

{

??struct

??{

????rd_kafka_msg_s*?tqh_first;?//?隊首

????rd_kafka_msg_s*?tqh_last;??//?隊尾

??};

??

??//?消息個數

??rd_atomic32_t?rkmq_msg_cnt;

??//?所有消息加起來的字節數

??rd_atomic64_t?rkmq_msg_bytes;

};

5.1.12.?rd_kafka_toppar_t

Topic-Partition隊列,很復雜的一個結構,部分內容如下:

//?Topic?+?Partition?combination

typedef?struct?rd_kafka_toppar_s

{

??struct

??{

????rd_kafka_toppar_s*?tqe_next;

????rd_kafka_toppar_s**?tqe_prev;

??}rktp_rklink;

?

??struct

??{

????rd_kafka_toppar_s*?tqe_next;

????rd_kafka_toppar_s**?tqe_prev;

??}rktp_rkblink;

??

??struct

??{

????rd_kafka_toppar_s*?cqe_next;

????rd_kafka_toppar_s*?cqe_prev;

??}rktp_fetchlink;

??

??struct

??{

????rd_kafka_toppar_s*?tqe_next;

????rd_kafka_toppar_s**?tqe_prev;

??}rktp_rktlink;

??

??struct

??{

????rd_kafka_toppar_s*?tqe_next;

????rd_kafka_toppar_s**?tqe_prev;

??}rktp_cgrplink;

??

??rd_kafka_itopic_t*?rktp_rkt;

??int32_t?rktp_partition;

??int32_t?rktp_leader_id;

??rd_kafka_broker_t*?rktp_leader;

??rd_kafka_broker_t*?rktp_next_leader;

??rd_refcnt_t?rktp_refcnt;

??rd_kafka_msgq_t?rktp_msgq;?//?application->rdkafka?queue

}rd_kafka_toppar_t;

6.?生產者

6.1.?生產者結構

?

6.1.1.?DeliveryReportCb

消息已經成功遞送到Broker時回調,只針對生產者有效。

6.1.2.?PartitionerCb

計算分區號回調函數,只針對生產者有效。

6.1.3.?Producer

Producer為生產者接口,它的實現者為ProducerImpl

6.1.4.?ProduceImpl

ProducerImpl為生產者的實現。

6.2.?生產者啟動過程1

啟動時會創建兩組線程:一組Broker線程(rd_kafka_broker_thread_main,多個),實為與Broker間的網絡IO線程;一組Handler線程(rd_kafka_thread_main,單個),每調用一次RdKafka::Producer::createrd_kafka_new即創建一Handler線程


Handler線程調用棧:

(gdb)?t?17

[Switching?to?thread?17?(Thread?0x7ff7059d3700?(LWP?16765))]

#0??0x00007ff7091e6cf2?in?pthread_cond_timedwait@@GLIBC_2.3.2?()?from?/lib64/libpthread.so.0

(gdb)?bt

#0??0x00007ff7091e6cf2?in?pthread_cond_timedwait@@GLIBC_2.3.2?()?from?/lib64/libpthread.so.0

#1??0x00000000005b4d2f?in?cnd_timedwait_ms?(cnd=0x1517748,?mtx=0x1517720,?timeout_ms=898)?at?tinycthread.c:501

#2??0x0000000000580e16?in?rd_kafka_q_serve?(rkq=0x1517720,?timeout_ms=898,?max_cnt=0,?cb_type=RD_KAFKA_Q_CB_CALLBACK,?callback=0x0,?opaque=0x0)?at?rdkafka_queue.c:440

#3??0x000000000054ee9b?in?rd_kafka_thread_main?(arg=0x1516df0)?at?rdkafka.c:1227

#4??0x00000000005b4e0f?in?_thrd_wrapper_function?(aArg=0x15179d0)?at?tinycthread.c:624

#5??0x00007ff7091e2e25?in?start_thread?()?from?/lib64/libpthread.so.0

#6??0x00007ff7082d135d?in?clone?()?from?/lib64/libc.so.6

6.3.?生產者啟動過程2

創建網絡IO線程,消費者啟動過程類似,只是一個調用rd_kafka_broker_producer_serve(rkb),另一個調用rd_kafka_broker_consumer_serve(rkb)

IO線程負責消息的收和發,發送底層調用的是sendmsg,收調用的是recvmsg(但MSVC平臺調用sendrecv)。

?

6.4.?生產者生產過程

?

生產者生產的消息并不直接socket發送到brokers,而是放入隊列rd_kafka_msgq_t中。Broker線程(rd_kafka_broker_thread_main)消費這個隊列。

Broker線程同時監控與Broker間的網絡連接,又要監控隊列中是否有數據,如何實現的?這個隊列和管道綁定在一起的,綁定的是管道寫端(rktp->rktp_msgq_wakeup_fd?=?rkb->rkb_toppar_wakeup_fd;?rkb->rkb_toppar_wakeup_fd=rkb->rkb_wakeup_fd[1])。

這樣Broker線程即可同時監聽網絡數據和管道數據。

//?int?rd_kafka_msg_partitioner(rd_kafka_itopic_t?*rkt,?rd_kafka_msg_t?*rkm,int?do_lock)

(gdb)?p?*rkm

$7?=?{rkm_rkmessage?=?{err?=?RD_KAFKA_RESP_ERR_NO_ERROR,?rkt?=?0x1590c10,?partition?=?1,?payload?=?0x7f48c4001260,?len?=?203,?key?=?0x7f48c400132b,?key_len?=?14,?offset?=?0,?

????_private?=?0x0},?rkm_link?=?{tqe_next?=?0x5b5d47554245445b,?tqe_prev?=?0x6361667265746e69},?rkm_flags?=?196610,?rkm_timestamp?=?1524829399009,?

??rkm_tstype?=?RD_KAFKA_TIMESTAMP_CREATE_TIME,?rkm_u?=?{producer?=?{ts_timeout?=?16074575505526,?ts_enq?=?16074275505526}}}

(gdb)?p?rkm->rkm_rkmessage

$8?=?{err?=?RD_KAFKA_RESP_ERR_NO_ERROR,?rkt?=?0x1590c10,?partition?=?1,?payload?=?0x7f48c4001260,?len?=?203,?key?=?0x7f48c400132b,?key_len?=?14,?offset?=?0,?_private?=?0x0}

(gdb)?p?rkm->rkm_rkmessage->payload

$9?=?(void?*)?0x7f48c4001260

(gdb)?p?(char*)rkm->rkm_rkmessage->payload

$10?=?0x7f48c4001260?"{\"p\":\"f\",\"o\":1,\"d\":\"m\",\"d\":\"m\",\"i\":\"f2\",\"ip\":\"127.0.0.1\",\"pt\":2018,\"sc\":0,\"fc\":1,\"tc\":0,\"acc\":395,\"mcc\":395,\"cd\":\"test\",\"cmd\":\"tester\",\"cf\":\"main\",\"cp\":\"1.49.16.9"...

7.?poll過程

poll的作用是觸發回調,生產者即使不調用poll,消息也會發送出去,但是如果不通過poll觸發回調,則不能確定消息發送狀態(成功或失敗等)。

消費隊列rd_kafka_t->rk_reprk_rep為響應隊列,類型為rd_kafka_q_trd_kafka_q_s

?

?

?

轉載于:https://www.cnblogs.com/aquester/p/9891483.html

總結

以上是生活随笔為你收集整理的Kafka C++客户端库librdkafka笔记的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。