日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

SpingBoot 整合 kafka Elk

發布時間:2024/9/27 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SpingBoot 整合 kafka Elk 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

            • 1. 依賴
            • 2. yml配置
            • 3. 測試類
            • 4. aop攔截
            • 5. 并發隊列異步發送MQ
            • 6. 封裝json消息
            • 7. 完整封裝json消息

1. 依賴
<dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.79</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
2. yml配置

application.yml

server:port: 8080 spring:application:name: springboot-kafkakafka:bootstrap-servers: 192.168.92.137:9092producer:batch-size: 16384buffer-memory: 33544432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default_consumer_groupauto-commit-interval: 1000auto-offset-reset: earliestenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3. 測試類
package com.gblfy.elk.controller;import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;@RestController public class KafkaController {@GetMapping("/healthAdvice")public String healthAdvice() {return "healthAdvice";}@GetMapping("/errorAdvice")public String errorAdvice(@RequestParam("userId") Integer userId) {Integer i = 1 / userId;return "success";} }
4. aop攔截

演示:
AOP前置通知
后置通知
異常通知

package com.gblfy.elk.aop;import com.alibaba.fastjson.JSONObject; import com.gblfy.elk.queue.AsynConcurrentQueue; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest; import java.net.InetAddress; import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date;@Aspect @Component public class AopLogAspect {@Value("${server.port}")private String serverPort;// 申明一個切點 里面是 execution表達式@Pointcut("execution(* com.gblfy.elk.controller.*.*(..))")private void serviceAspect() {}@Autowiredprivate AsynConcurrentQueue asynConcurrentQueue;// 請求method前打印內容@Before(value = "serviceAspect()")public void methodBefore(JoinPoint joinPoint) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 設置日期格式jsonObject.put("request_time", df.format(new Date()));jsonObject.put("request_url", request.getRequestURL().toString());jsonObject.put("request_method", request.getMethod());jsonObject.put("signature", joinPoint.getSignature());jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);JSONObject requestJsonObject = new JSONObject();requestJsonObject.put("request", jsonObject);// 將日志信息投遞到kafka中String log = requestJsonObject.toJSONString();asynConcurrentQueue.put(log);}// 在方法執行完結后打印返回內容@AfterReturning(returning = "o", pointcut = "serviceAspect()")public void methodAfterReturing(Object o) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject respJSONObject = new JSONObject();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 設置日期格式jsonObject.put("response_time", df.format(new Date()));jsonObject.put("response_content", JSONObject.toJSONString(o));// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);respJSONObject.put("response", jsonObject);asynConcurrentQueue.put(respJSONObject.toJSONString());}/*** 異常通知** @param point*/@AfterThrowing(pointcut = "serviceAspect()", throwing = "e")public void serviceAspect(JoinPoint point, Exception e) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 設置日期格式jsonObject.put("request_time", df.format(new Date()));jsonObject.put("request_url", request.getRequestURL().toString());jsonObject.put("request_method", request.getMethod());jsonObject.put("signature", point.getSignature());jsonObject.put("request_args", Arrays.toString(point.getArgs()));jsonObject.put("error", e.toString());// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);JSONObject requestJsonObject = new JSONObject();requestJsonObject.put("request", jsonObject);// 將日志信息投遞到kafka中String log = requestJsonObject.toJSONString();asynConcurrentQueue.put(log);}public static String getIpAddr(HttpServletRequest request) {//X-Forwarded-For(XFF)是用來識別通過HTTP代理或負載均衡方式連接到Web服務器的客戶端最原始的IP地址的HTTP請求頭字段。String ipAddress = request.getHeader("x-forwarded-for");if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getHeader("Proxy-Client-IP");}if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getHeader("WL-Proxy-Client-IP");}if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress = request.getRemoteAddr();if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1")) {//根據網卡取本機配置的IPInetAddress inet = null;try {inet = InetAddress.getLocalHost();} catch (UnknownHostException e) {e.printStackTrace();}ipAddress = inet.getHostAddress();}}//對于通過多個代理的情況,第一個IP為客戶端真實IP,多個IP按照','分割if (ipAddress != null && ipAddress.length() > 15) { //"***.***.***.***".length() = 15if (ipAddress.indexOf(",") > 0) {ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));}}return ipAddress;} }
5. 并發隊列異步發送MQ

這里采用異步發消息到MQ,是為了解決同步發送消息到MQ和業務線程同屬于一個主線程帶來的延遲問題。

package com.gblfy.elk.queue;import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component;import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque;@Component public class AsynConcurrentQueue {private static BlockingDeque<String> logDeque = new LinkedBlockingDeque<>();@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;public AsynConcurrentQueue() {// 初始化new AsynConcurrentQueue.LogThreadKafka().start();}/*** 存入日志** @param log*/public void put(String log) {logDeque.offer(log);}class LogThreadKafka extends Thread {@Overridepublic void run() {while (true) {String log = logDeque.poll();if (!StringUtils.isEmpty(log)) {// 將消息投遞kafka中kafkaTemplate.send("mayikt-log", log);}}}} }
6. 封裝json消息
{ "request_args": "[0]","request_method": "GET","error": "java.lang.ArithmeticException: / by zero","ip_addres": "192.168.92.1:8080","request_url": "http://localhost:8080/errorAdvice"}
7. 完整封裝json消息
{"request": {"request_time": "2022-03-11 20:45:50","signature": {"declaringType": "com.gblfy.elk.controller.KafkaController","declaringTypeName": "com.gblfy.elk.controller.KafkaController","exceptionTypes": [],"method": {"accessible": false,"annotatedExceptionTypes": [],"annotatedParameterTypes": [{"annotations": [],"declaredAnnotations": [],"type": "java.lang.Integer"}],"annotatedReceiverType": {"annotations": [],"declaredAnnotations": [],"type": "com.gblfy.elk.controller.KafkaController"},"annotatedReturnType": {"annotations": [],"declaredAnnotations": [],"type": "java.lang.String"},"annotations": [{"path": [],"headers": [],"name": "","produces": [],"params": [],"value": ["/errorAdvice"],"consumes": []}],"bridge": false,"declaringClass": "com.gblfy.elk.controller.KafkaController","default": false,"exceptionTypes": [],"genericExceptionTypes": [],"genericParameterTypes": ["java.lang.Integer"],"genericReturnType": "java.lang.String","modifiers": 1,"name": "errorAdvice","parameterAnnotations": [[{"name": "","value": "userId","defaultValue": "\n\t\t\n\t\t\n\n\t\t\t\t\n","required": true}]],"parameterCount": 1,"parameterTypes": ["java.lang.Integer"],"returnType": "java.lang.String","synthetic": false,"typeParameters": [],"varArgs": false},"modifiers": 1,"name": "errorAdvice","parameterNames": ["userId"],"parameterTypes": ["java.lang.Integer"],"returnType": "java.lang.String"},"request_args": "[0]","request_method": "GET","error": "java.lang.ArithmeticException: / by zero","ip_addres": "192.168.92.1:8080","request_url": "http://localhost:8080/errorAdvice"} } GET /mayikt_logs/_search

總結

以上是生活随笔為你收集整理的SpingBoot 整合 kafka Elk的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。