[三]RabbitMQ-客户端源码之ChannelManager
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-channelmanager/
關于ChannelManager,官方注解:Manages a set of channels, indexed by channel number (1… _channelMax)。
ChannelManager類的代碼量不是很多,主要用來管理Channel的,channelNumber=0的除外,應為channelNumber=0是留給Connection的特殊的channelNumber。
下面是ChannelManager的成員變量:
/** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */ private final Object monitor = new Object();/** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>();private final IntAllocator channelNumberAllocator;private final ConsumerWorkService workService;private final Set<CountDownLatch> shutdownSet = new HashSet<CountDownLatch>();/** Maximum channel number available on this connection. */ private final int _channelMax; private final ThreadFactory threadFactory;這上面的成員變量下面會有涉及。
對于ChannelManager的使用,是AMQConnection中的成員變量:
/** Object that manages a set of channels */ private volatile ChannelManager _channelManager;AMQConnection中start()的_channelManager中對其初始化:
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {return new ChannelManager(this._workService, channelMax, threadFactory); }再調用其構造函數:
public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {if (channelMax == 0) {// The framing encoding only allows for unsigned 16-bit integers// for the channel numberchannelMax = (1 << 16) - 1;}_channelMax = channelMax;channelNumberAllocator = new IntAllocator(1, channelMax);this.workService = workService;this.threadFactory = threadFactory; }這里的ConsumerWorkService也在AMQConnection的start()方法中初始化——initializeConsumerWorkService():
private void initializeConsumerWorkService() {this._workService = new ConsumerWorkService(executor, threadFactory, shutdownTimeout); }再回到構造函數。
channelMax參數是在client接收到broker的Connection.Tune幀中的“Channel-Max”參數之后設置的,如果為0則表示沒有限制,這里就會設置為默認的最大值:2的16次方-1。
threadFactory參數是指:Executors.defaultThreadFactory();
關于ConsumerWorkService請參考文章末尾處。
使用過RabbitMQ的同學知道要生產或者消費消息之前必須要初始化Channel,如下:
Channel channel = connection.createChannel();這個createChannel()是AMQConnection中的方法:
public Channel createChannel(int channelNumber) throws IOException {ensureIsOpen();ChannelManager cm = _channelManager;if (cm == null) return null;return cm.createChannel(this, channelNumber); } public Channel createChannel() throws IOException {ensureIsOpen();ChannelManager cm = _channelManager;if (cm == null) return null;return cm.createChannel(this); }這里就是調用了ChannelManager的createChannel方法。
下面是ChannelManager中關于創建Channel的代碼:
public ChannelN createChannel(AMQConnection connection) throws IOException {ChannelN ch;synchronized (this.monitor) {int channelNumber = channelNumberAllocator.allocate();if (channelNumber == -1) {return null;} else {ch = addNewChannel(connection, channelNumber);}}ch.open(); // now that it's been safely addedreturn ch; }public ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException {ChannelN ch;synchronized (this.monitor) {if (channelNumberAllocator.reserve(channelNumber)) {ch = addNewChannel(connection, channelNumber);} else {return null;}}ch.open(); // now that it's been safely addedreturn ch; }private ChannelN addNewChannel(AMQConnection connection, int channelNumber) throws IOException {if (_channelMap.containsKey(channelNumber)) {// That number's already allocated! Can't do it// This should never happen unless something has gone// badly wrong with our implementation.throw new IllegalStateException("We have attempted to "+ "create a channel with a number that is already in "+ "use. This should never happen. "+ "Please report this as a bug.");}ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);_channelMap.put(ch.getChannelNumber(), ch);return ch; }protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {return new ChannelN(connection, channelNumber, workService); }上面有兩個createChannel方法,一個是帶了channelNumber的,一個是自動分片channelNumber的,分別對應AMQConnection中的兩個方法。最后都調用addNewChannel方法。
注意兩個createChannel方法中都有這樣一句代碼:
ch.open();這個是什么呢?其實是調用ChannelN的open方法:
/*** Package method: open the channel.* This is only called from {@link ChannelManager}.* @throws IOException if any problem is encountered*/ public void open() throws IOException {// wait for the Channel.OpenOk response, and ignore itexnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND)); }這樣就調用了AMQChannel的rpc方法,向broker發送了一個Channel.Open幀。
addNewChannel方法實際上是創建了一個ChannelN對象,然后置其于ChannelManager中的_channelMap中,方便管理。
channelNumberAllocator是channelNumber的分配器,其原理是采用BitSet來實現channelNumber的分配,有興趣的同學可以深究進去看看。
關于ChannelN類會有專門一篇博文來講述,其實整個RabbitMQ-client的代碼最關鍵的就是ChannelN這個類,需要著重講述。
細心的朋友可能會發現關于ConsumerWorkService這個,我并沒有做什么闡述。這個主要牽涉到Channel層面的處理,涉及到的類有AMQConnection, ChannelN, ConsumerDispatcher等。ConsumerWorkService是在AMQConnection中初始化,在ChannelManager中引用。至于這里怎么理解,在ChannelN中這么解釋:
service for managing this channel’s consumer callbacks。意思是管理消費回調的服務。
綜述,ChannelManager主要用來管理Channel, 包括channelNumber與Channel之間的映射關系。
附:本系列全集
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-channelmanager/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的[三]RabbitMQ-客户端源码之ChannelManager的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [一]RabbitMQ-客户端源码之Co
- 下一篇: [四]RabbitMQ-客户端源码之Fr