戴尔集群监控与管理系统_监控与管理
戴爾集群監(jiān)控與管理系統(tǒng)
本文是我們名為“ EAI的Spring集成 ”的學(xué)院課程的一部分。
在本課程中,向您介紹了企業(yè)應(yīng)用程序集成模式以及Spring Integration如何解決它們。 接下來,您將深入研究Spring Integration的基礎(chǔ)知識(shí),例如通道,轉(zhuǎn)換器和適配器。 在這里查看 !
目錄
1.簡介 2.發(fā)布和接收J(rèn)MX通知1.簡介
在嘗試了Spring Integration提供的主要組件并了解了它如何與JMS隊(duì)列或Web服務(wù)之類的其他系統(tǒng)很好地集成之后,本章通過展示不同的監(jiān)視或收集有關(guān)消息傳遞中發(fā)生的情況的機(jī)制來結(jié)束本課程。系統(tǒng)。
其中一些機(jī)制包括通過MBean管理或監(jiān)視應(yīng)用程序,MBean是JMX規(guī)范的一部分。 我們還將學(xué)習(xí)如何監(jiān)視消息以查看消息傳遞流程中涉及哪些組件,以及如何為具有緩沖消息能力的組件保留消息。
本章討論的另一種機(jī)制是我們將如何使用元數(shù)據(jù)存儲(chǔ)實(shí)現(xiàn)EIP冪等接收器模式。
最后,描述的最后一種機(jī)制是控制總線。 這將使我們發(fā)送消息,這些消息將調(diào)用應(yīng)用程序上下文中的組件上的操作。
2.發(fā)布和接收J(rèn)MX通知
JMX規(guī)范定義了一種機(jī)制,該機(jī)制允許MBean發(fā)布將發(fā)送到其他MBean或管理應(yīng)用程序的通知。 Oracle 文檔解釋了如何實(shí)現(xiàn)此機(jī)制。
Spring Integration通過提供能夠發(fā)布和接收J(rèn)MX通知的通道適配器來支持此功能。 我們將看到一個(gè)使用兩個(gè)通道適配器的示例:
- 通知監(jiān)聽通道適配器
- 通知發(fā)布通道適配器
發(fā)布JMX通知
在示例的第一部分中,消息傳遞系統(tǒng)通過其入口網(wǎng)關(guān)接收String消息(具有有效載荷類型為String的消息)。 然后,它使用服務(wù)激活器(通知處理程序)來構(gòu)建javax.management.Notification并將其發(fā)送到通知發(fā)布通道適配器,該適配器將發(fā)布JMX通知。
第一部分的流程如下所示:
圖1
xml配置等同于上圖:
<context:component-scan base-package="xpadro.spring.integration.jmx.notification"/><context:mbean-export/> <context:mbean-server/><!-- Sending Notifications --> <int:gateway service-interface="xpadro.spring.integration.jmx.notification.JmxNotificationGateway" default-request-channel="entryChannel"/><int:channel id="entryChannel"/><int:service-activator input-channel="entryChannel" output-channel="sendNotificationChannel" ref="notificationHandler" method="buildNotification"/><int:channel id="sendNotificationChannel"/><int-jmx:notification-publishing-channel-adapter channel="sendNotificationChannel" object-name="xpadro.spring.integration.jmx.adapter:type=integrationMBean,name=integrationMbean"/>網(wǎng)關(guān)與前面的示例一樣簡單。 請(qǐng)記住,如果只有一種方法,則@Gateway注釋不是必需的:
public interface JmxNotificationGateway {public void send(String type); }Message將到達(dá)服務(wù)激活器,該服務(wù)激活器將使用JMX通知構(gòu)建消息:
@Component("notificationHandler") public class NotificationHandler {private Logger logger = LoggerFactory.getLogger(this.getClass());private static final String NOTIFICATION_TYPE_HEADER = "jmx_notificationType";public void receive(Message<Notification> msg) {logger.info("Notification received: {}", msg.getPayload().getType());}public Message<Notification> buildNotification(Message<String> msg) {Notification notification = new Notification(msg.getPayload(), this, 0);return MessageBuilder.withPayload(notification).copyHeadersIfAbsent(msg.getHeaders()).setHeader(NOTIFICATION_TYPE_HEADER, "myJmxNotification").build();} }注意,我們已經(jīng)設(shè)置了一個(gè)新的標(biāo)題。 這對(duì)于提供通知類型是必需的,否則JMX適配器將拋出IllegalArgumentException并顯示消息“沒有可用的通知類型頭,并且沒有提供默認(rèn)值”。
最后,我們只需要返回消息即可發(fā)送到發(fā)布適配器。 其余的由Spring Integration處理。
接收J(rèn)MX通知
該流程的第二部分包含一個(gè)通知偵聽通道適配器,它將接收我們先前發(fā)布的通知。
圖2
xml配置:
<!-- Receiving Notifications --> <int-jmx:notification-listening-channel-adapter channel="receiveNotificationChannel" object-name="xpadro.spring.integration.jmx.adapter:type=integrationMBean,name=integrationMbean"/><int:channel id="receiveNotificationChannel"/><int:service-activator input-channel="receiveNotificationChannel" ref="notificationHandler" method="receive"/>我們將只收到通知并記錄它:
public void receive(Message<Notification> msg) {logger.info("Notification received: {}", msg.getPayload().getType()); }運(yùn)行示例的應(yīng)用程序:
public class NotificationApp {public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/jmx/config/int-notification-config.xml");JmxNotificationGateway gateway = context.getBean(JmxNotificationGateway.class);gateway.send("gatewayNotification");Thread.sleep(1000);context.close();} }3.從MBean輪詢托管屬性
假設(shè)我們有一個(gè)正在監(jiān)視某些功能的MBean 。 使用屬性輪詢通道適配器,您的應(yīng)用程序?qū)⒛軌蜉喸僊Bean并接收更新的數(shù)據(jù)。
我實(shí)現(xiàn)了一個(gè)MBean ,每次詢問時(shí)都會(huì)生成一個(gè)隨機(jī)數(shù)。 這不是最重要的功能,但可以為我們提供示例:
@Component("pollingMbean") @ManagedResource public class JmxPollingMBean {@ManagedAttributepublic int getNumber() {Random rnd = new Random();int randomNum = rnd.nextInt(100);return randomNum;} }流程再簡單不過了; 我們需要一個(gè)屬性輪詢通道適配器,用于指定MBean的類型和名稱。 適配器將輪詢MBean并將結(jié)果放置在結(jié)果通道中。 輪詢的每個(gè)結(jié)果將通過流標(biāo)準(zhǔn)輸出通道適配器顯示在控制臺(tái)上:
<context:component-scan base-package="xpadro.spring.integration.jmx.polling"/><context:mbean-export/> <context:mbean-server/><!-- Polling --> <int-jmx:attribute-polling-channel-adapter channel="resultChannel"object-name="xpadro.spring.integration.jmx.polling:type=JmxPollingMBean,name=pollingMbean"attribute-name="Number"><int:poller max-messages-per-poll="1" fixed-delay="1000"/> </int-jmx:attribute-polling-channel-adapter><int:channel id="resultChannel"/><int-stream:stdout-channel-adapter channel="resultChannel" append-newline="true"/>運(yùn)行示例的應(yīng)用程序:
public class PollingApp {public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/jmx/config/int-polling-config.xml");context.registerShutdownHook();Thread.sleep(5000);context.close();} }和控制臺(tái)輸出:
2014-04-16 16:23:43,867|AbstractEndpoint|started org.springframework.integration.config.ConsumerEndpointFactoryBean#0 82 72 20 47 21 2014-04-16 16:23:48,878|AbstractApplicationContext|Closing org.springframework.context.support.ClassPathXmlApplicationContext@72839224.調(diào)用MBean操作
下一個(gè)機(jī)制允許我們調(diào)用MBean的操作。 我們將實(shí)現(xiàn)另一個(gè)包含單個(gè)操作的bean,我們的老朋友hello world:
@Component("operationMbean") @ManagedResource public class JmxOperationMBean {@ManagedOperationpublic String hello(String name) {return "Hello " + name;} }現(xiàn)在,如果操作未返回結(jié)果,則可以使用通道適配器,或者如果網(wǎng)關(guān)不返回結(jié)果,則可以使用網(wǎng)關(guān)。 通過以下xml配置,我們導(dǎo)出MBean并使用網(wǎng)關(guān)來調(diào)用操作并等待結(jié)果:
<context:component-scan base-package="xpadro.spring.integration.jmx.operation"/><context:mbean-export/> <context:mbean-server/><int:gateway service-interface="xpadro.spring.integration.jmx.operation.JmxOperationGateway" default-request-channel="entryChannel"/><int-jmx:operation-invoking-outbound-gateway request-channel="entryChannel" reply-channel="replyChannel"object-name="xpadro.spring.integration.jmx.operation:type=JmxOperationMBean,name=operationMbean" operation-name="hello"/><int:channel id="replyChannel"/><int-stream:stdout-channel-adapter channel="replyChannel" append-newline="true"/>為了工作,我們必須指定MBean的類型和名稱以及我們要調(diào)用的操作。 結(jié)果將被發(fā)送到流通道適配器,以便在控制臺(tái)上顯示。
運(yùn)行示例的應(yīng)用程序:
public class OperationApp {public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/jmx/config/int-operation-config.xml");JmxOperationGateway gateway = context.getBean(JmxOperationGateway.class);gateway.hello("World");Thread.sleep(1000);context.close();} }5.將組件導(dǎo)出為MBean
此組件用于將消息通道,消息處理程序和消息端點(diǎn)導(dǎo)出為MBean,以便您可以對(duì)其進(jìn)行監(jiān)視。
您需要將以下配置放入您的應(yīng)用程序:
<int-jmx:mbean-export id="integrationMBeanExporter"default-domain="xpadro.integration.exporter" server="mbeanServer"/><bean id="mbeanServer" class="org.springframework.jmx.support.MBeanServerFactoryBean"><property name="locateExistingServerIfPossible" value="true"/> </bean>并按照Spring 文檔中的說明設(shè)置以下VM參數(shù):
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=6969 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false運(yùn)行該示例的應(yīng)用程序?qū)l(fā)送三則消息:
public class ExporterApp {public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/jmx/config/int-exporter-config.xml");context.registerShutdownHook();JmxExporterGateway gateway = context.getBean(JmxExporterGateway.class);gateway.sendMessage("message 1");Thread.sleep(500);gateway.sendMessage("message 2");Thread.sleep(500);gateway.sendMessage("message 3");} }應(yīng)用程序運(yùn)行后,您可以查看有關(guān)組件的信息。 以下屏幕快照是在JConsole上制作的:
圖3
您會(huì)注意到,輸入通道的sendCount屬性值為3,因?yàn)樵诒臼纠?#xff0c;我們已發(fā)送了3條消息。
6.跟蹤消息路徑
在消息傳遞系統(tǒng)中,組件是松散耦合的。 這意味著發(fā)送組件不需要知道誰將接收消息。 反之,接收者只對(duì)接收到的消息感興趣,而不是誰發(fā)送消息。 當(dāng)我們需要調(diào)試應(yīng)用程序時(shí),這種好處可能不是很好。
消息歷史記錄包括在消息上附加消息傳遞的所有組件的列表。
以下應(yīng)用程序?qū)⑼ㄟ^多個(gè)組件發(fā)送消息來測(cè)試此功能:
圖4
該配置的關(guān)鍵元素在上圖中不可見: message-history元素:
<context:component-scan base-package="xpadro.spring.integration.msg.history"/><int:message-history/><int:gateway id="historyGateway" service-interface="xpadro.spring.integration.msg.history.HistoryGateway" default-request-channel="entryChannel"/><int:channel id="entryChannel"/><int:transformer id="msgTransformer" input-channel="entryChannel" expression="payload + 'transformed'" output-channel="transformedChannel"/><int:channel id="transformedChannel"/><int:service-activator input-channel="transformedChannel" ref="historyActivator"/>使用此配置集,消息流結(jié)尾的服務(wù)激活器將能夠通過查看消息的標(biāo)頭來檢索已訪問組件的列表:
@Component("historyActivator") public class HistoryActivator {private Logger logger = LoggerFactory.getLogger(this.getClass());public void handle(Message<String> msg) {MessageHistory msgHistory = msg.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class);if (msgHistory != null) {logger.info("Components visited: {}", msgHistory.toString());}} }運(yùn)行此示例的應(yīng)用程序:
public class MsgHistoryApp {public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/msg/history/config/int-msg-history-config.xml");HistoryGateway gateway = context.getBean(HistoryGateway.class);gateway.send("myTest");Thread.sleep(1000);context.close();} }結(jié)果將顯示在控制臺(tái)上:
2014-04-16 17:34:52,551|HistoryActivator|Components visited: historyGateway,entryChannel,msgTransformer,transformedChannel7.保留緩沖的消息
Spring Integration中的某些組件可以緩沖消息。 例如,隊(duì)列通道將緩沖消息,直到使用者從中檢索消息為止。 另一個(gè)示例是聚合器端點(diǎn); 如第二個(gè)教程中所見,此終結(jié)點(diǎn)將根據(jù)組的發(fā)布策略來收集消息,直到組完成為止。
這些集成模式意味著,如果發(fā)生故障,則緩沖的消息可能會(huì)丟失。 為了防止這種情況,我們可以保留這些消息,例如將它們存儲(chǔ)到數(shù)據(jù)庫中。 默認(rèn)情況下,Spring Integration將這些消息存儲(chǔ)在內(nèi)存中。 我們將使用消息存儲(chǔ)來更改此設(shè)置。
對(duì)于我們的示例,我們將這些消息存儲(chǔ)到MongoDB數(shù)據(jù)庫中。 為此,我們只需要以下配置:
<bean id="mongoDbFactory" class="org.springframework.data.mongodb.core.SimpleMongoDbFactory"><constructor-arg><bean class="com.mongodb.Mongo"/></constructor-arg><constructor-arg value="jcgdb"/> </bean><bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.ConfigurableMongoDbMessageStore"><constructor-arg ref="mongoDbFactory"/> </bean>現(xiàn)在,我們將創(chuàng)建一個(gè)應(yīng)用程序來測(cè)試此功能。 我實(shí)現(xiàn)了一個(gè)通過網(wǎng)關(guān)接收帶有String有效負(fù)載的消息的流。 網(wǎng)關(guān)將該消息發(fā)送到隊(duì)列通道,該通道將緩沖消息,直到服務(wù)激活程序msgStoreActivator從隊(duì)列中檢索到該消息為止。 服務(wù)激活器將每五秒鐘輪詢一次消息:
<context:component-scan base-package="xpadro.spring.integration.msg.store"/><import resource="mongodb-config.xml"/><int:gateway id="storeGateway" service-interface="xpadro.spring.integration.msg.store.MsgStoreGateway" default-request-channel="entryChannel"/><int:channel id="entryChannel"><int:queue message-store="myMessageStore"/> </int:channel><int:service-activator input-channel="entryChannel" ref="msgStoreActivator"><int:poller fixed-rate="5000"/> </int:service-activator>也許您已經(jīng)注意到myMessageStore bean。 為了查看持久消息機(jī)制的工作方式,我擴(kuò)展了ConfigurableMongoDBMessageStore類以將日志放入其中并調(diào)試結(jié)果。 如果要嘗試此操作,可以刪除mongodb-config.xml的MongoDB messageStore bean,因?yàn)槲覀儾辉偈褂盟?
我已經(jīng)覆蓋了兩種方法:
@Component("myMessageStore") public class MyMessageStore extends ConfigurableMongoDbMessageStore {private Logger logger = LoggerFactory.getLogger(this.getClass());private static final String STORE_COLLECTION_NAME = "messageStoreCollection";@Autowiredpublic MyMessageStore(MongoDbFactory mongoDbFactory) {super(mongoDbFactory, STORE_COLLECTION_NAME);logger.info("Creating message store '{}'", STORE_COLLECTION_NAME);}@Overridepublic MessageGroup addMessageToGroup(Object groupId, Message<?> message) {logger.info("Adding message '{}' to group '{}'", message.getPayload(), groupId);return super.addMessageToGroup(groupId, message);}@Overridepublic Message<?> pollMessageFromGroup(Object groupId) {Message<?> msg = super.pollMessageFromGroup(groupId);if (msg != null) {logger.info("polling message '{}' from group '{}'", msg.getPayload(), groupId);}else {logger.info("Polling null message from group {}", groupId);}return msg;} }該機(jī)制的工作原理如下:
讓我們通過調(diào)試代碼來看看它是如何工作的。 我們將在輪詢消息以查看其在數(shù)據(jù)庫中的存儲(chǔ)方式之前停止:
圖5
現(xiàn)在,我們可以看一下數(shù)據(jù)庫:
圖6
恢復(fù)后,將從集合中輪詢消息:
圖7
運(yùn)行示例的應(yīng)用程序:
public class MsgStoreApp {public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/msg/store/config/int-msg-store-config.xml");MsgStoreGateway gateway = context.getBean(MsgStoreGateway.class);gateway.send("myMessage");Thread.sleep(30000);context.close();} }8.實(shí)現(xiàn)冪等組件
如果我們的應(yīng)用程序需要避免重復(fù)消息,Spring Integration通過實(shí)現(xiàn)冪等接收器模式來提供此機(jī)制。 負(fù)責(zé)檢測(cè)重復(fù)消息的是元數(shù)據(jù)存儲(chǔ)組件。 該組件包括存儲(chǔ)鍵值對(duì)。 該框架提供了接口MetadataStore兩種實(shí)現(xiàn):
- SimpleMetadataStore :默認(rèn)實(shí)現(xiàn)。 它使用內(nèi)存映射來存儲(chǔ)信息。
- PropertiesPersistingMetadataStore :如果需要持久化數(shù)據(jù),則很有用。 它使用屬性文件。 我們將在示例中使用此實(shí)現(xiàn)。
好的,讓我們從配置文件開始:
<context:component-scan base-package="xpadro.spring.integration.msg.metadata"/><bean id="metadataStore" class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/><int:gateway id="metadataGateway" service-interface="xpadro.spring.integration.msg.metadata.MetadataGateway"default-request-channel="entryChannel"/><int:channel id="entryChannel"/><int:filter input-channel="entryChannel" output-channel="processChannel"discard-channel="discardChannel" expression="@metadataStore.get(headers.messageId) == null"/><!-- Process message --> <int:publish-subscribe-channel id="processChannel"/><int:outbound-channel-adapter channel="processChannel" expression="@metadataStore.put(headers.messageId, '')"/><int:service-activator input-channel="processChannel" ref="metadataActivator" method="process"/><!-- Duplicated message - discard it --> <int:channel id="discardChannel"/><int:service-activator input-channel="discardChannel" ref="metadataActivator" method="discard"/>我們定義了一個(gè)“ metadataStore”,以便使用我們的屬性元數(shù)據(jù)存儲(chǔ)來代替默認(rèn)的內(nèi)存實(shí)現(xiàn)。
流程說明如下:
元數(shù)據(jù)存儲(chǔ)在文件系統(tǒng)中創(chuàng)建屬性文件。 如果您使用Windows,則會(huì)在“ C:\ Users \用戶名\ AppData \ Local \ Temp \ spring-integration”文件夾中看到一個(gè)meta-store.properties文件
該示例使用服務(wù)激活器來記錄是否已處理消息:
@Component("metadataActivator") public class MetadataActivator {private Logger logger = LoggerFactory.getLogger(this.getClass());public void process(Message<String> msg) {logger.info("Message processed: {}", msg.getPayload());}public void discard(Message<String> msg) {logger.info("Message discarded: {}", msg.getPayload());} }該應(yīng)用程序?qū)⑦\(yùn)行示例:
public class MetadataApp {private static final String MESSAGE_STORE_HEADER = "messageId";public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/msg/metadata/config/int-msg-metadata-config.xml");MetadataGateway gateway = context.getBean(MetadataGateway.class);Map<String,String> headers = new HashMap<>();headers.put(MESSAGE_STORE_HEADER, "msg1");Message<String> msg1 = MessageBuilder.withPayload("msg1").copyHeaders(headers).build();headers = new HashMap<>();headers.put(MESSAGE_STORE_HEADER, "msg2");Message<String> msg2 = MessageBuilder.withPayload("msg2").copyHeaders(headers).build();gateway.sendMessage(msg1);Thread.sleep(500);gateway.sendMessage(msg1);Thread.sleep(500);gateway.sendMessage(msg2);Thread.sleep(3000);context.close();} }第一次調(diào)用將在控制臺(tái)上產(chǎn)生以下輸出:
2014-04-17 13:00:08,223|MetadataActivator|Message processed: msg1 2014-04-17 13:00:08,726|MetadataActivator|Message discarded: msg1 2014-04-17 13:00:09,229|MetadataActivator|Message processed: msg2現(xiàn)在請(qǐng)記住,PropertiesPersistingMetadataStore將數(shù)據(jù)存儲(chǔ)在屬性文件中。 這意味著該數(shù)據(jù)將在ApplicationContext重新啟動(dòng)后繼續(xù)存在。 因此,如果我們不刪除屬性文件,而是再次運(yùn)行示例,結(jié)果將有所不同:
2014-04-17 13:02:27,117|MetadataActivator|Message discarded: msg1 2014-04-17 13:02:27,620|MetadataActivator|Message discarded: msg1 2014-04-17 13:02:28,123|MetadataActivator|Message discarded: msg29.發(fā)送操作調(diào)用請(qǐng)求
本教程討論的最后一種機(jī)制是控制總線 。 控制總線將使您以與應(yīng)用程序相同的方式管理系統(tǒng)。 該消息將作為一種Spring Expression Language執(zhí)行。 若要從控制總線執(zhí)行,該方法需要使用@ManagedAttribute或@ManagedOperation批注。
本節(jié)的示例使用控制總線來調(diào)用Bean上的方法:
<context:component-scan base-package="xpadro.spring.integration.control.bus"/><int:channel id="entryChannel"/><int:control-bus input-channel="entryChannel" output-channel="resultChannel"/><int:channel id="resultChannel"/><int:service-activator input-channel="resultChannel" ref="controlbusActivator"/>將被調(diào)用的操作如下:
@Component("controlbusBean") public class ControlBusBean {@ManagedOperationpublic String greet(String name) {return "Hello " + name;} }運(yùn)行該示例的應(yīng)用程序發(fā)送一條消息,其中包含要執(zhí)行的表達(dá)式:
public class ControlBusApp {public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:xpadro/spring/integration/control/bus/config/int-control-bus-config.xml");MessageChannel channel = context.getBean("entryChannel", MessageChannel.class);Message<String> msg = MessageBuilder.withPayload("@controlbusBean.greet('World!')").build();channel.send(msg);Thread.sleep(3000);context.close();} }結(jié)果顯示在控制臺(tái)上:
2014-04-17 13:21:42,910|ControlBusActivator|Message received: Hello World!翻譯自: https://www.javacodegeeks.com/2015/09/monitoring-and-management.html
戴爾集群監(jiān)控與管理系統(tǒng)
總結(jié)
以上是生活随笔為你收集整理的戴尔集群监控与管理系统_监控与管理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: amber 口译_口译员设计模式示例
- 下一篇: eclipse clean_Clean