学习Apache Camel –实时索引推文
在大多數(shù)軟件開發(fā)項(xiàng)目中,有一點(diǎn)需要使應(yīng)用程序開始與其他應(yīng)用程序或第三方組件通信。
無(wú)論是發(fā)送電子郵件通知,調(diào)用外部api,寫入文件還是將數(shù)據(jù)從一個(gè)地方遷移到另一個(gè)地方,您都可以推出自己的解決方案或利用現(xiàn)有框架。
對(duì)于Java生態(tài)系統(tǒng)中的現(xiàn)有框架,我們可以發(fā)現(xiàn)Tibco BusinessWorks和Mule ESB ,另一方面是Spring Integration和Apache Camel 。
在本教程中,我將通過一個(gè)示例應(yīng)用程序向您介紹Apache Camel ,該示例應(yīng)用程序從Twitter的示例提要中讀取推文,并使用Elastic Search實(shí)時(shí)索引這些推文。
什么是Apache Camel?
將應(yīng)用程序與生態(tài)系統(tǒng)中的內(nèi)部或外部組件集成是軟件開發(fā)中最復(fù)雜的任務(wù)之一,如果操作不正確,則可能導(dǎo)致混亂不堪,并且長(zhǎng)期維護(hù)很麻煩。
幸運(yùn)的是,Camel是Apache托管的開放源代碼集成框架,它基于企業(yè)集成模式 ,這些模式可以幫助編寫更具可讀性和可維護(hù)性的代碼。 與Lego相似,這些模式可以用作構(gòu)建可靠軟件設(shè)計(jì)的基礎(chǔ)。
Apache Camel還支持各種各樣的連接器,以將您的應(yīng)用程序與不同的框架和技術(shù)集成在一起。 順便說一下,它也可以與Spring很好地配合使用。
如果您不熟悉Spring,那么您可能會(huì)發(fā)現(xiàn)這篇文章很有幫助: 使用Spring Boot處理Twitter feed 。
在以下各節(jié)中,我們將介紹一個(gè)示例應(yīng)用程序,其中Camel與Twitter示例提要和ElasticSearch集成在一起。
什么是ElasticSearch?
類似于Apache Solr的 ElasticSearch是一個(gè)高度可擴(kuò)展的開源,基于Java的全文本搜索引擎,構(gòu)建在Apache Lucene之上。
在此示例應(yīng)用程序中,我們將使用ElasticSearch實(shí)時(shí)索引推文,并在這些推文上提供全文本搜索功能。
其他使用的技術(shù)
除了Apache Camel和ElasticSearch,我還在此應(yīng)用程序中包括其他框架: Gradle作為構(gòu)建工具, Spring Boot作為Web應(yīng)用程序框架,以及Twitter4j,用于從Twitter示例提要中讀取推文。
入門
該項(xiàng)目的框架是在http://start.spring.io生成的,在那里我檢查了Web依賴項(xiàng)選項(xiàng),填寫了“項(xiàng)目元數(shù)據(jù)”部分,然后選擇“ Gradle Project”作為項(xiàng)目類型。
生成項(xiàng)目后,您可以下載并將其導(dǎo)入您喜歡的IDE。 我現(xiàn)在不打算在Gradle上做更多的細(xì)節(jié),但是這是build.gradle文件中所有依賴項(xiàng)的列表:
def camelVersion = '2.15.2' dependencies {compile("org.springframework.boot:spring-boot-starter-web")compile("org.apache.camel:camel-core:${camelVersion}")compile("org.apache.camel:camel-spring-boot:${camelVersion}")compile("org.apache.camel:camel-twitter:${camelVersion}")compile("org.apache.camel:camel-elasticsearch:${camelVersion}")compile("org.apache.camel:camel-jackson:${camelVersion}")compile("joda-time:joda-time:2.8.2")testCompile("org.springframework.boot:spring-boot-starter-test") }使用駱駝路線進(jìn)行整合
駱駝實(shí)現(xiàn)了面向消息的體系結(jié)構(gòu),它的主要構(gòu)建模塊是描述消息流的路由 。
可以使用XML(舊方式)或其Java DSL(新方式)描述路由。 我們將在本文中僅討論Java DSL,因?yàn)檫@是首選且更優(yōu)雅的選擇。
好吧,讓我們看一個(gè)簡(jiǎn)單的Route:
from("file://orders").convertBodyTo(String.class).to("log:com.mycompany.order?level=DEBUG").to("jms:topic:OrdersTopic");這里有幾件事要注意:
- 消息在由URI表示并使用URI配置的端點(diǎn)之間流動(dòng)
- 路由只能有一個(gè)消息生產(chǎn)者端點(diǎn)(在本例中為“ file:// orders”,它從orders文件夾中讀取文件)和多個(gè)消息消費(fèi)者端點(diǎn):
- “ log:com.mycompany.order?level = DEBUG”,它將文件的內(nèi)容記錄在com.mycompany.order日志記錄類別下的調(diào)試消息中,
- 在端點(diǎn)之間可以更改消息,即:convertBodyTo(String.class)將消息正文轉(zhuǎn)換為String。
另請(qǐng)注意,相同的URI可以在一個(gè)路由中用于消費(fèi)者端點(diǎn),而在另一路由中用于生產(chǎn)者端點(diǎn):
from("file://orders").convertBodyTo(String.class).to("direct:orders");from("direct:orders).to("log:com.mycompany.order?level=DEBUG").to("jms:topic:OrdersTopic");Direct端點(diǎn)是通用端點(diǎn)之一,它允許將消息從一條路由同步傳遞到另一條路由。
這有助于創(chuàng)建可讀代碼并在代碼的多個(gè)位置重用路由。
索引推文
現(xiàn)在,讓我們看一下代碼中的一些路由。 讓我們從簡(jiǎn)單的事情開始:
private String ES_TWEET_INDEXER_ENDPOINT = "direct:tweet-indexer-ES";...from("twitter://streaming/sample?type=EVENT&consumerKey={{twitter4j.oauth.consumerKey}}&consumerSecret={{twitter4j.oauth.consumerSecret}}?cessToken={{twitter4j.oauth.accessToken}}?cessTokenSecret={{twitter4j.oauth.accessTokenSecret}}").to(ES_TWEET_INDEXER_ENDPOINT);這是如此簡(jiǎn)單,對(duì)吧? 到現(xiàn)在為止,您可能已經(jīng)知道該路由從Twitter示例提要中讀取了推文,并將它們傳遞到“ direct:tweet-indexer-ES”端點(diǎn)。 請(qǐng)注意,consumerKey,consumerSecret等已配置并作為系統(tǒng)屬性傳遞(請(qǐng)參見http://twitter4j.org/en/configuration.html )。
現(xiàn)在,讓我們看一下一個(gè)稍微復(fù)雜的Route,該路由從“ direct:tweet-indexer-ES”端點(diǎn)讀取,并將Tweets批量插入到Elasticsearch中(有關(guān)每個(gè)步驟的詳細(xì)說明,請(qǐng)參見注釋):
@Value("${elasticsearch.tweet.uri}")private String elasticsearchTweetUri;...from(ES_TWEET_INDEXER_ENDPOINT)// groups tweets into separate indexes on a weekly basis to make it easier clean up old tweets:.process(new WeeklyIndexNameHeaderUpdater(ES_TWEET_INDEX_TYPE))// converts Twitter4j Tweet object into an elasticsearch document represented by a Map:.process(new ElasticSearchTweetConverter())// collects tweets into weekly batches based on index name:.aggregate(header("indexName"), new ListAggregationStrategy())// creates new batches every 2 seconds.completionInterval(2000)// makes sure the last batch will be processed before the application shuts down:.forceCompletionOnStop()// inserts a batch of tweets to elasticsearch: .to(elasticsearchTweetUri).log("Uploaded documents to ElasticSearch index ${headers.indexName}: ${body.size()}");關(guān)于此路線的注意事項(xiàng):
- elasticsearchTweetUri是一個(gè)字段,其值由Spring從application.properties文件(elasticsearch.tweet.uri = elasticsearch:// tweet-indexer?operation = BULK_INDEX&ip = 127.0.0.1&port = 9300)中獲取并注入到該字段中
- 為了在Route中實(shí)現(xiàn)自定義處理邏輯,我們可以創(chuàng)建實(shí)現(xiàn)Processor接口的類。 參見WeeklyIndexNameHeaderUpdater和ElasticSearchTweetConverter
- 使用自定義ListAggregationStrategy策略聚合推文,該策略將消息聚合到ArrayList中,稍后每2秒(或在應(yīng)用程序停止時(shí))傳遞給下一個(gè)終結(jié)點(diǎn)
- Camel實(shí)現(xiàn)了一種表達(dá)語(yǔ)言 ,我們正在使用它來(lái)記錄批處理的大小(“ $ {body.size()}”)和插入消息的索引的名稱($ {headers.indexName})。
在Elasticsearch中搜索推文
現(xiàn)在我們已經(jīng)在Elasticsearch中索引了推文,是時(shí)候?qū)ζ溥M(jìn)行一些搜索了。
首先,讓我們看一下接收搜索查詢的Route和限制搜索結(jié)果數(shù)量的maxSize參數(shù):
public static final String TWEET_SEARCH_URI = "vm:tweetSearch";...from(TWEET_SEARCH_URI).setHeader("CamelFileName", simple("tweet-${body}-${header.maxSize}-${date:now:yyyyMMddHHmmss}.txt"))// calls the search() method of the esTweetService which returns an iterator// to process search result - better than keeping the whole resultset in memory:.split(method(esTweetService, "search"))// converts Elasticsearch doucment to Map object:.process(new ElasticSearchSearchHitConverter())// serializes the Map object to JSON:.marshal(new JacksonDataFormat())// appends new line at the end of every tweet.setBody(simple("${body}\n"))// write search results as json into a file under /tmp folder:.to("file:/tmp?fileExist=Append").end().log("Wrote search results to /tmp/${headers.CamelFileName}");當(dāng)消息傳遞到“ vm:tweetSearch”端點(diǎn)(該端點(diǎn)使用內(nèi)存隊(duì)列異步處理消息)時(shí),將觸發(fā)此路由。
SearchController類實(shí)現(xiàn)了REST api,允許用戶通過使用Camel的ProducerTemplate類將消息發(fā)送到“ vm:tweetSearch”端點(diǎn)來(lái)運(yùn)行tweet搜索:
@Autowiredprivate ProducerTemplate producerTemplate;@RequestMapping(value = "/tweet/search", method = { RequestMethod.GET, RequestMethod.POST },produces = MediaType.TEXT_PLAIN_VALUE)@ResponseBodypublic String tweetSearch(@RequestParam("q") String query,@RequestParam(value = "max") int maxSize) {LOG.info("Tweet search request received with query: {} and max: {}", query, maxSize);Map<String, Object> headers = new HashMap<String, Object>();// "content" is the field in the Elasticsearch index that we'll be querying:headers.put("queryField", "content");headers.put("maxSize", maxSize);producerTemplate.asyncRequestBodyAndHeaders(CamelRouter.TWEET_SEARCH_URI, query, headers);return "Request is queued";}這將觸發(fā)Elasticsearch的執(zhí)行,但是結(jié)果不會(huì)在響應(yīng)中返回,而是寫入/ tmp文件夾中的文件(如前所述)。
此路由使用ElasticSearchService類在ElasticSearch中搜索推文。 當(dāng)執(zhí)行此Route時(shí),Camel調(diào)用search()方法并傳遞搜索查詢和maxSize作為輸入?yún)?shù):
public SearchHitIterator search(@Body String query, @Header(value = "queryField") String queryField, @Header(value = "maxSize") int maxSize) {boolean scroll = maxSize > batchSize;LOG.info("Executing {} on index type: '{}' with query: '{}' and max: {}", scroll ? "scan & scroll" : "search", indexType, query, maxSize);QueryBuilder qb = termQuery(queryField, query);long startTime = System.currentTimeMillis();SearchResponse response = scroll ? prepareSearchForScroll(maxSize, qb) : prepareSearchForRegular(maxSize, qb);return new SearchHitIterator(client, response, scroll, maxSize, KEEP_ALIVE_MILLIS, startTime);}請(qǐng)注意,根據(jù)maxSize和batchSize,代碼將執(zhí)行常規(guī)搜索以返回單頁(yè)結(jié)果,或者執(zhí)行滾動(dòng)請(qǐng)求以使我們能夠檢索大量結(jié)果。 在滾動(dòng)的情況下, SearchHitIterator將隨后對(duì)Elasticsearch進(jìn)行調(diào)用,以分批檢索結(jié)果。
安裝ElasticSearch
cluster.name:tweet-indexer
這些步驟將允許您以最少的配置運(yùn)行獨(dú)立的Elasticsearch實(shí)例,但請(qǐng)記住,它們并非供生產(chǎn)使用。
運(yùn)行
這是應(yīng)用程序的入口點(diǎn),可以從命令行運(yùn)行。
package com.kaviddiss.twittercamel;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);} }要運(yùn)行該應(yīng)用程序,請(qǐng)從您最喜歡的IDE運(yùn)行Application.main()方法,或者從命令行執(zhí)行以下代碼:
$GRADLE_HOME/bin/gradlew build && java -jar build/libs/twitter-camel-ingester-0.0.1-SNAPSHOT.jar一旦應(yīng)用程序啟動(dòng),它將自動(dòng)開始索引推文。 轉(zhuǎn)到http:// localhost:9200 / _plugin / bigdesk /#cluster可視化索引:
要搜索推文,請(qǐng)?jiān)跒g覽器中輸入與此類似的URL: http:// localhost:8080 / tweet / search?q = toronto&max = 100 。
使用BigDesk插件,我們可以監(jiān)視Elasticsearch如何索引推文:
結(jié)論
在Apache Camel的簡(jiǎn)介中,我們介紹了如何使用此集成框架與Twitter提要feed和Elasticsearch之類的外部組件進(jìn)行通信,以實(shí)時(shí)索引和搜索推文。
- 示例應(yīng)用程序的源代碼可從https://github.com/davidkiss/twitter-camel-ingester獲得 。
翻譯自: https://www.javacodegeeks.com/2015/09/learn-apache-camel-indexing-tweets-in-real-time.html
總結(jié)
以上是生活随笔為你收集整理的学习Apache Camel –实时索引推文的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 02096699是什么银行?
- 下一篇: 62303611开头是什么银行?