日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

skywalking后端处理业务逻辑的梳理

發布時間:2023/12/19 综合教程 19 生活家
生活随笔 收集整理的這篇文章主要介紹了 skywalking后端处理业务逻辑的梳理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

接下來會進入下面的代理處理TraceSegment

接下來會調用到TraceAnalyzer的doAnalysis方法,代碼如下

public void doAnalysis(SegmentObject segmentObject) {
        if (segmentObject.getSpansList().size() == 0) {
            return;
        }

        createSpanListeners();

        try {
            notifySegmentListener(segmentObject);

            segmentObject.getSpansList().forEach(spanObject -> {
                if (spanObject.getSpanId() == 0) {
                    notifyFirstListener(spanObject, segmentObject);
                }

                if (SpanType.Exit.equals(spanObject.getSpanType())) {
                    notifyExitListener(spanObject, segmentObject);
                } else if (SpanType.Entry.equals(spanObject.getSpanType())) {
                    notifyEntryListener(spanObject, segmentObject);
                } else if (SpanType.Local.equals(spanObject.getSpanType())) {
                    notifyLocalListener(spanObject, segmentObject);
                } else {
                    log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
                                                                                              .name());
                }
            });

            notifyListenerToBuild();
        } catch (Throwable e) {
            log.error(e.getMessage(), e);
        }
    }

我們首先來看方法的第一個業務邏輯createSpanListeners(),改方法主要產生處于TraceSegment。以及各類span的listener,默認情況下產生的listener情況如下

默認情況下產生上面三個listener

1、SegmentAnalysisListener主要是處理TraceSegment,將整個TraceSegment轉行成segement對象存儲到數據庫中

2、MultiScopesAnalysisListener主要用來處理TraceSegment中的span,依據span的不同類型進行單獨處理,然后依據不同的span類型產生不同的指標信息

這是收到的skywalking的出

skywalking后端oap集群收到TraceSegment的處理信息之后,會進行處理,這里處理會對TraceSegment中保存的span進行分類,span分為下面的幾種類型

enum Point {
        Entry, Exit, Local, First, Segment
    }

每一種類型會對應一個listen處理器與之對應,經過處理器處理之后,在延伸到后續的流程和操作

五個listen的類型如下面所示

代碼中創建了三個listener情況如下

接下來我們分析完成了createSpanListeners();接下來是執行

notifySegmentListener(segmentObject);我們來看下改方法是如何進行處理的

 private void notifySegmentListener(SegmentObject segmentObject) {
        analysisListeners.forEach(listener -> {
            if (listener.containsPoint(AnalysisListener.Point.Segment)) {
                ((SegmentListener) listener).parseSegment(segmentObject);
            }能夠
        });
    }

在上面創建的三個listener中只有SegmentAnalysisListener能夠處理TraceSegment,我們來看下后面的代碼

 @Override
    public void parseSegment(SegmentObject segmentObject) {
        segment.setTraceId(segmentObject.getTraceId());
        segmentObject.getSpansList().forEach(span -> {
            if (startTimestamp == 0 || startTimestamp > span.getStartTime()) {
                startTimestamp = span.getStartTime();
            }
            if (span.getEndTime() > endTimestamp) {
                endTimestamp = span.getEndTime();
            }
            isError = isError || segmentStatusAnalyzer.isError(span);
            appendSearchableTags(span);
        });
        final long accurateDuration = endTimestamp - startTimestamp;
        duration = accurateDuration > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) accurateDuration;

        if (sampleStatus.equals(SAMPLE_STATUS.UNKNOWN) || sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
            if (sampler.shouldSample(segmentObject.getTraceId())) {
                sampleStatus = SAMPLE_STATUS.SAMPLED;
            } else if (isError && forceSampleErrorSegment) {
                sampleStatus = SAMPLE_STATUS.SAMPLED;
            } else if (traceLatencyThresholdsAndWatcher.shouldSample(duration)) {
                sampleStatus = SAMPLE_STATUS.SAMPLED;
            } else {
                sampleStatus = SAMPLE_STATUS.IGNORE;
            }
        }
    }

我們來重點看下SegmentAnalysisListener下的parseSegment的方法的流程,方法傳入的參數segmentObject的結構體如下

traceId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380005"
traceSegmentId: "ec5de79a4a8848e88146ebf696d157dc.73.16092665874590000"
spans {
  parentSpanId: -1
  startTime: 1609266587459
  endTime: 1609266589471
  refs {
    traceId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380005"
    parentTraceSegmentId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380004"
    parentSpanId: 1
    parentService: "DEMO-webapp"
    parentServiceInstance: "e171fdf7a1b4436b8289159e6d3f90b1@192.168.43.80"
    parentEndpoint: "{GET}/hello/{words}"
    networkAddressUsedAtPeer: "169.254.122.166:20880"
  }
  operationName: "com.xxx.HelloService.say(String)"
  spanLayer: RPCFramework
  componentId: 3
  tags {
    key: "url"
    value: "dubbo://169.254.122.166:20880/com.xxx.HelloService.say(String)"
  }
}
service: "DEMO-provider"
serviceInstance: "c5ec906d3dd24d0b8907fa8549c8306b@192.168.43.80"

這里首先處理的是dubbo-provider上傳的TraceSegment,產生的是一個EntrySpan,改EntrySpan中管理了dubbo-webapp的TraceSegment

這里在處理的時候如果當前的TraceSegment中存在5個span,那么當前TraceSegment的開始時間等于5個span中最小的starttime時間,最大時間為5個span中最大的endTime的時間,如果當前的TraceSegment中只要存在一個span的熟悉中為iserror,則改TraceSegment就為false

這個代碼是判斷當前的TraceSegment是否被采用處理了,如果被采樣了就可以存儲到數據庫中,如果沒有采用改TraceSegment就丟棄不用做任何處理

notifySegmentListener(segmentObject);本質上就是將后端上傳的TraceSegment轉化成一個segment對象

接下來繼續執行

取出當前TraceSegment中的span列表,然后對沒有span進行判斷,判斷當前的span是EntrySpan、localspan、exitspan,依據不同的span傳遞給不同的listener進行處理

這里當前的TraceSegment中只有一個span

span的對象為

parentSpanId: -1
startTime: 1609266587459
endTime: 1609266589471
refs {
  traceId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380005"
  parentTraceSegmentId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380004"
  parentSpanId: 1
  parentService: "DEMO-webapp"
  parentServiceInstance: "e171fdf7a1b4436b8289159e6d3f90b1@192.168.43.80"
  parentEndpoint: "{GET}/hello/{words}"
  networkAddressUsedAtPeer: "169.254.122.166:20880"
}
operationName: "com.xxx.HelloService.say(String)"
spanLayer: RPCFramework
componentId: 3
tags {
  key: "url"
  value: "dubbo://169.254.122.166:20880/com.xxx.HelloService.say(String)"
}

因為當前的span為EntrySpan,對應的spanID值就是0,會被下面的listener進行處理

notifyFirstListener(spanObject, segmentObject);底層會調用SegmentAnalysisListener的parseFirst方法,我們來看下改方法的業務邏輯

@Override
    public void parseFirst(SpanObject span, SegmentObject segmentObject) {
        if (sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
            return;
        }

        if (StringUtil.isEmpty(serviceId)) {
            serviceName = namingControl.formatServiceName(segmentObject.getService());
            serviceId = IDManager.ServiceID.buildId(
                serviceName,
                NodeType.Normal
            );
        }

        long timeBucket = TimeBucket.getRecordTimeBucket(startTimestamp);

        segment.setSegmentId(segmentObject.getTraceSegmentId());
        segment.setServiceId(serviceId);
        segment.setServiceInstanceId(IDManager.ServiceInstanceID.buildId(
            serviceId,
            namingControl.formatInstanceName(segmentObject.getServiceInstance())
        ));
        segment.setLatency(duration);
        segment.setStartTime(startTimestamp);
        segment.setTimeBucket(timeBucket);
        segment.setEndTime(endTimestamp);
        segment.setIsError(BooleanUtils.booleanToValue(isError));
        segment.setDataBinary(segmentObject.toByteArray());
        segment.setVersion(3);

        endpointName = namingControl.formatEndpointName(serviceName, span.getOperationName());
        endpointId = IDManager.EndpointID.buildId(
            serviceId,
            endpointName
        );
    }

1、當前service: "DEMO-provider"我們需要將"DEMO-provider"轉行成對應應用id,存儲到segment對象中

2、同理應用實例serviceInstance: "c5ec906d3dd24d0b8907fa8549c8306b@192.168.43.80"我們也要轉化成對應的應用實例id存儲到segement對象中

segement中的timeBucket字段就是當前TraceSegment中span集合中,比較各個span的開始時間,取出最小的時間,最小的時間作為segement的timeBucket

接下來設置segemnt的開始時間,開始時間就是比較各個span的開始時間,取出最小的開始時間作為segemnt的開始時間

segemnt的結束時間結束始時間就是比較各個span的結束時間,取出最大的結束時間作為segemnt的結束時間

整個segemnt的持續時間就是開始時間減去結束時間,此外還將整個segemeOBj對象轉化成二進制文件存儲在字段setDataBinary中

經過整個parseFirst方法之后,整個segemnt對象的結構為

接下來我們繼續執行下面的方法

當前的span為EntrySpan,接下來會執行notifyEntryListener(spanObject, segmentObject);方法

這里MultiScopesAnalysisListener和SegmentAnalysisListener都會對整個EntrySpan進行處理,我們來分析下下面的代碼

首先來看下MultiScopesAnalysisListener對parseEntry的方法對EntrySpan的處理

改方法的主要作用是獲得當前EntrySpan關聯的上游的TraceSegmentRef對象,然后將TraceSegmentRef對象封裝到SourceBuilder對象中

我們先來看下當前EntrySpan中獲得的TraceSegmentRef對象的信息如下

traceId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380005"
parentTraceSegmentId: "f5411eeefb3b403f8cafa89d0b82a82f.48.16092665850380004"
parentSpanId: 1
parentService: "DEMO-webapp"
parentServiceInstance: "e171fdf7a1b4436b8289159e6d3f90b1@192.168.43.80"
parentEndpoint: "{GET}/hello/{words}"
networkAddressUsedAtPeer: "169.254.122.166:20880"

在 SourceBuilder 中記錄了上下游兩個系統的基礎信息,核心字段如下所示:

// 上游系統的 service 信息、serviceInstance 信息以及 Endpoint 信息

@Getter @Setter private int sourceServiceId;

@Getter @Setter private String sourceServiceName;

@Getter @Setter private int sourceServiceInstanceId;

@Getter @Setter private String sourceServiceInstanceName;

@Getter @Setter private int sourceEndpointId;

@Getter @Setter private String sourceEndpointName;

// 下游系統的 service 信息、serviceInstance 信息以及 Endpoint 信息

@Getter @Setter private int destServiceId;

@Getter @Setter private String destServiceName;

@Getter @Setter private int destServiceInstanceId;

@Getter @Setter private String destServiceInstanceName;

@Getter @Setter private int destEndpointId;

@Getter @Setter private String destEndpointName;

// 當前系統的組件類型

@Getter @Setter private int componentId;

// 在當前系統中的耗時

@Getter @Setter private int latency;

// 當前系統是否發生Error

@Getter @Setter private boolean status;

@Getter @Setter private int responseCode; // 默認為0

@Getter @Setter private RequestType type; // 請求類型

// 調用關系中的角色,是調用方(Client)、被調用方(Server)還是代理(Proxy)

@Getter @Setter private DetectPoint detectPoint; 

// TraceSegment 起始時間所在分鐘級時間窗口

@Getter @Setter private long timeBucket;

這里有一個關鍵點

就是如果當前的EntrySpan中沒有TraceSegmentRef沒有父親入口,比如dubbo-webapp入口的EntrySpan,在http請求頭信息是沒有任何信息的,獲得的TraceSegmentRef就是為null,這里代碼中設置當前的span的sourceBuilder中的 sourceBuilder.setSourceEndpointName(Const.USER_ENDPOINT_NAME);,public static final String USER_ENDPOINT_NAME = "User";的函數表示就是瀏覽器或者postman等入口調用了dubbo-webapp,作為dubbo-webapp調用的入口

這里dubbo-provider的執行的是else條件,入口的方法就是dubbo-webapp的方法

這里設置sourceBuilder的上游節點內容的時候,需要判斷上面有MQ類型還是一般應用,這里是一般應用執行下面的else的方法

上游節點的信息設置完成之后,接下來是設置下游的節點信息

通過這個sourcebuild對象就將當前的EntrySpan和dubb-webapp中產線的exitspan關聯起來了,

接下來將創建好的sourceBuilder添加到集合entrySourceBuilders中,后續會進行處理

這里TraceSegment中的span集合中如果存在5個EntrySpan,就會產生5個sourceBuilder對象

接下來繼續執行parseLogicEndpoints(span, segmentObject);

我們來看看parseLogicEndpoints(span, segmentObject);具體的業務代碼,改方法的具體業務邏輯后面再進行分析

MultiScopesAnalysisListener的parseEntry執行完成之后,接下來執行NetworkAddressAliasMappingListener類的parseEntry方法

我們來看下改方法的具體業務邏輯,就是將當前EntrySpan所在的網絡、應用名稱、應用實例封裝成一個networkAddressAliasSetup對象,然后將這個對象

傳遞給sourceReceiver進行處理

整個networkAddressAliasSetup的結構如下

SegmentAnalysisListener的parseEntry方法就是將應用名稱應用實例名稱轉化成對于的id

經過上面的轉化之后執行notifyListenerToBuild()方法

private void notifyListenerToBuild() {
analysisListeners.forEach(AnalysisListener::build);
}執行各個listener的build方法,主要是執行SegmentAnalysisListener的build方法和MultiScopesAnalysisListener的build方法

我們先介紹SegmentAnalysisListener的build方法,然后在介紹MultiScopesAnalysisListener的build方法

收到判斷當前的segment是否是被采樣忽略的,如果是被采樣忽略的

skywalking后端采樣率要設置為一樣的原因
一個全局的請求,在dubbo-webapp產生了一個TraceSegmentA,在dubbo-dubbo產生了一個TraceSegmentB,需要有這兩個TraceSegment才能還原這一次Trace
TraceSegmentA傳遞給后端的oap集群的A節點處理,TraceSegmentB傳遞給后端的oap集群的B節點處理,
如果A節點和B節點在后端的采樣率設置不一致,就可能導致TraceSegmentA被處理了存儲在es中,TraceSegmentB被忽略直接丟棄,導致無法還原當前請求的trace情況
所以oap集群后端每個節點的采樣率必須設置為一致

收到判斷當前的segment是否是被采樣忽略的,如果是被采樣忽略的就直接忽略,如果不是繼續執行后續的代碼

設置segemnt中endpointId的值,這里endpointID是這樣計算出來的

        endpointId = IDManager.EndpointID.buildId(
            serviceId,
            endpointName
        )

當前的應用為dubbo-provider對于的service_id的值為REVNTy13ZWJhcHA=.1

endpointName為當前span對應的 operationName: "com.xxx.HelloService.say(String)"

然后通過上面的這兩個參數獲得對應的值

         */
        public static String buildId(String serviceId, String endpointName) {
            return serviceId
                + Const.ID_CONNECTOR
                + encode(endpointName);
        }

算出來了之后

經過處理之后整個TraceSegment的結構如下

2、預構建 :替換url peer端口、servicenam等信息

1、接下來將TraceSegment中各個span的類型分配給不同的spanlistener進行處理

接下來對每一個span進行解析

parentSpanId: -1
startTime: 1609266200857
endTime: 1609266202857
refs {
traceId: "f5411eeefb3b403f8cafa89d0b82a82f.57.16092661992120001"
parentTraceSegmentId: "f5411eeefb3b403f8cafa89d0b82a82f.57.16092661992120000"
parentSpanId: 1
parentService: "DEMO-webapp"
parentServiceInstance: "e171fdf7a1b4436b8289159e6d3f90b1@192.168.43.80"
parentEndpoint: "{GET}/hello/{words}"
networkAddressUsedAtPeer: "169.254.122.166:20880"
}
operationName: "com.xxx.HelloService.say(String)"
spanLayer: RPCFramework
componentId: 3
tags {
key: "url"
value: "dubbo://169.254.122.166:20880/com.xxx.HelloService.say(String)"
}

依據不同的span類型進行解析,傳遞給不同的listen對象,然后產生不同的數據指標

總結

以上是生活随笔為你收集整理的skywalking后端处理业务逻辑的梳理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。