Kafka安全认证授权配置
Kafka安全認(rèn)證授權(quán)配置
- 一 概述
- 1.1 Kafka的權(quán)限分類
- 1.2 實(shí)現(xiàn)方式
- 二 安全認(rèn)證授權(quán)配置
- 2.1 zookeeper配置
- 2.1.1 引入kafka依賴包
- 2.1.2修改ZK配置文件
- 2.1.3 創(chuàng)建jaas文件
- 2.1.4 修改zkEnv.sh
- 2.1.5 啟動(dòng)zk
- 2.2 Kafka配置
- 2.2.1 創(chuàng)建超級(jí)用戶
- 2.2.2 創(chuàng)建jaas文件
- 2.2.3 修改kafka配置文件
- 2.2.4啟動(dòng)kafka
- 三 測(cè)試連接
- 3.1 生產(chǎn)者測(cè)試
- 3.2 消費(fèi)者測(cè)試
- 3.3 beats工具連接
- 3.4 logstash消費(fèi)
一 概述
1.1 Kafka的權(quán)限分類
1)身份認(rèn)證(Authentication):對(duì)client 與服務(wù)器的連接進(jìn)行身份認(rèn)證,brokers和zookeeper之間的連接進(jìn)行Authentication(producer 和 consumer)、其他 brokers、tools與 brokers 之間連接的認(rèn)證。
2)權(quán)限控制(Authorization):實(shí)現(xiàn)對(duì)于消息級(jí)別的權(quán)限控制,clients的讀寫操作進(jìn)行Authorization:(生產(chǎn)/消費(fèi)/group)數(shù)據(jù)權(quán)限。
1.2 實(shí)現(xiàn)方式
自0.9.0.0版本開始Kafka社區(qū)添加了許多功能用于提高Kafka集群的安全性,Kafka提供SSL或者SASL兩種安全策略。SSL方式主要是通過CA令牌實(shí)現(xiàn),此方案主要介紹SASL方式。
1)SASL驗(yàn)證:
| SASL/PLAIN | 0.10.0.0 | 不能動(dòng)態(tài)添加用戶 |
| SASL/SCRAM | 0.10.2.0 | 可以動(dòng)態(tài)添加用戶 |
| SASL/Kerberos | 0.9.0.0 | 需要獨(dú)立部署驗(yàn)證服務(wù) |
| SASL/OAUTHBEARER | 2.0.0 | 需自己實(shí)現(xiàn)接口實(shí)現(xiàn)token的創(chuàng)建和驗(yàn)證,需要額外的oauth服務(wù) |
2)SSL加密: 使用SSL加密在代理和客戶端之間,代理之間或代理和工具之間傳輸?shù)臄?shù)據(jù)。
二 安全認(rèn)證授權(quán)配置
本文檔主要演示SASL/SCRAM和SASL/PLAIN 搭配使用的方案。版本:kafka_2.12-2.5.1.tgz、apache-zookeeper-3.5.8-bin.tar.gz
2.1 zookeeper配置
2.1.1 引入kafka依賴包
將kafka包下面的相關(guān)依賴包復(fù)制到zookeeper的目錄下,不同的kafka
版本jar包版本會(huì)不一樣。
2.1.2修改ZK配置文件
Zookeeper的配置文件zoo.cfg中添加如下內(nèi)容:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=36000002.1.3 創(chuàng)建jaas文件
在zookeeper的conf目錄下添加安全認(rèn)證的jaas文件,該文件定義需要鏈接到Zookeeper服務(wù)器的用戶名和密碼。這里定義文件名為zk-server-jaas.conf,文件內(nèi)容為:
Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="zoo_admin" password="zoo_admin" user_kafka="kafka"; };該文件中username和password是定義zookeeper集群節(jié)點(diǎn)之間認(rèn)證的用戶名和密碼;user_開頭配置的用戶kafka和密碼kafka是提供給外部應(yīng)用連接zookeeper使用的,后面kafka使用此用戶連接zookeeper。
2.1.4 修改zkEnv.sh
將上一步添加的jaas配置文件添加到zookeeper的環(huán)境變量中,zkEnv.sh文件最后添加一行:
export SERVER_JVMFLAGS="-Djava.security.auth.login.config=/mnt/datadisk/zookeeper/conf/zk_server_jaas.conf"2.1.5 啟動(dòng)zk
nohup bin/zkServer.sh start &2.2 Kafka配置
Kafka和生產(chǎn)者/消費(fèi)者之間采用SASL/PLAIN和SASL/SCRAM兩種方式共同完成認(rèn)證,授權(quán)使用ACL方式。PLAIN方式的用戶是在jaas文件中寫死的,不能動(dòng)態(tài)的添加;SCRAM支持動(dòng)態(tài)的添加用戶。
2.2.1 創(chuàng)建超級(jí)用戶
配置SASL/SCRAM認(rèn)證的第一步,是配置可以連接到kafka集群的用戶。本方案演示創(chuàng)建3個(gè)用戶:kafka_server_admin,writer,reader。kafka_server_admin用戶用于broker之間的認(rèn)證通信,writer用戶用于生產(chǎn)者連接kafka,reader用戶用于消費(fèi)者連接kafka。
bin/kafka-configs.sh --zookeeper 192.168.13.202:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name kafka_server_admin正常情況打印信息:Completed updating config for entity: user-principal ‘a(chǎn)dmin’.
bin/kafka-configs.sh --zookeeper 192.168.13.202:2181 --alter --add-config 'SCRAM-SHA-256=[password=writer],SCRAM-SHA-512=[password=writer]' --entity-type users --entity-name writer bin/kafka-configs.sh --zookeeper 192.168.13.202:2181 --alter --add-config 'SCRAM-SHA-256=[password=reader],SCRAM-SHA-512=[password=reader]' --entity-type users --entity-name reader在zk客戶端中可以查看創(chuàng)建成功的用戶:
bin/zkCli.sh # 客戶端打開后執(zhí)行下面命令,查看用戶節(jié)點(diǎn),可以看到創(chuàng)建成功的用戶 ls /config/userskafka-configs 腳本是用來設(shè)置主題級(jí)別參數(shù)的。其實(shí),它的功能還有很多。比如在這個(gè)例子中,我們使用它來創(chuàng)建 SASL/SCRAM 認(rèn)證中的用戶信息。可以使用下列命令來查看剛才創(chuàng)建的用戶數(shù)據(jù)。
bin/kafka-configs.sh --zookeeper 192.168.13.202:2181 --describe --entity-type users --entity-name writer這段命令包含了 writer 用戶加密算法 SCRAM-SHA-256 以及 SCRAM-SHA-512 對(duì)應(yīng)的鹽值 (Salt)、ServerKey 和 StoreKey,這些都是 SCRAM 機(jī)制的術(shù)語。
2.2.2 創(chuàng)建jaas文件
配置了用戶之后,我們需要為 Broker 創(chuàng)建一個(gè)對(duì)應(yīng)的 JAAS 文件。在實(shí)際場(chǎng)景中,需要為每臺(tái)單獨(dú)的物理 Broker 機(jī)器都創(chuàng)建一份 JAAS 文件。
# 在kafka目錄下創(chuàng)建文件 vi config/kafka_server_jaas.confkafka_server_jaas.conf文件的內(nèi)容為:
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka_server_admin" password="admin";org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_filebeat="123456"; };Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka"; };KafkaServer配置的是kafka的賬號(hào)和密碼。Client配置了broker到Zookeeper的連接用戶名密碼,這里要和前面2.1.3節(jié)zookeeper配置中的zk_server_jaas.conf中user_kafka的賬號(hào)和密碼相同。中間部分配置的是PLAIN認(rèn)證方式的賬戶和密碼,其中filebeat是賬戶名,123456是密碼。
關(guān)于這個(gè)文件內(nèi)容,需要注意以下兩點(diǎn):
2.2.3 修改kafka配置文件
server.properties文件中添加如下內(nèi)容:
# 啟用ACL allow.everyone.if.no.acl.found=false authorizer.class.name=kafka.security.authorizer.AclAuthorizer # 設(shè)置本例中admin為超級(jí)用戶;在Zookeeper的“/kafka/config/users”下存在用戶 super.users=User:kafka_server_admin # 同時(shí)啟用SCRAM和PLAIN機(jī)制 sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN # 為broker間通訊開啟SCRAM機(jī)制,采用SCRAM-SHA-512算法 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 # broker間通訊使用PLAINTEXT,本例中不演示SSL配置 security.inter.broker.protocol=SASL_PLAINTEXT # 配置listeners使用SASL_PLAINTEXT listeners=SASL_PLAINTEXT://:9092 # 配置advertised.listeners advertised.listeners=SASL_PLAINTEXT://192.168.13.202:9092啟動(dòng)腳本kafka-server-start.sh文件最后一行注釋掉,修改為:
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@" exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/mnt/datadisk/kafka/config/kafka_server_jaas.conf kafka.Kafka "$@"kafka配置文件server.properties完整配置如下:
# kafka不同節(jié)點(diǎn)的唯一標(biāo)識(shí) broker.id=0#認(rèn)證配置 listeners=SASL_PLAINTEXT://:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN #ACL配置 allow.everyone.if.no.acl.found=false super.users=User:kafka_server_admin authorizer.class.name=kafka.security.authorizer.AclAuthorizer# ip為本機(jī)ip advertised.listeners=SASL_PLAINTEXT://192.168.13.202:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 # 必改項(xiàng) 數(shù)據(jù)和日志分離, kafka中數(shù)據(jù)的存放目錄 log.dirs=/mnt/datadisk/kafka/data # 默認(rèn)的分區(qū)數(shù)量 num.partitions=3 # 默認(rèn)的副本數(shù)量 default.replication.factor=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 # zookeeper集群地址 zookeeper.connect=192.168.13.202:2181 zookeeper.connection.timeout.ms=90000 # 可以刪除topic delete.topic.enable=true group.initial.rebalance.delay.ms=0 # 消息體大小 message.max.bytes=10485760 socket.request.max.bytes=524288000 replica.lag.time.max.ms=1200002.2.4啟動(dòng)kafka
nohup bin/kafka-server-start.sh config/server.properties &三 測(cè)試連接
3.1 生產(chǎn)者測(cè)試
使用kafka-console-producer.sh腳本測(cè)試生產(chǎn)者,由于開啟安全認(rèn)證和授權(quán),因此客戶端需要做相應(yīng)的配置。config目錄下創(chuàng)建一個(gè)producer.conf的文件,文件內(nèi)容如下:
security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writer";創(chuàng)建一個(gè)測(cè)試主題test_topic:
bin/kafka-topics.sh --zookeeper 192.168.13.202:2181 --create --replication-factor 1 --partitions 1 --topic test_topic這里使用writer用戶認(rèn)證授權(quán),通過ACL為writer用戶分配操作test_topic權(quán)限(下面兩個(gè)命令二選一即可):
1.分配寫的權(quán)限
2.分配producer權(quán)限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:writer --producer --topic 'test_topic'啟動(dòng)生產(chǎn)者發(fā)送消息:
bin/kafka-console-producer.sh --broker-list 192.168.13.202:9092 --topic test_topic --producer.config /mnt/datadisk/kafka/config/producer.conf3.2 消費(fèi)者測(cè)試
使用kafka-console-consumer.sh腳本測(cè)試生產(chǎn)者,由于開啟安全認(rèn)證和授權(quán),因此客戶端需要做相應(yīng)的配置。config目錄下創(chuàng)建一個(gè)consumer.conf的文件,文件內(nèi)容如下:
security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader";這里使用reader用戶認(rèn)證授權(quán),通過ACL為reader用戶分配操作test_topic權(quán)限(下面兩個(gè)命令二選一即可):
1.分配讀的權(quán)限
2.分配consumer的權(quán)限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --consumer --topic test_topic --group test_group啟動(dòng)消費(fèi)者消費(fèi)消息:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.13.202:9092 --topic test_topic --group 'test_group' --from-beginning --consumer.config /mnt/datadisk/kafka/config/consumer.conf3.3 beats工具連接
kafka官網(wǎng)提供了許多beats采集工具,用于不同場(chǎng)景下采集數(shù)據(jù),感興趣的可以去官網(wǎng)了解下,這里就不介紹了。筆者這里使用的是beats工具是7.9版本的,采集的數(shù)據(jù)如果要發(fā)送到kafka,只支持SASL/PLAIN認(rèn)證方式,據(jù)說高版本的beats工具支持SCRAM方式的。這里介紹下filebeat-7.9.1的配置方式。PLAIN類型的用戶在2.2.2章節(jié)已經(jīng)創(chuàng)建,在filebeat中用此用戶名和密碼即可完成認(rèn)證,授權(quán)參考3.1章節(jié)。
filebeat的配置文件filebeat.yml中輸出kafka配置中加入用戶名和密碼,如下:
可以啟動(dòng)一個(gè)消費(fèi)者監(jiān)控filebeat采集的數(shù)據(jù),啟動(dòng)filebeat,并在filebeat采集的文件中輸入內(nèi)容,可以看到消費(fèi)者可以獲取對(duì)應(yīng)的消息:
3.4 logstash消費(fèi)
logstash可以使用SCRAM方式認(rèn)證,logstash的config目錄下新增kafka-client-jaas.conf文件,文件內(nèi)容如下:
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader"; };修改congfig目錄下對(duì)應(yīng)的conf處理文件,在input模塊中引入SCRAM認(rèn)證:
security_protocol => "SASL_PLAINTEXT" sasl_mechanism => "SCRAM-SHA-256" jaas_path => "/mnt/datadisk/logstash/config/kafka-client-jaas.conf"
啟動(dòng)生產(chǎn)者往logstash監(jiān)控的topic中發(fā)送數(shù)據(jù),測(cè)試如下:
下一篇:【Java版 Kafka ACL使用實(shí)戰(zhàn)】
總結(jié)
以上是生活随笔為你收集整理的Kafka安全认证授权配置的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python之多张图片拼接
- 下一篇: 模拟集成电路学习