Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
生活随笔
收集整理的這篇文章主要介紹了
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章目錄
- 操作步驟
- Maven依賴
- 生產者
- 消費者
操作步驟
Maven依賴
核心依賴 kafka-clients
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.1.0</version></dependency>生產者
package com.artisan.kafkademo.producer;import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version v1.0* @create 2019-11-18 0:17* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class MsgProducer {public static void main(String[] args) throws InterruptedException {// ---------------參數設置---------------BEGINProperties properties = new Properties();// broker 信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.18.130:9092,192.168.18.131:9092,192.168.18.132:9092");/*發出消息持久化機制參數(1)acks=0: 表示producer不需要等待任何broker確認收到消息的回復,就可以繼續發送下一條消息。性能最高,但是最容易丟消息。(2)acks=1: 至少要等待leader已經成功將數據寫入本地log,但是不需要等待所有follower是否成功寫入。就可以繼續發送下一條消息。這種情況下,如果follower沒有成功備份數據,而此時leader又掛掉,則消息會丟失。(3)acks=‐1或all: 這意味著leader需要等待所有備份(min.insync.replicas配置的備份個數)都成功寫入日志,這種策略會保證只要有一個備份存活就不會丟失數據。這是最強的數據保證。一般除非是金融級別,或跟錢打交道的場景才會使用這種配置。*/properties.put(ProducerConfig.ACKS_CONFIG, "1");// 發送失敗會重試,默認重試間隔100ms,重試能保證消息發送的可靠性,// 但是也可能造成消息重復發送,比如網絡抖動,所以需要在接收者那邊做好消息接收的冪等性處理properties.put(ProducerConfig.RETRIES_CONFIG,3);// 重試間隔設置properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);// /設置發送消息的本地緩沖區,// 如果設置了該緩沖區,消息會先發送到本地緩沖區,可以提高消息發送性能,默認值是33554432,即32MBproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);// kafka本地線程會從緩沖區取數據,批量發送到broker,// 設置批量發送消息的大小,默認值是16384,即16kb,就是說一個batch滿了16kb就發送出去properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);//默認值是0,意思就是消息必須立即被發送,但這樣會影響性能//一般設置100毫秒左右,就是說這個消息發送完后會進入本地的一個batch,// 如果100毫秒內,這個batch滿了16kb就會隨batch一起被發送出去//如果100毫秒內,batch沒滿,那么也必須把消息發送出去,不能讓消息的發送延遲時間太長properties.put(ProducerConfig.LINGER_MS_CONFIG,100);// 把發送的key從字符串序列化為字節數組properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//把發送消息value從字符串序列化為字節數組properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// ---------------參數設置---------------END// 使用properties實例化KafkaProducerProducer producer = new KafkaProducer(properties);final int messageNum = 5 ;final CountDownLatch countDownLatch = new CountDownLatch(messageNum);// 發送5條消息for (int i = 1; i <= messageNum; i++) {Order order = new Order(i, 100,66,987.99 + i);// 指定發送分區ProducerRecord<String,String> record = new ProducerRecord<String, String>("artisan-replicated-topic",0,String.valueOf(order.getOrderId()), JSON.toJSONString(order));// 異步方式發送消息producer.send(record, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null){countDownLatch.countDown();System.err.println("發送消息失敗:" + exception.getStackTrace());}if (metadata != null){countDownLatch.countDown();System.out.println("異步方式發送消息結果: topic=" + metadata.topic()+ " , partition=" + metadata.partition()+ " , offset=" + metadata.offset());}}});}// 等5秒鐘,5秒鐘后,執行后續的代碼countDownLatch.await(5, TimeUnit.SECONDS);producer.close();}static class Order {private int orderId ;private int productId ;private int productNum;private double orderAmount ;public Order() {}public Order(int orderId, int productId, int productNum, double orderAmount) {this.orderId = orderId;this.productId = productId;this.productNum = productNum;this.orderAmount = orderAmount;}public int getOrderId() {return orderId;}public void setOrderId(int orderId) {this.orderId = orderId;}public int getProductId() {return productId;}public void setProductId(int productId) {this.productId = productId;}public int getProductNum() {return productNum;}public void setProductNum(int productNum) {this.productNum = productNum;}public double getOrderAmount() {return orderAmount;}public void setOrderAmount(double orderAmount) {this.orderAmount = orderAmount;}} }消費者
package com.artisan.kafkademo.consumer;import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer;import java.util.*;/*** @author 小工匠* @version v1.0* @create 2019-11-18 23:51* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class MsgConsumer {public static void main(String[] args) {// ---------------參數設置---------------BEGINProperties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.18.130:9092,192.168.18.131:9092,192.168.18.132:9092");// 消費分組名props.put(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");// 是否自動提交offset/*props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 自動提交offset的間隔時間props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG , "1000");*///props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");/*心跳時間,服務端broker通過心跳確認consumer是否故障,如果發現故障,就會通過心跳下發rebalance的指令給其他的consumer通知他們進行rebalance操作,這個時間可以稍微短一點*/props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);//服務端broker多久感知不到一個consumer心跳就認為他故障了,默認是10秒props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);/*如果兩次poll操作間隔超過了這個時間,broker就會認為這個consumer處理能力太弱,會將其踢出消費組,將分區分配給別的consumer消費*/props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);// 消費主題String topicName = "artisan-replicated-topic";consumer.subscribe(Arrays.asList(topicName));// 消費指定分區//consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));//消息回溯消費/*consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));*//*consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));*//*//指定offset消費consumer.seek(new TopicPartition(topicName, 0), 10);//從指定時間點開始消費Map<TopicPartition, Long> map = new HashMap<TopicPartition, Long>();List<PartitionInfo> topicPartitions = consumer.partitionsFor(topicName);//從半小時前開始消費long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);}Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {TopicPartition key = entry.getKey();OffsetAndTimestamp value = entry.getValue();if (key == null || value == null) continue;Long offset = value.offset();System.out.println("partition-" + key.partition() + "|offset-" + offset);System.out.println();//根據消費里的timestamp確定offsetif (value != null) {//沒有這行代碼會導致下面的報錯信息consumer.assign(Arrays.asList(key));consumer.seek(key, offset);}} */while (true) {/** poll() API 是拉取消息的長輪詢,主要是判斷consumer是否還活著,只要我們持續調用poll(),* 消費者就會存活在自己所在的group中,并且持續的消費指定partition的消息。* 底層是這么做的:消費者向server持續發送心跳,如果一個時間段(session.* timeout.ms)consumer掛掉或是不能發送心跳,這個消費者會被認為是掛掉了,* 這個Partition也會被重新分配給其他consumer*/ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:offset = %d, key = %s, value = %s%n", record.offset(), record.key(),record.value());}if (records.count() > 0) {// 提交offsetconsumer.commitSync();}}} }總結
以上是生活随笔為你收集整理的Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Kafka-初体验Kafk
- 下一篇: Algorithms_入门基础_如何使用