CentOS7 搭建Kafka消息队列环境,以及Python3操作Kafka Demo
Kafka適合什么樣的場(chǎng)景?
它可以用于兩大類(lèi)別的應(yīng)用:
1,系統(tǒng)環(huán)境
a,操作系統(tǒng)? ?CentOS Linux release 7.6.1810 (Core)? 64位,必須確保你的內(nèi)存是4G以上,雙核CPU!否則將無(wú)法新建默認(rèn)命名空間。
b,確保jdk環(huán)境已經(jīng)安裝,具體教程請(qǐng)看??CentOS7 shell腳本安裝jdk??
c,確保Python3和對(duì)應(yīng)的pip已經(jīng)安裝,具體教程請(qǐng)看?CentOS7 源碼編譯安裝Python3.5
2,執(zhí)行以下命令,安裝Kafka并啟動(dòng)
wget https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz # 下載kafka 1.0.0安裝包 tar -zxvf kafka_2.11-1.0.0.tgz # 解壓安裝包 cd kafka_2.11-1.0.0/ # 打開(kāi)kafka目錄 sh bin/zookeeper-server-start.sh -daemon config/zookeeper.properties # 后臺(tái)運(yùn)行zookeeper sh bin/kafka-server-start.sh config/server.properties # 運(yùn)行kafka服務(wù)出現(xiàn) “started” 則是啟動(dòng)成功
3,執(zhí)行以下命令,創(chuàng)建test5的topic
sh bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test54,執(zhí)行以下命令,創(chuàng)建監(jiān)聽(tīng)test5的消息隊(duì)列程序
sh bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test5 --from-beginning5,執(zhí)行以下命令,建發(fā)送test5消息隊(duì)列的生產(chǎn)者程序
sh bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test5生產(chǎn)者發(fā)送一些消息,回車(chē)
看到消費(fèi)者界面出現(xiàn)生產(chǎn)者的消息?
6,Python3安裝kafka依賴(lài)包,執(zhí)行命令 “pip3 install kafka”
依賴(lài)包安裝完成后,創(chuàng)建python3消費(fèi)者監(jiān)聽(tīng)程序,kafka_consumer.py
from kafka import KafkaConsumer consumer = KafkaConsumer('test4', bootstrap_servers=['localhost:9092']) for msg in consumer:recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)print(recv)運(yùn)行 python3?kafka_consumer.py
?
創(chuàng)建python3生產(chǎn)消息隊(duì)列程序,kafka_producer.py
import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092') msg_dict = {"sleep_time": 10,"db_config": {"database": "test_1","host": "xxxx","user": "root","password": "root"},"table": "msg","msg": "Hello World" } msg = json.dumps(msg_dict) producer.send('test4', bytes(msg,'ascii'), partition=0) producer.close()運(yùn)行 “python3 kafka_producer.py”,轉(zhuǎn)到kafka_consumer.py運(yùn)行界面看到已接收到生產(chǎn)程序發(fā)送過(guò)來(lái)的信息
?
總結(jié)
以上是生活随笔為你收集整理的CentOS7 搭建Kafka消息队列环境,以及Python3操作Kafka Demo的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: CAS实现原子操作的三大问题
- 下一篇: python新建txt文件,并逐行写入数