public void connected(Channel ch) throws RemotingException {// If the server has entered the shutdown process, reject any new connectionif (this.isClosing() || this.isClosed()) {logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");ch.close();return;}Collection<Channel> channels = getChannels();//大于accepts的tcp連接直接關閉if (accepts > 0 && channels.size() > accepts) { logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);ch.close();return;}super.connected(ch);}
復制代碼
public class LimitedThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);// 缺省核心線程數量為0int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);// 缺省最大線程數量200int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);// 任務隊列缺省0int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}}
復制代碼
不配置的話和FixedThreadPool沒有區別。
EagerThreadPool
public class EagerThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);// 0int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);// Integer.MAX_VALUEint threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);// 0int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);// 60sint alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);// init queue and executor// 初始任務隊列為1TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,threads,alive,TimeUnit.MILLISECONDS,taskQueue,new NamedInternalThreadFactory(name, true),new AbortPolicyWithReport(name, url));taskQueue.setExecutor(executor);return executor;}
}
復制代碼
EagerThreadPoolExecutor
public void execute(Runnable command) {if (command == null) {throw new NullPointerException();}// do not increment in method beforeExecute!//已提交任務數量submittedTaskCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException rx) { //大于最大線程數被拒絕任務 重新添加到任務隊列// retry to offer the task into queue.final TaskQueue queue = (TaskQueue) super.getQueue();try {if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {submittedTaskCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.", rx);}} catch (InterruptedException x) {submittedTaskCount.decrementAndGet();throw new RejectedExecutionException(x);}} catch (Throwable t) {// decrease any waysubmittedTaskCount.decrementAndGet();throw t;}}
復制代碼
TaskQueue
public boolean offer(Runnable runnable) {if (executor == null) {throw new RejectedExecutionException("The task queue does not have executor!");}// 獲取當前線程池中的線程數量int currentPoolThreadSize = executor.getPoolSize();// have free worker. put task into queue to let the worker deal with task.// 如果已經提交的任務數量小于當前線程池中線程數量(不是很理解這里的操作)if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {return super.offer(runnable);}// returnfalse to let executor create new worker.//當前線程數小于最大線程程數直接創建新workerif (currentPoolThreadSize < executor.getMaximumPoolSize()) {returnfalse;}// currentPoolThreadSize >= maxreturn super.offer(runnable);}
復制代碼