日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MINA2 源代码学习--源代码结构梳理

發布時間:2023/12/19 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MINA2 源代码学习--源代码结构梳理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、mina總體框架與案例:

1.總體結構圖:

簡述:以上是一張來自網上比較經典的圖,總體上揭示了mina的結構,當中IoService包括clientIoConnector和服務端IoAcceptor兩部分。即不管是client還是服務端都是這個結構。IoService封裝了網絡傳輸層(TCP和UDP),而IoFilterChain中mina自帶的filter做了一些主要的操作之外,支持擴展。經過FilterChain之后終于調用IoHandler,IoHandler是詳細實現業務邏輯的處理接口,詳細的業務實現可擴展。


2.一個可執行的案例(案例來自網上,轉載后試驗):
Client.java:

import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.util.Random;import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketConnector;public class Client extends IoHandlerAdapter {private Random random = new Random(System.currentTimeMillis());public Client() {IoConnector connector = new NioSocketConnector();connector.getFilterChain().addLast("text",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName(Server.ENCODE))));connector.setHandler(this);ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1", Server.PORT));future.awaitUninterruptibly();future.addListener(new IoFutureListener<ConnectFuture>() {@Overridepublic void operationComplete(ConnectFuture future) {IoSession session = future.getSession();while (!session.isClosing()) {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}String message = "你好。我roll了" + random.nextInt(100) + "點.";session.write(message);}}});connector.dispose();}@Overridepublic void messageReceived(IoSession session, Object message)throws Exception {System.out.println("批復:" + message.toString());}@Overridepublic void messageSent(IoSession session, Object message) throws Exception {System.out.println("報告:" + message.toString());}@Overridepublic void exceptionCaught(IoSession session, Throwable cause)throws Exception {cause.printStackTrace();session.close(true);}public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Client();}} }

ServerHandler.java:

import java.net.InetSocketAddress; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession;public class ServerHandler extends IoHandlerAdapter {@Overridepublic void exceptionCaught(IoSession session, Throwable cause)throws Exception {cause.printStackTrace();session.close(false);}public void messageReceived(IoSession session, Object message)throws Exception {String s = message.toString();System.out.println("收到請求:" + s);if (s != null) {int i = getPoint(s);if (session.isConnected()) {if (i >= 95) {session.write("運氣不錯,你能夠出去了.");session.close(false);return;}Integer count = (Integer) session.getAttribute(Server.KEY);count++;session.setAttribute(Server.KEY, count);session.write("抱歉。你運氣太差了,第" + count + "次請求未被通過。繼續在小黑屋呆著吧.");} else {session.close(true);}}}@Overridepublic void messageSent(IoSession session, Object message) throws Exception {System.out.println("發給client:" + message.toString());}@Overridepublic void sessionClosed(IoSession session) throws Exception {long l = session.getCreationTime();System.out.println("來自" + getInfo(session) + "的會話已經關閉,它已經存活了"+ (System.currentTimeMillis() - 1) + "毫秒");}@Overridepublic void sessionCreated(IoSession session) throws Exception {System.out.println("給" + getInfo(session) + "創建了一個會話");}@Overridepublic void sessionIdle(IoSession session, IdleStatus status)throws Exception {System.out.println("來自" + getInfo(session) + "的會話閑置,狀態為"+ status.toString());}public void sessionOpened(IoSession session) throws Exception {session.setAttribute(Server.KEY, 0);System.out.println("和" + getInfo(session) + "的會話已經打開.");}public String getInfo(IoSession session) {if (session == null) {return null;}InetSocketAddress address = (InetSocketAddress) session.getRemoteAddress();int port = address.getPort();String ip = address.getAddress().getHostAddress();return ip + ":" + port;}public int getPoint(String s) {if (s == null) {return -1;}Pattern p = Pattern.compile("^[\u0041-\uFFFF,]*(\\d+).*$");Matcher m = p.matcher(s);if (m.matches()) {return Integer.valueOf(m.group(1));}return 0;} }

Server.java:

import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset;import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.SocketAcceptor; import org.apache.mina.transport.socket.nio.NioSocketAcceptor;public class Server {public static final int PORT = 2534;public static String ENCODE = "UTF-8";public static final String KEY = "roll";public static void main(String[] args){ SocketAcceptor acceptor = new NioSocketAcceptor();acceptor.getFilterChain().addLast("text",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName(ENCODE))));acceptor.setHandler(new ServerHandler());try {acceptor.bind(new InetSocketAddress(PORT));System.out.println("游戲開始,你想出去嗎,來,碰碰運氣吧!");} catch (IOException e) {e.printStackTrace();acceptor.dispose();}} }

本案例依賴的jar例如以下圖:


簡述:以上是依賴mina實現的一個可執行的案例,就不多說了,結合總體的結構圖和案例實現能夠看出mina框架還是非常輕量級的。以下分析一下mina的源代碼結構和一些時序流程。

二、mina 核心源代碼分析:

1.mina的啟動時序(結合上面的案例):


簡述:SocketAcceptor作為服務端對外啟動接口類,在bind網絡地址的時候,會觸發服務端一系列服務的啟動,從調用鏈能夠清晰找到相應的源代碼閱讀。

當中AbstractPollingIoAcceptor是一個核心類,它會調用自身的startupAcceptor方法,來啟動一個存放Acceptor的線程池用來處理client傳輸過來的請求。
AbstractPollingIoAcceptor 類的 startupAcceptor 方法例如以下:

/*** This method is called by the doBind() and doUnbind()* methods. If the acceptor is null, the acceptor object will* be created and kicked off by the executor. If the acceptor* object is null, probably already created and this class* is now working, then nothing will happen and the method* will just return.*/ private void startupAcceptor() throws InterruptedException {// If the acceptor is not ready, clear the queues// TODO : they should already be clean : do we have to do that ?

if (!selectable) { registerQueue.clear(); cancelQueue.clear(); } // start the acceptor if not already started Acceptor acceptor = acceptorRef.get(); //這里僅僅會啟動一個worker if (acceptor == null) { lock.acquire(); acceptor = new Acceptor(); if (acceptorRef.compareAndSet(null, acceptor)) { executeWorker(acceptor); } else { lock.release(); } } }

上面調用到 AbstractIoService 的 executeWorker方法例如以下:

protected final void executeWorker(Runnable worker) {executeWorker(worker, null); }protected final void executeWorker(Runnable worker, String suffix) {String actualThreadName = threadName;if (suffix != null) {actualThreadName = actualThreadName + '-' + suffix;}executor.execute(new NamePreservingRunnable(worker, actualThreadName)); }

簡述:有類AbstractPollingIoAcceptor 的 startupAcceptor方法(上文)能夠看到,一個SocketAcceptor僅僅啟動了一個Worker線程(即代碼中的Acceptor對象)而且把他加到線程池中。反過來講,也能夠看出AbstractIoService維護了Worker的線程池。(ps:這個Worker就是服務端處理請求的線程)。


2.Mina處理client鏈接的過程(啟動后):

概述:從1中的啟動時序能夠看到,啟動過程通過創建SocketAcceptor將有類AbstractPollingIoAcceptor的內部類Acceptor放到了 AbstractIoService的線程池里面,而這個Acceptor就是處理client網絡請求的worker。而以下這個時序就是線程池中每一個worker處理client網絡請求的時序流程。

處理請求時序:?

簡述:worker線程Acceptor的run方法中會調用NioSocketAcceptor或者AprSocketAccetpor的select方法。
ps:APR(Apache Protable Runtime Library,Apache可移植執行庫)是能夠提供非常好的可拓展性、性能以及對底層操作系統一致性操作的技術,說白了就是apache實現的一套標準的通訊接口。

AprSocketAcceptor先不做深入了解,主要了解下NioSocketAcceptor,NioSocketAcceptor顧名思義,它調用了java NIO的API實現了NIO的網絡連接處理過程。

AbstractPolling$Acceptor 的run方法的核心代碼例如以下:

private class Acceptor implements Runnable {public void run() {assert (acceptorRef.get() == this);int nHandles = 0;// Release the locklock.release();while (selectable) {try {// Detect if we have some keys ready to be processed// The select() will be woke up if some new connection// have occurred, or if the selector has been explicitly// woke up//調用了NioSocketAcceptor的select方法,獲取了selectKeyint selected = select();// this actually sets the selector to OP_ACCEPT,// and binds to the port on which this class will// listen onnHandles += registerHandles();// Now, if the number of registred handles is 0, we can// quit the loop: we don't have any socket listening// for incoming connection.if (nHandles == 0) {acceptorRef.set(null);if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {assert (acceptorRef.get() != this);break;}if (!acceptorRef.compareAndSet(null, this)) {assert (acceptorRef.get() != this);break;}assert (acceptorRef.get() == this);}if (selected > 0) {// We have some connection request, let's process// them here.processHandles(selectedHandles());}// check to see if any cancellation request has been made.nHandles -= unregisterHandles();} catch (ClosedSelectorException cse) {// If the selector has been closed, we can exit the loopbreak;} catch (Throwable e) {ExceptionMonitor.getInstance().exceptionCaught(e);try {Thread.sleep(1000);} catch (InterruptedException e1) {ExceptionMonitor.getInstance().exceptionCaught(e1);}}}// Cleanup all the processors, and shutdown the acceptor.if (selectable && isDisposing()) {selectable = false;try {if (createdProcessor) {processor.dispose();}} finally {try {synchronized (disposalLock) {if (isDisposing()) {destroy();}}} catch (Exception e) {ExceptionMonitor.getInstance().exceptionCaught(e);} finally {disposalFuture.setDone();}}}}

簡述:從上面的代碼中能夠看出一個典型的網絡請求處理的程序,在循環中拿到處理的請求后就調用AbstractPollingIoAcceptor的processHandles()對網絡請求做處理。

代碼例如以下:

/*** This method will process new sessions for the Worker class. All* keys that have had their status updates as per the Selector.selectedKeys()* method will be processed here. Only keys that are ready to accept* connections are handled here.* <p/>* Session objects are created by making new instances of SocketSessionImpl* and passing the session object to the SocketIoProcessor class.*/@SuppressWarnings("unchecked")private void processHandles(Iterator<H> handles) throws Exception {while (handles.hasNext()) {H handle = handles.next();handles.remove();// Associates a new created connection to a processor,// and get back a session//這里調用了NioSocketAcceptor的accept方法S session = accept(processor, handle);if (session == null) {continue;}initSession(session, null, null);// add the session to the SocketIoProcessor// 這步處理add操作,會觸發對client請求的異步處理。

session.getProcessor().add(session); } }

NioSocketAcceptor的accept方法new了一個包裝Process處理線程的session實例:而且在調用session.getProcessor().add(session)的操作的時候觸發了對client請求的異步處理。

/*** {@inheritDoc}*/ @Override protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {SelectionKey key = handle.keyFor(selector);if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {return null;}// accept the connection from the clientSocketChannel ch = handle.accept();if (ch == null) {return null;}return new NioSocketSession(this, processor, ch); }

再看上面時序圖:有一步是AbstractPollingIoProcessor調用了startupProcessor方法。代碼例如以下:

/*** Starts the inner Processor, asking the executor to pick a thread in its* pool. The Runnable will be renamed*/ private void startupProcessor() {Processor processor = processorRef.get();if (processor == null) {processor = new Processor();if (processorRef.compareAndSet(null, processor)) {executor.execute(new NamePreservingRunnable(processor, threadName));}}// Just stop the select() and start it again, so that the processor// can be activated immediately.wakeup(); }

簡述:這個startupProcessor方法在調用 session里包裝的processor的add方法是,觸發了將處理client請求的processor放入異步處理的線程池中。興許詳細Processor怎么處理client請求的流程,涉及到FilterChain的過濾。以及Adapter的調用。用來處理業務邏輯。詳細的異步處理時序看以下的時序圖:


簡述:這個時序就是將待處理的client鏈接,通過NIO的形式接受請求,并將請求包裝成Processor的形式放到處理的線程池中異步的處理。

在異步的處理過程中則調用了Processor的run方法,詳細的filterchain的調用和業務Adapter的調用也是在這一步得到處理。

值得注意的是。Handler的調用是封裝在DefaultFilterchain的內部類誒TairFilter中觸發調用的。Processor的run方法代碼例如以下:

private class Processor implements Runnable {public void run() {assert (processorRef.get() == this);int nSessions = 0;lastIdleCheckTime = System.currentTimeMillis();for (;;) {try {// This select has a timeout so that we can manage// idle session when we get out of the select every// second. (note : this is a hack to avoid creating// a dedicated thread).long t0 = System.currentTimeMillis();//調用了NioProcessorint selected = select(SELECT_TIMEOUT);long t1 = System.currentTimeMillis();long delta = (t1 - t0);if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {// Last chance : the select() may have been// interrupted because we have had an closed channel.if (isBrokenConnection()) {LOG.warn("Broken connection");// we can reselect immediately// set back the flag to falsewakeupCalled.getAndSet(false);continue;} else {LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));// Ok, we are hit by the nasty(討厭的) epoll// spinning.// Basically, there is a race condition// which causes a closing file descriptor not to be// considered as available as a selected channel, but// it stopped the select. The next time we will// call select(), it will exit immediately for the same// reason, and do so forever, consuming 100%// CPU.// We have to destroy the selector, and// register all the socket on a new one.registerNewSelector();}// Set back the flag to falsewakeupCalled.getAndSet(false);// and continue the loopcontinue;}// Manage newly created session firstnSessions += handleNewSessions();updateTrafficMask();// Now, if we have had some incoming or outgoing events,// deal with themif (selected > 0) {//LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...//觸發了詳細的調用邏輯process();}// Write the pending requestslong currentTime = System.currentTimeMillis();flush(currentTime);// And manage removed sessionsnSessions -= removeSessions();// Last, not least, send Idle events to the idle sessionsnotifyIdleSessions(currentTime);// Get a chance to exit the infinite loop if there are no// more sessions on this Processorif (nSessions == 0) {processorRef.set(null);if (newSessions.isEmpty() && isSelectorEmpty()) {// newSessions.add() precedes startupProcessorassert (processorRef.get() != this);break;}assert (processorRef.get() != this);if (!processorRef.compareAndSet(null, this)) {// startupProcessor won race, so must exit processorassert (processorRef.get() != this);break;}assert (processorRef.get() == this);}// Disconnect all sessions immediately if disposal has been// requested so that we exit this loop eventually.if (isDisposing()) {for (Iterator<S> i = allSessions(); i.hasNext();) {scheduleRemove(i.next());}wakeup();}} catch (ClosedSelectorException cse) {// If the selector has been closed, we can exit the loopbreak;} catch (Throwable t) {ExceptionMonitor.getInstance().exceptionCaught(t);try {Thread.sleep(1000);} catch (InterruptedException e1) {ExceptionMonitor.getInstance().exceptionCaught(e1);}}}try {synchronized (disposalLock) {if (disposing) {doDispose();}}} catch (Throwable t) {ExceptionMonitor.getInstance().exceptionCaught(t);} finally {disposalFuture.setValue(true);}} }

簡述:這么一坨代碼能夠看出,這個處理器也調用了java的Nio API是一個NIO模型。當中select和process方法各自是從session拿到要處理的請求,并進行處理。而詳細的Processor實例是NioProcessor。從加入凝視的代碼中有一步調用了自身的process方法,這步調用觸發了詳細業務邏輯的調用。能夠結合代碼和時序圖看下。在Process方法中會調用reader(session)或wirte(session)方法,然后調用fireMessageReceived方法,這種方法又調用了callNextMessageReceived方法致使觸發了整個FilterChain和Adapter的調用。read方法的核心代碼例如以下:

private void read(S session) {IoSessionConfig config = session.getConfig();int bufferSize = config.getReadBufferSize();IoBuffer buf = IoBuffer.allocate(bufferSize);final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();try {int readBytes = 0;int ret;try {if (hasFragmentation) {while ((ret = read(session, buf)) > 0) {readBytes += ret;if (!buf.hasRemaining()) {break;}}} else {ret = read(session, buf);if (ret > 0) {readBytes = ret;}}} finally {buf.flip();}if (readBytes > 0) {IoFilterChain filterChain = session.getFilterChain();filterChain.fireMessageReceived(buf);buf = null;if (hasFragmentation) {if (readBytes << 1 < config.getReadBufferSize()) {session.decreaseReadBufferSize();} else if (readBytes == config.getReadBufferSize()) {session.increaseReadBufferSize();}}}if (ret < 0) {scheduleRemove(session);}} catch (Throwable e) {if (e instanceof IOException) {if (!(e instanceof PortUnreachableException)|| !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())|| ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {scheduleRemove(session);}}IoFilterChain filterChain = session.getFilterChain();filterChain.fireExceptionCaught(e);} }

從這段代碼并結合上面的時序圖能夠看出來觸發整個FilterChain的調用以及IoHandler的調用。

三、類結構分析

參考第一部分的總體結構圖,畫一下每一個部分大致的類結構圖:

簡述: 從類繼承結構圖來看,能夠看到在IOService體系下,存在IoConnector和IoAcceptor兩個大的分支體系。IoConnector是做為client的時候使用,IoAcceptor是作為服務端的時候使用。實際上在Mina中,有三種worker線程各自是:Acceptor、Connector 和 I/O processor。


(1) Acceptor Thread 作為server端的鏈接線程,實現了IoService接口。線程的數量就是創建SocketAcceptor的數量。


(2) Connector Thread 作為client請求建立的鏈接線程,實現了IoService接口,維持了一個和服務端Acceptor的一個鏈接,線程的數量就是創建SocketConnector的數量。


(3) I/O processorThread 作為I/O真正處理的線程,存在于server端和client。線程的數量是能夠配置的,默認是CPU個數+1。

上面那個圖僅僅是表述了IoService類體系,而I/O Processor的類體系并不在當中,見下圖:


簡述:IOProcessor主要分為兩種。各自是AprIOProcessor和NioProcessor,Apr的解釋見上文:ps:APR(Apache Protable Runtime Library,Apache可移植執行庫)。

NioProcessor也是Nio的一種實現,用來處理client連接過來的請求。在Processor中會調用到 FilterChain 和 Handler,見上文代碼。先看下FilterChain的類結構圖例如以下:


Filter 和 Handler的類結構例如以下:


Handler的類結構例如以下:


Mina的session類結構圖例如以下:

Mina的Buffer的類結構圖例如以下:

版權聲明:本文博主原創文章,博客,未經同意不得轉載。

總結

以上是生活随笔為你收集整理的MINA2 源代码学习--源代码结构梳理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。