日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

hbase RPCServer源码分析

發布時間:2023/11/27 生活经验 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hbase RPCServer源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前置知識: java,nio,多線程

看了幾天的源碼,寫一些自己心得,若有錯誤請指出。

RPCServer的作用:負責創建listener,reader,responser,handler來處理client端的請求。

RPCServer中重要的子類有:Listener,Reader,Call,Connection,Responser

? ? 其中Reader是Listener的子類

listener負責監聽client端的請求,主要做nio操作中的accept操作。

while (iter.hasNext()) {key = iter.next();iter.remove();try {if (key.isValid()) {if (key.isAcceptable())doAccept(key);}} catch (IOException ignored) {}key = null;
}

?

與client創建連接,生成新的channel,并將新的channel注冊在reader上。

void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {……SocketChannel channel;……Reader reader = getReader();try {reader.startAdd();SelectionKey readKey = reader.registerChannel(channel);  //listener接受的連接注冊在reader上c = getConnection(channel, System.currentTimeMillis());readKey.attach(c);……
}

?

 

reader負責處理listener傳過來的channel,依次讀取數據,

void doRead(SelectionKey key) throws InterruptedException {int count = 0;Connection c = (Connection)key.attachment();……try {count = c.readAndProcess();} catch (InterruptedException ieo) {……
}

?

這里調用Connection里面的readAndProcess()方法,這個方法的做用是讀取客戶端的數據,存入一個buffer字節數組中,給processRequest()方法進行處理,

processRequest方法:

protected void processRequest(byte[] buf) throws IOException, InterruptedException {……//這里的call構造方法中的參數都是由buf中的數據解析出來的,前面省略的代碼做了這部分工作Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,totalRequestSize,traceInfo);//這里的scheduler是一個調度器,可以簡單理解為一個線程池的控制器,它初始化時會生成默認大小的線程池,參數可由REGION_SERVER_HANDLER_COUNT來指定//也就是jstack文件中的handler線程,默認是30//dispatch方法會獲取線程池中的一個線程,執行callRunner中的run()方法。run()方法的功能有:查詢結果,并調用sendResponseIfReady()方法來返回數據。scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));
}

?

call的run()方法:

public void run() {……//查詢數據,存在resultPair中resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,……if (!call.isDelayed() || !call.isReturnValueDelayed()) {Message param = resultPair != null ? resultPair.getFirst() : null;CellScanner cells = resultPair != null ? resultPair.getSecond() : null;call.setResponse(param, cells, errorThrowable, error);}//調用Responser
    call.sendResponseIfReady();……
}

?

其中rpcServer的call方法為:

public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException {……//此句進行查詢Message result = service.callBlockingMethod(md, controller, param);……//返回給Call對象return new Pair<Message, CellScanner>(result,controller != null? controller.cellScanner(): null);
……
}

?

再詳細點的還沒看。看了這些主要解決了以下幾個疑惑:

?

?

reader的線程數在哪指定生成,handler的線程池在哪維護,監聽連接請求的線程有幾個?responser的線程又有幾個?

listener只有一個,

listener中有一個Reader數組,默認是10,也就是說讀取請求數據的連接池大小為10。

private class Listener extends Thread {
……private Reader[] readers = null;

?

handler的線程池由RPCServer中的scheduler維護,默認是30,
listener監聽到一個請求后,生成對應的channel發送給Reader,然后Reader會為每一個channel創建一個connection,

connection中保存了連接的信息。然后調用connection的方法來讀取請求參數,并生成call對象,這時將調用scheduler,

使用handler線程池(默認30)來查詢數據,(這里就開始并行了),結果存在call對象用,call對象最后再調用responser類的方法,將結果返回給client。

responser只有一個線程,它維護了一個call鏈表,采用非阻塞的方式(這里要說也是并行),依次將call對象送出。

大致過程就是這樣

?




轉載于:https://www.cnblogs.com/quan-qunar/p/4942972.html

總結

以上是生活随笔為你收集整理的hbase RPCServer源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。