Kafka动态认证SASL/SCRAM配置+整合springboot配置
記錄:
zookeeper啟動(dòng)命令:
[root@master-yzjgxh2571705819-1651919082731-99-0727183 bin]# ./zkServer.sh start
[root@master-yzjgxh2571705819-1651919082731-99-0727183 bin]# ./zkServer.sh stop
kafka啟動(dòng)命令:
/data/program/kafka2.12/bin/kafka-server-start.sh /data/program/kafka2.12/config/server.properties
創(chuàng)建SCRAM證書
1)創(chuàng)建broker建通信用戶:admin(在使用sasl之前必須先創(chuàng)建,否則啟動(dòng)報(bào)錯(cuò))
bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter?--add-config 'SCRAM-SHA-256=[password=admin-sec],
SCRAM-SHA-512=[password=admin-sec]' --entity-type users --entity-name admin
2)創(chuàng)建生產(chǎn)用戶:producer
bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter?
--add-config 'SCRAM-SHA-256=[iterations=8192,password=prod-sec],
SCRAM-SHA-512=[password=prod-sec]' --entity-type users --entity-name producer
?3)創(chuàng)建消費(fèi)用戶:consumer
bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter?
--add-config 'SCRAM-SHA-256=[iterations=8192,password=cons-sec],
SCRAM-SHA-512=[password=cons-sec]' --entity-type users --entity-name consumer
SCRAM-SHA-256/SCRAM-SHA-512是對(duì)密碼加密的算法,二者有其一即可
查看SCRAM證書
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name consumer
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name producer
服務(wù)端配置
1)創(chuàng)建JAAS文件
vi config/kafka_server_jaas.conf
?內(nèi)容:
?KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-sec";
};
2)將JAAS配置文件位置作為JVM參數(shù)傳遞給每個(gè)Kafka Broker【bin/kafka-server-start.sh】添加
exec $base_dir/kafka-run-class.sh?
$EXTRA_ARGS -Djava.security.auth.login.config
=/home/test/kiki/kafka/ka/config/kafka_server_jaas.conf kafka.Kafka "$@"
操作:
vi bin/kafka-server-start.sh?
?修改最后一行
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data/program/kafka2.12/config/kafka_server_jaas.conf kafka.Kafka "$@"
3)配置server.properties【config/server.properties】
#認(rèn)證配置
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
#ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
當(dāng)前測試kafka端口為8100
?vi config/server.properties?
#listeners=PLAINTEXT://:8100
listeners=SASL_PLAINTEXT://:8100
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512
advertised.listeners=SASL_PLAINTEXT://183.56.218.28:8100
#ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
個(gè)人備注:
log.dirs=/data/program/kafka2.12/data
zookeeper.connect=183.56.218.28:2181
SCRAM-SHA-512與SCRAM-SHA-216可互相更改,看需要什么類型。PLAINTEXT為不需要認(rèn)證
4)重啟Kafka和Zookeeper
客戶端配置
?1)為我們創(chuàng)建的三個(gè)用戶分別創(chuàng)建三個(gè)JAAS文件:分別命名為
kafka_client_scram_admin_jaas.conf
kafka_client_scram_producer_jaas.conf
kafka_client_scram_consumer_jaas.conf
vi bin/kafka_client_scram_admin_jaas.conf
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-sec"; };
vi bin/kafka_client_scram_producer_jaas.conf
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="producer" password="prod-sec"; };
vi bin/kafka_client_scram_consumer_jaas.conf
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="cons-sec"; };
2)修改啟動(dòng)腳本引入JAAS文件:
生產(chǎn)者配置:
配置bin/kafka-console-producer.sh
exec $(dirname $0)/kafka-run-class.sh?
-Djava.security.auth.login.config
=/data/program/kafka2.12/config/kafka_client_scram_producer_jaas.conf
消費(fèi)者配置:
配置bin/kafka-console-consumer.sh
exec $(dirname $0)/kafka-run-class.sh?
-Djava.security.auth.login.config
=/data/program/kafka2.12/config/kafka_client_scram_consumer_jaas.conf
3)配置consumer.properties和producer.properties,都要加入以下配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
4)創(chuàng)建主題
[test@police ka]$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 2 --replication-factor 1
5)啟動(dòng)生產(chǎn)(ps:結(jié)束也未能成功測試該命令是否能用,后面在代碼方面配置就好)
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test --producer.config config/producer.properties
發(fā)現(xiàn)會(huì)報(bào)權(quán)限相關(guān)的錯(cuò)
6)對(duì)生產(chǎn)者賦予寫的權(quán)限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add
?--allow-principal User:producer --operation Write --topic test
7)對(duì)消費(fèi)者賦予讀的權(quán)限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add
?--allow-principal User:consumer --operation Read --topic test
此時(shí)啟動(dòng)消費(fèi)者(ps:結(jié)束也未能成功測試該命令是否能用,后面在代碼方面配置就好)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer.properties
此時(shí)依舊會(huì)報(bào)錯(cuò),報(bào)未對(duì)消費(fèi)者組授權(quán)。給groupId配權(quán)
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add?
--allow-principal User:consumer --operation Read --group test-group
此時(shí)再啟動(dòng)消費(fèi)者,可以發(fā)現(xiàn)能正常消費(fèi)生產(chǎn)者的消息
8)查看權(quán)限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer?
--authorizer-properties zookeeper.connect=localhost:2181 --list
springboot整合配置
權(quán)限主要配置部分格式:
? ? ? props.put("security.protocol", "SASL_PLAINTEXT");
? ? ? props.put("sasl.mechanism", "SCRAM-SHA-512");
? ? ? props.put("sasl.jaas.config",
? ? ? ? ? ? "org.apache.kafka.common.security.scram.ScramLoginModule required username='easy' password='easy1234';");
生產(chǎn)者:
//異步發(fā)送@Testfun customProducer() {//配置val properties = Properties()//鏈接kafkaproperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8100"//指定對(duì)應(yīng)key和value的序列化類型(二選一) // properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_PLAINTEXT"properties[SaslConfigs.SASL_MECHANISM] = "SCRAM-SHA-512"properties[SaslConfigs.SASL_JAAS_CONFIG] = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"producer\" password=\"prod-sec\";"val kafkaProducer = KafkaProducer<String, String>(properties)//發(fā)送數(shù)據(jù)for (i in 0 until 1) {//黏性發(fā)送,達(dá)到設(shè)置的數(shù)據(jù)最大值/時(shí)間后,切換分區(qū)(不會(huì)是當(dāng)前分區(qū))kafkaProducer.send(ProducerRecord("test", "我是成功:::${LocalDateTime.now()}"))}//"type":"UPDATE/ADD/DELETE"//關(guān)閉資源kafkaProducer.close()}消費(fèi)者
package com.umh.medicalbookingplatform.background.configimport com.umh.medicalbookingplatform.core.properties.ApplicationProperties import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.serialization.StringDeserializer import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.annotation.EnableKafka import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory/*** @Description :* @Author xiaomh* @date 2022/8/30 14:14*/@EnableKafka @Configuration class KafkaConsumerConfig {@Autowiredprivate lateinit var appProperties: ApplicationProperties@Beanfun consumerFactory(): ConsumerFactory<String?, String?> {val props: MutableMap<String, Any> = HashMap()props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = appProperties.kafkaBootstrapServersConfig.toString()props[ConsumerConfig.GROUP_ID_CONFIG] = appProperties.kafkaGroupId.toString()props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.javaprops[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.javaprops[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.javaprops[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = appProperties.kafkaSecurityProtocol.toString()props[SaslConfigs.SASL_MECHANISM] = appProperties.kafkaSaslMechanism.toString()props[SaslConfigs.SASL_JAAS_CONFIG] = appProperties.kafkaSaslJaasConfig.toString()return DefaultKafkaConsumerFactory(props)}@Beanfun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String>? {val factory = ConcurrentKafkaListenerContainerFactory<String, String>()factory.setConsumerFactory(consumerFactory())return factory} }yml
kafkaBootstrapServersConfig: xxxxxx:8100 kafkaGroupId: test-group kafkaSecurityProtocol: SASL_PLAINTEXT kafkaSaslMechanism: SCRAM-SHA-512 kafkaSaslJaasConfig: org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="cons-sec";?參考
Kafka動(dòng)態(tài)認(rèn)證SASL/SCRAM驗(yàn)證_慕木兮人可的博客-CSDN博客
總結(jié)
以上是生活随笔為你收集整理的Kafka动态认证SASL/SCRAM配置+整合springboot配置的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 突破灰色按钮原理讲解
- 下一篇: 锐捷交换机配置保存到计算机,锐捷交换机备