日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka生产者详解

發布時間:2025/3/20 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka生产者详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一.消息發送

1.java客戶端數據生產流程解析

?

① 首先要構造一個 ProducerRecord 對象,該對象可以聲明主題Topic、分區Partition、鍵 Key以及值 Value,主題和值是必須要聲明的,分區和鍵可以不用指定。

② 調用send() 方法進行消息發送。

③ 因為消息要到網絡上進行傳輸,所以必須進行序列化,序列化器的作用就是把消息的 key 和value對象序列化成字節數組后,生產者就知道該往哪個主題和分區發送記錄了。

④ 接著這條記錄會被添加到一個記錄批次里面,這個批次里所有的消息會被發送到相同的主題和分區。會有一個獨立的線程來把這些記錄批次發送到相應的 Broker 上。

⑤ Broker成功接收到消息,表示發送成功,返回消息的元數據(包括主題和分區信息以及記錄在分區里的偏移量)。發送失敗,可以選擇重試或者直接拋出異常。

依賴的maven包

? ? ? ?<kafka.version>2.0.0</kafka.version><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_${scala.version}</artifactId><version>${kafka.version}</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion></exclusions></dependency>

2.必要參數配置

? ?private static final String brokerList = "192.168.37.129:9092";private static final String topic = "test"; ?public static Properties initConfig() {Properties props = new Properties();// 該屬性指定 brokers 的地址清單,格式為 host:port。清單里不需要包含所有的 broker 地址,// 生產者會從給定的 broker 里查找到其它 broker 的信息?!ㄗh至少提供兩個 broker 的信息,因為一旦其中一個宕機,生產者仍然能夠連接到集群上。props.put("bootstrap.servers", brokerList);// 將 key 轉換為字節數組的配置,必須設定為一個實現了 org.apache.kafka.common.serialization.Serializer 接口的類,// 生產者會用這個類把鍵對象序列化為字節數組。// ——kafka 默認提供了 StringSerializer和 IntegerSerializer、ByteArraySerializer。當然也可以自定義序列化器。props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 和 key.serializer 一樣,用于 value 的序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 用來設定KafkaProducer對應的客戶端ID,默認為空,如果不設置KafkaProducer會自動生成一個非空字符串。// 內容形式如:"producer-1"props.put("client.id", "producer.client.id.demo");return props;} ?public static void main(String[] args) throws InterruptedException {Properties props = initConfig();KafkaProducer<String, String> producer = new KafkaProducer<>(props);ProducerRecord<String, String> record = new ProducerRecord<>(topic, ?"hello, Kafka!");try {// 1、發送消息producer.send(record);} catch (Exception e) {e.printStackTrace();}producer.close();}

3.發送類型

發送即忘記

producer.send(record);

同步發送

//通過send()發送完消息后返回一個Future對象,然后調用Future對象的get()方法等待kafka響應 //如果kafka正常響應,返回一個RecordMetadata對象,該對象存儲消息的偏移量 // 如果kafka發生錯誤,無法正常響應,就會拋出異常,我們便可以進行異常處理 producer.send(record).get();

異步發送

? ? ? ? ? ?producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println(metadata.partition() + ":" + metadata.offset());}}});

4.序列化器

消息要到網絡上進行傳輸,必須進行序列化,而序列化器的作用就是如此。

Kafka 提供了默認的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer),還有整型(IntegerSerializer)和字節數組(BytesSerializer)序列化器,這些序列化器都實現了接口 (org.apache.kafka.common.serialization.Serializer)基本上能夠滿足大部分場景的需求。

5.自定義序列化器

/*** 自定義序列化器*/ public class CompanySerializer implements Serializer<Company> {@Overridepublic void configure(Map configs, boolean isKey) {} ?@Overridepublic byte[] serialize(String topic, Company data) {if (data == null) {return null;}byte[] name, address;try {if (data.getName() != null) {name = data.getName().getBytes("UTF-8");} else {name = new byte[0];}if (data.getAddress() != null) {address = data.getAddress().getBytes("UTF-8");} else {address = new byte[0];}ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length);buffer.putInt(name.length);buffer.put(name);buffer.putInt(address.length);buffer.put(address);return buffer.array();} catch (UnsupportedEncodingException e) {e.printStackTrace();}return new byte[0];} ?@Overridepublic void close() {} }

使用自定義的序列化器

public class ProducerDefineSerializer {public static final String brokerList = "192.168.37.129:9092";public static final String topic = "test"; ?public static void main(String[] args)throws ExecutionException, InterruptedException {Properties properties = new Properties();properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,CompanySerializer.class.getName());properties.put("bootstrap.servers", brokerList); ?KafkaProducer<String, Company> producer =new KafkaProducer<>(properties);Company company = Company.builder().name("kafka").address("北京").build();ProducerRecord<String, Company> record =new ProducerRecord<>(topic, company);producer.send(record).get();} }

6.分區器

本身kafka有自己的分區策略的,如果未指定,就會使用默認的分區策略:

Kafka根據傳遞消息的key來進行分區的分配,即hash(key) % numPartitions。如果Key相同的話,那么就會分配到統一分區。

源碼如下:

?

我們可以自定義一個分區器

/*** 自定義分區器*/ public class DefinePartitioner implements Partitioner {private final AtomicInteger counter = new AtomicInteger(0); ?@Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (null == keyBytes) {return counter.getAndIncrement() % numPartitions;} else {return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}} ?@Overridepublic void close() {} ?@Overridepublic void configure(Map<String, ?> configs) {} }

實現自定義分區器需要通過配置參數ProducerConfig.PARTITIONER_CLASS_CONFIG來實現

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,DefinePartitioner.class.getName());

7.攔截器

Producer攔截器(interceptor)是個相當新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于實現clients端的定制化控制邏輯。

生產者攔截器可以用在消息發送前做一些準備工作。

使用場景

1)按照某個規則過濾掉不符合要求的消息

2)修改消息的內容

3)統計類需求

自定義一個攔截器

/*** 自定義攔截器*/ public class ProducerInterceptorPrefix implementsProducerInterceptor<String, String> {private volatile long sendSuccess = 0;private volatile long sendFailure = 0; ?@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {String modifiedValue = "prefix1-" + record.value();return new ProducerRecord<>(record.topic(),record.partition(), record.timestamp(),record.key(), modifiedValue, record.headers()); // ? ? ? if (record.value().length() < 5) { // ? ? ? ? ? throw new RuntimeException(); // ? ? ? } // ? ? ? return record;} ?@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata,Exception e) {if (e == null) {sendSuccess++;} else {sendFailure++;}} ?@Overridepublic void close() {double successRatio = (double) sendSuccess / (sendFailure + sendSuccess);System.out.println("[INFO] 發送成功率="+ String.format("%f", successRatio * 100) + "%");} ?@Overridepublic void configure(Map<String, ?> map) {} }

實現自定義攔截器之后需要在配置參數中指定這個攔截器,此參數的默認值為空,如下:

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptorPrefix.class.getName());

功能演示:

生產者:

?

消費者:

?

二.其它生產者參數

之前提及的默認三個客戶端參數,大部分參數都有合理的默認值,一般情況下不需要修改它們, 參考官網:http://kafka.apache.org/documentation/#producerconfigs

1.acks

這個參數用來指定分區中必須有多少個副本收到這條消息,之后生產者才會認為這條消息時寫入成功的。acks是生產者客戶端中非常重要的一個參數,它涉及到消息的可靠性和吞吐量之間的權衡。

  • ack=0, 生產者在成功寫入消息之前不會等待任何來自服務器的相應。如果出現問題生產者是感知不到的,消息就丟失了。不過因為生產者不需要等待服務器響應,所以它可以以網絡能夠支持的最大速度發送消息,從而達到很高的吞吐量。
  • ack=1,默認值為1,只要集群的首領節點收到消息,生產這就會收到一個來自服務器的成功響應。如果消息無法達到首領節點(比如首領節點崩潰,新的首領還沒有被選舉出來),生產者會收到一個錯誤響應,為了避免數據丟失,生產者會重發消息。但是,這樣還有可能會導致數據丟失,如果收到寫成功通知,此時首領節點還沒來的及同步數據到follower節點,首領節點崩潰,就會導致數據丟失。
  • ack=-1, 只有當所有參與復制的節點都收到消息時,生產者才會收到一個來自服務器的成功響應,這種模式是最安全的,它可以保證不止一個服務器收到消息,就算有服務器發生崩潰,整個集群仍然可以運行。

注意:acks參數配置的是一個字符串類型,而不是整數類型,如果配置為整數類型會拋出以下異常

props.put(ProducerConfig.ACKS_CONFIG,"1");

2.retries

生產者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區找不到首領)。在這種情況下,如果達到了 retires 設置的次數,生產者會放棄重試并返回錯誤。默認情況下,生產者會在每次重試之間等待100ms,可以通過 retry.backoff.ms 參數來修改這個時間間隔。

3.batch.size

當有多個消息要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算,而不是消息個數。當批次被填滿,批次里的所有消息會被發送出去。不過生產者并不一定都會等到批次被填滿才發送,半滿的批次,甚至只包含一個消息的批次也可能被發送。所以就算把 batch.size 設置的很大,也不會造成延遲,只會占用更多的內存而已,如果設置的太小,生產者會因為頻繁發送消息而增加一些額外的開銷。

4.max.request.size

該參數用于控制生產者發送的請求大小,它可以指定能發送的單個消息的最大值,也可以指單個請求里所有消息的總大小。 broker 對可接收的消息最大值也有自己的限制( message.max.size ),所以兩邊的配置最好匹配,避免生產者發送的消息被 broker 拒絕。

5.buffer.memory

該參數用來設置生產者內存緩沖區的大小,生產者用它緩沖要發送到服務器的消息。如果應用程序發送消息的速度超過發送到服務器的速度,會導致生產者空間不足。這個時候,send()方法調用要么被阻塞,要么拋出異常。取決于如何設置 block.on.buffer.full 參數。

6.linger.ms

該參數指定了生產者在發送批次之前等待更多消息加入批次的時間。KafkaProduce會在批次填滿或linger.ms達到上限時把批次發送出去。默認情況下,只要有用的線程,生產者就會把消息發送出去,就算批次里只有一個消息。把linger.ms設置成比0大的數,讓生產者在發送批次之前等待一會兒,使更多的消息加入到這個批次。雖然這樣會增加延遲,但會提升吞吐量(因為一次性發送更多的消息,每個消息的開銷就變小了)

7.max.in.flight.requests.per.connection

該參數指定了生產者在 到服務器晌應之前可以發送多少個消息。它的值越高,就會占用越多的內存,不過也會提升吞吐量。 它設為1可以保證消息是按照發送的順序寫入服務器的,即使發生了重試。

思考:如何保證消息的有序性

Kafka 可以保證同一個分區里的消息是有序的。也就是說,如果生產者按照一定的順序發送消息, broker 就會按照這個順序把它們寫入分區,消費者也會按照同樣的順序讀取它們。在某些情況下,順序是非常重要的。例如,往一個賬戶存入 100元再取出來,這個與先取錢再存錢是截然不同的!不過,有些場景對順序不是很敏感。

如果把retries設為非零整數,同時把max.in.flight.requests.per.connection設為比1大的數,那么,如果第一個批次消息寫入失敗,而第二個批次寫入成功, broker 會重試寫入第一個批次。如果此時第一個批次也寫入成功,那么兩個批次的順序就反過來了。

一般來說,如果某些場景要求消息是有序的,那么消息是否寫入成功很關鍵的,所以不建議把retries設為0??梢园裮ax.in.flight.requests.per.connection設為1,這樣在生產者嘗試發送第一批消息時,就不會有其他的消息發送給 broker 。不過這樣會嚴重影響生產者的吞吐量 ,所以只有在對消息的順序有嚴格要求的情況下才能這么做。

總結

以上是生活随笔為你收集整理的Kafka生产者详解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: av成人免费 | 成年人91视频 | 日韩一级性生活片 | 假日游船 | 欧美性生话 | 色伊人影院 | 日韩福利影院 | 秋霞影院一区二区 | 亚洲无码精品国产 | 亚洲性免费 | 最好看的mv中文字幕国语电影 | 99久久99久久精品免费看蜜桃 | 日韩av在线看免费观看 | 健身教练巨大粗爽gay视频 | 久久精品专区 | 天堂在线观看 | 国产精品一区二区三区四 | 黄色av电影在线 | 久草99| 操操久久 | 成年人视频在线免费观看 | 久久6视频 | 国产伦精品一区二区三区高清 | 美女在线一区 | 国产亚洲系列 | 久久久久久天堂 | 亚洲成人77777 | 日韩天堂av | 可以看的av网址 | 人人射人人 | av在线手机版 | 欧美亚洲国产另类 | 亚洲精品乱码久久久久久自慰 | 91视频福利| 99久久精品国产一区二区成人 | 伊人色综合久久久 | 欧美在线资源 | 性生活一级大片 | 亚洲色图图| 婷婷干| 青草草在线 | 最新国产露脸在线观看 | 国产一级自拍视频 | 大地资源影视在线播放观看高清视频 | 草莓巧克力香氛动漫的观看方法 | www在线看片 | 日本精品一区二区三区视频 | 精品黑人一区二区三区在线观看 | 色综合区| 精品久久久久久久久久久久久久久久久久 | 国产经典毛片 | 在线香蕉| 亚洲 欧美 国产 另类 | 男女床上拍拍拍 | 亚洲欧美黄色片 | 亚洲国产综合视频 | 亚洲国产精品免费 | 成人av免费看 | 日韩精品亚洲一区 | 精品人妻无码一区二区色欲产成人 | 久久久亚洲精品无码 | いいなり北条麻妃av101 | 性一交一乱一区二区洋洋av | av一片 | 久久精品国产99国产 | 高清一区二区三区视频 | 欧美精品欧美精品系列 | 成人黄色视屏 | 日本毛片在线 | av地址在线| 日韩和的一区二区 | 秋霞影院一区二区 | 香蕉视频性 | 色淫湿视频| 蜜桃视频导航 | 91丨九色丨黑人外教 | 少妇又紧又爽视频 | 在线观看天堂av | 青青草免费看 | 不卡视频免费在线观看 | 青青草精品视频 | 天天噜日日噜 | 爱乃なみ加勒比在线播放 | 国产欧美一区二区三区沐欲 | 一级片播放| 牛牛视频在线观看 | 日韩人妻一区 | 自拍偷拍一区 | 亚洲五码av | 色一情一乱一伦 | 东京热毛片 | 黄色永久视频 | 欧美日韩大片 | 国产高清视频免费观看 | 黄色网址在线免费 | 500部大龄熟乱视频 亚洲乱码精品 | 中文字幕一区二区三区四区欧美 | 中文天堂| 久久密桃 |