日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka安装使用

發布時間:2025/4/14 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka安装使用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

版本:kafka_2.11-0.10.1.0 ?(之前安裝2.10-0.10.0.0,一直出問題)

?

  • 安裝
  • Springboot結合Kafka的使用

?

安裝

  • 下載并解壓代碼 wget http://mirrors.cnnic.cn/apache/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz #http://kafka.apache.org/downloadstar -zxvf kafka_2.10-0.10.0.0.tgz cd kafka_2.10-0.10.0.0

    ?

  • 修改每個broker安裝目錄下的配置文件 # $targetID默認是0,每個broker的broker.id必須要唯一 broker.id=$targetID#默認是注釋的,$IP改成當前節點的IP即可。如果不改該配置項,在節點通過命令行可以收發消息,而在其他機器是無法通過IP去訪問隊列的 #在之前的版本不叫listeners,而是advertised.host.name和host.name listeners=PLAINTEXT://$IP:9092

    ?

  • 啟動服務 #kafka依賴于zookeeper #如果沒有的話,可以通過kafka提供的腳本快速創建一個單節點zookeeper實例: #bin/zookeeper-server-start.sh config/zookeeper.properties#確認zookeeper服務已經啟動后,啟動kafka服務 nohup bin/kafka-server-start.sh config/server.properties &

    ?

  • 創建一個名為test,有一份備份,一個分區的topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test #查看所有topicbin/kafka-topics.sh --list --zookeeper localhost:2181

    ?

  • 發送消息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    ?

  • 開啟一個消費者接收消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

    ?

  • 查看topic信息 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

    ?

  • Springboot結合Kafka的使用?

    1.在pom文件添加依賴

    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>

    ?

    2.在application.properties中添加配置

    # APACHE KAFKA (KafkaProperties) spring.kafka.bootstrap-servers=192.168.0.155:9092,192.168.0.156:9092 spring.kafka.client-id=K1
    spring.kafka.consumer.auto-offset-reset= earliest spring.kafka.consumer.enable-auto-commit= true spring.kafka.consumer.group-id= test-consumer-group
    spring.kafka.producer.batch-size=2 spring.kafka.producer.bootstrap-servers= 192.168.0.155:9092,192.168.0.156:9092 spring.kafka.producer.client-id= P1 spring.kafka.producer.retries=3 spring.kafka.template.default-topic= test

    ?

    3.創建消費者類(訂閱消息的對象)

    import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;@Component public class ListenerBean {@KafkaListener(topics = "myTopic")public void processMessage(String content) {System.out.println("you have a new message:" + content);// ... } }

    ?

    4.創建生產者類(發布消息的對象)

    import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController;@Component @RestController @RequestMapping("/send") @EnableAutoConfiguration public class SendMsgBean {private final KafkaTemplate<String,String> kafkaTemplate;@Autowiredpublic SendMsgBean(KafkaTemplate<String,String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@RequestMapping(path="/{msg}",method=RequestMethod.GET)public String send(@PathVariable("msg") String msg) {System.out.println("==sending msg:" + msg);kafkaTemplate.send("test","test-"+msg);return "message has been sent!";} }

    ?

    ?

    只需這4步,就可以在springboot中使用kafka了,現在我們訪問 http://localhost:8080/send/mymessage ?就可以在控制臺看到信息了。

    源碼下載

    ?

    參考:

    • Kafka producer程序本地運行時發送信息失敗解決方案

    轉載于:https://www.cnblogs.com/TiestoRay/p/6394602.html

    總結

    以上是生活随笔為你收集整理的kafka安装使用的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。