日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

大数据统计分析平台之一、Kafka单机搭建

發布時間:2024/4/17 编程问答 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 大数据统计分析平台之一、Kafka单机搭建 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、zookeeper搭建

  Kafka集群依賴zookeeper,需要提前搭建好zookeeper

單機模式(7步)(集群模式進階請移步:http://blog.51cto.com/nileader/795230)

?Step1:

cd /usr/local/software

jdk-8u161-linux-x64.rpm
鏈接:https://pan.baidu.com/s/1i6iHIDJ 密碼:bgcc

rpm -ivh jdk-8u161-linux-x64.rpm

vi /etc/profile

JAVA_HOME=/usr/java/jdk1.8.0_161
JRE_HOME=/usr/java/jdk1.8.0_161/jre
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH

source /etc/profile

echo $PATH

?

Step2:

# 下載zookeeper

wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz

# 如果下載不到,可以使用迅雷,或者使用百度云盤

鏈接:https://pan.baidu.com/s/1MXYd4UlKWvqB6EcVLyF8cg 密碼:an6t

?

# 解壓

tar -zxvf zookeeper-3.4.11.tar.gz

# 移動一下

mv zookeeper-3.4.11 /usr/local/zookeeper-3.4.11

?

Step3:重命名 zoo_sample.cfg文件

?mv /usr/local/zookeeper-3.4.11/conf/zoo_sample.cfg??/usr/local/zookeeper-3.4.11/conf/zoo.cfg

?Step4:vi /usr/local/zookeeper-3.4.11/conf/zoo.cfg,修改

dataDir=/usr/local/zookeeper-3.4.11/data

Step5:創建數據目錄

mkdir? /usr/local/zookeeper-3.4.11/data


Step6:啟動zookeeper:執行

/usr/local/zookeeper-3.4.11/bin/zkServer.sh?start

Step7:檢測是否成功啟動:執行

/usr/local/zookeeper-3.4.11/bin/zkCli.sh
或者
yum install nc -y
echo?stat| nc?localhost?2181

================================================================================================================

2、下載Kafka

下載地址:http://kafka.apache.org/downloads.html # mkdir -p /usr/local/software
# cd /usr/local/software
# wget http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz

# 百度云下載地址:
鏈接:https://pan.baidu.com/s/1Kp0uD_5YjGKOLkbW_igm2g 密碼:v1q7
kafka_2.12-1.0.0.tgz ?? //其中2.12-1.0.0為Scala的版本,kafka-1.0.0-src.tgz為kafka版本 3、解壓 # tar zxf kafka_2.12-1.0.0.tgz -C /usr/local/ # cd /usr/local/ # mv kafka_2.12-1.0.0/ kafka/ 4、配置 mkdir -p /usr/local/kafka/kafkaLogs # vi /usr/local/kafka/config/server.properties

# broker的ID,集群中每個broker ID不可相同
broker.id=0
# 監聽器,端口號和port一致即可
listeners=PLAINTEXT:/10.10.6.225/:9092
# Broker的監聽端口
port=9092

# 必須填寫當前服務器IP地址
host.name=10.10.6.225

# 必須填寫當前服務器IP地址
advertised.host.name=10.10.6.225
# 暫未配置集群
zookeeper.connect=10.10.6.225:2181

# 消息持久化目錄
log.dirs=/usr/local/kafka/kafkaLogs

# 可以刪除主題
delete.topic.enable=true

# 關閉自動創建topic
auto.create.topics.enable=false

?

5、配置Kafka的環境變量 # vi /etc/profileexport KAFKA_HOME=/usr/local/kafkaexport PATH=$PATH:$KAFKA_HOME/bin # source /etc/profile


# vi /etc/hosts

# es為主機名 ,這里一定要注意,是主機名!!!!重要的話說三次!!!!!!!!
127.0.0.1 es
10.10.6.225 es
6、啟動與停止Kafka # kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 官方推薦啟動方式: # /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

但這種方式退出shell后會自動斷開

停止:

kafka-server-stop.sh 7、驗證 # jps2608 Kafka
2236 QuorumPeerMain
2687 Jps
看到Kafka的進程,說明Kafka已經啟動 8、創建topic 創建名為test,partitions為3,replication為3的topic # kafka-topics.sh --create --zookeeper 10.10.6.225:2181 --partitions 1 --replication-factor 1 --topic test 查看topic狀態 # kafka-topics.sh --describe --zookeeper 10.10.6.225:2181 --topic test
  Topic:test????? PartitionCount:1??????? ReplicationFactor:1???? Configs:
?? Topic: test???? Partition: 0??? Leader: 0?????? Replicas: 0???? Isr: 0
刪除topic 執行如下命令 # kafka-topics.sh --delete --zookeeper 10.10.6.225:2181 --topic test 9、測試使用Kafka 發送消息 # kafka-console-producer.sh --broker-list 10.10.6.225:9092 --topic test 輸入以下信息:   This is a message   This is another message 接收消息 # kafka-console-consumer.sh --bootstrap-server 10.10.6.225:9092 --topic test --from-beginning
若看到上輸入的信息說明已經搭建成功。 更復雜配置參考: https://www.cnblogs.com/wangxiaoqiangs/p/7831990.html 黃海添加于2018-02-11?夜 鏈接:https://pan.baidu.com/s/1i6HnIzr 密碼:1soq KafkaProducer.py # http://kafka-python.readthedocs.io/en/master/ # 安裝辦法: # C:\Users\Administrator>pip install kafka-python # Collecting kafka-python # Downloading kafka_python-1.4.1-py2.py3-none-any.whl (235kB) # 100% |████████████████████████████████| 235kB 150kB/s # Installing collected packages: kafka-python # Successfully installed kafka-python-1.4.1 # http://blog.csdn.net/evankaka/article/details/52421314 from kafka import KafkaProducer
from Util.MySQLHelper import *
import json

producer = KafkaProducer(bootstrap_servers='10.10.6.225:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
db = MySQLHelper()
sql = "select ID,RESOURCE_ID_INT,RESOURCE_ID_CHAR,RESOURCE_TITLE,RESOURCE_TYPE_NAME,RESOURCE_FORMAT,RESOURCE_PAGE,CAST(CREATE_TIME AS CHAR) AS CREATE_TIME,DOWN_COUNT,FILE_ID,RESOURCE_TYPE,STRUCTURE_ID,PERSON_ID,PERSON_NAME,IDENTITY_ID from t_resource_info limit 100"
dt = db.query(sql)

print(len(dt))

for row in dt:
producer.send('t_resource_info', row)

producer.flush()

print('恭喜,完成!')

?

不依賴于MYSQL的數據提交:

import json from kafka import KafkaProducer import datetime# kafka的服務器位置 kafka_servers = '10.10.6.194:9092'# 日期的轉換器 class DateEncoder(json.JSONEncoder):def default(self, obj):if isinstance(obj, datetime.datetime):return obj.strftime('%Y-%m-%d %H:%M:%S')elif isinstance(obj, datetime.date):return obj.strftime("%Y-%m-%d")else:return json.JSONEncoder.default(self, obj)# 黃海定義的輸出信息的辦法,帶當前時間 def logInfo(msg):i = datetime.datetime.now()print(" %s %s" % (i, msg))# 統一的topic名稱 topicName = 'test'dt=[{"id":1,"name":"劉備"},{"id":2,"name":"關羽"},{"id":3,"name":"張飛"}]# kafka的生產者 producer = KafkaProducer(bootstrap_servers=kafka_servers)# # 將字段大寫轉為小寫 for row in dt:new_dics = {}for k, v in row.items():new_dics[k.lower()] = vjstr = json.dumps(new_dics, cls=DateEncoder)producer.send(topic=topicName, partition=0, value=jstr.encode('utf-8')) # 提交一下 producer.flush() print('恭喜,完成!')

?

?

KafkaConsumer.py

from kafka import KafkaConsumer import timedef log(str):t = time.strftime(r"%Y-%m-%d_%H-%M-%S", time.localtime())print("[%s]%s" % (t, str))log('start consumer') # 消費192.168.120.11:9092上的world 這個Topic,指定consumer group是consumer-20171017 consumer = KafkaConsumer('foobar', bootstrap_servers=['localhost:9092']) for msg in consumer:recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)log(recv)

?如果是想讀取kafka記得的所有消費記錄:

from kafka import KafkaConsumer import time# kafka的服務器位置 kafka_servers = '10.10.6.194:9092' # 統一的topic名稱 topicName = 'test'def log(str):t = time.strftime(r"%Y-%m-%d_%H-%M-%S", time.localtime())print("[%s]%s" % (t, str))log('啟動消費者...') # auto_offset_reset='earliest' 這個參數很重要,如果加上了,就是kafka記錄的最后一條位置,如果不加,就是以后要插入的數據了。 #consumer = KafkaConsumer(topicName, auto_offset_reset='earliest', bootstrap_servers=kafka_servers) consumer = KafkaConsumer(topicName, bootstrap_servers=kafka_servers) for msg in consumer:recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)log(recv)

?

轉載于:https://www.cnblogs.com/littlehb/p/8438401.html

與50位技術專家面對面20年技術見證,附贈技術全景圖

總結

以上是生活随笔為你收集整理的大数据统计分析平台之一、Kafka单机搭建的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。