日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka配置动态SASL_SCRAM认证

發(fā)布時(shí)間:2023/12/14 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka配置动态SASL_SCRAM认证 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Kafka配置動態(tài)SASL_SCRAM認(rèn)證

  • 1.啟動Zookeeper和Kafka
  • 2.創(chuàng)建SCRAM證書
  • 3.維護(hù)SCRAM證書
    • 3.1查看SCRAM證書
    • 3.1刪除SCRAM證書
  • 4.服務(wù)端配置
  • 5.客戶端配置
  • 6.Java代碼測試
    • 6.1生產(chǎn)者
    • 6.2消費(fèi)者


Kafka中需要加上認(rèn)證,并動態(tài)新增用戶,SASL/SCRAM驗(yàn)證可以支持
本文章是對https://blog.csdn.net/qq_38616503/article/details/117529690中的內(nèi)容整理與重新記錄

1.啟動Zookeeper和Kafka

第一步,在沒有設(shè)置任何權(quán)限的配置下啟動Kafka和Zookeeper,如需要從頭安裝Kafka,可參見Kafka的安裝單機(jī)安裝以及集群安裝

2.創(chuàng)建SCRAM證書

(1)創(chuàng)建broker通信用戶:admin(在使用sasl之前必須先創(chuàng)建,否則啟動報(bào)錯(cuò))

bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-sec],SCRAM-SHA-512=[password=admin-sec]' --entity-type users --entity-name admin

(2)創(chuàng)建生產(chǎn)用戶producer

bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=prod-sec],SCRAM-SHA-512=[password=prod-sec]' --entity-type users --entity-name producer

(2)創(chuàng)建消費(fèi)用戶:consumer

bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=cons-sec],SCRAM-SHA-512=[password=cons-sec]' --entity-type users --entity-name consumer

SCRAM-SHA-256/SCRAM-SHA-512是對密碼加密的算法,二者有其一即可

3.維護(hù)SCRAM證書

3.1查看SCRAM證書

bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --describe --entity-type users --entity-name consumer bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --describe --entity-type users --entity-name producer

3.1刪除SCRAM證書

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name producer

4.服務(wù)端配置

在用戶證書創(chuàng)建完畢之后開始Kafka服務(wù)端的配置
(1)創(chuàng)建JAAS文件:

cat > kafka_server_jaas.conf << EOF KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-sec"; }; EOF

(2)將JAAS配置文件位置作為JVM參數(shù)傳遞給每個(gè)Kafka Broker【bin/kafka-server-start.sh】添加-Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_server_jaas.conf

-Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_server_jaas.conf exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_server_jaas.conf kafka.Kafka "$@"

(3)配置server.properties【config/server.properties】

#認(rèn)證配置 listeners=SASL_PLAINTEXT://IP:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 sasl.enabled.mechanisms=SCRAM-SHA-256 advertised.listeners=SASL_PLAINTEXT://IP:9092#ACL配置 allow.everyone.if.no.acl.found=false super.users=User:admin authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

可以根據(jù)自己的需求選擇SASL_SSL或SASL_PLAINTEXT, PLAINTEXT為不加密明文傳輸,性能好一點(diǎn)。配置完后重啟Kafka和Zookeeper

5.客戶端配置

(1)創(chuàng)建的三個(gè)用戶的三個(gè)JAAS文件:
kafka_client_scram_admin_jaas.conf
kafka_client_scram_producer_jaas.conf
kafka_client_scram_consumer_jaas.conf

cat > kafka_client_scram_admin_jaas.conf << EOF KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-sec"; }; EOFcat > kafka_client_scram_producer_jaas.conf << EOF KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="producer" password="prod-sec"; }; EOFcat > kafka_client_scram_consumer_jaas.conf << EOF KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="cons-sec"; }; EOF

(2)修改啟動腳本引入JAAS文件

###生產(chǎn)者配置bin/kafka-console-producer.sh exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_client_scram_producer_jaas.conf###消費(fèi)者配置bin/kafka-console-consumer.sh exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_client_scram_consumer_jaas.conf

(3)配置consumer.properties和producer.properties,都加入以下配置

security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 bootstrap.servers=192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092

(4)創(chuàng)建主題

bin/kafka-topics.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --create --topic topictest --partitions 3 --replication-factor 1

(5)啟動生產(chǎn)

bin/kafka-console-producer.sh --broker-list 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092 --topic topictest --producer.config config/producer.properties

(6)對生產(chǎn)者賦予寫的權(quán)限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --add --allow-principal User:producer --operation Write --topic topictest

(7)查看權(quán)限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --list

(8)對消費(fèi)者賦予讀的權(quán)限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --add --allow-principal User:consumer --operation Read --topic topictest

(9)對消費(fèi)者賦予組的權(quán)限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --add --allow-principal User:consumer --operation Read --group test-consumer-group

(10)啟動消費(fèi)者

bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092 --topic topictest --from-beginning --consumer.config config/consumer.properties

6.Java代碼測試

6.1生產(chǎn)者

maven的pom.xml

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.0</version><!-- <version>0.10.2.0</version> --></dependency>

kafka_client_scram_producer_jaas.conf

KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="producer" password="prod-sec"; };

代碼:

import java.util.Properties; import java.util.concurrent.Future;import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;public class MySaslScramProducer {public static MySaslScramProducer ins ;private Producer<String, String> producer;private MySaslScramProducer(){Properties props = new Properties();props.put("bootstrap.servers","192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092");props.put("acks", "1");props.put("retries", 3);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);//props.put("compression.type","gzip");//props.put("max.request.size","5242880");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//配置文件采用項(xiàng)目相對路徑訪問,plan-text鑒權(quán)將以下注解開放即可System.out.println(MySaslScramProducer.class.getResource("/").getPath() + "kafka_client_scram_producer_jaas.conf");System.setProperty("java.security.auth.login.config", MySaslScramProducer.class.getResource("/").getPath() + "kafka_client_scram_producer_jaas.conf");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-256");producer = new KafkaProducer<>(props);}public static MySaslScramProducer getIns(){if(ins == null) {synchronized (MySaslScramProducer.class) {if(ins == null) {ins = new MySaslScramProducer();}}}return ins;}public Future<RecordMetadata> send(String topic, String valueStr){//采用異步發(fā)送,在失敗時(shí)打印出失敗的日志,備核查Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>(topic, valueStr), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception != null) {//發(fā)送失敗的打印出來到error.logSystem.out.println("sendi failed--->>> " + valueStr);}else {System.out.println("topic:" + metadata.topic() + " ,partition:" +metadata.partition() +" , offset:" + metadata.offset() + " -> " + valueStr);}}});return meta;}public void close(){if(producer != null) producer.close();}public static void main(String[] args) throws InterruptedException {String valueStr = "{\"metric\":\"host.mem.pused\",\"value\":\"97.781098\",\"tags\":{\"resCi\":\"TA_RES_PHYSICALHOST\",\"dataType\":0,\"ip\":\"132.121.93.69\",\"cmd\":\"\",\"resId\":\"auto217A77657DDC70403B949090D3EA5543\",\"itemKey\":\"vm.memory.size[pavailable]\"},\"timestamp\":\"1617673320000\"}";MySaslScramProducer.getIns().send("topictest", valueStr);MySaslScramProducer.getIns().close();} }

6.2消費(fèi)者

kafka_client_scram_consumer_jaas.conf

KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="cons-sec"; };

代碼:

package cn.gzsendi;import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class SaslScramTopicTest {public static boolean stop = false;private static Logger logger = LoggerFactory.getLogger(SaslScramTopicTest.class);public static void main(String[] args) {KafkaConsumer<String, String> consumer = null;Properties props = new Properties();props.put("bootstrap.servers","192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092");props.put("group.id", "liutest");props.put("enable.auto.commit", "true"); // 自動提交props.put("auto.offset.reset", "latest");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "300000");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");System.setProperty("java.security.auth.login.config", SaslScramTopicTest.class.getResource("/").getPath() + "kafka_client_scram_consumer_jaas.conf");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-256");consumer = new KafkaConsumer<>(props);String topicName = "topictest";consumer.subscribe(Arrays.asList(topicName));while (!stop) {try {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {String valueStr = record.value();try {logger.info(valueStr);logger.info("topic:" + record.topic() +" ,partition:" + record.partition() + " ,offset:" +record.offset() + " -> " + record.value());} catch (Exception e) {System.out.println("error------->>> " + valueStr);}}} catch (Exception e) {e.printStackTrace();}}if (consumer != null)consumer.close();}/**** <跳過歷史數(shù)據(jù),從最新的數(shù)據(jù)開始消費(fèi)>** @param consumer* @throws*/public static void assignOffset(KafkaConsumer<String, String> consumer) {if (consumer == null) {return;}Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<TopicPartition, OffsetAndMetadata>();consumer.poll(100);Set<TopicPartition> assignment = consumer.assignment();consumer.seekToEnd(assignment);//consumer.seekToBeginning(assignment);for (TopicPartition topicPartition : assignment) {long position = consumer.position(topicPartition);offsetMap.put(topicPartition, new OffsetAndMetadata(position));consumer.commitSync(offsetMap);}} }

總結(jié)

以上是生活随笔為你收集整理的Kafka配置动态SASL_SCRAM认证的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。