日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

CentOS7 搭建Kafka消息队列环境,以及Python3操作Kafka Demo

發(fā)布時(shí)間:2025/3/11 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 CentOS7 搭建Kafka消息队列环境,以及Python3操作Kafka Demo 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Kafka適合什么樣的場(chǎng)景?

它可以用于兩大類(lèi)別的應(yīng)用:

  • 構(gòu)造實(shí)時(shí)流數(shù)據(jù)管道,它可以在系統(tǒng)或應(yīng)用之間可靠地獲取數(shù)據(jù)。 (相當(dāng)于message queue)
  • 構(gòu)建實(shí)時(shí)流式應(yīng)用程序,對(duì)這些流數(shù)據(jù)進(jìn)行轉(zhuǎn)換或者影響。 (就是流處理,通過(guò)kafka stream topic和topic之間內(nèi)部進(jìn)行變化)
  • Kafka中文文檔:http://kafka.apachecn.org/

    1,系統(tǒng)環(huán)境

    a,操作系統(tǒng)? ?CentOS Linux release 7.6.1810 (Core)? 64位,必須確保你的內(nèi)存是4G以上,雙核CPU!否則將無(wú)法新建默認(rèn)命名空間。

    b,確保jdk環(huán)境已經(jīng)安裝,具體教程請(qǐng)看??CentOS7 shell腳本安裝jdk??

    c,確保Python3和對(duì)應(yīng)的pip已經(jīng)安裝,具體教程請(qǐng)看?CentOS7 源碼編譯安裝Python3.5

    2,執(zhí)行以下命令,安裝Kafka并啟動(dòng)

    wget https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz # 下載kafka 1.0.0安裝包 tar -zxvf kafka_2.11-1.0.0.tgz # 解壓安裝包 cd kafka_2.11-1.0.0/ # 打開(kāi)kafka目錄 sh bin/zookeeper-server-start.sh -daemon config/zookeeper.properties # 后臺(tái)運(yùn)行zookeeper sh bin/kafka-server-start.sh config/server.properties # 運(yùn)行kafka服務(wù)

    出現(xiàn) “started” 則是啟動(dòng)成功

    3,執(zhí)行以下命令,創(chuàng)建test5的topic

    sh bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test5

    4,執(zhí)行以下命令,創(chuàng)建監(jiān)聽(tīng)test5的消息隊(duì)列程序

    sh bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test5 --from-beginning

    5,執(zhí)行以下命令,建發(fā)送test5消息隊(duì)列的生產(chǎn)者程序

    sh bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test5

    生產(chǎn)者發(fā)送一些消息,回車(chē)

    看到消費(fèi)者界面出現(xiàn)生產(chǎn)者的消息?

    6,Python3安裝kafka依賴(lài)包,執(zhí)行命令 “pip3 install kafka”

    依賴(lài)包安裝完成后,創(chuàng)建python3消費(fèi)者監(jiān)聽(tīng)程序,kafka_consumer.py

    from kafka import KafkaConsumer consumer = KafkaConsumer('test4', bootstrap_servers=['localhost:9092']) for msg in consumer:recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)print(recv)

    運(yùn)行 python3?kafka_consumer.py

    ?

    創(chuàng)建python3生產(chǎn)消息隊(duì)列程序,kafka_producer.py

    import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092') msg_dict = {"sleep_time": 10,"db_config": {"database": "test_1","host": "xxxx","user": "root","password": "root"},"table": "msg","msg": "Hello World" } msg = json.dumps(msg_dict) producer.send('test4', bytes(msg,'ascii'), partition=0) producer.close()

    運(yùn)行 “python3 kafka_producer.py”,轉(zhuǎn)到kafka_consumer.py運(yùn)行界面看到已接收到生產(chǎn)程序發(fā)送過(guò)來(lái)的信息

    ?

    總結(jié)

    以上是生活随笔為你收集整理的CentOS7 搭建Kafka消息队列环境,以及Python3操作Kafka Demo的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。