springcloud gateway 自定义 accesslog elk
大家好,我是烤鴨:
? 最近用 springcloud gateway 時(shí),想使用類似 logback-access的功能,用來做數(shù)據(jù)統(tǒng)計(jì)和圖表繪制等等,發(fā)現(xiàn)沒有類似的功能,只能自己開發(fā)了。
環(huán)境:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId></dependency>整體思路
logback-access.jar 只需要在logback.xml 配置 LogstashAccessTcpSocketAppender 即可完成異步的日志上報(bào)。
如果采用相同的方式,考慮到同一個(gè)進(jìn)程里異步上報(bào)占性能。(其實(shí)是開發(fā)太麻煩了)
這里采用的本地日志文件 + elk。
仿照 logback-access ,定義要收集的字段,開發(fā)過濾器收集字段,自定義 logstash.yml。
收集到的字段:
“User-Agent” : 請(qǐng)求頭字段
“server_ip” :服務(wù)器ip
“Content-Length” : 請(qǐng)求參數(shù)長度
“request_uri” :請(qǐng)求路徑(網(wǎng)關(guān)轉(zhuǎn)發(fā)的路徑)
“host” :本機(jī)ip
“client_ip” :請(qǐng)求ip
“method” :get/post
“Host” : 請(qǐng)求頭字段
“params” :請(qǐng)求參數(shù)
“request_url” :請(qǐng)求全路徑
“thread_name” :當(dāng)前線程
“l(fā)evel” :日志級(jí)別
“cost_time” : 請(qǐng)求耗時(shí)
“l(fā)ogger_name” :日志類
“Protocol” : 請(qǐng)求頭字段
代碼實(shí)現(xiàn)
LoggingFilter
package com.xxx.gateway.filter;import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.core.Ordered; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.springframework.util.MultiValueMap; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono;import java.nio.charset.StandardCharsets; import java.util.Map;@Slf4j @Component public class LoggingFilter implements GlobalFilter, Ordered {private static final String UNKNOWN = "unknown";private static final String METHOD = "method";private static final String PARAMS = "params";private static final String REQUEST_URI = "request_uri";private static final String REQUEST_URL = "request_url";private static final String CLIENT_IP = "client_ip";private static final String SERVER_IP = "server_ip";private static final String HOST = "Host";private static final String COST_TIME = "cost_time";private static final String CID = "cid";private static final String CONTENT_LENGTH = "Content-Length";private static final String PROTOCOL = "Protocol";private static final String REQID = "reqid";private static final String USER_AGENT = "User-Agent";private static final String START_TIME = "gw_start_time";private static final String LOGINFOCOLLECTOR = "logInfoCollector";/*** Process the Web request and (optionally) delegate to the next {@code WebFilter}* through the given {@link GatewayFilterChain}.** @param exchange the current server exchange* @param chain provides a way to delegate to the next filter* @return {@code Mono<Void>} to indicate when request processing is complete*/@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request = exchange.getRequest();String requestUrl = request.getPath().toString();Map<String, Object> logInfoCollector = Maps.newLinkedHashMap();logInfoCollector.put(CLIENT_IP, getIpAddress(request));logInfoCollector.put(SERVER_IP, request.getURI().getHost());logInfoCollector.put(HOST, getHeaderValue(request, HOST));logInfoCollector.put(METHOD, request.getMethodValue());logInfoCollector.put(REQUEST_URI, request.getURI().getPath());logInfoCollector.put(REQUEST_URL, getRequestUrl(request));logInfoCollector.put(PARAMS, request.getURI().getQuery());logInfoCollector.put(CID, getHeaderValue(request, CID));logInfoCollector.put(CONTENT_LENGTH, request.getHeaders().getContentLength());logInfoCollector.put(PROTOCOL, getHeaderValue(request, PROTOCOL));logInfoCollector.put(REQID, getHeaderValue(request, REQID));logInfoCollector.put(USER_AGENT, getHeaderValue(request, USER_AGENT));exchange.getAttributes().put(START_TIME, System.currentTimeMillis());exchange.getAttributes().put(LOGINFOCOLLECTOR, logInfoCollector);String requestMethod = request.getMethodValue();String contentType = exchange.getRequest().getHeaders().getFirst(HttpHeaders.CONTENT_TYPE);String contentLength = exchange.getRequest().getHeaders().getFirst(HttpHeaders.CONTENT_LENGTH);if (HttpMethod.POST.toString().equals(requestMethod) || HttpMethod.PUT.toString().equals(requestMethod)) {// 根據(jù)請(qǐng)求頭,用不同的方式解析Bodyif ((Character.DIRECTIONALITY_LEFT_TO_RIGHT + "").equals(contentLength) || StringUtils.isEmpty(contentType)) {MultiValueMap<String, String> getRequestParams = request.getQueryParams();log.info("\n 請(qǐng)求url:`{}` \n 請(qǐng)求類型:{} \n 請(qǐng)求參數(shù):{}", requestUrl, requestMethod, getRequestParams);return chain.filter(exchange);}Mono<DataBuffer> bufferMono = DataBufferUtils.join(exchange.getRequest().getBody());return bufferMono.flatMap(dataBuffer -> {byte[] bytes = new byte[dataBuffer.readableByteCount()];dataBuffer.read(bytes);String postRequestBodyStr = new String(bytes, StandardCharsets.UTF_8);if (contentType.startsWith("multipart/form-data")) {log.info("\n 請(qǐng)求url:`{}` \n 請(qǐng)求類型:{} \n 文件上傳", requestMethod);} else {log.info("\n 請(qǐng)求url:`{}` \n 請(qǐng)求類型:{} \n 請(qǐng)求參數(shù):{}", requestMethod, postRequestBodyStr);}// 后續(xù)需要用到參數(shù)的可以從這個(gè)地方獲取exchange.getAttributes().put("POST_BODY", postRequestBodyStr);logInfoCollector.put(PARAMS, postRequestBodyStr);DataBufferUtils.release(dataBuffer);Flux<DataBuffer> cachedFlux = Flux.defer(() -> {DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);return Mono.just(buffer);});// 下面的將請(qǐng)求體再次封裝寫回到request里,傳到下一級(jí),否則,由于請(qǐng)求體已被消費(fèi),后續(xù)的服務(wù)將取不到值ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {@Overridepublic Flux<DataBuffer> getBody() {return cachedFlux;}};// 封裝request,傳給下一級(jí)return chain.filter(exchange.mutate().request(mutatedRequest).build()).then(Mono.fromRunnable(() -> {Long startTime = exchange.getAttribute(START_TIME);Map<String, Object> logInfo = exchange.getAttribute(LOGINFOCOLLECTOR);if (startTime != null && !CollectionUtils.isEmpty(logInfo)) {Long executeTime = (System.currentTimeMillis() - startTime);logInfo.put(COST_TIME, executeTime);log.info(JSONObject.toJSONString(logInfo));}}));});} else if (HttpMethod.GET.toString().equals(requestMethod)|| HttpMethod.DELETE.toString().equals(requestMethod)) {return chain.filter(exchange).then(Mono.fromRunnable(() -> {Long startTime = exchange.getAttribute(START_TIME);Map<String, Object> logInfo = exchange.getAttribute(LOGINFOCOLLECTOR);if (startTime != null && !CollectionUtils.isEmpty(logInfo)) {Long executeTime = (System.currentTimeMillis() - startTime);logInfo.put(COST_TIME, executeTime);log.info(JSONObject.toJSONString(logInfo));}}));}return chain.filter(exchange);}public String getIpAddress(ServerHttpRequest request) {HttpHeaders headers = request.getHeaders();String ip = headers.getFirst("x-forwarded-for");if (StringUtils.isNotBlank(ip) && !UNKNOWN.equalsIgnoreCase(ip)) {// 多次反向代理后會(huì)有多個(gè)ip值,第一個(gè)ip才是真實(shí)ipif (ip.indexOf(",") != -1) {ip = ip.split(",")[0];}}if (StringUtils.isBlank(ip) || UNKNOWN.equalsIgnoreCase(ip)) {ip = headers.getFirst("Proxy-Client-IP");}if (StringUtils.isBlank(ip) || UNKNOWN.equalsIgnoreCase(ip)) {ip = headers.getFirst("WL-Proxy-Client-IP");}if (StringUtils.isBlank(ip) || UNKNOWN.equalsIgnoreCase(ip)) {ip = headers.getFirst("HTTP_CLIENT_IP");}if (StringUtils.isBlank(ip) || UNKNOWN.equalsIgnoreCase(ip)) {ip = headers.getFirst("HTTP_X_FORWARDED_FOR");}if (StringUtils.isBlank(ip) || UNKNOWN.equalsIgnoreCase(ip)) {ip = headers.getFirst("X-Real-IP");}if (StringUtils.isBlank(ip) || UNKNOWN.equalsIgnoreCase(ip)) {ip = request.getRemoteAddress().getAddress().getHostAddress();}return ip;}private String getRequestUrl(ServerHttpRequest request) {String url = request.getURI().toString();if (url.contains("?")) {url = url.substring(0, url.indexOf("?"));}return url;}private String getHeaderValue(ServerHttpRequest request, String key) {if (StringUtils.isEmpty(key)) {return "";}HttpHeaders headers = request.getHeaders();if (headers.containsKey(key)) {return headers.get(key).get(0);}return "";}/*** Get the order value of this object.* <p>Higher values are interpreted as lower priority. As a consequence,* the object with the lowest value has the highest priority (somewhat* analogous to Servlet {@code load-on-startup} values).* <p>Same order values will result in arbitrary sort positions for the* affected objects.** @return the order value* @see #HIGHEST_PRECEDENCE* @see #LOWEST_PRECEDENCE*/@Overridepublic int getOrder() {return HIGHEST_PRECEDENCE;} }logstash.yml
input {file {path => "D:/data/logs/ccc-gateway/*.log"type => "ccc-gateway"codec => json {charset => "UTF-8"}} }filter {json {source => "message"skip_on_invalid_json => trueadd_field => { "@accessmes" => "%{message}" } remove_field => [ "@accessmes" ]} }output {elasticsearch {hosts => "localhost:9200" index => "ccc-gateway_%{+YYYY.MM.dd}"} }上面的 logstash.yml 兼容 json和非json格式,loggingFilter 會(huì)保證數(shù)據(jù)打印為json格式,其他的地方log也可以是非json的。
效果如圖
accesslog:
其他的log:
圖表繪制
其實(shí)netty 作為容器本身也是有 acesslog的,可以開啟。
-Dreactor.netty.http.server.accessLogEnabled=trueAccessLog的log方法直接通過logger輸出日志,其日志格式為COMMON_LOG_FORMAT({} - {} [{}] "{} {} {}" {} {} {} {} ms),分別是address, user, zonedDateTime, method, uri, protocol, status, contentLength, port, duration
沒有請(qǐng)求參數(shù)和自定義參數(shù)(一般鏈路id放在請(qǐng)求頭里的)和響應(yīng)參數(shù)(這次也沒加),所以算是對(duì)accesslog做了改進(jìn)。下圖是訪問量和平均耗時(shí),后續(xù)還可以加tp99,請(qǐng)求路徑等等
訪問量:
平均耗時(shí):
總結(jié)
以上是生活随笔為你收集整理的springcloud gateway 自定义 accesslog elk的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux基础-目录与路径
- 下一篇: 左右值