日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

post发送byte数组_KAFKA消息发送

發(fā)布時(shí)間:2024/7/19 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 post发送byte数组_KAFKA消息发送 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

消息發(fā)送的整體架構(gòu)

RecordAccumulator 主要用來(lái)緩存消息以便 Sender 線(xiàn)程可以批量發(fā)送,進(jìn)而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗以提升性能。RecordAccumulator 緩存的大小可以通過(guò)生產(chǎn)者客戶(hù)端參數(shù) buffer.memory 配置,默認(rèn)值為 33554432B,即32MB。如果生產(chǎn)者發(fā)送消息的速度超過(guò)發(fā)送到服務(wù)器的速度,則會(huì)導(dǎo)致生產(chǎn)者空間不足,這個(gè)時(shí)候 KafkaProducer 的 send() 方法調(diào)用要么被阻塞,要么拋出異常,這個(gè)取決于參數(shù) max.block.ms 的配置,此參數(shù)的默認(rèn)值為60000,即60秒。

Kafka是通過(guò)broker中未確認(rèn)的消息數(shù)來(lái)判斷broker的負(fù)載的.未確認(rèn)的消息數(shù)越多則負(fù)載越高.Sender線(xiàn)程通過(guò)InFlightRequests來(lái)緩存已經(jīng)發(fā)出去但還沒(méi)有收到響應(yīng)的請(qǐng)求,具體形式為Map.

消息有序性

Kafka 可以保證同一個(gè)分區(qū)中的消息是有序的。如果生產(chǎn)者按照一定的順序發(fā)送消息,那么這些消息也會(huì)順序地寫(xiě)入分區(qū),進(jìn)而消費(fèi)者也可以按照同樣的順序消費(fèi)它們。

如果將acks參數(shù)配置為非零值,并且max.in.flight.requests.per.connection 參數(shù)配置為大于1的值,那么就會(huì)出現(xiàn)錯(cuò)序的現(xiàn)象:如果第一批次消息寫(xiě)入失敗,而第二批次消息寫(xiě)入成功,那么生產(chǎn)者會(huì)重試發(fā)送第一批次的消息,此時(shí)如果第一批次的消息寫(xiě)入成功,那么這兩個(gè)批次的消息就出現(xiàn)了錯(cuò)序。一般而言,在需要保證消息順序的場(chǎng)合建議把參數(shù) max.in.flight.requests.per.connection配置為1,而不是把 acks 配置為0,不過(guò)這樣也會(huì)影響整體的吞吐。

max.in.flight.requests.per.connection = 1 限制客戶(hù)端在單個(gè)連接上能夠發(fā)送的未響應(yīng)請(qǐng)求的個(gè)數(shù)(也就是客戶(hù)端與 Node 之間的連接)。設(shè)置此值是1表示kafka broker在響應(yīng)請(qǐng)求之前client不能再向同一個(gè)broker發(fā)送請(qǐng)求。注意:設(shè)置此參數(shù)是為了避免消息亂序

消息發(fā)送的三種模式

  • 發(fā)后即忘(fire-and-forget,不保證消息到達(dá)broker,會(huì)丟消息)

  • 同步(sync,同步發(fā)送,一條發(fā)完才發(fā)送下一條,每次都會(huì)返回Future值或拋異常,如果是可重試的異常,那么如果配置了retries參數(shù)則可自動(dòng)重試)

  • 異步(async,會(huì)有一個(gè)回調(diào)函數(shù)來(lái)通知消息的處理結(jié)果是成功還是異常)

同步代碼

try { Future<RecordMetadata> future = producer.send(record); //阻塞獲取結(jié)果,然后才能下一條發(fā)送 RecordMetadata metadata = future.get(); System.out.println(metadata.topic() + "-" +metadata.partition() + ":" + metadata.offset());} catch (ExecutionException | InterruptedException e) { //常見(jiàn)的可重試異常有:NetworkException、LeaderNotAvailableException、 //UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。 //對(duì)于可重試的異常,如果配置了 retries 參數(shù),那么只要在規(guī)定的重試次數(shù)內(nèi)自行恢復(fù)了,就不會(huì)拋出異常。 //不可重試異常如LeaderNotAvailableException ,RecordTooLargeException則是直接拋異常}

異步代碼

public class KafkaAsyncSender{ private static final Logger logger = LoggerFactory.getLogger(KafkaAsyncSender.class); //KafkaProducer 而言,它是線(xiàn)程安全的 private Producer producer; @Autowired private UdpSerializer udpSerializer; @Value("${kafka_connect_string}") private String kafkaConnectString; private Cache<String, Integer> cache; private KafkaTopicPartitionMapper mapper; @PostConstruct public void init() { Properties props = new Properties(); props.put("metadata.broker.list", kafkaConnectString.trim()); props.put("bootstrap.servers", kafkaConnectString.trim()); props.put("producer.type", "async");//消息發(fā)送類(lèi)型同步(sync)還是異步(async將本地buffer) props.put("compression.codec", "none");//消息的壓縮格式,默認(rèn)為none不壓縮,gzip, snappy, lz4 生產(chǎn)者發(fā)送消息之后,只要分區(qū)的 leader 副本成功寫(xiě)入消息,那么它就會(huì)收到來(lái)自服務(wù)端的成功響應(yīng) props.put("request.required.acks", "1"); //發(fā)送失敗后重試的次數(shù),允許重試 //如果 max.in.flight.requests.per.connection 設(shè)置不為1,可能會(huì)導(dǎo)致亂序 props.put("message.send.max.retries", 3);//失敗重試次數(shù) props.put("retry.backoff.ms", 100);//重試間隔 props.put("queue.buffering.max.ms", 10);//緩存數(shù)據(jù)的最大時(shí)間間隔 props.put("batch.num.messages", 1000);//緩存數(shù)據(jù)的最大條數(shù) //限制生產(chǎn)者客戶(hù)端能發(fā)送的消息的最大值,默認(rèn)值為1048576B,即1MB,需注意broker端的message.max.bytes props.put("max.request.size", 1024 * 1024); props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); this.producer = new KafkaProducer<String, String>(props); this.mapper = new KafkaTopicPartitionMapper(this.producer); this.cache = CacheBuilder.newBuilder().refreshAfterWrite(1, TimeUnit.SECONDS) .build(this.mapper); } //將被下面的handle類(lèi)調(diào)用 public boolean sendMsg(final String topic, Object body, Callback callback) { //直接發(fā)送bytes數(shù)組 if(body instanceof byte[]){ ProducerRecord <String,byte[]>record = new ProducerRecord<String,byte[]>(topic,(byte[])body); producer.send(record); }else if(body instanceof BinlogEventInfo){ //對(duì)象類(lèi)型的消息 final BinlogEventInfo binlogEventInfo = (BinlogEventInfo)body; Integer num = null; //獲取發(fā)送到kafka的key,這里是使用guava緩存了key String cacheKey = genCacheKey(topic, binlogEventInfo); num = this.cache.getIfPresent(cacheKey); if(num == null){ try { num = this.mapper.load(cacheKey); } catch (Exception e) { logger.error("load kafka partition cache exception :", e); } if(num == null){ this.cache.put(cacheKey, Integer.MIN_VALUE); } else { this.cache.put(cacheKey, num); } } //構(gòu)造發(fā)送的消息體,注意序列化是使用Byte序列化,沒(méi)使用默認(rèn)的String ProducerRecord <String,byte[]>record = null; if(num == Integer.MIN_VALUE || num == null){ //獲取key失敗,不使用key的構(gòu)造發(fā)送發(fā)送數(shù)據(jù) if(logger.isDebugEnabled()){ logger.debug("get partition fail , send to {}, info {}" , topic, JsonUtils.toJson((binlogEventInfo))); } record = new ProducerRecord<String,byte[]>(topic,udpSerializer.serialize(binlogEventInfo)); } else { //根據(jù)key指定到哪一個(gè)分區(qū)的發(fā)送 if(logger.isDebugEnabled()){ logger.debug("send to {}, partition {}, info {}" , topic, num, JsonUtils.toJson((binlogEventInfo))); } //這里有三個(gè)可能影響到分區(qū)數(shù)的因素 : 1.直接指定分區(qū)數(shù) 2,直接指定key 3.無(wú)任何指定 //在直接指定了分區(qū)數(shù)的情況下,那么將直接發(fā)送往此分區(qū) //若分區(qū)數(shù)未指定,但key指定了,那么因?yàn)橛?jì)算的hash值一樣,那么相同的key也會(huì)發(fā)送到一樣的分區(qū) //若都未指定,則直接輪詢(xún)分區(qū)來(lái)發(fā)送消息 record = new ProducerRecord<String,byte[]>(topic, num , null ,udpSerializer.serialize(binlogEventInfo)); } producer.send(record, callback); } return true; } private String genCacheKey(String topic, BinlogEventInfo binlogEventInfo) { return topic + "-" + binlogEventInfo.getHost() + "-" + binlogEventInfo.getSchemaName() + "-" + binlogEventInfo.getTableName(); }}

Mapper的查詢(xún)分區(qū)

public class KafkaTopicPartitionMapper extends CacheLoader<String, Integer>{ private Producer producer; public KafkaTopicPartitionMapper(Producer producer){ this.producer = producer; } @Override //格式 topic-host-database-table public Integer load(String key) throws Exception { try{ String[] arr = key.split("-"); int hash = this.hash(key); List list = this.producer.partitionsFor(arr[0]); int psize = list.size(); if(psize == 0){ return null; } else { //根據(jù)哈希值 % 分區(qū)數(shù) return parlist.get(Math.abs(hash % psize)); } } catch(Exception e){ return null; } } //計(jì)算hash值 private int hash(Object key) { int h; return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); } }

異步的消息回調(diào)

//kafka消息管理類(lèi),發(fā)送消息以及回調(diào)處理public class KafkaQueueChannelHandler extends AsyncQueueChannelHandler{ private KafkaAsyncSender sender; public KafkaQueueChannelHandler(){ super("kafka"); this.sender = SpringContextUtil.getBean(KafkaAsyncSender.class); } public KafkaQueueChannelHandler(String identity) { super(identity); this.sender = SpringContextUtil.getBean(KafkaAsyncSender.class); } @Override public void sendMessage(BinlogEventInfo info, DeliverInfo deliverInfo) { //發(fā)送主題消息的時(shí)候設(shè)置回調(diào) if(!sender.sendMsg(deliverInfo.getSendTopic(), info, new KafkaCallback(info, deliverInfo))) { this.stopDeliverAndNotify(info,Constants.SENDTYPE_KAFKA,"",deliverInfo.getSendTopic()); } } @Override public void sendMessageInner(BinlogEventInfo info, DeliverInfo deliverInfo) { this.sender.sendMsg(deliverInfo.getSendTopic(), info, new KafkaCallback(info, deliverInfo)); } private class KafkaCallback implements Callback{ private BinlogEventInfo info; private DeliverInfo deliverInfo; public KafkaCallback(BinlogEventInfo info, DeliverInfo deliverInfo) { this.info = info; this.deliverInfo = deliverInfo; } //異步回調(diào)方法 @Override public void onCompletion(RecordMetadata metadata, Exception exception) { //onCompletion() 方法的兩個(gè)參數(shù)是互斥的,消息發(fā)送成功時(shí),metadata 不為 null 而 exception 為 null; //消息發(fā)送異常時(shí),metadata 為 null 而 exception 不為 null。 try{ if(metadata!=null){ logger.info("kafka回調(diào)信息:topic=【{}】,partition=【{}】,offset=【{}】,發(fā)送內(nèi)容=【{}】,exception=【{}】,",metadata.topic(),metadata.partition(), metadata.offset(),JSON.toJSONString(info),exception); }else{ logger.info("kafka回調(diào)信息:發(fā)送內(nèi)容=【{}】,exception=【{}】,",JSON.toJSONString(info),exception); } if(exception!=null){ stopDeliverAndNotify( info,Constants.SENDTYPE_KAFKA,"", this.deliverInfo.getSendTopic()); } else { //將位置信息保存到內(nèi)存,異步更新到數(shù)據(jù)庫(kù) updatePosition(info, this.deliverInfo); } }catch(Exception e){ logger.error("KafkaQueueChannelHandler kafka回調(diào)處理失敗:發(fā)送內(nèi)容=【{}】,exception=【{}】",JSON.toJSONString(info),e); stopDeliverAndNotify( info,Constants.SENDTYPE_KAFKA,"", this.deliverInfo.getSendTopic()); } } } }

生產(chǎn)者攔截器

生產(chǎn)者攔截器既可以用來(lái)在消息發(fā)送前做一些準(zhǔn)備工作,比如按照某個(gè)規(guī)則過(guò)濾不符合要求的消息、修改消息的內(nèi)容等,也可以用來(lái)在發(fā)送回調(diào)邏輯前做一些定制化的需求,比如統(tǒng)計(jì)類(lèi)工作。

生產(chǎn)者攔截器的使用也很方便,主要是自定義實(shí)現(xiàn) org.apache.kafka.clients.producer. ProducerInterceptor 接口。ProducerInterceptor 接口中包含3個(gè)方法:

//KafkaProducer在將消息序列化和計(jì)算分區(qū)之前會(huì)調(diào)用生產(chǎn)者攔截器的 onSend() 方法來(lái)對(duì)消息進(jìn)行相應(yīng)的定制化操作。public ProducerRecordonSend(ProducerRecord record);//KafkaProducer會(huì)在消息被應(yīng)答(Acknowledgement)之前或消息發(fā)送失敗時(shí)調(diào)用生產(chǎn)者攔截器的onAcknowledgement() 方法,//優(yōu)先于用戶(hù)設(shè)定的 Callback 之前執(zhí)行。public void onAcknowledgement(RecordMetadata metadata, Exception exception);public void close();

添加生產(chǎn)者攔截器

//此參數(shù)默認(rèn)值為""properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());

實(shí)現(xiàn)

//接口的這3個(gè)方法中拋出的異常都會(huì)被捕獲并記錄到日志中,但并不會(huì)再向上傳遞。public class ProducerInterceptorPrefix implements ProducerInterceptor<String,String>{ @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { //更改消息內(nèi)容 String modifiedValue = "prefix1-" + record.value(); return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), modifiedValue, record.headers()); } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {} @Override public void close() {} @Override public void configure(Map<String, ?> map) {}}

參考鏈接

https://juejin.im/book/5c7d467e5188251b9156fdc0/section/5c7d5391f265da2db7183fe5

總結(jié)

以上是生活随笔為你收集整理的post发送byte数组_KAFKA消息发送的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。