IBM WebSphere MQ概述
網上關于IBM WebSphere MQ的資料挺少的,畢竟是一項老技術,整理一下從零開始對于IBM WebSphere MQ的理解
IBM WebSphere MQ是一種消息中間件技術,可用于多個系統間通信,對于消息中間件的作用,這里就不多做展開。首先要了解IBM WebSphere MQ中的幾個名詞定義
- 隊列管理器:構件獨立的MQ運行環境,類似于RabbitMQ中的vhost,主要作用是維護和管理消息隊列
- 隊列:存放消息的容器,可分為本地隊列,遠程隊列等
- 消息:MQ中的最小對象,默認情況下,消息缺省可以達到 4MB。消息可以分成持久消息和非持久消息。所 謂“持久”的 意思,就是在MQ 隊列管理器重啟動后,消息是否仍然能保持。持久的消息寫入或讀出隊列的同時會在 Log 中記錄,所以性能上比非持久消息差不少。
- 通道:通道是兩個隊列管理器之間的一種單向的點對點的通信連接。隊列管理器之間的通信是通過配置通道來實現 的,通道兩側的隊列管理器對這個通道的相關參數應該能對應起來。在通道上可以配置不同的通信協議,這樣就使得編程接口與通信協議無關。通道兩端的配置必須匹配, 且名字相同,否則無法連通。消息在通道中只能單向的流動。如果需要雙向的流動,可以創建一對通道,一來一去。
其基本服務架構如圖
圖中顯示了 IBM WebSphere MQ 編程的原理。第一步是讓應用程序與隊列管理器連接。它通過 MQConnect 調用來進行此連接。下一步使用 MQOpen 調用為輸出打開一個隊列。然后應用程序使用 MQPut 調用將其數據放到隊列上。要接收數據,應用程序調用 MQOpen 調用打開輸入隊列。應用程序使用 MQGet 調用從隊列上接收數據。
還展示了消息通道代理(MCA)、通道出口和對象權限管理器(OAM)。MCA 是 IBM WebSphere MQ 程序,它使用現有傳輸服務諸如 TCP/IP 與 SNA 將消息從本地傳輸隊列移到目標隊列管理器。這些傳輸服務即通道。通道出口是用戶寫入庫,可以在通道運作期間,從已定義位置號之一進入這些庫。OAM 是命令和對象管理的缺省授權服務(針對操作系統)。這三個組件對 IBM WebSphere MQ 的現有安全性解決方案非常重要。
IBM WebSphere MQ服務的安裝
服務端的安裝這里就不做描述,請查閱其他博文。
服務安裝后為了使客戶端能連接使用,需做如下準備:
創建隊列管理器新建本地隊列新建服務連接通道,并將其屬性-MCA用戶標識值設置為“MUSR_MQADMIN”
注意:此處博主并未設置客戶端的登錄賬號與密碼,因為未找到在哪里設置,請知曉的朋友告知一下。
springboot配置多個隊列管理器連接
1.引入依賴
<dependency><groupId>org.springframework
</groupId><artifactId>spring-jms
</artifactId></dependency><dependency><groupId>javax.jms
</groupId><artifactId>javax.jms-api
</artifactId><version>2.0.1
</version></dependency><dependency><groupId>com.ibm.mq
</groupId><artifactId>com.ibm.mq.allclient
</artifactId><version>9.1.1.0
</version></dependency>
yml配置文件信息
spring:ibmmq:zsam:host: 10.1.16.156port: 1415 #此處配置的是隊列管理器監聽端口queue-manager: qm_ntfm_zsamchannel: ch_serviceccsid: 1381 #CCSID要與連接到的隊列管理器一致,Windows下默認為1381,Linux下默認為1208。1208表示UTF-8字符集,建議把隊列管理器的CCSID改為1208username: MUSR_MQADMINpassword:receive-timeout: 2000pub-queue: ZSAM.TO.NTFM #推送給NTFM的對列名FCTI-queue: FCTI.TO.ZSAMzsfz:host: 10.1.16.103port: 1416queue-manager: qm_ntfm_zsfzchannel: ch_serviceccsid: 1381username: 1password: 1receive-timeout: 2000pub-queue: ZSFZ.TO.NTFMFCTI-queue: FCTI.TO.ZSFZ
IBM MQ配置類
package com
.pantech
.ntfmadapter
.config
;import com
.ibm
.mq
.jms
.MQQueueConnectionFactory
;
import com
.ibm
.msg
.client
.wmq
.common
.CommonConstants
;
import lombok
.extern
.slf4j
.Slf4j
;
import org
.springframework
.beans
.factory
.annotation
.Qualifier
;
import org
.springframework
.beans
.factory
.annotation
.Value
;
import org
.springframework
.context
.annotation
.Bean
;
import org
.springframework
.context
.annotation
.Configuration
;
import org
.springframework
.context
.annotation
.Primary
;
import org
.springframework
.jms
.config
.DefaultJmsListenerContainerFactory
;
import org
.springframework
.jms
.connection
.CachingConnectionFactory
;
import org
.springframework
.jms
.connection
.UserCredentialsConnectionFactoryAdapter
;
import org
.springframework
.jms
.core
.JmsOperations
;
import org
.springframework
.jms
.core
.JmsTemplate
;
@Configuration
@Slf4j
public class IbmMqConfig {@Bean(name
= "zsamMqQueueConnectionFactory")public MQQueueConnectionFactory
zsamMqQueueConnectionFactory(@Value("${spring.ibmmq.zsam.host}") String host
,@Value("${spring.ibmmq.zsam.ccsid}") Integer ccsid
,@Value("${spring.ibmmq.zsam.channel}") String channel
,@Value("${spring.ibmmq.zsam.port}") Integer port
,@Value("${spring.ibmmq.zsam.queue-manager}") String queueManager
) {MQQueueConnectionFactory mqQueueConnectionFactory
= new MQQueueConnectionFactory();mqQueueConnectionFactory
.setHostName(host
);try {mqQueueConnectionFactory
.setTransportType(CommonConstants
.WMQ_CM_CLIENT
);mqQueueConnectionFactory
.setCCSID(ccsid
);mqQueueConnectionFactory
.setChannel(channel
);mqQueueConnectionFactory
.setPort(port
);mqQueueConnectionFactory
.setQueueManager(queueManager
);} catch (Exception e
) {log
.error("創建ZSAM機場IBM MQ連接工廠異常,原因:", e
);}return mqQueueConnectionFactory
;}@Bean(name
= "zsamUserCredentialsConnectionFactoryAdapter")UserCredentialsConnectionFactoryAdapter
zsamUserCredentialsConnectionFactoryAdapter(@Value("${spring.ibmmq.zsam.username}") String username
,@Value("${spring.ibmmq.zsam.password}") String password
,@Qualifier("zsamMqQueueConnectionFactory") MQQueueConnectionFactory mqQueueConnectionFactory
) {UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter
= new UserCredentialsConnectionFactoryAdapter();userCredentialsConnectionFactoryAdapter
.setUsername(username
);userCredentialsConnectionFactoryAdapter
.setPassword(password
);userCredentialsConnectionFactoryAdapter
.setTargetConnectionFactory(mqQueueConnectionFactory
);return userCredentialsConnectionFactoryAdapter
;}@Bean(name
= "zsamCachingConnectionFactory")@Primarypublic CachingConnectionFactory
zsamCachingConnectionFactory(@Qualifier("zsamUserCredentialsConnectionFactoryAdapter") UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter
) {CachingConnectionFactory cachingConnectionFactory
= new CachingConnectionFactory();cachingConnectionFactory
.setTargetConnectionFactory(userCredentialsConnectionFactoryAdapter
);cachingConnectionFactory
.setSessionCacheSize(500);cachingConnectionFactory
.setReconnectOnException(true);return cachingConnectionFactory
;}@Bean(name
= "zsamJmsQueueListenerContainerFactory")@Primarypublic DefaultJmsListenerContainerFactory
zsamJmsQueueListenerContainerFactory(@Qualifier("zsamCachingConnectionFactory") CachingConnectionFactory cachingConnectionFactory
) {DefaultJmsListenerContainerFactory factory
= new DefaultJmsListenerContainerFactory();factory
.setConnectionFactory(cachingConnectionFactory
);factory
.setConcurrency("3-10");factory
.setRecoveryInterval(1000L
);return factory
;}@Bean(name
= "zsamJmsOperations")public JmsOperations
zsamJmsOperations(@Value("${spring.ibmmq.zsam.receive-timeout}") Integer receiveTimeout
,@Qualifier("zsamCachingConnectionFactory") CachingConnectionFactory cachingConnectionFactory
) {JmsTemplate jmsTemplate
= new JmsTemplate(cachingConnectionFactory
);jmsTemplate
.setReceiveTimeout(receiveTimeout
);return jmsTemplate
;}/*** 配置連接工廠
*/@Bean(name
= "zsfzMqQueueConnectionFactory")public MQQueueConnectionFactory
zsfzMqQueueConnectionFactory(@Value("${spring.ibmmq.zsfz.host}") String host
,@Value("${spring.ibmmq.zsfz.ccsid}") Integer ccsid
,@Value("${spring.ibmmq.zsfz.channel}") String channel
,@Value("${spring.ibmmq.zsfz.port}") Integer port
,@Value("${spring.ibmmq.zsfz.queue-manager}") String queueManager
) {MQQueueConnectionFactory mqQueueConnectionFactory
= new MQQueueConnectionFactory();mqQueueConnectionFactory
.setHostName(host
);try {mqQueueConnectionFactory
.setTransportType(CommonConstants
.WMQ_CM_CLIENT
);mqQueueConnectionFactory
.setCCSID(ccsid
);mqQueueConnectionFactory
.setChannel(channel
);mqQueueConnectionFactory
.setPort(port
);mqQueueConnectionFactory
.setQueueManager(queueManager
);} catch (Exception e
) {log
.error("創建ZSFZ機場IBM MQ連接工廠異常,原因:", e
);}return mqQueueConnectionFactory
;}@Bean(name
= "zsfzUserCredentialsConnectionFactoryAdapter")UserCredentialsConnectionFactoryAdapter
zsfzUserCredentialsConnectionFactoryAdapter(@Value("${spring.ibmmq.zsfz.username}") String username
,@Value("${spring.ibmmq.zsfz.password}") String password
,@Qualifier("zsfzMqQueueConnectionFactory") MQQueueConnectionFactory mqQueueConnectionFactory
) {UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter
= new UserCredentialsConnectionFactoryAdapter();userCredentialsConnectionFactoryAdapter
.setUsername(username
);userCredentialsConnectionFactoryAdapter
.setPassword(password
);userCredentialsConnectionFactoryAdapter
.setTargetConnectionFactory(mqQueueConnectionFactory
);return userCredentialsConnectionFactoryAdapter
;}@Bean(name
= "zsfzCachingConnectionFactory")public CachingConnectionFactory
zsfzCachingConnectionFactory(@Qualifier("zsfzUserCredentialsConnectionFactoryAdapter") UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter
) {CachingConnectionFactory cachingConnectionFactory
= new CachingConnectionFactory();cachingConnectionFactory
.setTargetConnectionFactory(userCredentialsConnectionFactoryAdapter
);cachingConnectionFactory
.setSessionCacheSize(500);cachingConnectionFactory
.setReconnectOnException(true);return cachingConnectionFactory
;}@Bean(name
= "zsfzJmsQueueListenerContainerFactory")public DefaultJmsListenerContainerFactory
zsfzJmsQueueListenerContainerFactory(@Qualifier("zsfzCachingConnectionFactory") CachingConnectionFactory cachingConnectionFactory
) {DefaultJmsListenerContainerFactory factory
= new DefaultJmsListenerContainerFactory();factory
.setConnectionFactory(cachingConnectionFactory
);factory
.setConcurrency("3-10");factory
.setRecoveryInterval(1000L
);return factory
;}@Bean(name
= "zsfzJmsOperations")public JmsOperations
zsfzJmsOperations(@Value("${spring.ibmmq.zsfz.receive-timeout}") Integer receiveTimeout
,@Qualifier("zsfzCachingConnectionFactory") CachingConnectionFactory cachingConnectionFactory
) {JmsTemplate jmsTemplate
= new JmsTemplate(cachingConnectionFactory
);jmsTemplate
.setReceiveTimeout(receiveTimeout
);return jmsTemplate
;}
}
監聽IBM MQ上的隊列消息
package com
.pantech
.ntfmadapter
.consumer
;import com
.pantech
.ntfmadapter
.service
.HandlerService
;
import lombok
.extern
.slf4j
.Slf4j
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.jms
.annotation
.JmsListener
;
import org
.springframework
.jms
.listener
.adapter
.MessageListenerAdapter
;
import org
.springframework
.stereotype
.Component
;import javax
.jms
.Message
;
import javax
.jms
.TextMessage
;
@Component
@Slf4j
public class ZsamIbmMqConsumer extends MessageListenerAdapter {private final HandlerService handlerService
;@Autowiredpublic ZsamIbmMqConsumer(HandlerService handlerService
) {this.handlerService
= handlerService
;}@Override@JmsListener(destination
= "${spring.ibmmq.zsam.FCTI-queue}")@JmsListener(destination
= "${spring.ibmmq.zsfz.FCTI-queue}", containerFactory
= "zsfzJmsQueueListenerContainerFactory")public void onMessage(Message message
) {try {TextMessage textMessage
= (TextMessage
) message
;String messagebody
= textMessage
.getText();log
.info("監聽到NTFM系統廈門機場消息:{}", messagebody
);handlerService
.ntfmHandler(messagebody
);} catch (Exception e
) {log
.error("{}消息獲取發生異常,原因:", message
, e
);}}
}
發送消息到IBM MQ
package com
.pantech
.ntfmadapter
.provider
;import lombok
.extern
.slf4j
.Slf4j
;
import org
.springframework
.amqp
.core
.Message
;
import org
.springframework
.amqp
.rabbit
.core
.RabbitTemplate
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.beans
.factory
.annotation
.Value
;
import org
.springframework
.jms
.core
.JmsOperations
;
import org
.springframework
.stereotype
.Component
;
@Component
@Slf4j
public class Provider {private final RabbitTemplate rabbitTemplate
;private final JmsOperations zsamJmsOperations
;private final JmsOperations zsfzJmsOperations
;@Value("${spring.ibmmq.zsam.pub-queue}")private String pubQueueName
;@Autowiredpublic Provider(RabbitTemplate rabbitTemplate
, JmsOperations zsamJmsOperations
, JmsOperations zsfzJmsOperations
) {this.rabbitTemplate
= rabbitTemplate
;this.zsamJmsOperations
= zsamJmsOperations
;this.zsfzJmsOperations
= zsfzJmsOperations
;}public void sendToImbMq(String messageBody
){zsamJmsOperations
.convertAndSend(pubQueueName
,messageBody
);log
.info("推送至IBM MQ成功,原消息為:{}",messageBody
);}public void sendToRabbitMq(String messageBody
){Message message
= new Message(messageBody
.getBytes());rabbitTemplate
.convertAndSend("",message
);log
.info("轉發至RabbitMQ成功,原消息為:{}",messageBody
);}
}
springboot配置監聽單個隊列管理器
大體與上面配置多個隊列管理器類似,這里貼一下配置類代碼
@Configuration
public class IbmMqConfig {@Value("${spring.ibmmq.host}")private String host
;@Value("${spring.ibmmq.port}")private Integer port
;@Value("${spring.ibmmq.queue-manager}")private String queueManager
;@Value("${spring.ibmmq.channel}")private String channel
;@Value("${spring.ibmmq.username}")private String username
;@Value("${spring.ibmmq.password}")private String password
;@Value("${spring.ibmmq.receive-timeout}")private long receiveTimeout
;@Beanpublic MQQueueConnectionFactory
mqQueueConnectionFactory() {MQQueueConnectionFactory mqQueueConnectionFactory
= new MQQueueConnectionFactory();mqQueueConnectionFactory
.setHostName(host
);try {mqQueueConnectionFactory
.setTransportType(CommonConstants
.WMQ_CM_CLIENT
);mqQueueConnectionFactory
.setCCSID(1381);mqQueueConnectionFactory
.setChannel(channel
);mqQueueConnectionFactory
.setPort(port
);mqQueueConnectionFactory
.setQueueManager(queueManager
);} catch (Exception e
) {e
.printStackTrace();}return mqQueueConnectionFactory
;}
@Bean@Primarypublic CachingConnectionFactory
cachingConnectionFactory(MQQueueConnectionFactory mqQueueConnectionFactory
) {CachingConnectionFactory cachingConnectionFactory
= new CachingConnectionFactory();cachingConnectionFactory
.setTargetConnectionFactory(mqQueueConnectionFactory
);cachingConnectionFactory
.setSessionCacheSize(500);cachingConnectionFactory
.setReconnectOnException(true);return cachingConnectionFactory
;}@Beanpublic PlatformTransactionManager
jmsTransactionManager(CachingConnectionFactory cachingConnectionFactory
) {JmsTransactionManager jmsTransactionManager
= new JmsTransactionManager();jmsTransactionManager
.setConnectionFactory(cachingConnectionFactory
);return jmsTransactionManager
;}@Beanpublic JmsOperations
jmsOperations(CachingConnectionFactory cachingConnectionFactory
) {JmsTemplate jmsTemplate
= new JmsTemplate(cachingConnectionFactory
);jmsTemplate
.setReceiveTimeout(receiveTimeout
);return jmsTemplate
;}
我待你好,不要罵我。
總結
以上是生活随笔為你收集整理的spring boot整合IBM WebSphere MQ,并配置多个队列管理器的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。