Mule的消息路由
當談論整合應用時,消息路由備受關注。當我們確定了各個應用,并選擇Mule作為整合平臺,也知道在Mule的服務中可以使用哪些Java類和web services處理消息,那么為了讓消息正確地在服務間流轉(zhuǎn),該如何將所有的事情整合在一起,從而確保獲得我們所需要的的結果呢?
Mule為您的Mule應用中的服務間的路由消息提供了強大而靈活的可選項。本文描述了Mule的常用消息類型和幾種可用的特殊路由器。
下面介紹消息路由器的相關核心概念:
相關廠商內(nèi)容
InfoQ金秋招聘編輯/銷售等重要崗位,高成長高福利 ???????????????????????????
Top100summit案例分析:有道云筆記云端架構設計與重構 ???????????????????????????
??????????????????????????????? 白皮書下載:開發(fā)多平臺、多設備的原生應用 ???????????????????????????
Top100summit案例分析:增長Facebook Location-Based Service 經(jīng)驗談 ???????????????????????????
白皮書下載:Polarion Requirements創(chuàng)新基于Web的需求管理解決方案 ???????????????????????????
- 端點(Endpoints)定義了發(fā)送和接收消息的通道(channel)。例如,一個購物組件可能會接收到一個HTTP訂單請求。一旦該組件處理完訂單請求,它可能會發(fā)送一個 JMS消息到一個主題(topic)上,以通知審計系統(tǒng),并返回一個HTTP響應??梢酝ㄟ^端點監(jiān)聽JMS消息、發(fā)送email、調(diào)用web services等。
- 入站路由器(Inbound router)控制服務如何處理入站(incoming)消息,比如,有選擇地只消費符合特定條件的消息,或者在將消息轉(zhuǎn)發(fā)給服務處理之前,將擁有同一group ID的消息聚合(group)在一起。
- 出站路由器(Outbound router)控制如何分發(fā)經(jīng)服務處理過的消息,比如,將消息發(fā)送到一個接受者列表,或者將消息分解成多個部分,并將它們分別發(fā)送至不同的端點。
- 異步回復路由器(Asynchronous reply router)常用于request/response場景。在這些場景中,發(fā)送一個請求會觸發(fā)一個或者更多的請求,并且在返回響應之前,需要考慮這些請求的結果。典型的例子是請求發(fā)送后,會并行執(zhí)行任務(task)。在返回響應之前,必須執(zhí)行完每個任務,處理完結果。
- Catch-all策略在當前消息找不到路由路徑時才被調(diào)用。入站和出站端點都可以配置catch-all策略,因此可以捕獲到任何孤立的消息,并將這些消息路由到一個共同的位置。
- 過濾器提供用于調(diào)用特定路由器的邏輯。通過邏輯過濾器AndFilter、OrFilter和NotFilter可以將過濾器組合在一起使用。并非所有的路由器都需要使用過濾器,但是所有的路由器都支持過濾器。
選擇消息類型
當將Mule的服務結合在一起時,初用Mule的人有時會感到困惑,他們不知何時該使用出站路由器,何時可以最大程度地簡化獲得回復信息。下面介紹 Mule中可使用的消息類型,可以通過一個列表查看各個傳輸(transport)所支持的消息類型,詳細內(nèi)容可以查看Mule用戶指南中的傳輸特征矩陣(Transports Feature Matrix)(查看前需要先登錄,但注冊是免費的,只需花費一點時間就可以完成注冊。)
異步
如果只想將消息以“即發(fā)即棄(fire and forget)”的方式發(fā)送給一個服務,(并不需要給調(diào)用者返回響應),那么可使用異步消息類型。如果將入站端點的synchronous屬性設置為false,它就不會給調(diào)用者返回響應。
例如:
<model name="Asynchronous_Message_Pattern"><service name="AsynchronousService"><inbound><jms:inbound-endpoint queue="test.in" synchronous="false"/></inbound><component class="org.myorg.WidgetHandler"/><outbound><pass-through-router><jms:outbound-endpoint queue="test.out"></pass-through-router></outbound></service> </model>Request-Response
在簡單的Request-Response場景中,服務在一個同步的入口端點上接收請求,并處理該請求,然后將它作為回復發(fā)送給調(diào)用者。例如,如果用戶在 HTML表單中輸入一個值,想轉(zhuǎn)換該值并將其結果顯示在同一個頁面上,那么可以在該服務上簡單地配置一個同步入站端點,由該服務完成數(shù)據(jù)轉(zhuǎn)換。這種場景并不需要使用出站端點。這就是request-response消息類型。
例如:
<model name="Request-Response_Message_Pattern"><service name="SynchronousService"><!-- 為了返回response將synchronous的值設置為“true”--><inbound><http:inbound-endpoint host="localhost" port="8080"path="/mule/services" synchronous="true"/></inbound><!-- 指定處理該請求的組件 --><component class="org.myorg.WidgetHandler"/></service> </model>同步
如果為了進一步處理消息,需要將消息傳遞給第二個服務,那么需要在第一個服務上配置一個出站路由器將該消息傳遞給第二個服務。在第二個服務處理完消息后,第一個服務將它作為回復發(fā)送給調(diào)用者。值得注意的是將第一個服務設置為同步入口端點就意味著之后的所有服務都會以同步的方式處理該消息,所以無需在第二個服務上設置synchronous屬性的值。這就是同步消息類型。
例如:
<model name="Synchronous_Message_Pattern"><service name="SynchronousService"><inbound><!-- 為了返回response將synchronous的值設置為“true” --><jms:inbound-endpoint queue="test.in" synchronous="true"/></inbound><component class="org.myorg.WidgetHandler"/><outbound><!-- 使用pass-through路由器時,如果想返回response必須將synchronous的值設置為“true”--><pass-through-router><!-- 設置出站端點 --><jms:outbound-endpoint queue="test.out" synchronous="true"/></pass-through-router></outbound></service><!-- 配置第二個服務,并將它的入站端點設置為上一個服務的出站端點的路徑。值得注意的是無需設置synchronous的值,因為在第一個服務中已經(jīng)將消息設置為synchronous了。--><service><inbound><jms:inbound-endpoint queue="test.out"/></inbound><component class="org.myorg.WidgetProcesser"/></service> </model>注意:在Mule的以往版本中,遠程服務需要設置remoteSync屬性的值為true用于配置同步處理。從Mule 2.2開始,remoteSync屬性已經(jīng)被刪除,只需要設置synchronous屬性的值為true就可以創(chuàng)建同步流。
異步Request-Response
在大多數(shù)復雜的場景中,可以使用request-response消息,并使用后端(back-end)流程調(diào)用其它的服務,并基于多個服務調(diào)用的結果異步地返回一個回復。你可以將入站端點的synchronous屬性設置為false,因為異步回復路由器會處理該回復,除非你想給調(diào)用者發(fā)送響應。這就是異步request-response消息類型。
在下面的例子中,HTTP端點接收一個請求,并使用Multicast路由器將該請求廣播到兩個端點,再將這些結果以異步的方式發(fā)送到一個JMS端點。
<model name="Async_Request-Response_Message_Pattern"><service name="AsyncRequestResponseService"><inbound><!-- 將synchronous設置為“false”,因為response將由異步回復路由器處理 --><http:inbound-endpoint host="localhost" port="8080"path="/mule/services" synchronoussynchronous="false"/></inbound><component class="org.myorg.WidgetHandler"/><!-- 配置異步回復的設置。這個例子使用了收集異步回復路由器,在發(fā)送回復信息之前,它將所有的響應信息收集在一起。 --><async-reply timeout="5000><collection-async-reply-router/><jms:inbound-endpoint queue="reply.queue"/></async-reply><!--設置負責接收和處理消息的端點以及回復消息的端點 --><outbound><multicasting-router><reply-to address="jms://reply.queue"/><jms:outbound-endpoint queue="service1" synchronous="false"/><jms:outbound-endpoint queue="service2" synchronous="false"/></multicasting-router></outbound></service> </model>關于消息類型的全部內(nèi)容可以參看Mule用戶指南中的Mule的消息類型。
現(xiàn)在我們已經(jīng)理解了在不同的場景中可以使用哪種消息類型路由消息,下面讓我們看看哪些路由器可以很好地控制消息路由。更多消息路由的信息可以參看Mule用戶指南中的使用消息路由器。
將消息傳遞到另一個端點
pass-through路由器是為簡化端點間的消息傳遞而設計的。比如,它對分發(fā)消息給一個隊列非常有用。
也可以使用pass-through路由器將協(xié)議橋接到其它的出站端點。例如:
<service name="HttpProxyService"><inbound><inbound-endpoint address="http://localhost:8888" synchronous="true"/></inbound><outbound><pass-through-router><outbound-endpointaddress="http://www.webservicex.net#[header:http.request]"synchronous="true"/></pass-through-router></outbound> </service>當使用pass-through路由器時,如果想返回一個響應,必須將出站端點的synchronous屬性設置為true。其它的路由器,比如 chaining路由器并不需將出站端點的synchronous屬性設置為true,該路由器總會在同步的場景中返回一個響應。因此,如果將消費發(fā)送給多個服務,可能會用chaining路由器代替pass-through路由器,因為chaining路由器中不需要將每個端點的synchronous 設置為true。
過濾消息
使用過濾器可以控制服務處理哪些消息。選擇性消費者路由器(Selective Consumer Router)用于入站端點,它可以控制服務處理哪些消息。過濾路由器(Filtering Router)用于出站端點,可以控制哪些消息發(fā)送到下一個服務上??梢越M合使用這些過濾器來控制消息流。
例如,如果只想處理不包含錯誤的消息,那么可以使用選擇性消費者以確保只處理結果代碼為success的消息。并使用Catch-all策略將其它的消息轉(zhuǎn)發(fā)到另外端點上作為錯誤處理:
<inbound><selective-consumer-router><mulexml:jxpath-filter expression="msg/header/resultcode = 'success'"/></selective-consumer-router><forwarding-catch-all-strategy><jms:endpoint topic="error.topic"/></forwarding-catch-all-strategy> </inbound>在服務處理消息時,如果想通過指定的標準決定將消息發(fā)送到哪個端點,那么可以在出站端點上使用過濾路由器。在下面的示例中,將包含異常信息的消息發(fā)送到系統(tǒng)管理員的email郵箱,將包含特定字符串的消息發(fā)送到名為string.queue的隊列,并使用forwarding catch-all路由器接收余下的所有消息,并將它們發(fā)送到名為error.queue的死信隊列:
<outbound><filtering-router><smtp:outbound-endpoint to="ross@muleumo.org"/><payload-type-filter expectedTypeexpectedType="java.lang.Exception"/></filtering-router><filtering-router><jms:outbound-endpoint to="string.queue"/><and-filter><payload-type-filter expectedType="java.lang.String"/><regex-filter pattern="the quick brown (.*)"/></and-filter></filtering-router><forwarding-catch-all-strategy><jms:outbound-endpoint queue="error.queue"/></forwarding-catch-all-strategy></outbound>與過濾路由器(filtering router)相似的路由器有轉(zhuǎn)發(fā)路由器(forwarding router),它可以處理一些消息并可以選擇性地將消息轉(zhuǎn)發(fā)到其它路由器,還有wiretap router,這種路由器可以處理所有的消息,并將它們發(fā)送到端點上,同時也將消息的副本發(fā)送到另外一個端點。更多信息可以參看Mule用戶指南中的入站路由器(Inbound Routers)。
將多個出站端點鏈接在一起
假設我們有一個驗證服務,當消息沒有通過驗證時,想將該消息以及驗證異常轉(zhuǎn)發(fā)到另一個服務,并將消息和驗證異常返回給調(diào)用者。那么可以使用鏈接路由器(chaining router),它是一個高速的、輕量級的可配置路由器,可用于將消息發(fā)送到端點,然后將該端點的輸出結果發(fā)送到另一個端點。例如:
<chaining-router><!-- 首先,將消息發(fā)送到這個端點,用于驗證。 --><vm:outbound-endpoint path="ValidationService" synchronous="true"/><!-- 接著將包含表達式的消息發(fā)送到這個端點上 --><vm:outbound-endpoint path="ValidationError" synchronous="true"><exception-type-filter expectedType="java.lang.Exception"/></vm:outbound-endpoint></chaining-router>消息分解
消息分解器(message splitter)可用于將輸出消息(outgoing message)分解成多個部分,再將他們分發(fā)到配置在路由器(router)上的不同端點。例如,在訂單處理應用中,如果想將經(jīng)消息分解后的不同部分分發(fā)給不同的服務去處理,那么可以使用下面的路由器:
列表消息分解器(List Message Splitter):接收一個對象列表,這些對象將被路由到不同的端點。例如:
<outbound><list-message-splitter-router"><!-- 將order路由到隊列order.queue --><jms:outbound-endpoint queue="order.queue"><payload-type-filter expectedType="com.foo.Order"/></jms:outbound-endpoint><!-- 將items路由到隊列item.queue --><jms:outbound-endpoint queue="item.queue"><payload-type-filter expectedType="com.foo.Item"/></jms:outbound-endpoint></list-message-splitter-router> </outbound>表達式分解路由器(Expression Splitter Router):它與列表消息分解器相似,只是它是基于表達式分解消息,將消息分解成一個或者多個部分。例如:
<outbound><expression-splitter-routerevaluator="xpath"expression="/mule:mule/mule:model/mule:service"disableRoundRobin="true"failIfNoMatch="false"><outbound-endpoint ref="service1"><expression-filterevaluator="xpath"expression="/mule:service/@name = 'service splitter'"/></outbound-endpoint><outbound-endpoint ref="service2"><expression-filterevaluator="xpath"expression="/mule:service/@name = 'round robin deterministic'"/></outbound-endpoint></expression-splitter-router> </outbound>關于如何配置表達式分解路由器的更多信息,可以參看Mule的用戶手冊中的表達式配置參考(Expressions Configuration Reference)。
為了提高性能也可以將消息分解成多個部分。輪叫(Round Robin)消息分解器將消息分解成多個部分,并以輪叫(round-robin)的方式將它們發(fā)送到端點。Message Chunking Router將消息按固定長度分解成多個部分,并將它們路由到同一個端點。
消息分解之后,可以使用Message Chunking Aggregator重新將消息塊聚合在一起。該聚合器(aggregator)通過關聯(lián)ID(correlation ID)來識別哪些消息塊屬于同一個消息,關聯(lián)ID(correlation ID)在出站路由器(outbound router)上設置。
<inbound><message-chunking-aggregator-router><expression-message-info-mappingcorrelationIdExpression="#[header:correlation]"/><payload-type-filter expectedType="org.foo.some.Object"/></message-chunking-aggregator-router> </inbound>處理消息僅有一次
冪等接收器(Idempotent Receiver)通過核對輸入消息的唯一消息ID來保證只有擁有唯一ID的消息才能被服務所接收。消息ID可以通過使用一個表達式從消息中產(chǎn)生,該表達式在 idExpression屬性中定義。#[message:id]是默認的表達式,也就是說如果要實現(xiàn)該功能,端點必須支持唯一性消息ID。在下面的例子中,唯一性ID是由消息ID和消息標頭中標簽的內(nèi)容組合而成。所有的消息ID都被記錄到一個簡單的文本文件中,用于追蹤哪些消息已經(jīng)處理過。
<inbound><idempotent-receiver-router idExpression="#[message:id]-#[header:label]"><simple-text-file-store directory="./idempotent"/></idempotent-receiver-router> </inbound>通過組件綁定調(diào)用外部服務
除了使用消息路由器控制服務間的消息流之外,也可以通過組件綁定(Component Bindings)調(diào)用處理消息的外部服務(External Service)。
在這個方法中,可以將Mule的端點綁定到Java接口方法。該方法的優(yōu)勢在于,在組件仍在處理消息時,你可以使用外部服務,而無需使用Mule的API 或者修改組件的代碼。相反,只需要在XML配置文件中配置組件綁定,從而指定外部服務的端點。例如,在下面的綁定例子中,當sayHello方法被調(diào)用時,HelloInterface中的sayHello方法會調(diào)用外部的HelloWeb服務。
<component class="org.mule.examples.bindings.InvokerComponent"><binding interface="org.mule.examples.bindings.HelloInterface"method="sayHello"><cxf:outbound-endpointaddress="http://myhost.com:81/services/HelloWeb?method=helloMethod"synchronous="true"/></binding> </component>更多信息,可以參看Mule用戶指南中的組件綁定(Component Bindings)。
總結
Mule為控制應用中的消息如何交換提供了多種方法。本文總體上介紹了Mule的消息路由,以及它所支持的消息類型,一些常用的路由器和組件綁定。關于消息路由的完整信息,可以參看Mule 的用戶手冊。
查看英文原文:Routing Messages in Mule
譯者簡介:
陳義,計算機應用技術專業(yè)碩士研究生,一直專注于SOA、BPM、ESB、EAI和MOM的研究及應用。熱衷于開源SOA項目,有志致力于 Mule、ServiceMix、ODE、ActiveBPEL、ActiveMQ、OpenJMS、Camel、CXF、XFire以及Tuscany 在中文社區(qū)的研究和推廣工作。您可以通過honnom (at) 163.com聯(lián)系到他。
總結
- 上一篇: 一个基于Mule的企业服务总线的案例(关
- 下一篇: Mule ESB 学习笔记