Kafka生产者详解
一.消息發送
1.java客戶端數據生產流程解析
?
① 首先要構造一個 ProducerRecord 對象,該對象可以聲明主題Topic、分區Partition、鍵 Key以及值 Value,主題和值是必須要聲明的,分區和鍵可以不用指定。
② 調用send() 方法進行消息發送。
③ 因為消息要到網絡上進行傳輸,所以必須進行序列化,序列化器的作用就是把消息的 key 和value對象序列化成字節數組后,生產者就知道該往哪個主題和分區發送記錄了。
④ 接著這條記錄會被添加到一個記錄批次里面,這個批次里所有的消息會被發送到相同的主題和分區。會有一個獨立的線程來把這些記錄批次發送到相應的 Broker 上。
⑤ Broker成功接收到消息,表示發送成功,返回消息的元數據(包括主題和分區信息以及記錄在分區里的偏移量)。發送失敗,可以選擇重試或者直接拋出異常。
依賴的maven包
? ? ? ?<kafka.version>2.0.0</kafka.version><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_${scala.version}</artifactId><version>${kafka.version}</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion></exclusions></dependency>2.必要參數配置
? ?private static final String brokerList = "192.168.37.129:9092";private static final String topic = "test"; ?public static Properties initConfig() {Properties props = new Properties();// 該屬性指定 brokers 的地址清單,格式為 host:port。清單里不需要包含所有的 broker 地址,// 生產者會從給定的 broker 里查找到其它 broker 的信息?!ㄗh至少提供兩個 broker 的信息,因為一旦其中一個宕機,生產者仍然能夠連接到集群上。props.put("bootstrap.servers", brokerList);// 將 key 轉換為字節數組的配置,必須設定為一個實現了 org.apache.kafka.common.serialization.Serializer 接口的類,// 生產者會用這個類把鍵對象序列化為字節數組。// ——kafka 默認提供了 StringSerializer和 IntegerSerializer、ByteArraySerializer。當然也可以自定義序列化器。props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 和 key.serializer 一樣,用于 value 的序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 用來設定KafkaProducer對應的客戶端ID,默認為空,如果不設置KafkaProducer會自動生成一個非空字符串。// 內容形式如:"producer-1"props.put("client.id", "producer.client.id.demo");return props;} ?public static void main(String[] args) throws InterruptedException {Properties props = initConfig();KafkaProducer<String, String> producer = new KafkaProducer<>(props);ProducerRecord<String, String> record = new ProducerRecord<>(topic, ?"hello, Kafka!");try {// 1、發送消息producer.send(record);} catch (Exception e) {e.printStackTrace();}producer.close();}3.發送類型
發送即忘記
producer.send(record);同步發送
//通過send()發送完消息后返回一個Future對象,然后調用Future對象的get()方法等待kafka響應 //如果kafka正常響應,返回一個RecordMetadata對象,該對象存儲消息的偏移量 // 如果kafka發生錯誤,無法正常響應,就會拋出異常,我們便可以進行異常處理 producer.send(record).get();異步發送
? ? ? ? ? ?producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println(metadata.partition() + ":" + metadata.offset());}}});4.序列化器
消息要到網絡上進行傳輸,必須進行序列化,而序列化器的作用就是如此。
Kafka 提供了默認的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer),還有整型(IntegerSerializer)和字節數組(BytesSerializer)序列化器,這些序列化器都實現了接口 (org.apache.kafka.common.serialization.Serializer)基本上能夠滿足大部分場景的需求。
5.自定義序列化器
/*** 自定義序列化器*/ public class CompanySerializer implements Serializer<Company> {@Overridepublic void configure(Map configs, boolean isKey) {} ?@Overridepublic byte[] serialize(String topic, Company data) {if (data == null) {return null;}byte[] name, address;try {if (data.getName() != null) {name = data.getName().getBytes("UTF-8");} else {name = new byte[0];}if (data.getAddress() != null) {address = data.getAddress().getBytes("UTF-8");} else {address = new byte[0];}ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length);buffer.putInt(name.length);buffer.put(name);buffer.putInt(address.length);buffer.put(address);return buffer.array();} catch (UnsupportedEncodingException e) {e.printStackTrace();}return new byte[0];} ?@Overridepublic void close() {} }使用自定義的序列化器
public class ProducerDefineSerializer {public static final String brokerList = "192.168.37.129:9092";public static final String topic = "test"; ?public static void main(String[] args)throws ExecutionException, InterruptedException {Properties properties = new Properties();properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,CompanySerializer.class.getName());properties.put("bootstrap.servers", brokerList); ?KafkaProducer<String, Company> producer =new KafkaProducer<>(properties);Company company = Company.builder().name("kafka").address("北京").build();ProducerRecord<String, Company> record =new ProducerRecord<>(topic, company);producer.send(record).get();} }6.分區器
本身kafka有自己的分區策略的,如果未指定,就會使用默認的分區策略:
Kafka根據傳遞消息的key來進行分區的分配,即hash(key) % numPartitions。如果Key相同的話,那么就會分配到統一分區。
源碼如下:
?
我們可以自定義一個分區器
/*** 自定義分區器*/ public class DefinePartitioner implements Partitioner {private final AtomicInteger counter = new AtomicInteger(0); ?@Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (null == keyBytes) {return counter.getAndIncrement() % numPartitions;} else {return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}} ?@Overridepublic void close() {} ?@Overridepublic void configure(Map<String, ?> configs) {} }實現自定義分區器需要通過配置參數ProducerConfig.PARTITIONER_CLASS_CONFIG來實現
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,DefinePartitioner.class.getName());7.攔截器
Producer攔截器(interceptor)是個相當新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于實現clients端的定制化控制邏輯。
生產者攔截器可以用在消息發送前做一些準備工作。
使用場景
1)按照某個規則過濾掉不符合要求的消息
2)修改消息的內容
3)統計類需求
自定義一個攔截器
/*** 自定義攔截器*/ public class ProducerInterceptorPrefix implementsProducerInterceptor<String, String> {private volatile long sendSuccess = 0;private volatile long sendFailure = 0; ?@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {String modifiedValue = "prefix1-" + record.value();return new ProducerRecord<>(record.topic(),record.partition(), record.timestamp(),record.key(), modifiedValue, record.headers()); // ? ? ? if (record.value().length() < 5) { // ? ? ? ? ? throw new RuntimeException(); // ? ? ? } // ? ? ? return record;} ?@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata,Exception e) {if (e == null) {sendSuccess++;} else {sendFailure++;}} ?@Overridepublic void close() {double successRatio = (double) sendSuccess / (sendFailure + sendSuccess);System.out.println("[INFO] 發送成功率="+ String.format("%f", successRatio * 100) + "%");} ?@Overridepublic void configure(Map<String, ?> map) {} }實現自定義攔截器之后需要在配置參數中指定這個攔截器,此參數的默認值為空,如下:
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptorPrefix.class.getName());功能演示:
生產者:
?
消費者:
?
二.其它生產者參數
之前提及的默認三個客戶端參數,大部分參數都有合理的默認值,一般情況下不需要修改它們, 參考官網:http://kafka.apache.org/documentation/#producerconfigs
1.acks
這個參數用來指定分區中必須有多少個副本收到這條消息,之后生產者才會認為這條消息時寫入成功的。acks是生產者客戶端中非常重要的一個參數,它涉及到消息的可靠性和吞吐量之間的權衡。
- ack=0, 生產者在成功寫入消息之前不會等待任何來自服務器的相應。如果出現問題生產者是感知不到的,消息就丟失了。不過因為生產者不需要等待服務器響應,所以它可以以網絡能夠支持的最大速度發送消息,從而達到很高的吞吐量。
- ack=1,默認值為1,只要集群的首領節點收到消息,生產這就會收到一個來自服務器的成功響應。如果消息無法達到首領節點(比如首領節點崩潰,新的首領還沒有被選舉出來),生產者會收到一個錯誤響應,為了避免數據丟失,生產者會重發消息。但是,這樣還有可能會導致數據丟失,如果收到寫成功通知,此時首領節點還沒來的及同步數據到follower節點,首領節點崩潰,就會導致數據丟失。
- ack=-1, 只有當所有參與復制的節點都收到消息時,生產者才會收到一個來自服務器的成功響應,這種模式是最安全的,它可以保證不止一個服務器收到消息,就算有服務器發生崩潰,整個集群仍然可以運行。
注意:acks參數配置的是一個字符串類型,而不是整數類型,如果配置為整數類型會拋出以下異常
props.put(ProducerConfig.ACKS_CONFIG,"1");2.retries
生產者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區找不到首領)。在這種情況下,如果達到了 retires 設置的次數,生產者會放棄重試并返回錯誤。默認情況下,生產者會在每次重試之間等待100ms,可以通過 retry.backoff.ms 參數來修改這個時間間隔。
3.batch.size
當有多個消息要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算,而不是消息個數。當批次被填滿,批次里的所有消息會被發送出去。不過生產者并不一定都會等到批次被填滿才發送,半滿的批次,甚至只包含一個消息的批次也可能被發送。所以就算把 batch.size 設置的很大,也不會造成延遲,只會占用更多的內存而已,如果設置的太小,生產者會因為頻繁發送消息而增加一些額外的開銷。
4.max.request.size
該參數用于控制生產者發送的請求大小,它可以指定能發送的單個消息的最大值,也可以指單個請求里所有消息的總大小。 broker 對可接收的消息最大值也有自己的限制( message.max.size ),所以兩邊的配置最好匹配,避免生產者發送的消息被 broker 拒絕。
5.buffer.memory
該參數用來設置生產者內存緩沖區的大小,生產者用它緩沖要發送到服務器的消息。如果應用程序發送消息的速度超過發送到服務器的速度,會導致生產者空間不足。這個時候,send()方法調用要么被阻塞,要么拋出異常。取決于如何設置 block.on.buffer.full 參數。
6.linger.ms
該參數指定了生產者在發送批次之前等待更多消息加入批次的時間。KafkaProduce會在批次填滿或linger.ms達到上限時把批次發送出去。默認情況下,只要有用的線程,生產者就會把消息發送出去,就算批次里只有一個消息。把linger.ms設置成比0大的數,讓生產者在發送批次之前等待一會兒,使更多的消息加入到這個批次。雖然這樣會增加延遲,但會提升吞吐量(因為一次性發送更多的消息,每個消息的開銷就變小了)
7.max.in.flight.requests.per.connection
該參數指定了生產者在 到服務器晌應之前可以發送多少個消息。它的值越高,就會占用越多的內存,不過也會提升吞吐量。 它設為1可以保證消息是按照發送的順序寫入服務器的,即使發生了重試。
思考:如何保證消息的有序性
Kafka 可以保證同一個分區里的消息是有序的。也就是說,如果生產者按照一定的順序發送消息, broker 就會按照這個順序把它們寫入分區,消費者也會按照同樣的順序讀取它們。在某些情況下,順序是非常重要的。例如,往一個賬戶存入 100元再取出來,這個與先取錢再存錢是截然不同的!不過,有些場景對順序不是很敏感。
如果把retries設為非零整數,同時把max.in.flight.requests.per.connection設為比1大的數,那么,如果第一個批次消息寫入失敗,而第二個批次寫入成功, broker 會重試寫入第一個批次。如果此時第一個批次也寫入成功,那么兩個批次的順序就反過來了。
一般來說,如果某些場景要求消息是有序的,那么消息是否寫入成功很關鍵的,所以不建議把retries設為0??梢园裮ax.in.flight.requests.per.connection設為1,這樣在生產者嘗試發送第一批消息時,就不會有其他的消息發送給 broker 。不過這樣會嚴重影響生產者的吞吐量 ,所以只有在對消息的順序有嚴格要求的情況下才能這么做。
總結
以上是生活随笔為你收集整理的Kafka生产者详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka的基本介绍和在linux的安装
- 下一篇: Kafka消费者详解