Camel中的几个重要概念之 Components
http://jnn.iteye.com/blog/320094
?
Components
Component 是一個(gè)容易混淆的名詞,可能使用EndpointFactory會(huì)更合適,因?yàn)镃omponent是創(chuàng)建Endpoint實(shí)力的工廠類。例如如果一個(gè)Camel應(yīng)用使用了幾個(gè)JMS 隊(duì)列,那么這個(gè)應(yīng)用首先需要?jiǎng)?chuàng)建一個(gè)叫JmsComponent(實(shí)現(xiàn)了Component接口)的實(shí)例,然后應(yīng)用會(huì)調(diào)用這個(gè)JMSComponent對(duì)象的createEndpoint()方法來創(chuàng)建一個(gè)JmsEndpoint對(duì)象(這個(gè)對(duì)象實(shí)現(xiàn)了Endpoint接口)。事實(shí)上,應(yīng)用代碼并沒直接調(diào)用Component.createEndoint() 方法,而是CamelContext對(duì)象通過找到對(duì)應(yīng)的Component對(duì)象(我馬上會(huì)在后續(xù)的文章中介紹),并調(diào)用createEndpoint() 方法來實(shí)現(xiàn)的。
myCamelContext.getEndpoint("pop3://john.smith@mailserv.example.com?password=myPassword");
在getEndpoint()中使用的參數(shù)就是URI。這個(gè)URI的前綴(: 之前的那部分內(nèi)容)描述了一個(gè)組件的名字,CamelContext對(duì)象內(nèi)部維護(hù)著一個(gè)組件名字與Component對(duì)象的映射表。對(duì)于上面給定的URI例子來說,CamelContext對(duì)象會(huì)根據(jù)pop3前綴找到MailComponent類,然后CamelContext對(duì)會(huì)調(diào)用MailComponent的createEndpoint("pop3://john.smith@mailserv.example.com?password=myPassword") 方法。在createEndpoint()方法中, 將把URI分割成一段段小的參數(shù),這些小參數(shù)將被用來設(shè)置生成的Endpoint對(duì)象。
在上一段中, 我提到的CamelContext對(duì)象維護(hù)了一個(gè)組件名到Component對(duì)象的映射表。但這個(gè)映射表是如何產(chǎn)生的呢?這里可以在通過代碼調(diào)用CamelContext.addComponent(String componentName, Component component)來實(shí)現(xiàn)。 下面的例子就是展示了如何給一個(gè)MailComponent對(duì)象注冊(cè)上三個(gè)不同的名字。
Component mailComponent = new org.apache.camel.component.mail.MailComponent();
myCamelContext.addComponent("pop3", mailComponent);
myCamelContext.addComponent("imap", mailComponent);
myCamelContext.addComponent("smtp", mailComponent);
第二個(gè)方法也是最常用的方法,就是通過CamelContext對(duì)象來實(shí)現(xiàn)一個(gè)懶初始化。這個(gè)方法依賴于一套Camel內(nèi)部的定義Component發(fā)現(xiàn)規(guī)則, 開發(fā)者只要在實(shí)現(xiàn)Component接口的時(shí)候按照這一規(guī)則設(shè)置,就可以保證CamelContext能夠正常發(fā)現(xiàn)這一Component。這里我們假設(shè)你所寫的Class名字為 com.example.myproject.FooComponent, 并且你想讓Camel自動(dòng)將這個(gè)component和"foo”這個(gè)名字相對(duì)應(yīng)。為了做到這一點(diǎn),你需要先寫一個(gè)叫做"META-INF/services/org/apache/camel/component/foo" 屬性文件, 注意這個(gè)文件沒有".properties"作為后綴名,在這個(gè)屬性文件中只有一個(gè)class的條目,而這個(gè)條目的只就是你所寫的類的全名。如下所示
META-INF/services/org/apache/camel/component/foo
class=com.example.myproject.FooComponent
如果你還想讓Camel將上面的類和”bar” 這個(gè)名字聯(lián)系起來,那你需要在同樣的目錄下在創(chuàng)建一個(gè)相同內(nèi)容叫bar的文件。一旦完成了這些配置, 你可以把 com.example.myproject.FooComponent class和這些配置文件一同打成一個(gè)jar 包,然后把這個(gè)jar包放你的CLASSPATH中。這樣Camel就會(huì)通過分析這些屬性文件的class 項(xiàng)目,通過使用reflectionAPI創(chuàng)建這個(gè)指定的類的實(shí)例。
正如我在Endpoint中說描述的, Camel提供了對(duì)多種通信協(xié)議一個(gè)開箱即用的支持。這種支持是建立在實(shí)現(xiàn)了Component接口的類以及讓CamelContext對(duì)象自動(dòng)建立映射關(guān)系的配置文件基礎(chǔ)之上的。
在這一節(jié)的開始, 我使用的這個(gè)例子來調(diào)用CamelContext.getEndpoint()。
myCamelContext.getEndpoint("pop3://john.smith@mailserv.example.com?password=myPassword");
在最開始舉這個(gè)例子的時(shí)候,我說這個(gè)getEndpoint()方法的參數(shù)是一個(gè)URI。我這么說是因?yàn)镃amel的在線問答以及Camel的源代碼就把這個(gè)參數(shù)聲明為一個(gè)URI。在現(xiàn)實(shí)生活中,這個(gè)參數(shù)是按照URL來定義的。這是因?yàn)镃amel會(huì)從參數(shù)中通過一個(gè)簡(jiǎn)單的算法查找第一:來分析出組件名。為了了解其中的奧妙,大家可以回想一下我在前面 URL,URI,URN和IRI是什么中談到的 一個(gè)URI可以是URL或者URN。 現(xiàn)在讓我們來看一下下面的getEndpoint()調(diào)用。
myCamelContext.getEndpoint("pop3:...");
myCamelContext.getEndpoint("jms:...");
myCamelContext.getEndpoint("urn:foo:...");
myCamelContext.getEndpoint("urn:bar:...");
Camel會(huì)先找出這些component的標(biāo)識(shí),例如 "pop3", "jms", "urn" 和 "urn"。如果"urn:foo" 和"urn:bar" 能夠別用來識(shí)別component,或者是使用"foo" 和"bar" (這一可以跳過這個(gè)"urn:"前綴)。所以在實(shí)際的編程中,大家更喜歡使用URL來制定一個(gè)Endpoint(使用":..."來描述的字符串)而不是用一個(gè)URN( 使用"urn::..."來描述的字符串)。正因?yàn)槲覀儧]有安全按照URN的規(guī)定的參數(shù)來調(diào)用getEndpoint() 方法, 所以這個(gè)方法的參數(shù)更像一個(gè)URL而不是一個(gè)URI。
?
===========================
1.component.createEndpoint()
2.if it's in from(...) clause, then call endpoint.createConsumer()
? 2.1 consumer will call doStart(),
? 2.2 create message and exchange, set exchange.in,then call processor to process this exchange.
??you can create several exchange, each exchange mean one process.
?3.if it's in to(...), then call endpoint.createProducer()
? 3.1producer call process(), then you can process this exchange, set this exchange.out,?then it will be processed
?
Component
public class MemoryComponent extends DefaultComponent{
?@Override
?//memory:myMemory?param=value
?//remaining = myMemory
?protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
??System.out.println("createEndpoint(String uri, String remaining, Map<String, Object> parameters)");
??MemoryEndpoint endpoint = new MemoryEndpoint(uri, this);
??return endpoint;
?}
}
Endpoint
public class MemoryEndpoint extends DefaultEndpoint{
?public MemoryEndpoint() {
??super();
??System.out.println("MemoryEndpoint()");
?}
?public MemoryEndpoint(String endpointUri, Component component) {
??super(endpointUri, component);
??System.out.println("MemoryEndpoint(String endpointUri, Component component)");
?}
?@Override
?public Consumer createConsumer(Processor processor) throws Exception {
??System.out.println("createConsumer(Processor processor)");
??Consumer memConsumer = new MemoryConsumer(this,processor);
??return memConsumer;
?}
?@Override
?public Producer createProducer() throws Exception {
??System.out.println("createProducer()");
??Producer memProducer = new MemoryProducer(this);
??return memProducer;
?}
?@Override
?public boolean isSingleton() {
??System.out.println("isSingleton()");
??return false;
?}
}
Producer
public class MemoryProducer extends DefaultProducer{
?public MemoryProducer(Endpoint endpoint) {
??super(endpoint);
??System.out.println("MemoryProducer(Endpoint endpoint)");
?}
?@Override
?public void process(Exchange exchange) throws Exception {
??System.out.println("process(Exchange exchange)");
??Object body = exchange.getIn().getBody();
??System.out.println("add to queue:" + (String)body);
??exchange.getOut().setBody("producer-" + (String)body);
??StaticQueue.producerQueue.add("producer-" + (String)body);
?}
}
?
Consumer
public class MemoryConsumer extends DefaultConsumer {
?public MemoryConsumer(Endpoint endpoint, Processor processor) {
??super(endpoint, processor);
??System.out.println("MemoryConsumer(Endpoint endpoint, Processor processor)");
?}
?@Override
?protected void doStart() throws Exception {
??super.doStart();
??
??System.out.println("doStart()");
??for (String str : StaticQueue.consumerQueue) {
???Exchange ex = this.getEndpoint().createExchange();
???Message msg = new DefaultMessage();
???msg.setBody(str);
???ex.setIn(msg);
???getProcessor().process(ex);
??}
?}
?@Override
?protected void doStop() throws Exception {
??super.doStop();
??System.out.println("doStop()");
??
?}
}
?
Processor
public class MemoryProcessor implements Processor {
?@Override
?public void process(Exchange exchange) throws Exception {
??System.out.println("MemoryProcessor-process(Exchange exchange)");
??Message in = exchange.getIn();
??System.out.println("in:" +in.getBody());
??Message out = exchange.getOut();
??System.out.println("out:" +out.getBody());
??
??//self defined processor need to transmit the msg from in to out,?
//to("memory:mymemory").process(processor)
??out.setBody(in.getBody() + " out processed");
??
??//in.setBody(in.getBody() + " in processed");
??
?}
}
?
?
Test
public class Test {
?/**
? * @param args
? * @throws Exception
? */
?public static void main(String[] args) throws Exception {
??if ( System.getProperty("log4j.configuration") != null )
???PropertyConfigurator.configure(System.getProperty("log4j.configuration"));
??else
???BasicConfigurator.configure();
??
??// TODO Auto-generated method stub
??Component memoryCom = new MemoryComponent();
??CamelContext context = new DefaultCamelContext();
??context.addComponent("memory", memoryCom);
??
??
??Component streamCom = new StreamComponent();
??context.addComponent("in", streamCom);
??context.addComponent("out", streamCom);
??
??context.addRoutes(new MemoryRouteBuilder());
??context.start();
??
??Runtime.getRuntime().addShutdownHook(new Thread(){
???@Override
???public void run() {
????System.out.println("queues:");
????for (String str : StaticQueue.producerQueue) {
?????System.out.println("queue:"+str);
????}
???}
???
??});
?}
}
class MemoryRouteBuilder extends RouteBuilder {
?/** * A main() so we can easily run these routing rules in our IDE */
?
?/** * Lets configure the Camel routing rules using Java code... */
?public void configure() {
??//producer
//??from("timer://myTimer?period=2000").setBody().simple("myTimertest1").
//??to("memory:mymemory");
??
??//consumer
??MemoryProcessor processor = new MemoryProcessor();
??//from("memory:mymemory").to("file://C://test");
??//from("memory:mymemory").to("memory:mymemory").to("file://C://test");
??from("memory:mymemory").to("memory:mymemory").process(processor).to("file://test");
????//to("log:out");
??//??to("stream:out");
??
??//from("stream:in?promptMessage=Enter&promptDelay=5000").to("stream:out");
?}
}
class StaticQueue? {
?public static Queue<String> consumerQueue = new LinkedBlockingQueue<String>();
?public static Queue<String> producerQueue = new LinkedBlockingQueue<String>();
?static {
??consumerQueue.add("consumer-1");
??consumerQueue.add("consumer-2");
?}
?
}
?
總結(jié)
以上是生活随笔為你收集整理的Camel中的几个重要概念之 Components的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Maven2 的常用命令
- 下一篇: Camel 组件之 Timer