kafaka生产者消费者demo(简易上手demo)
kafaka生產者消費者demo(簡易上手demo)
文章目錄
- kafaka生產者消費者demo(簡易上手demo)
- 導包
- kafka官方client
- spring官方template
- spring官方springcloud stream starter
- kafka官方client使用
- 生產者Demo
- 消費者Demo
- 簡易的多線程生產者
- 生產
- 消費
- 使用線程池優化生產者
- ProducerThreadPool
- 測試使用
- 測試結果
- spring官方template使用
- 配置
- 生產者Demo
- 消費者Demo
- spring官方springcloud stream starter使用
- 配置
- 啟動類
- 生產者Demo
- 消費者Demo
- 測試類
- 結果
導包
kafka官方client
kafka官方提供的Java client jar包
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.1.0</version></dependency>spring官方template
也可以使用spring官方提供的kafaka template
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.4</version> </dependency>spring官方springcloud stream starter
使用spring-cloud-starter-stream-kafka可以整合kafka進入到spring項目中
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-stream-kafka --> <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.2.2</version> </dependency>kafka官方client使用
生產者Demo
使用KafkaProducer做生產者,可以使用多線程模擬多個生產者,這里提供簡單的test來供以參考。
- bootstrap.servers: kafka服務器的地址。
- acks:消息的確認機制,默認值是0。
- acks=0:如果設置為0,生產者不會等待kafka的響應。
- acks=1:這個配置意味著kafka會把這條消息寫到本地日志文件中,但是不會等待集群中其他機器的成功響應。
- acks=all:這個配置意味著leader會等待所有的follower同步完成。這個確保消息不會丟失,除非kafka集群中所有機器掛掉。這是最強的可用性保證。
- retries:配置為大于0的值的話,客戶端會在消息發送失敗時重新發送。(允許重發的情況)
- batch.size:當多條消息需要發送到同一個分區時,生產者會嘗試合并網絡請求。這會提高client和生產者的效率。
- key.serializer: 鍵序列化,默認org.apache.kafka.common.serialization.StringDeserializer。
- value.deserializer:值序列化,默認org.apache.kafka.common.serialization.StringDeserializer。
消費者Demo
使用KafkaConsumer做消費者client API,可以通過多線程模擬生產訂閱關系。這里給一個簡單的消費者demo。
- bootstrap.servers: kafka的地址。
- group.id:組名,不同組名可以重復消費。(同組重復消費會拋異常)
- enable.auto.commit:是否自動提交,默認為true。
- auto.commit.interval.ms: 從poll(拉)的回話處理時長。
- session.timeout.ms:超時時間。
- max.poll.records:一次最大拉取的數據條數。
- auto.offset.reset:消費規則,默認earliest 。
- earliest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 。
- latest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 。
- none: topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常。
- key.deserializer: 鍵反序列化器,默認org.apache.kafka.common.serialization.StringDeserializer
- value.deserializer:值反序列化器,默認org.apache.kafka.common.serialization.StringDeserializer
簡易的多線程生產者
生產
實現Runnable接口可以實現簡易的多線程生產者,模擬多個生產者生產
@Getter public class MyselfProducer implements Runnable{//主題(當主題不存在,自動創建主題)private final String topic;//配置private final Properties properties;//主題和配置的多線程共享public MyselfProducer(String topic,Properties properties){this.topic = topic;this.properties = properties;}@Overridepublic void run() {//每個線程單獨的生產者KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);//生產信息for (int i = 0; i < 100; i++) {String msg = String.format("hello,線程%s發送第%d條信息",Thread.currentThread().getName() , i);//消息(key可以為null,key值影響消息發往哪個分區)ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, String.valueOf(i), msg);//發送kafkaProducer.send(producerRecord);//控制臺顯示System.out.println(msg);}//關閉kafkaProducer.close();} }消費
使用多線程進行生產,然后使用消費者Demo進行消費,獲得以下結果
使用線程池優化生產者
ProducerThreadPool
public class ProducerThreadPool{//主題(當主題不存在,自動創建主題)private final String topic;//配置private final Properties properties;//要產生的生產者線程類private final Class<? extends Runnable> producerClass;//線程池private final ThreadPoolExecutor executor;public ProducerThreadPool(String topic,Properties properties,Class<? extends Runnable> c){//初始化線程池this.executor = new ThreadPoolExecutor(5,10,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());//主題this.topic = topic;//配置this.properties = properties;//線程類this.producerClass = c;}public Future<?> createAndsubmit(){try {//反射出構造器Constructor<? extends Runnable> constructor = producerClass.getConstructor(String.class, Properties.class);//實例化生產者線程Runnable runnable = constructor.newInstance(topic, properties);System.out.println("提交線程池");//提交到線程池return executor.submit(runnable);} catch (NoSuchMethodException e) {e.printStackTrace();} catch (IllegalAccessException e) {e.printStackTrace();} catch (InstantiationException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}return null;}}測試使用
寫一個Test使用自己寫的ProducerThreadPool生產者線程池
@Testpublic void testProducerThreadPool() throws InterruptedException {//主題(當主題不存在,自動創建主題)String topic = "threadPool_topic";//配置Properties properties = new Properties();//kafka服務器地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);ProducerThreadPool producerThreadPool = new ProducerThreadPool(topic, properties, MyselfProducer.class);//生產并提交Future<?> futureA = producerThreadPool.createAndsubmit();Future<?> futureB = producerThreadPool.createAndsubmit();Future<?> futureC = producerThreadPool.createAndsubmit();Thread.sleep(5000);System.out.println(String.format("線程A狀態%s",futureA.isDone()));System.out.println(String.format("線程B狀態%s",futureB.isDone()));System.out.println(String.format("線程C狀態%s",futureC.isDone()));}測試結果
生產過程結果
消費結果
spring官方template使用
配置
使用spring官方提供的kafka template就需要配置Bean,講bean注入到上下文中。
@Configuration @EnableKafka public class KafkaConfiguration {//ConcurrentKafkaListenerContainerFactory為創建Kafka監聽器的工廠類@Beanpublic ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(@Qualifier("consumerFactory") ConsumerFactory<Integer, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);return factory;}//kafkaTemplate實現了Kafka 生產者等功能@Beanpublic KafkaTemplate<Integer, String> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<Integer, String> producerFactory) {KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory);return template;}//根據consumerProps填寫的參數創建消費者工廠@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerProps());}//根據senderProps填寫的參數創建生產者工廠@Beanpublic ProducerFactory<Integer, String> producerFactory() {return new DefaultKafkaProducerFactory<>(ProducerProps());}//消費者配置參數private Map<String, Object> consumerProps() {Map<String, Object> props = new HashMap<>();//連接地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//GroupIDprops.put(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-Kafka-1");//是否自動提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自動提交的頻率props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");//Session超時設置props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");//鍵的反序列化器props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);//值的反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}//生產者配置private Map<String, Object> ProducerProps (){Map<String, Object> props = new HashMap<>();//Kafka服務器連接地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//重試機制,0為不啟用重試機制props.put(ProducerConfig.RETRIES_CONFIG, 1);//控制批處理大小,單位為字節props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//批量發送,延遲為1毫秒,啟用該功能能有效減少生產者發送消息次數,減少網絡IO次數props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//生產者可以使用的總內存字節來緩沖等待發送到服務器的記錄props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);//鍵的序列化器props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);//值的序列化器props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}}生產者Demo
可以通過kafkaTemplate發送消息,也可以通過spring提供的工廠生產produce并進行消息的發送。
@Component public class MsgProducer {//主題static final String topic = "spring-kafka";//spring提供的模板類(生產)@Autowiredprivate KafkaTemplate kafkaTemplate;//spring提供的生產者工廠@Autowiredprivate ProducerFactory producerFactory;//使用template發送消息public void sendMsg(Integer key, String msg){kafkaTemplate.send(topic,key,msg);}public void sendMsg(String msg){kafkaTemplate.send(topic,msg);}//使用原生Producer client API發送消息public void sendMsgByProducer(Integer key, String msg){Producer producer = producerFactory.createProducer();producer.send(new ProducerRecord(topic,key,msg));producer.close();}public void sendMsgByProducer(String msg){Producer producer = producerFactory.createProducer();producer.send(new ProducerRecord(topic,msg));producer.close();} }消費者Demo
更具上面的配置,這些Consumer在組Consumer-Kafka-1,組里面有兩個不同的Consumer,分別是Consumer-1,Consumer-2。
@Slf4j @Component public class MsgConsumer {static final String topicA = "spring-kafka";static final String topicB = "spring-kafka-B";//訂閱一個主題@KafkaListener(id = "Consumer-1",topics = {topicA})public String getMsg(String msg){return msg;} //訂閱多個主題@KafkaListener(id = "Consumer-2",topics = {topicA,topicB})public String getMsgBytwo(String msg){return msg;}//指定主題分區,并指定讀取的分區offset位置@KafkaListener(id = "Consumer-3",topicPartitions = {@TopicPartition(topic = topicA,partitions = {"0","1"}),@TopicPartition(topic = topicB,partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "100"))})public String getMsgByPartition(String msg){return msg;}//通過原生Consumer獲取消息public ConsumerRecords getMsgByConsumer(){Consumer consumer = consumerFactory.createConsumer();consumer.subscribe(Collections.singleton(topicA));ConsumerRecords poll = consumer.poll(Duration.ofMillis(500));consumer.close();return poll;}}spring官方springcloud stream starter使用
spring官方提供了一套統一的消息中間件的編程框架,對外提供統一的編程方式,隱藏底層消息中間件編程的差異。
關于springcloud stream 的概念可以查看:Spring Cloud Stream 體系及原理介紹-阿里云開發者社區 (aliyun.com)
配置
spring:cloud:stream:kafka:binder:brokers: localhost:9092bindings:input: #channelName,官方提供的默認輸入通道名(消費者)destination: topicA #消費者訂閱的topicgroup: consumer-group-1 #消費者分組content-type: text/plainoutput:destination: topicA #生產者將數據發送的topiccontentType: text/plain啟動類
因為測試需要,本人同時bind輸入和輸出channel(Source,Sink)。
@SpringBootApplication @EnableBinding({Source.class, Sink.class}) @ComponentScan("org.example.**") public class WebApplication {public static void main(String[] args) {SpringApplication.run(WebApplication.class,args);} }生產者Demo
@Component public class SourceProducer {@Autowiredprivate Source source;//默認有一個叫output的MessageChannel@Autowiredprivate MessageChannel output;//通過source發送public void send(String msg){//source.output獲得是MessageChannelMessageChannel output = source.output();System.out.println("發送消息:"+msg);output.send(MessageBuilder.withPayload(msg).build());}//通過MessageChannel直接發送public void sendByChannel(String msg){System.out.println("發送消息:"+msg);output.send(MessageBuilder.withPayload(msg).build());} }消費者Demo
@Component public class SinkConsumer {@StreamListener(Sink.INPUT)public void getMsg(Message<String> msg){System.out.println("收到消息:"+msg.getPayload());} }測試類
因為SpringRunner會啟動spring容器,而容器里面有StreamListener監聽著Stream,
@SpringBootTest @RunWith(SpringRunner.class) public class ProductorPostTest {@Autowiredprivate SourceProducer sourceProducer;@Testpublic void testSource() throws InterruptedException {String msg = "消息A";while (true){sourceProducer.send(msg);Thread.sleep(1000);}} }結果
發送消息:消息A 收到消息:消息A 發送消息:消息A 收到消息:消息A 發送消息:消息A 收到消息:消息A總結
以上是生活随笔為你收集整理的kafaka生产者消费者demo(简易上手demo)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 刘强东回应采销喊话:水龙头已换新 还买了
- 下一篇: 解决Could not load dyn