日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafaka生产者消费者demo(简易上手demo)

發布時間:2024/9/19 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafaka生产者消费者demo(简易上手demo) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

kafaka生產者消費者demo(簡易上手demo)

文章目錄

  • kafaka生產者消費者demo(簡易上手demo)
      • 導包
        • kafka官方client
        • spring官方template
        • spring官方springcloud stream starter
    • kafka官方client使用
        • 生產者Demo
        • 消費者Demo
        • 簡易的多線程生產者
          • 生產
          • 消費
        • 使用線程池優化生產者
          • ProducerThreadPool
          • 測試使用
          • 測試結果
    • spring官方template使用
        • 配置
        • 生產者Demo
        • 消費者Demo
    • spring官方springcloud stream starter使用
        • 配置
        • 啟動類
        • 生產者Demo
        • 消費者Demo
        • 測試類
        • 結果

導包

kafka官方client

kafka官方提供的Java client jar包

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.1.0</version></dependency>

spring官方template

也可以使用spring官方提供的kafaka template

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.4</version> </dependency>

spring官方springcloud stream starter

使用spring-cloud-starter-stream-kafka可以整合kafka進入到spring項目中

<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-stream-kafka --> <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.2.2</version> </dependency>

kafka官方client使用

生產者Demo

使用KafkaProducer做生產者,可以使用多線程模擬多個生產者,這里提供簡單的test來供以參考。

  • bootstrap.servers: kafka服務器的地址。
    • acks:消息的確認機制,默認值是0。
    • acks=0:如果設置為0,生產者不會等待kafka的響應。
    • acks=1:這個配置意味著kafka會把這條消息寫到本地日志文件中,但是不會等待集群中其他機器的成功響應。
    • acks=all:這個配置意味著leader會等待所有的follower同步完成。這個確保消息不會丟失,除非kafka集群中所有機器掛掉。這是最強的可用性保證。
  • retries:配置為大于0的值的話,客戶端會在消息發送失敗時重新發送。(允許重發的情況)
  • batch.size:當多條消息需要發送到同一個分區時,生產者會嘗試合并網絡請求。這會提高client和生產者的效率。
  • key.serializer: 鍵序列化,默認org.apache.kafka.common.serialization.StringDeserializer。
  • value.deserializer:值序列化,默認org.apache.kafka.common.serialization.StringDeserializer。
@Testpublic void testPost(){//主題(當主題不存在,自動創建主題)String topic = "product_post";//配置Properties properties = new Properties();//kafka服務器地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//反序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);//生產者KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);//生產信息for (int i = 0; i < 100; i++) {String msg = String.format("hello,第%d條信息", i);//消息(key可以為null,key值影響消息發往哪個分區)ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, String.valueOf(i), msg);//發送kafkaProducer.send(producerRecord);System.out.println("發送第"+i+"條信息");}//關閉kafkaProducer.close();}

消費者Demo

使用KafkaConsumer做消費者client API,可以通過多線程模擬生產訂閱關系。這里給一個簡單的消費者demo。

  • bootstrap.servers: kafka的地址。
  • group.id:組名,不同組名可以重復消費。(同組重復消費會拋異常)
  • enable.auto.commit:是否自動提交,默認為true。
  • auto.commit.interval.ms: 從poll(拉)的回話處理時長。
  • session.timeout.ms:超時時間。
  • max.poll.records:一次最大拉取的數據條數。
  • auto.offset.reset:消費規則,默認earliest 。
    • earliest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 。
    • latest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 。
    • none: topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常。
  • key.deserializer: 鍵反序列化器,默認org.apache.kafka.common.serialization.StringDeserializer
  • value.deserializer:值反序列化器,默認org.apache.kafka.common.serialization.StringDeserializer
@Testpublic void testGet() throws InterruptedException {//主題String topic = "product_post";//配置Properties properties = new Properties();//kafka服務器地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//k,v的序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);//消費者分組properties.put(ConsumerConfig.GROUP_ID_CONFIG,"Consumer-Group-1");//offset重置模式properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");//消費者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);//訂閱(可以訂閱多個主題)kafkaConsumer.subscribe(Collections.singletonList(topic));//消費while (true){//獲取信息ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));//遍歷records.forEach(o->{System.out.println(String.format("topic==%s,offset==%s,key==%s,value==%s",o.topic(),o.offset(),o.key(),o.value()));});//睡眠Thread.sleep(500);}}

簡易的多線程生產者

生產

實現Runnable接口可以實現簡易的多線程生產者,模擬多個生產者生產

@Getter public class MyselfProducer implements Runnable{//主題(當主題不存在,自動創建主題)private final String topic;//配置private final Properties properties;//主題和配置的多線程共享public MyselfProducer(String topic,Properties properties){this.topic = topic;this.properties = properties;}@Overridepublic void run() {//每個線程單獨的生產者KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);//生產信息for (int i = 0; i < 100; i++) {String msg = String.format("hello,線程%s發送第%d條信息",Thread.currentThread().getName() , i);//消息(key可以為null,key值影響消息發往哪個分區)ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, String.valueOf(i), msg);//發送kafkaProducer.send(producerRecord);//控制臺顯示System.out.println(msg);}//關閉kafkaProducer.close();} }
消費

使用多線程進行生產,然后使用消費者Demo進行消費,獲得以下結果

使用線程池優化生產者

ProducerThreadPool
public class ProducerThreadPool{//主題(當主題不存在,自動創建主題)private final String topic;//配置private final Properties properties;//要產生的生產者線程類private final Class<? extends Runnable> producerClass;//線程池private final ThreadPoolExecutor executor;public ProducerThreadPool(String topic,Properties properties,Class<? extends Runnable> c){//初始化線程池this.executor = new ThreadPoolExecutor(5,10,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());//主題this.topic = topic;//配置this.properties = properties;//線程類this.producerClass = c;}public Future<?> createAndsubmit(){try {//反射出構造器Constructor<? extends Runnable> constructor = producerClass.getConstructor(String.class, Properties.class);//實例化生產者線程Runnable runnable = constructor.newInstance(topic, properties);System.out.println("提交線程池");//提交到線程池return executor.submit(runnable);} catch (NoSuchMethodException e) {e.printStackTrace();} catch (IllegalAccessException e) {e.printStackTrace();} catch (InstantiationException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}return null;}}
測試使用

寫一個Test使用自己寫的ProducerThreadPool生產者線程池

@Testpublic void testProducerThreadPool() throws InterruptedException {//主題(當主題不存在,自動創建主題)String topic = "threadPool_topic";//配置Properties properties = new Properties();//kafka服務器地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);ProducerThreadPool producerThreadPool = new ProducerThreadPool(topic, properties, MyselfProducer.class);//生產并提交Future<?> futureA = producerThreadPool.createAndsubmit();Future<?> futureB = producerThreadPool.createAndsubmit();Future<?> futureC = producerThreadPool.createAndsubmit();Thread.sleep(5000);System.out.println(String.format("線程A狀態%s",futureA.isDone()));System.out.println(String.format("線程B狀態%s",futureB.isDone()));System.out.println(String.format("線程C狀態%s",futureC.isDone()));}
測試結果

生產過程結果

消費結果

spring官方template使用

配置

使用spring官方提供的kafka template就需要配置Bean,講bean注入到上下文中。

@Configuration @EnableKafka public class KafkaConfiguration {//ConcurrentKafkaListenerContainerFactory為創建Kafka監聽器的工廠類@Beanpublic ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(@Qualifier("consumerFactory") ConsumerFactory<Integer, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);return factory;}//kafkaTemplate實現了Kafka 生產者等功能@Beanpublic KafkaTemplate<Integer, String> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<Integer, String> producerFactory) {KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory);return template;}//根據consumerProps填寫的參數創建消費者工廠@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerProps());}//根據senderProps填寫的參數創建生產者工廠@Beanpublic ProducerFactory<Integer, String> producerFactory() {return new DefaultKafkaProducerFactory<>(ProducerProps());}//消費者配置參數private Map<String, Object> consumerProps() {Map<String, Object> props = new HashMap<>();//連接地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//GroupIDprops.put(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-Kafka-1");//是否自動提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自動提交的頻率props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");//Session超時設置props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");//鍵的反序列化器props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);//值的反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}//生產者配置private Map<String, Object> ProducerProps (){Map<String, Object> props = new HashMap<>();//Kafka服務器連接地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//重試機制,0為不啟用重試機制props.put(ProducerConfig.RETRIES_CONFIG, 1);//控制批處理大小,單位為字節props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//批量發送,延遲為1毫秒,啟用該功能能有效減少生產者發送消息次數,減少網絡IO次數props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//生產者可以使用的總內存字節來緩沖等待發送到服務器的記錄props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);//鍵的序列化器props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);//值的序列化器props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}}

生產者Demo

可以通過kafkaTemplate發送消息,也可以通過spring提供的工廠生產produce并進行消息的發送。

@Component public class MsgProducer {//主題static final String topic = "spring-kafka";//spring提供的模板類(生產)@Autowiredprivate KafkaTemplate kafkaTemplate;//spring提供的生產者工廠@Autowiredprivate ProducerFactory producerFactory;//使用template發送消息public void sendMsg(Integer key, String msg){kafkaTemplate.send(topic,key,msg);}public void sendMsg(String msg){kafkaTemplate.send(topic,msg);}//使用原生Producer client API發送消息public void sendMsgByProducer(Integer key, String msg){Producer producer = producerFactory.createProducer();producer.send(new ProducerRecord(topic,key,msg));producer.close();}public void sendMsgByProducer(String msg){Producer producer = producerFactory.createProducer();producer.send(new ProducerRecord(topic,msg));producer.close();} }

消費者Demo

更具上面的配置,這些Consumer在組Consumer-Kafka-1,組里面有兩個不同的Consumer,分別是Consumer-1,Consumer-2。

@Slf4j @Component public class MsgConsumer {static final String topicA = "spring-kafka";static final String topicB = "spring-kafka-B";//訂閱一個主題@KafkaListener(id = "Consumer-1",topics = {topicA})public String getMsg(String msg){return msg;} //訂閱多個主題@KafkaListener(id = "Consumer-2",topics = {topicA,topicB})public String getMsgBytwo(String msg){return msg;}//指定主題分區,并指定讀取的分區offset位置@KafkaListener(id = "Consumer-3",topicPartitions = {@TopicPartition(topic = topicA,partitions = {"0","1"}),@TopicPartition(topic = topicB,partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "100"))})public String getMsgByPartition(String msg){return msg;}//通過原生Consumer獲取消息public ConsumerRecords getMsgByConsumer(){Consumer consumer = consumerFactory.createConsumer();consumer.subscribe(Collections.singleton(topicA));ConsumerRecords poll = consumer.poll(Duration.ofMillis(500));consumer.close();return poll;}}

spring官方springcloud stream starter使用

spring官方提供了一套統一的消息中間件的編程框架,對外提供統一的編程方式,隱藏底層消息中間件編程的差異。

關于springcloud stream 的概念可以查看:Spring Cloud Stream 體系及原理介紹-阿里云開發者社區 (aliyun.com)

配置

spring:cloud:stream:kafka:binder:brokers: localhost:9092bindings:input: #channelName,官方提供的默認輸入通道名(消費者)destination: topicA #消費者訂閱的topicgroup: consumer-group-1 #消費者分組content-type: text/plainoutput:destination: topicA #生產者將數據發送的topiccontentType: text/plain

啟動類

因為測試需要,本人同時bind輸入和輸出channel(Source,Sink)。

@SpringBootApplication @EnableBinding({Source.class, Sink.class}) @ComponentScan("org.example.**") public class WebApplication {public static void main(String[] args) {SpringApplication.run(WebApplication.class,args);} }

生產者Demo

@Component public class SourceProducer {@Autowiredprivate Source source;//默認有一個叫output的MessageChannel@Autowiredprivate MessageChannel output;//通過source發送public void send(String msg){//source.output獲得是MessageChannelMessageChannel output = source.output();System.out.println("發送消息:"+msg);output.send(MessageBuilder.withPayload(msg).build());}//通過MessageChannel直接發送public void sendByChannel(String msg){System.out.println("發送消息:"+msg);output.send(MessageBuilder.withPayload(msg).build());} }

消費者Demo

@Component public class SinkConsumer {@StreamListener(Sink.INPUT)public void getMsg(Message<String> msg){System.out.println("收到消息:"+msg.getPayload());} }

測試類

因為SpringRunner會啟動spring容器,而容器里面有StreamListener監聽著Stream,

@SpringBootTest @RunWith(SpringRunner.class) public class ProductorPostTest {@Autowiredprivate SourceProducer sourceProducer;@Testpublic void testSource() throws InterruptedException {String msg = "消息A";while (true){sourceProducer.send(msg);Thread.sleep(1000);}} }

結果

發送消息:消息A 收到消息:消息A 發送消息:消息A 收到消息:消息A 發送消息:消息A 收到消息:消息A

總結

以上是生活随笔為你收集整理的kafaka生产者消费者demo(简易上手demo)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。