camel mq_Camel:构建基于消息的应用程序
camel mq
這是一篇長文章,包含三個單獨的主題:- Java的Apache Camel入門
- 使用CamelRunner改善路線的啟動
- 使用Camel構建基于消息的應用程序
但是,由于我準備了包含所有這些材料的camel-demo-1.0.0-SNAPSHOT-project.zip ,因此我認為將它們組合并整體呈現會更容易。
Java的Apache Camel入門
用很少的Groovy行嘗試Camel是一回事,但是用Java進行全面的項目則是另一回事。 今天,我將向您展示如何通過基于Maven的項目在Apache Camel上開始工作。 您還可以使用提供的camel-demo作為項目模板來啟動您自己的Apache Camel項目。 您只需要重命名Java包,并重命名pom的組和工件ID即可滿足您的需要。
準備具有Camel依賴關系的基于Maven的項目
解壓縮camel-demo項目源代碼,您將看到基本的目錄布局。
camel-demo+- bin+- config+- data+- src+- pom.xml+- README.txt使此演示成為基于Camel的項目的原因只是pom.xml的聲明。 讓我們看一下文件及其依賴項。
<?xml version='1.0' encoding='UTF-8'?> <project xmlns='http://maven.apache.org/POM/4.0.0' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xsi:schemaLocation='http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd'><modelVersion>4.0.0</modelVersion><groupId>deng.cameldemo</groupId><artifactId>camel-demo</artifactId><version>1.0.0-SNAPSHOT</version><packaging>jar</packaging><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><slf4j.version>1.6.6</slf4j.version><camel.version>2.10.1</camel.version></properties><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.6</source><target>1.6</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><version>2.3</version><configuration><descriptorRefs><descriptorRef>project</descriptorRef><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build><dependencies><!-- Unit testing lib --><dependency><groupId>junit</groupId><artifactId>junit-dep</artifactId><version>4.10</version><scope>test</scope></dependency><dependency><groupId>org.hamcrest</groupId><artifactId>hamcrest-library</artifactId><version>1.2.1</version><scope>test</scope></dependency><!-- Logging lib --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version><scope>runtime</scope><optional>true</optional></dependency><!-- Apache Commons lib --><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.0.1</version></dependency><!-- Apache Camel --><dependency><groupId>org.apache.camel</groupId><artifactId>camel-core</artifactId><version>${camel.version}</version></dependency><dependency><groupId>org.apache.camel</groupId><artifactId>camel-spring</artifactId><version>${camel.version}</version></dependency><dependency><groupId>org.apache.camel</groupId><artifactId>camel-groovy</artifactId><version>${camel.version}</version></dependency><dependency><groupId>org.apache.camel</groupId><artifactId>camel-jackson</artifactId><version>${camel.version}</version></dependency><dependency><groupId>org.apache.camel</groupId><artifactId>camel-mina</artifactId><version>${camel.version}</version></dependency></dependencies></project>此pom.xml對基于Java的應用程序進行貼花處理,它將生成jar 。 它需要最少的JDK 6或更高版本。 除了用于單元測試的典型junit和hamcrest之外,我還添加了slf4j進行日志記錄。 我也將Apache的commons-lang/io夫婦添加到了項目中。 我認為這些是任何基于Java的應用程序都應使用的基本設置。
我聲明的maven-assembly-plugin僅用于此演示打包目的,您可以更改或刪除以適合您自己的項目需求。
對于駱駝依賴,您將需要最少的camel-core來構建路線。 然后,您可以添加計劃在項目中使用的任何其他組件。 我添加了以下內容以構建典型的基于消息的應用程序開發:
請注意,由于我們使用了多個駱駝組件依賴關系,因此我選擇設置Maven屬性${camel.version}以便在升級Camel時,將pom.xml文件維護在一個位置更容易。
您應該能夠進入項目目錄并運行mvn compile以驗證該項目。 它應該編譯沒有錯誤。
使用CamelRunner改善路線的啟動
準備好項目pom.xml文件后,就可以開始創建駱駝路線來處理您自己的業務邏輯。 在我們太興奮之前,讓我們嘗試一個簡單的HelloRoute ,看看它如何工作以及如何首先運行它。 這是src/main/java/deng/cameldemo/HelloRoute.java的路由定義代碼。
package deng.cameldemo;import org.apache.camel.builder.RouteBuilder;public class HelloRoute extends RouteBuilder {@Overridepublic void configure() throws Exception {from('timer://helloTimer?period=3000').to('log:' + getClass().getName());} }
體驗駱駝之旅
要查看上面的內容,我們需要將其添加到CamelContext并啟動上下文。 對于Java獨立程序,我們將在Main類中編寫此安裝代碼。 Camel實際上帶有org.apache.camel.main.MainSupport抽象類,您可以用來擴展自己的Main 。 但是,我認為如果Camel提供一個可以像這樣運行的CamelRunner會更好。
$ java CamelRunner deng.cameldemo.HelloRoute這樣的CamelRunner將非常易于使用并且可重復使用,所以我就是這樣做的。 我這樣寫:
package deng.cameldemo;import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.support.FileSystemXmlApplicationContext;/** * A main program to start Camel and run as a server using RouteBuilder class names or * Spring config files.* * <p>Usage:* * java deng.cameldemo.CamelRunner deng.cameldemo.HelloRoute* * or* * java -Dspring=true deng.cameldemo.CamelRunner /path/to/camel-spring.xml* * @author Zemian Deng*/ public class CamelRunner {public static void main(String[] args) throws Exception {CamelRunner runner = new CamelRunner();runner.run(args);}private static Logger logger = LoggerFactory.getLogger(CamelRunner.class);public void run(String[] args) throws Exception {if (Boolean.parseBoolean(System.getProperty('spring', 'false')))runWithSpringConfig(args);elserunWithCamelRoutes(args);// Wait for user to hit CRTL+C to stop the servicesynchronized(this) {this.wait();}}private void runWithSpringConfig(String[] args) {final ConfigurableApplicationContext springContext = new FileSystemXmlApplicationContext(args);// Register proper shutdown.Runtime.getRuntime().addShutdownHook(new Thread() { @Overridepublic void run() {try {springContext.close();logger.info('Spring stopped.');} catch (Exception e) {logger.error('Failed to stop Spring.', e);}}});// Start springlogger.info('Spring started.');}private void runWithCamelRoutes(String[] args) throws Exception {final CamelContext camelContext = new DefaultCamelContext(); // Register proper shutdown.Runtime.getRuntime().addShutdownHook(new Thread() { @Overridepublic void run() {try {camelContext.stop();logger.info('Camel stopped for {}', camelContext);} catch (Exception e) {logger.error('Failed to stop Camel.', e);}}});// Added RouteBuilder from argsfor (String className : args) {Class<?> cls = Class.forName(className);if (RouteBuilder.class.isAssignableFrom(cls)) {Object obj = cls.newInstance();RouteBuilder routeBuilder = (RouteBuilder)obj;camelContext.addRoutes(routeBuilder);} else {throw new RuntimeException('Unable to add Camel RouteBuilder ' + className);}}// Start camelcamelContext.start();logger.info('Camel started for {}', camelContext);} }為了幫助您運行主類,我在項目的bin目錄下提供了一個run-java包裝程序腳本,以便您無需設置類路徑即可快速對其進行測試。
$ mvn package $ bin/run-java deng.cameldemo.CamelRunner deng.cameldemo.HelloRoute您將看到該程序將在DefaultCamelContext加載HelloRoute并將其作為服務器啟動。 HelloRoute本身將生成3秒鐘的計時器消息,并將其發送到記錄器,該記錄器應打印在控制臺屏幕上。 這將一直持續下去,直到您按CTRL+C結束它為止。
注意:您只需調用一次mvn package命令,這樣它將打包所有依賴項jar,以便run-java自動檢測到它們。 如果在package階段不打算使用maven-assembly-plugin ,那么使用mvn dependency:copy-dependencies命令顯式也可以正常工作。
進行Camel測試,第2部分:使用Spring xml配置運行Camel
上面的HelloRoute示例將僅提供通過使用組件URI形成的路由定義。 如果我們可以以聲明的方式配置路由,這樣我們就可以更改路由而無需重新編譯類文件,那就太好了。 這將非常方便,特別是如果您不熟悉每個組件的選項并且想探索并嘗試的話。 嗯,這就是camel-spring用途。 除了為您提供在xml配置文件中加載路由的選項之外,它還提供了一種非常靈活的方式來在Spring IoC容器中注冊自定義服務/處理器Bean。
如果您是一位敏銳的讀者,您會在上面的CamelRunner代碼中注意到它還有一個額外的runWithSpringConfig部分。 因此, CamelRunner實際上可以引導任何Spring xml文件并作為服務器啟動上下文。 您可以這樣使用它:
$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/hellocamel-spring.xmlconfig/hellocamel-spring.xml等效于我們的HelloRoute代碼,但形式為Spring xml:
<beans xmlns='http://www.springframework.org/schema/beans'xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xsi:schemaLocation='http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'><camelContext id='helloCamel' xmlns='http://camel.apache.org/schema/spring'><route><from uri='timer://jdkTimer?period=3000'/><to uri='log://deng.cameldemo.HelloCamel'/></route></camelContext></beans> 這樣就無需編譯/重新編譯HelloRoute來定義要運行的Camel路由。
使用Camel構建基于消息的應用程序
為了向您展示更實際的演示,我將進一步向您展示如何設置Camel來處理基于消息的應用程序。 在許多IT商店中,通常都有一臺服務器將消息數據作為輸入并進行處理。 一個實際的用例是獲取任何JSON格式的消息并將其轉換為對象并進行處理。 要在Camel中做到這一點,您想要構建的是一條路由,該路由將從TCP端口獲取輸入消息,然后使用可能具有的任何業務邏輯在管道流中對其進行處理。 您將把路由作為服務器運行,然后客戶端可以使用任何方式將消息提交到TCP端口。 客戶端甚至可能是另一個瘦的Camel客戶端應用程序,也可以提交數據。 讓我告訴您如何開始。
用駱駝路線寫服務器端代碼
服務器端需要一條路由來偵聽TCP端口,而這是由camel-mina組件提供的。 第一步是您需要一條路線。
package deng.cameldemo;import org.apache.camel.builder.RouteBuilder;public class TcpMsgRoute extends RouteBuilder {@Overridepublic void configure() throws Exception {String port = System.getProperty('port', '12345');from('mina:tcp://localhost:' + port + '?sync=false').to('log:' + getClass().getName());} }然后,下一步完成了! 沒辦法,您的意思是服務器就這些了嗎? 難以置信? 好吧,讓我們嘗試一下
$ bin/run-java deng.cameldemo.CamelRunner deng.cameldemo.TcpMsgRoute -Dport=12345 15:21:41 main INFO org.apache.camel.impl.DefaultCamelContext:1391 | Apache Camel 2.10.1 (CamelContext: camel-1) is starting 15:21:41 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled. 15:21:42 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters 15:21:42 main INFO org.apache.camel.component.mina.MinaConsumer:59 | Binding to server address: localhost/127.0.0.1:12345 using acceptor: org.apache.mina.transport.socket.nio.SocketAcceptor@2ffad8fe 15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:2045 | Route: route1 started and consuming from: Endpoint[mina://tcp://localhost:12345?sync=true] 15:21:42 main INFO org.apache.camel.management.DefaultManagementLifecycleStrategy:859 | StatisticsLevel at All so enabling load performance statistics 15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:1426 | Total 1 routes, of which 1 is started. 15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:1427 | Apache Camel 2.10.1 (CamelContext: camel-1) started in 0.505 seconds 15:21:42 main INFO deng.cameldemo.CamelRunner:93 | Camel started for CamelContext(camel-1) 瞧! 服務器已啟動,正在等待用戶通過端口12345發送消息。 幾行代碼還不錯。
用Camel ProducerTemplate編寫客戶端代碼
由于我們的服務器公開了TCP端口并接收任何文本內容消息,因此您可以創建任何能夠寫入TCP套接字的客戶端。 在這里,我將向您展示如何使用Camel編寫瘦客戶端。
package deng.cameldemo.client;import java.io.FileReader; import org.apache.camel.CamelContext; import org.apache.camel.ProducerTemplate; import org.apache.camel.impl.DefaultCamelContext; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class TcpMsgSender {public static void main(String[] args) throws Exception {TcpMsgSender runner = new TcpMsgSender();runner.run(args);}private static Logger logger = LoggerFactory.getLogger(TcpMsgSender.class);public void run(String[] args) throws Exception {String fileName = args.length > 0 ? args[0] : 'data/msg.txt';String[] hostPort = (args.length > 1 ? args[1] : 'localhost:12345').split(':');String host = hostPort[0];String port = hostPort.length > 1 ? hostPort[1] : '12345';logger.info('Sending tcp message {} to host={}, port={}', new Object[]{ fileName, host, port});String text = IOUtils.toString(new FileReader(fileName));logger.debug('File size={}', text.length());CamelContext camelContext = new DefaultCamelContext();ProducerTemplate producer = camelContext.createProducerTemplate();producer.sendBody('mina:tcp://' + host + ':' + port + '?sync=false', text);logger.info('Message sent.');} }該TcpMsgSender可以將任何文本文件發送到您的服務器端點。 在服務器運行時嘗試以下操作:
$ bin/run-java deng.cameldemo.client.TcpMsgSender data/test-msg.json localhost:12345 15:22:35 main INFO deng.cameldemo.client.TcpMsgSender:24 | Sending tcp message data/test-msg.json to host=localhost, port=12345 15:22:35 main DEBUG deng.cameldemo.client.TcpMsgSender:27 | File size=47 15:22:35 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters 15:22:35 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled. 15:22:35 main INFO deng.cameldemo.client.TcpMsgSender:32 | Message sent.您應該能夠從服務器控制臺輸出中驗證它是否收到了消息。 我發送的味精在data/test-msg.json ,其中包含以下簡單文本:
{ 'firstName' : 'Zemian', 'lastName' : 'Deng' } 請注意,我們的服務器僅接收純文本并將其記錄。 接下來,我們將討論如何處理消息。
使用Camel和Spring xml配置以JSON格式處理消息數據
您認為服務器代碼從上面很容易,請再猜一次。 實際上,您可以僅用一些簡單的xml行替換TcpMsgRoute !
<beans xmlns='http://www.springframework.org/schema/beans'xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xsi:schemaLocation='http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'><camelContext id='tcpMsgServer' xmlns='http://camel.apache.org/schema/spring'><route><from uri='mina:tcp://localhost:12345?sync=false'/><to uri='log://deng.cameldemo.TcpMsgServer'/></route></camelContext></beans>將其另存為config/tcpmsgserver-spring.xml 。 然后重新運行服務器,您應該獲得與上面相同的結果。
$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/tcpmsgserver-spring.xml現在讓我們改進上面的xml,以進一步處理JSON消息數據。 我們希望將純文本轉換為Java對象,然后由自定義bean處理。 為此,我們首先需要在路線中添加解組組件。 這就是camel-jackson發揮作用的地方。 在我們的演示中,解組步驟會將JSON文本轉換為java.util.Map ,然后將其傳遞給名為myMsgProcessor的處理器bean。 讓我們創建一個名為config/tcpmsgserver-json-spring.xml的新xml文件,如下所示。
<beans xmlns='http://www.springframework.org/schema/beans'xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xsi:schemaLocation='http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'><camelContext id='tcpMsgServer' xmlns='http://camel.apache.org/schema/spring'><route><from uri='mina:tcp://localhost:12345?sync=false'/><to uri='log://deng.cameldemo.TcpMsgServer'/><unmarshal><json library='Jackson'/></unmarshal><to uri='bean:myMsgProcessor?method=process'/></route></camelContext><bean id='myMsgProcessor' class='deng.cameldemo.MyMsgProcessor'></bean></beans>myMsgProcessor是一個Spring bean,我們提供了自定義邏輯代碼來處理數據。 至此,我們有一個完整的Java對象要操作。 處理器的內容可以是具有URI中指定的方法名稱的任何POJO。 這是一個示例:
package deng.cameldemo;import org.apache.camel.builder.RouteBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map;public class MyMsgProcessor {private static Logger logger = LoggerFactory.getLogger(MyMsgProcessor.class);public void process(Map<String, String> data) {logger.info('We should slice and dice the data: ' + data);} }嘗試使用上面的新xml文件重新運行服務器,您應該能夠重新調用相同的客戶端命令進行測試。 這是服務器的示例輸出:
$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/tcpmsgserver-json-spring.xml 17:05:25 main INFO org.springframework.context.support.FileSystemXmlApplicationContext:456 | Refreshing org.springframework.context.support.FileSystemXmlApplicationContext@4200309: startup date [Sat Sep 15 17:05:25 EDT 2012]; root of context hierarchy 17:05:25 main INFO org.springframework.beans.factory.xml.XmlBeanDefinitionReader:315 | Loading XML bean definitions from file [/Users/zemian/projects/sandbox/camel-demo/config/tcpmsgserver-json-spring.xml] 17:05:27 main INFO org.springframework.beans.factory.support.DefaultListableBeanFactory:557 | Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@27b75165: defining beans [template,consumerTemplate,tcpMsgServer:beanPostProcessor,tcpMsgServer,myMsgProcessor]; root of factory hierarchy 17:05:27 main INFO org.apache.camel.spring.SpringCamelContext:1391 | Apache Camel 2.10.1 (CamelContext: tcpMsgServer) is starting 17:05:27 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled. 17:05:27 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters 17:05:28 main INFO org.apache.camel.component.mina.MinaConsumer:59 | Binding to server address: localhost/127.0.0.1:12345 using acceptor: org.apache.mina.transport.socket.nio.SocketAcceptor@5a3cae4a 17:05:28 main INFO org.apache.camel.spring.SpringCamelContext:2045 | Route: route1 started and consuming from: Endpoint[mina://tcp://localhost:12345?sync=false] 17:05:28 main INFO org.apache.camel.management.DefaultManagementLifecycleStrategy:859 | StatisticsLevel at All so enabling load performance statistics 17:05:28 main INFO org.apache.camel.spring.SpringCamelContext:1426 | Total 1 routes, of which 1 is started. 17:05:28 main INFO org.apache.camel.spring.SpringCamelContext:1427 | Apache Camel 2.10.1 (CamelContext: tcpMsgServer) started in 0.695 seconds 17:05:28 main INFO deng.cameldemo.CamelRunner:61 | Spring started. 17:05:35 Camel (tcpMsgServer) thread #3 - MinaThreadPool INFO deng.cameldemo.TcpMsgServer:96 | Exchange[ExchangePattern:InOnly, BodyType:String, Body:{ 'firstName' : 'Zemian', 'lastName' : 'Deng' }] 17:05:35 Camel (tcpMsgServer) thread #3 - MinaThreadPool INFO deng.cameldemo.MyMsgProcessor:11 | We should slice and dice the data: {lastName=Deng, firstName=Zemian}請注意,駱駝會自動轉換您的路線中的數據格式! 我們的客戶端僅以JSON格式發送純文本,但是當服務器收到純文本時,它將使用Jackson庫將其解組,然后將其轉換為Java Map對象。 然后,它將map對象傳遞到我們的處理器bean中。 另外,在此演示中,我選擇使用通用的java.util.Map作為處理器方法參數(它是JSON unmarshal的輸出),但是您可以輕松定義自己的業務數據類型,例如MyCustomerData 。 這顯示了Camel的強大功能,因為您無需在流程中推送消息,而只需擔心將“處理器”編寫為POJO。 駱駝將組件“粘合”在一起以形成一條路線,并通過管道流攜帶消息數據。
同樣,當您在一個或多個處理器中編寫業務邏輯時,最好將POJO邏輯限制為盡可能小的單位。 當您這樣做時,則可以使處理器的可重用性最大化。 您制作的POJO較大,并且混合了許多業務邏輯,因此也很難進行測試。 因此,我建議您在開發這些處理器bean時,嘗試將它們視為樂高積木-小POJO。 您想讓駱駝定義路線并將LEGO塊粘合在一起。 一旦習慣了這種thicking的習慣,便可以更有效地使用Camel來解決您的許多域問題。
好吧,今天的人就這些了。 我希望您喜歡騎駱駝。
祝您編程愉快,別忘了分享!
參考:來自A程序員雜志博客的JCG合作伙伴 Zemian Deng 使用Camel構建基于消息的應用程序 。
翻譯自: https://www.javacodegeeks.com/2012/09/camel-build-message-based-application.html
camel mq
總結
以上是生活随笔為你收集整理的camel mq_Camel:构建基于消息的应用程序的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 渊字取名寓意 渊字取名寓意是什么
- 下一篇: exo文件_您在eXo平台上的第一个Ju