Kafka生产者详解
一.消息發(fā)送
1.java客戶端數(shù)據(jù)生產(chǎn)流程解析
?
① 首先要構(gòu)造一個(gè) ProducerRecord 對(duì)象,該對(duì)象可以聲明主題Topic、分區(qū)Partition、鍵 Key以及值 Value,主題和值是必須要聲明的,分區(qū)和鍵可以不用指定。
② 調(diào)用send() 方法進(jìn)行消息發(fā)送。
③ 因?yàn)橄⒁骄W(wǎng)絡(luò)上進(jìn)行傳輸,所以必須進(jìn)行序列化,序列化器的作用就是把消息的 key 和value對(duì)象序列化成字節(jié)數(shù)組后,生產(chǎn)者就知道該往哪個(gè)主題和分區(qū)發(fā)送記錄了。
④ 接著這條記錄會(huì)被添加到一個(gè)記錄批次里面,這個(gè)批次里所有的消息會(huì)被發(fā)送到相同的主題和分區(qū)。會(huì)有一個(gè)獨(dú)立的線程來(lái)把這些記錄批次發(fā)送到相應(yīng)的 Broker 上。
⑤ Broker成功接收到消息,表示發(fā)送成功,返回消息的元數(shù)據(jù)(包括主題和分區(qū)信息以及記錄在分區(qū)里的偏移量)。發(fā)送失敗,可以選擇重試或者直接拋出異常。
依賴的maven包
? ? ? ?<kafka.version>2.0.0</kafka.version><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_${scala.version}</artifactId><version>${kafka.version}</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion></exclusions></dependency>2.必要參數(shù)配置
? ?private static final String brokerList = "192.168.37.129:9092";private static final String topic = "test"; ?public static Properties initConfig() {Properties props = new Properties();// 該屬性指定 brokers 的地址清單,格式為 host:port。清單里不需要包含所有的 broker 地址,// 生產(chǎn)者會(huì)從給定的 broker 里查找到其它 broker 的信息。——建議至少提供兩個(gè) broker 的信息,因?yàn)橐坏┢渲幸粋€(gè)宕機(jī),生產(chǎn)者仍然能夠連接到集群上。props.put("bootstrap.servers", brokerList);// 將 key 轉(zhuǎn)換為字節(jié)數(shù)組的配置,必須設(shè)定為一個(gè)實(shí)現(xiàn)了 org.apache.kafka.common.serialization.Serializer 接口的類,// 生產(chǎn)者會(huì)用這個(gè)類把鍵對(duì)象序列化為字節(jié)數(shù)組。// ——kafka 默認(rèn)提供了 StringSerializer和 IntegerSerializer、ByteArraySerializer。當(dāng)然也可以自定義序列化器。props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 和 key.serializer 一樣,用于 value 的序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 用來(lái)設(shè)定KafkaProducer對(duì)應(yīng)的客戶端ID,默認(rèn)為空,如果不設(shè)置KafkaProducer會(huì)自動(dòng)生成一個(gè)非空字符串。// 內(nèi)容形式如:"producer-1"props.put("client.id", "producer.client.id.demo");return props;} ?public static void main(String[] args) throws InterruptedException {Properties props = initConfig();KafkaProducer<String, String> producer = new KafkaProducer<>(props);ProducerRecord<String, String> record = new ProducerRecord<>(topic, ?"hello, Kafka!");try {// 1、發(fā)送消息producer.send(record);} catch (Exception e) {e.printStackTrace();}producer.close();}3.發(fā)送類型
發(fā)送即忘記
producer.send(record);同步發(fā)送
//通過(guò)send()發(fā)送完消息后返回一個(gè)Future對(duì)象,然后調(diào)用Future對(duì)象的get()方法等待kafka響應(yīng) //如果kafka正常響應(yīng),返回一個(gè)RecordMetadata對(duì)象,該對(duì)象存儲(chǔ)消息的偏移量 // 如果kafka發(fā)生錯(cuò)誤,無(wú)法正常響應(yīng),就會(huì)拋出異常,我們便可以進(jìn)行異常處理 producer.send(record).get();異步發(fā)送
? ? ? ? ? ?producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println(metadata.partition() + ":" + metadata.offset());}}});4.序列化器
消息要到網(wǎng)絡(luò)上進(jìn)行傳輸,必須進(jìn)行序列化,而序列化器的作用就是如此。
Kafka 提供了默認(rèn)的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer),還有整型(IntegerSerializer)和字節(jié)數(shù)組(BytesSerializer)序列化器,這些序列化器都實(shí)現(xiàn)了接口 (org.apache.kafka.common.serialization.Serializer)基本上能夠滿足大部分場(chǎng)景的需求。
5.自定義序列化器
/*** 自定義序列化器*/ public class CompanySerializer implements Serializer<Company> {@Overridepublic void configure(Map configs, boolean isKey) {} ?@Overridepublic byte[] serialize(String topic, Company data) {if (data == null) {return null;}byte[] name, address;try {if (data.getName() != null) {name = data.getName().getBytes("UTF-8");} else {name = new byte[0];}if (data.getAddress() != null) {address = data.getAddress().getBytes("UTF-8");} else {address = new byte[0];}ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length);buffer.putInt(name.length);buffer.put(name);buffer.putInt(address.length);buffer.put(address);return buffer.array();} catch (UnsupportedEncodingException e) {e.printStackTrace();}return new byte[0];} ?@Overridepublic void close() {} }使用自定義的序列化器
public class ProducerDefineSerializer {public static final String brokerList = "192.168.37.129:9092";public static final String topic = "test"; ?public static void main(String[] args)throws ExecutionException, InterruptedException {Properties properties = new Properties();properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,CompanySerializer.class.getName());properties.put("bootstrap.servers", brokerList); ?KafkaProducer<String, Company> producer =new KafkaProducer<>(properties);Company company = Company.builder().name("kafka").address("北京").build();ProducerRecord<String, Company> record =new ProducerRecord<>(topic, company);producer.send(record).get();} }6.分區(qū)器
本身kafka有自己的分區(qū)策略的,如果未指定,就會(huì)使用默認(rèn)的分區(qū)策略:
Kafka根據(jù)傳遞消息的key來(lái)進(jìn)行分區(qū)的分配,即hash(key) % numPartitions。如果Key相同的話,那么就會(huì)分配到統(tǒng)一分區(qū)。
源碼如下:
?
我們可以自定義一個(gè)分區(qū)器
/*** 自定義分區(qū)器*/ public class DefinePartitioner implements Partitioner {private final AtomicInteger counter = new AtomicInteger(0); ?@Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (null == keyBytes) {return counter.getAndIncrement() % numPartitions;} else {return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}} ?@Overridepublic void close() {} ?@Overridepublic void configure(Map<String, ?> configs) {} }實(shí)現(xiàn)自定義分區(qū)器需要通過(guò)配置參數(shù)ProducerConfig.PARTITIONER_CLASS_CONFIG來(lái)實(shí)現(xiàn)
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,DefinePartitioner.class.getName());7.攔截器
Producer攔截器(interceptor)是個(gè)相當(dāng)新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于實(shí)現(xiàn)clients端的定制化控制邏輯。
生產(chǎn)者攔截器可以用在消息發(fā)送前做一些準(zhǔn)備工作。
使用場(chǎng)景
1)按照某個(gè)規(guī)則過(guò)濾掉不符合要求的消息
2)修改消息的內(nèi)容
3)統(tǒng)計(jì)類需求
自定義一個(gè)攔截器
/*** 自定義攔截器*/ public class ProducerInterceptorPrefix implementsProducerInterceptor<String, String> {private volatile long sendSuccess = 0;private volatile long sendFailure = 0; ?@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {String modifiedValue = "prefix1-" + record.value();return new ProducerRecord<>(record.topic(),record.partition(), record.timestamp(),record.key(), modifiedValue, record.headers()); // ? ? ? if (record.value().length() < 5) { // ? ? ? ? ? throw new RuntimeException(); // ? ? ? } // ? ? ? return record;} ?@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata,Exception e) {if (e == null) {sendSuccess++;} else {sendFailure++;}} ?@Overridepublic void close() {double successRatio = (double) sendSuccess / (sendFailure + sendSuccess);System.out.println("[INFO] 發(fā)送成功率="+ String.format("%f", successRatio * 100) + "%");} ?@Overridepublic void configure(Map<String, ?> map) {} }實(shí)現(xiàn)自定義攔截器之后需要在配置參數(shù)中指定這個(gè)攔截器,此參數(shù)的默認(rèn)值為空,如下:
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptorPrefix.class.getName());功能演示:
生產(chǎn)者:
?
消費(fèi)者:
?
二.其它生產(chǎn)者參數(shù)
之前提及的默認(rèn)三個(gè)客戶端參數(shù),大部分參數(shù)都有合理的默認(rèn)值,一般情況下不需要修改它們, 參考官網(wǎng):http://kafka.apache.org/documentation/#producerconfigs
1.acks
這個(gè)參數(shù)用來(lái)指定分區(qū)中必須有多少個(gè)副本收到這條消息,之后生產(chǎn)者才會(huì)認(rèn)為這條消息時(shí)寫入成功的。acks是生產(chǎn)者客戶端中非常重要的一個(gè)參數(shù),它涉及到消息的可靠性和吞吐量之間的權(quán)衡。
- ack=0, 生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來(lái)自服務(wù)器的相應(yīng)。如果出現(xiàn)問(wèn)題生產(chǎn)者是感知不到的,消息就丟失了。不過(guò)因?yàn)樯a(chǎn)者不需要等待服務(wù)器響應(yīng),所以它可以以網(wǎng)絡(luò)能夠支持的最大速度發(fā)送消息,從而達(dá)到很高的吞吐量。
- ack=1,默認(rèn)值為1,只要集群的首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)這就會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng)。如果消息無(wú)法達(dá)到首領(lǐng)節(jié)點(diǎn)(比如首領(lǐng)節(jié)點(diǎn)崩潰,新的首領(lǐng)還沒(méi)有被選舉出來(lái)),生產(chǎn)者會(huì)收到一個(gè)錯(cuò)誤響應(yīng),為了避免數(shù)據(jù)丟失,生產(chǎn)者會(huì)重發(fā)消息。但是,這樣還有可能會(huì)導(dǎo)致數(shù)據(jù)丟失,如果收到寫成功通知,此時(shí)首領(lǐng)節(jié)點(diǎn)還沒(méi)來(lái)的及同步數(shù)據(jù)到follower節(jié)點(diǎn),首領(lǐng)節(jié)點(diǎn)崩潰,就會(huì)導(dǎo)致數(shù)據(jù)丟失。
- ack=-1, 只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)都收到消息時(shí),生產(chǎn)者才會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng),這種模式是最安全的,它可以保證不止一個(gè)服務(wù)器收到消息,就算有服務(wù)器發(fā)生崩潰,整個(gè)集群仍然可以運(yùn)行。
注意:acks參數(shù)配置的是一個(gè)字符串類型,而不是整數(shù)類型,如果配置為整數(shù)類型會(huì)拋出以下異常
props.put(ProducerConfig.ACKS_CONFIG,"1");2.retries
生產(chǎn)者從服務(wù)器收到的錯(cuò)誤有可能是臨時(shí)性的錯(cuò)誤(比如分區(qū)找不到首領(lǐng))。在這種情況下,如果達(dá)到了 retires 設(shè)置的次數(shù),生產(chǎn)者會(huì)放棄重試并返回錯(cuò)誤。默認(rèn)情況下,生產(chǎn)者會(huì)在每次重試之間等待100ms,可以通過(guò) retry.backoff.ms 參數(shù)來(lái)修改這個(gè)時(shí)間間隔。
3.batch.size
當(dāng)有多個(gè)消息要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放在同一個(gè)批次里。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算,而不是消息個(gè)數(shù)。當(dāng)批次被填滿,批次里的所有消息會(huì)被發(fā)送出去。不過(guò)生產(chǎn)者并不一定都會(huì)等到批次被填滿才發(fā)送,半滿的批次,甚至只包含一個(gè)消息的批次也可能被發(fā)送。所以就算把 batch.size 設(shè)置的很大,也不會(huì)造成延遲,只會(huì)占用更多的內(nèi)存而已,如果設(shè)置的太小,生產(chǎn)者會(huì)因?yàn)轭l繁發(fā)送消息而增加一些額外的開(kāi)銷。
4.max.request.size
該參數(shù)用于控制生產(chǎn)者發(fā)送的請(qǐng)求大小,它可以指定能發(fā)送的單個(gè)消息的最大值,也可以指單個(gè)請(qǐng)求里所有消息的總大小。 broker 對(duì)可接收的消息最大值也有自己的限制( message.max.size ),所以兩邊的配置最好匹配,避免生產(chǎn)者發(fā)送的消息被 broker 拒絕。
5.buffer.memory
該參數(shù)用來(lái)設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小,生產(chǎn)者用它緩沖要發(fā)送到服務(wù)器的消息。如果應(yīng)用程序發(fā)送消息的速度超過(guò)發(fā)送到服務(wù)器的速度,會(huì)導(dǎo)致生產(chǎn)者空間不足。這個(gè)時(shí)候,send()方法調(diào)用要么被阻塞,要么拋出異常。取決于如何設(shè)置 block.on.buffer.full 參數(shù)。
6.linger.ms
該參數(shù)指定了生產(chǎn)者在發(fā)送批次之前等待更多消息加入批次的時(shí)間。KafkaProduce會(huì)在批次填滿或linger.ms達(dá)到上限時(shí)把批次發(fā)送出去。默認(rèn)情況下,只要有用的線程,生產(chǎn)者就會(huì)把消息發(fā)送出去,就算批次里只有一個(gè)消息。把linger.ms設(shè)置成比0大的數(shù),讓生產(chǎn)者在發(fā)送批次之前等待一會(huì)兒,使更多的消息加入到這個(gè)批次。雖然這樣會(huì)增加延遲,但會(huì)提升吞吐量(因?yàn)橐淮涡园l(fā)送更多的消息,每個(gè)消息的開(kāi)銷就變小了)
7.max.in.flight.requests.per.connection
該參數(shù)指定了生產(chǎn)者在 到服務(wù)器晌應(yīng)之前可以發(fā)送多少個(gè)消息。它的值越高,就會(huì)占用越多的內(nèi)存,不過(guò)也會(huì)提升吞吐量。 它設(shè)為1可以保證消息是按照發(fā)送的順序?qū)懭敕?wù)器的,即使發(fā)生了重試。
思考:如何保證消息的有序性
Kafka 可以保證同一個(gè)分區(qū)里的消息是有序的。也就是說(shuō),如果生產(chǎn)者按照一定的順序發(fā)送消息, broker 就會(huì)按照這個(gè)順序把它們寫入分區(qū),消費(fèi)者也會(huì)按照同樣的順序讀取它們。在某些情況下,順序是非常重要的。例如,往一個(gè)賬戶存入 100元再取出來(lái),這個(gè)與先取錢再存錢是截然不同的!不過(guò),有些場(chǎng)景對(duì)順序不是很敏感。
如果把retries設(shè)為非零整數(shù),同時(shí)把max.in.flight.requests.per.connection設(shè)為比1大的數(shù),那么,如果第一個(gè)批次消息寫入失敗,而第二個(gè)批次寫入成功, broker 會(huì)重試寫入第一個(gè)批次。如果此時(shí)第一個(gè)批次也寫入成功,那么兩個(gè)批次的順序就反過(guò)來(lái)了。
一般來(lái)說(shuō),如果某些場(chǎng)景要求消息是有序的,那么消息是否寫入成功很關(guān)鍵的,所以不建議把retries設(shè)為0。可以把max.in.flight.requests.per.connection設(shè)為1,這樣在生產(chǎn)者嘗試發(fā)送第一批消息時(shí),就不會(huì)有其他的消息發(fā)送給 broker 。不過(guò)這樣會(huì)嚴(yán)重影響生產(chǎn)者的吞吐量 ,所以只有在對(duì)消息的順序有嚴(yán)格要求的情況下才能這么做。
總結(jié)
以上是生活随笔為你收集整理的Kafka生产者详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Kafka的基本介绍和在linux的安装
- 下一篇: Kafka消费者详解