日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Alibaba Dubbo框架同步调用原理分析-2

發布時間:2025/7/14 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Alibaba Dubbo框架同步调用原理分析-2 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

接上一篇,看一下Dubbo的相關代碼

關鍵代碼:

com.taobao.remoting.impl.DefaultClient.java

//同步調用遠程接口

public Object?invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException {

??????? byte protocol = getProtocol(control);

??????? if (!TRConstants.isValidProtocol(protocol)) {

??????????? throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync.");

??????? }

??????? ResponseFuture future =?invokeWithFuture(appRequest, control);

??????? return future.get();??//獲取結果時讓當前線程等待,ResponseFuture其實就是前面說的callback

}

public ResponseFuture?invokeWithFuture(Object appRequest, RequestControl control) {

???????? byte protocol = getProtocol(control);

???????? long timeout = getTimeout(control);

???????? ConnectionRequest request = new ConnectionRequest(appRequest);

???????? request.setSerializeProtocol(protocol);

???????? Callback2FutureAdapter adapter = new Callback2FutureAdapter(request);

???????? connection.sendRequestWithCallback(request, adapter, timeout);

???????? return adapter;

}

?

Callback2FutureAdapter implements ResponseFuture

public Object?get()?throws RemotingException, InterruptedException {

synchronized (this) { ?// 旋鎖

?? ?while (!isDone) { ?// 是否有結果了

wait();?//沒結果是釋放鎖,讓當前線程處于等待狀態

?? ?}

}

if (errorCode == TRConstants.RESULT_TIMEOUT) {

?? ?throw new TimeoutException("Wait response timeout, request["

?? ?+ connectionRequest.getAppRequest() + "].");

}

else if (errorCode > 0) {

?? ?throw new RemotingException(errorMsg);

}

else {

?? ?return appResp;

}

}

客戶端收到服務端結果后,回調時相關方法,即設置isDone = truenotifyAll()

public void?handleResponse(Object _appResponse) {

???????? appResp = _appResponse;?//將遠程調用結果設置到callback中來

???????? setDone();

}

public void?onRemotingException(int _errorType, String _errorMsg) {

???????? errorCode = _errorType;

???????? errorMsg = _errorMsg;

???????? setDone();

}

private void?setDone() {

???????? isDone = true;

???????? synchronized (this) {?//獲取鎖,因為前面wait()已經釋放了callback的鎖了

???????? ??? notifyAll();?//?喚醒處于等待的線程

???????? }

}

?

com.taobao.remoting.impl.DefaultConnection.java

?

//?用來存放請求和回調的MAP

private final ConcurrentHashMap<Long, Object[]>?requestResidents;

?

//發送消息出去

void?sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) {

???????? long requestId = connRequest.getId();

???????? long waitBegin = System.currentTimeMillis();

???????? long waitEnd = waitBegin + timeoutMs;

???????? Object[] queue = new Object[4];

???????? int idx = 0;

???????? queue[idx++] = waitEnd;

???????? queue[idx++] = waitBegin;?? //用于記錄日志

???????? queue[idx++] = connRequest; //用于記錄日志

???????? queue[idx++] = callback;

?????????requestResidents.put(requestId, queue);?//?記錄響應隊列

?????????write(connRequest);

?

???????? //?埋點記錄等待響應的Map的大小

???????? StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(),

?????????????????? 1L);

}

public void?write(final Object connectionMsg) {

//mina里的IoSession.write()發送消息

???????? WriteFuture writeFuture = ioSession.write(connectionMsg);

???????? //?注冊FutureListener,當請求發送失敗后,能夠立即做出響應

???????? writeFuture.addListener(new MsgWrittenListener(this, connectionMsg));

}

?

/**

*?在得到響應后,刪除對應的請求隊列,并執行回調

*?調用者:MINA線程

*/

public void?putResponse(final ConnectionResponse connResp) {

???????? final long?requestId?= connResp.getRequestId();

???????? Object[] queue =?requestResidents.remove(requestId);

???????? if (null == queue) {

???????? ??? Object appResp = connResp.getAppResponse();

???????? ??? String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName();

???????? ??? StringBuilder sb = new StringBuilder();

???????? ??? sb.append("Not found response receiver for requestId=[").append(requestId).append("],");

???????? ??? sb.append("from [").append(connResp.getHost()).append("],");

???????? ??? sb.append("response type [").append(appRespClazz).append("].");

???????? ??? LOGGER.warn(sb.toString());

???????? ??? return;

???????? }

???????? int idx = 0;

???????? idx++;

???????? long waitBegin = (Long) queue[idx++];

???????? ConnectionRequest connRequest = (ConnectionRequest) queue[idx++];

???????? ResponseCallback?callback?= (ResponseCallback) queue[idx++];

???????? // **?把回調任務交給業務提供的線程池執行?**

???????? Executor callbackExecutor = callback.getExecutor();

???????? callbackExecutor.execute(new?CallbackExecutorTask(connResp, callback));

?

???????? long duration = System.currentTimeMillis() - waitBegin; //?實際讀響應時間

???????? logIfResponseError(connResp, duration, connRequest.getAppRequest());

}

?

CallbackExecutorTask

static private class CallbackExecutorTask implements Runnable {

???????? final ConnectionResponse resp;

???????? final ResponseCallback callback;

???????? final Thread createThread;

?

???????? CallbackExecutorTask(ConnectionResponse _resp, ResponseCallback _cb) {

???????? ??? resp = _resp;

???????? ??? callback = _cb;

???????? ??? createThread = Thread.currentThread();

???????? }

?

???????? public void run() {

???????? ??? //?預防這種情況:業務提供的Executor,讓調用者線程來執行任務

???????? ??? if (createThread == Thread.currentThread()

?????????????????? ??? && callback.getExecutor() != DIYExecutor.getInstance()) {

?????????????????? StringBuilder sb = new StringBuilder();

?????????????????? sb.append("The network callback task [" + resp.getRequestId() + "] cancelled, cause:");

?????????????????? sb.append("Can not callback task on the network io thhread.");

?????????????????? LOGGER.warn(sb.toString());

?????????????????? return;

???????? ??? }

?

???????? ??? if (TRConstants.RESULT_SUCCESS == resp.getResult()) {

???????????????????callback.handleResponse(resp.getAppResponse());?//設置調用結果

???????? ??? }

???????? ??? else {

???????????????????callback.onRemotingException(resp.getResult(), resp

??????????????????????????? .getErrorMsg());??//處理調用異常

???????? ??? }

???????? }

}

?

另外:

1,?服務端在處理客戶端的消息,然后再處理時,使用了線程池來并行處理,不用一個一個消息的處理

同樣,客戶端接收到服務端的消息,也是使用線程池來處理消息,再回調

轉載于:https://my.oschina.net/91jason/blog/374170

總結

以上是生活随笔為你收集整理的Alibaba Dubbo框架同步调用原理分析-2的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。