Kafka SASL SCRAM授权java开发demo
生活随笔
收集整理的這篇文章主要介紹了
Kafka SASL SCRAM授权java开发demo
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
有道筆記鏈接:點擊這里
一.消費者(Consumer)測試demo
①安全協議(security.protocol)
②SASL mechanism配置(sasl.mechanism)
③授權信息(sasl.jaas.config)
public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.20.110:9092");properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");//安全協議properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");//SASL mechanism配置properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test\" password=\"test\";");//授權信息properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroupId");//2. 創建消費者KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(properties);//3. 訂閱主題consumer.subscribe(Arrays.asList("topiccccc"));//4. 拉取主題內新增的數據while (true) {ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(5));// 拉取的超時時間for (ConsumerRecord<Integer, String> record : records) {// 1 HelloWorld xxxx 0System.out.println(record.key() + " | " + record.value() + " | " + record.timestamp() + " | " + record.offset() + " | " + record.partition());}}}二.生產者(Producer)測試demo
①安全協議(security.protocol)
②SASL mechanism配置(sasl.mechanism)
③授權信息(sasl.jaas.config)
?
public static void main(String[] args) throws JSONException {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String date = format.format(new Date());//1. 創建配置對象 指定Producer的信息Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.20.110:9092");properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 對record的key進行序列化properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test\" password=\"test\";");//2. 創建Producer對象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);//3. 發布消息ProducerRecord<String, String> record = new ProducerRecord<String, String>("topiccccc","","{\n" +" \"data\": [\n" +" {\n" +" \"name\": null,\n" +" \"age\": 22,\n" +" \"sex\": \"ere\"\n" +" }\n" +" ]\n" +"}");producer.send(record);//4. 提交producer.flush();producer.close();}?
總結
以上是生活随笔為你收集整理的Kafka SASL SCRAM授权java开发demo的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 计算机操作系统期末考试试题及答案,计算机
- 下一篇: 计算机操作系统张尧学第四章课后答案,计算