云服务器安装kafka及python连接测试
生活随笔
收集整理的這篇文章主要介紹了
云服务器安装kafka及python连接测试
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
云服務器環境:CentOS 7.6
環境準備(版本見解壓命令):
- java環境jdk
- zookeeper
- Kafka
傳送門:kafka安裝包,包括jdk,zookeeper和kafka
jdk環境安裝
將下載的包上傳到服務器/opt/software,解壓
tar -zxvf jdk-8u311-linux-x64.tar.gz tar -xzvf kafka_2.12-2.3.1.tgz tar -zxvf apache-zookeeper-3.5.6.tar.gz重命名,將三個文件夾移動到/usr/local目錄下并重新命名
mv kafka_2.12-2.3.1 ./kafka mv apache-zookeeper-3.5.6 ./zookeepercp -r jdk1.8.0_311/ /usr/local/ cp -r zookeeper/ /usr/local/ cp -r kafka /usr/local/打開文件環境變量的文件并配置
vim /etc/profilejdk和zookeeper配置如下
export JAVA_HOME=/usr/local/jdk1.8.0_311 export JRE_HOME=${JAVA_HOME}/jre export ZOOKEEPER_HOME=/usr/local/zookeeper export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin:${ZOOKEEPER_HOME}/bin: export PATH=$PATH:${JAVA_PATH}使配置文件生效:
source /etc/profile查看是否安裝成功,成功如下圖所示
java -versionzookeeper環境配置
cd /usr/local/zookeeper/conf/cp ./zoo_sample.cfg zoo.cfg打開zoo.cfg配置文件
vim zoo.cfg修改dataDir和dataLogDir配置
dataDir=/usr/local/zookeeper/dataDir dataLogDir=/usr/local/zookeeper/dataLogDir在/usr/local/zookeeper/bin/下面啟動zookeeper
./zkServer.sh start./zkServer.sh status成功如下圖所示:
kafka安裝:
在/usr/local/kafka/config下修改server.properties配置,單機只需要配置logs目錄
# 用于本地代碼測試線上的kafka的配置 listeners=PLAINTEXT://內網ip:9092 advertised.listeners=PLAINTEXT://外網ip:9092# 單機只需配置,省略新建logs文件夾目錄 log.dirs=/usr/local/kafka/logs在/usr/local/kafka/bin目錄下啟動kafka
cd /usr/local/kafka/bin啟動kafka:
./kafka-server-start.sh -daemon ../config/server.properties查看進程狀態:
jps
創建topic:
查看topic列表:
./kafka-topics.sh --list --zookeeper localhost:2181創建生產者測試:
./kafka-console-producer.sh --broker-list localhost:9092 --topic csdn_test
創建消費者測試:
python本地代碼測試
生產者測試:
import json from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='外網ip:9092')msg = "Hello World".encode('utf-8') # 發送內容,必須是bytes類型 producer.send('test_2', msg) # 發送的topic為test producer.close()前往服務器/usr/local/kafka/bin查看topic列表,看列表中是否有test_2
./kafka-topics.sh --list --zookeeper localhost:2181
如果無法找到這個topic,請參考前面kafka的conf的配置文件server.properties修改如下配置
消費者測試:
from kafka import KafkaConsumerconsumer = KafkaConsumer('test_2', bootstrap_servers=['外網ip:9092']) for msg in consumer:print(msg)recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)總結
以上是生活随笔為你收集整理的云服务器安装kafka及python连接测试的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: map 转换成vo_JAVAMap转换为
- 下一篇: 计算机一键休眠,win7系统设置一键休眠