javascript
错误处理在Spring Integration中如何工作
1.引言
這篇文章的目標是向您展示將消息傳遞系統與Spring Integration結合使用時如何處理錯誤。 您將看到同步和異步消息傳遞之間的錯誤處理有所不同。 和往常一樣,我將跳過聊天并繼續進行一些示例。
- 您可以在github上獲取源代碼。
2,樣品申請
我將使用一個基本示例,因為我想專注于異常處理。 該應用程序包含一個訂單服務,該服務接收訂單,處理訂單并返回確認。
下面我們可以看到消息傳遞系統的配置方式:
int-config.xml
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.OrderService"/><int:channel id="requestChannel"/><int:router input-channel="requestChannel" ref="orderRouter" method="redirectOrder"/><int:channel id="syncChannel"/><int:channel id="asyncChannel"><int:queue capacity="5"/> </int:channel><int:service-activator method="processOrder" input-channel="syncChannel" ref="orderProcessor"/><int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"><int:poller fixed-delay="2000"/> </int:service-activator>網關是消息傳遞系統的入口點。 它將接收訂單并將其發送到直接通道“ requestChannel”,路由器將根據訂單ID將其重定向到適當的通道:
- syncChannel:一個直接通道 ,它將訂單發送到訂閱該通道的訂單處理器。
- asyncChannel:一個隊列通道 ,訂單處理器將從中主動檢索訂單。
處理訂單后,訂單確認將發送回網關。 這是代表此的圖形:
好的,讓我們從最簡單的情況開始,使用直接通道進行同步發送。
3.與直接通道同步發送
訂單處理器已訂閱“ syncChannel”直接渠道。 “ processOrder”方法將在發送者的線程中調用。
public OrderConfirmation processOrder(Order order) {logger.info("Processing order {}", order.getId());if (isInvalidOrder(order)) {logger.info("Error while processing order [{}]", ERROR_INVALID_ID);throw new InvalidOrderException(ERROR_INVALID_ID);}return new OrderConfirmation("confirmed"); }現在,我們將執行一個測試,該測試將通過發送無效訂單來引發異常。 此測試將向網關發送訂單:
public interface OrderService {@Gatewaypublic OrderConfirmation sendOrder(Order order); }考試:
TestSyncErrorHandling.java
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"}) @RunWith(SpringJUnit4ClassRunner.class) public class TestSyncErrorHandling {@Autowiredprivate OrderService service;@Testpublic void testCorrectOrder() {OrderConfirmation confirmation = service.sendOrder(new Order(3, "a correct order"));Assert.assertNotNull(confirmation);Assert.assertEquals("confirmed", confirmation.getId());}@Testpublic void testSyncErrorHandling() {OrderConfirmation confirmation = null;try {confirmation = service.sendOrder(new Order(1, "an invalid order"));Assert.fail("Should throw a MessageHandlingException");} catch (MessageHandlingException e) {Assert.assertEquals(InvalidOrderException.class, e.getCause().getClass());Assert.assertNull(confirmation);}} }我們運行測試,看看在訂單處理器中如何引發異常并到達測試。 沒關系; 我們想驗證發送無效訂單是否引發了異常。 發生這種情況是因為測試發送了訂單,并阻止等待在同一線程中處理訂單。 但是,當我們使用異步通道時會發生什么? 讓我們繼續下一節。
4,與隊列通道異步發送
此部分的測試發送一個命令,該命令將由路由器重定向到隊列通道。 網關如下所示:
public interface OrderService {@Gatewaypublic Future<OrderConfirmation> sendFutureOrder(Order order); }請注意,這次網關正在返回Future 。 如果我們不返回此值,則網關將阻止測試線程。 通過返回Future,網關將變為異步狀態,并且不會阻塞發送方的線程。
考試:
TestKoAsyncErrorHandling.java
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"}) @RunWith(SpringJUnit4ClassRunner.class) public class TestKoAsyncErrorHandling {@Autowiredprivate OrderService service;@Test(expected=MessageHandlingException.class)public void testAsyncErrorHandling() throws InterruptedException, ExecutionException {Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order"));} }好的,現在我們將啟動測試并看到引發異常的信息…
java.lang.AssertionError: Expected exception: org.springframework.integration.MessageHandlingException糟糕,測試失敗,因為沒有異常到達測試! 發生了什么? 好吧,解釋如下:
<int:channel id="asyncChannel"><int:queue capacity="5"/> </int:channel><int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"><int:poller fixed-delay="2000"/> </int:service-activator>由于我們使用的是異步通道(隊列),因此發送者發送訂單并繼續前進。 然后,接收方從另一個線程輪詢訂單。 因此,不可能將Exception拋回到發送方。 讓我們表現得好像什么都沒發生嗎? 好吧,您最好不要,還有其他選擇。
5,異步錯誤處理
當使用異步消息傳遞時,Spring Integration通過將異常發布到消息通道來處理它們。 引發的異常將包裝到MessagingException中,并成為消息的有效負載。
錯誤消息發送到哪個通道? 首先,它將檢查請求消息是否包含名為“ errorChannel”的標頭。 如果找到,錯誤消息將被發送到那里。 否則,該消息將被發送到所謂的全局錯誤通道。
5.1全局錯誤通道
默認情況下,Spring Integration創建一個名為“ errorChannel”的全局錯誤通道。 該頻道是發布-訂閱頻道。 這意味著我們可以為該頻道訂閱多個端點。 實際上,已經有一個端點訂閱了它:一個日志記錄處理程序 。該處理程序將記錄到達通道的消息的有效負載,盡管可以將其配置為不同的行為。
現在,我們將向該全局通道訂閱一個新的處理程序,并通過將其存儲到數據庫中來測試它是否接收到異常消息。
首先,我們需要在配置中進行一些更改。 我創建了一個新文件,因此它不會干擾我們之前的測試:
int-async-config.xml
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="asyncChannel" service-interface="xpadro.spring.integration.service.OrderService" error-channel="errorChannel"/><int:channel id="asyncChannel"><int:queue capacity="5"/> </int:channel><int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"><int:poller fixed-delay="2000"/> </int:service-activator><int:service-activator input-channel="errorChannel" ref="orderErrorHandler" method="handleFailedOrder"/><bean id="orderErrorHandler" class="xpadro.spring.integration.activator.OrderErrorHandler"/>網關 :我添加了一個錯誤通道。 如果調用失敗,錯誤消息將發送到該通道。 如果我沒有定義錯誤通道,則網關會將異常傳播給調用者,但是在這種情況下,由于這是一個異步網關,因此無法正常工作。
錯誤處理程序 :我已經定義了一個新的端點,該端點已訂閱了全局錯誤通道。 現在,任何發送到全局錯誤通道的錯誤消息都將傳遞給我們的處理程序。
我還添加了一個配置文件以配置數據庫。 我們的錯誤處理程序會將收到的錯誤插入此數據庫:
db-config.xml
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"><constructor-arg ref="dataSource"/> </bean><!-- in-memory database --> <jdbc:embedded-database id="dataSource"><jdbc:script location="classpath:db/schemas/schema.sql" /> </jdbc:embedded-database>錯誤處理程序非常簡單。 它收到錯誤消息,并將其信息插入數據庫:
public class OrderErrorHandler {@Autowiredprivate JdbcTemplate jdbcTemplate;@ServiceActivatorpublic void handleFailedOrder(Message<MessageHandlingException> message) {Order requestedOrder = (Order) message.getPayload().getFailedMessage().getPayload();saveToBD(requestedOrder.getId(), message.getPayload().getMessage());}private void saveToBD(int orderId, String errorMessage) {String query = "insert into errors(orderid, message) values (?,?)";jdbcTemplate.update(query, orderId, errorMessage);} }好的,現在一切就緒。 讓我們實施一個新的測試:
TestOkAsyncErrorHandlingTest.java
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-async-config.xml","/xpadro/spring/integration/config/db-config.xml"}) @RunWith(SpringJUnit4ClassRunner.class) public class TestOkAsyncErrorHandling {@Autowiredprivate JdbcTemplate jdbcTemplate;@Autowiredprivate OrderService service;@Beforepublic void prepareTest() {jdbcTemplate.update("delete from errors");}@Testpublic void testCorrectOrder() throws InterruptedException, ExecutionException {Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(7, "another correct order"));OrderConfirmation orderConfirmation = confirmation.get();Assert.assertNotNull(orderConfirmation);Assert.assertEquals("confirmed", orderConfirmation.getId());}@Testpublic void testAsyncErrorHandling() throws InterruptedException, ExecutionException {Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order"));Thread.sleep(2000);Assert.assertEquals(1, getSavedErrors());validateSavedError(6);}private int getSavedErrors() {return jdbcTemplate.queryForObject("select count(*) from errors", Integer.class);}private void validateSavedError(int orderId) {String query = "select * from errors where orderid=?";Map<String, Object> result = jdbcTemplate.queryForMap(query, orderId);Assert.assertEquals(6, result.get("orderid"));assertThat((String)result.get("message"), containsString("Order ID is invalid"));} }這次測試成功,錯誤消息已存儲到數據庫。
5.2其他機制
自定義錯誤通道 :您可以定義錯誤通道并將其定義為隊列通道,而不是默認的發布-訂閱通道:
<int:poller id="defaultPoller" default="true" fixed-delay="5000" /><int:channel id="errorChannel"><int:queue capacity="10"/> </int:channel>ErrorMessageExceptionTypeRouter :這個Spring Integration專用路由器將解析將錯誤消息發送到的通道。 它基于錯誤的最具體原因做出決定:
<int:exception-type-router input-channel="errorChannel" default-output-channel="genericErrorChannel"><int:mapping exception-type="xpadro.spring.integration.exception.InvalidOrderException" channel="invalidChannel" /><int:mapping exception-type="xpadro.spring.integration.exception.FooException" channel="fooChannel" /> </int:exception-type-router>六,結論
我們已經了解了使用Spring Integration時錯誤處理的不同機制是什么。 有了這個基礎,您將能夠通過實現轉換器從錯誤消息中提取信息,使用標頭擴展器設置錯誤通道或實現自己的路由器等來擴展它并配置錯誤處理。
翻譯自: https://www.javacodegeeks.com/2014/02/how-error-handling-works-in-spring-integration.html
總結
以上是生活随笔為你收集整理的错误处理在Spring Integration中如何工作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java 8 Friday Goodie
- 下一篇: 使用Spring跟踪应用程序异常