SpingBoot 整合 kafka Elk
生活随笔
收集整理的這篇文章主要介紹了
SpingBoot 整合 kafka Elk
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章目錄
- 1. 依賴
- 2. yml配置
- 3. 測試類
- 4. aop攔截
- 5. 并發隊列異步發送MQ
- 6. 封裝json消息
- 7. 完整封裝json消息
1. 依賴
<dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.79</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>2. yml配置
application.yml
server:port: 8080 spring:application:name: springboot-kafkakafka:bootstrap-servers: 192.168.92.137:9092producer:batch-size: 16384buffer-memory: 33544432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default_consumer_groupauto-commit-interval: 1000auto-offset-reset: earliestenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer3. 測試類
package com.gblfy.elk.controller;import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;@RestController public class KafkaController {@GetMapping("/healthAdvice")public String healthAdvice() {return "healthAdvice";}@GetMapping("/errorAdvice")public String errorAdvice(@RequestParam("userId") Integer userId) {Integer i = 1 / userId;return "success";} }4. aop攔截
演示:
AOP前置通知
后置通知
異常通知
5. 并發隊列異步發送MQ
這里采用異步發消息到MQ,是為了解決同步發送消息到MQ和業務線程同屬于一個主線程帶來的延遲問題。
package com.gblfy.elk.queue;import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component;import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque;@Component public class AsynConcurrentQueue {private static BlockingDeque<String> logDeque = new LinkedBlockingDeque<>();@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;public AsynConcurrentQueue() {// 初始化new AsynConcurrentQueue.LogThreadKafka().start();}/*** 存入日志** @param log*/public void put(String log) {logDeque.offer(log);}class LogThreadKafka extends Thread {@Overridepublic void run() {while (true) {String log = logDeque.poll();if (!StringUtils.isEmpty(log)) {// 將消息投遞kafka中kafkaTemplate.send("mayikt-log", log);}}}} }6. 封裝json消息
{ "request_args": "[0]","request_method": "GET","error": "java.lang.ArithmeticException: / by zero","ip_addres": "192.168.92.1:8080","request_url": "http://localhost:8080/errorAdvice"}7. 完整封裝json消息
{"request": {"request_time": "2022-03-11 20:45:50","signature": {"declaringType": "com.gblfy.elk.controller.KafkaController","declaringTypeName": "com.gblfy.elk.controller.KafkaController","exceptionTypes": [],"method": {"accessible": false,"annotatedExceptionTypes": [],"annotatedParameterTypes": [{"annotations": [],"declaredAnnotations": [],"type": "java.lang.Integer"}],"annotatedReceiverType": {"annotations": [],"declaredAnnotations": [],"type": "com.gblfy.elk.controller.KafkaController"},"annotatedReturnType": {"annotations": [],"declaredAnnotations": [],"type": "java.lang.String"},"annotations": [{"path": [],"headers": [],"name": "","produces": [],"params": [],"value": ["/errorAdvice"],"consumes": []}],"bridge": false,"declaringClass": "com.gblfy.elk.controller.KafkaController","default": false,"exceptionTypes": [],"genericExceptionTypes": [],"genericParameterTypes": ["java.lang.Integer"],"genericReturnType": "java.lang.String","modifiers": 1,"name": "errorAdvice","parameterAnnotations": [[{"name": "","value": "userId","defaultValue": "\n\t\t\n\t\t\n\n\t\t\t\t\n","required": true}]],"parameterCount": 1,"parameterTypes": ["java.lang.Integer"],"returnType": "java.lang.String","synthetic": false,"typeParameters": [],"varArgs": false},"modifiers": 1,"name": "errorAdvice","parameterNames": ["userId"],"parameterTypes": ["java.lang.Integer"],"returnType": "java.lang.String"},"request_args": "[0]","request_method": "GET","error": "java.lang.ArithmeticException: / by zero","ip_addres": "192.168.92.1:8080","request_url": "http://localhost:8080/errorAdvice"} } GET /mayikt_logs/_search總結
以上是生活随笔為你收集整理的SpingBoot 整合 kafka Elk的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: vsftpd 源码安装 linux/re
- 下一篇: ElasticSearch 动态映射与静