日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

学习Apache Camel –实时索引推文

發(fā)布時(shí)間:2023/12/3 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 学习Apache Camel –实时索引推文 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

在大多數(shù)軟件開發(fā)項(xiàng)目中,有一點(diǎn)需要使應(yīng)用程序開始與其他應(yīng)用程序或第三方組件通信。

無(wú)論是發(fā)送電子郵件通知,調(diào)用外部api,寫入文件還是將數(shù)據(jù)從一個(gè)地方遷移到另一個(gè)地方,您都可以推出自己的解決方案或利用現(xiàn)有框架。

對(duì)于Java生態(tài)系統(tǒng)中的現(xiàn)有框架,我們可以發(fā)現(xiàn)Tibco BusinessWorks和Mule ESB ,另一方面是Spring Integration和Apache Camel 。

在本教程中,我將通過一個(gè)示例應(yīng)用程序向您介紹Apache Camel ,該示例應(yīng)用程序從Twitter的示例提要中讀取推文,并使用Elastic Search實(shí)時(shí)索引這些推文。

什么是Apache Camel?

將應(yīng)用程序與生態(tài)系統(tǒng)中的內(nèi)部或外部組件集成是軟件開發(fā)中最復(fù)雜的任務(wù)之一,如果操作不正確,則可能導(dǎo)致混亂不堪,并且長(zhǎng)期維護(hù)很麻煩。

幸運(yùn)的是,Camel是Apache托管的開放源代碼集成框架,它基于企業(yè)集成模式 ,這些模式可以幫助編寫更具可讀性和可維護(hù)性的代碼。 與Lego相似,這些模式可以用作構(gòu)建可靠軟件設(shè)計(jì)的基礎(chǔ)。

Apache Camel還支持各種各樣的連接器,以將您的應(yīng)用程序與不同的框架和技術(shù)集成在一起。 順便說一下,它也可以與Spring很好地配合使用。

如果您不熟悉Spring,那么您可能會(huì)發(fā)現(xiàn)這篇文章很有幫助: 使用Spring Boot處理Twitter feed 。

在以下各節(jié)中,我們將介紹一個(gè)示例應(yīng)用程序,其中Camel與Twitter示例提要和ElasticSearch集成在一起。

什么是ElasticSearch?

類似于Apache Solr的 ElasticSearch是一個(gè)高度可擴(kuò)展的開源,基于Java的全文本搜索引擎,構(gòu)建在Apache Lucene之上。

在此示例應(yīng)用程序中,我們將使用ElasticSearch實(shí)時(shí)索引推文,并在這些推文上提供全文本搜索功能。

其他使用的技術(shù)

除了Apache Camel和ElasticSearch,我還在此應(yīng)用程序中包括其他框架: Gradle作為構(gòu)建工具, Spring Boot作為Web應(yīng)用程序框架,以及Twitter4j,用于從Twitter示例提要中讀取推文。

入門

該項(xiàng)目的框架是在http://start.spring.io生成的,在那里我檢查了Web依賴項(xiàng)選項(xiàng),填寫了“項(xiàng)目元數(shù)據(jù)”部分,然后選擇“ Gradle Project”作為項(xiàng)目類型。

生成項(xiàng)目后,您可以下載并將其導(dǎo)入您喜歡的IDE。 我現(xiàn)在不打算在Gradle上做更多的細(xì)節(jié),但是這是build.gradle文件中所有依賴項(xiàng)的列表:

def camelVersion = '2.15.2' dependencies {compile("org.springframework.boot:spring-boot-starter-web")compile("org.apache.camel:camel-core:${camelVersion}")compile("org.apache.camel:camel-spring-boot:${camelVersion}")compile("org.apache.camel:camel-twitter:${camelVersion}")compile("org.apache.camel:camel-elasticsearch:${camelVersion}")compile("org.apache.camel:camel-jackson:${camelVersion}")compile("joda-time:joda-time:2.8.2")testCompile("org.springframework.boot:spring-boot-starter-test") }

使用駱駝路線進(jìn)行整合

駱駝實(shí)現(xiàn)了面向消息的體系結(jié)構(gòu),它的主要構(gòu)建模塊是描述消息流的路由

可以使用XML(舊方式)或其Java DSL(新方式)描述路由。 我們將在本文中僅討論Java DSL,因?yàn)檫@是首選且更優(yōu)雅的選擇。

好吧,讓我們看一個(gè)簡(jiǎn)單的Route:

from("file://orders").convertBodyTo(String.class).to("log:com.mycompany.order?level=DEBUG").to("jms:topic:OrdersTopic");

這里有幾件事要注意:

  • 消息在由URI表示并使用URI配置的端點(diǎn)之間流動(dòng)
  • 路由只能有一個(gè)消息生產(chǎn)者端點(diǎn)(在本例中為“ file:// orders”,它從orders文件夾中讀取文件)和多個(gè)消息消費(fèi)者端點(diǎn):
    • “ log:com.mycompany.order?level = DEBUG”,它將文件的內(nèi)容記錄在com.mycompany.order日志記錄類別下的調(diào)試消息中,
  • 在端點(diǎn)之間可以更改消息,即:convertBodyTo(String.class)將消息正文轉(zhuǎn)換為String。

另請(qǐng)注意,相同的URI可以在一個(gè)路由中用于消費(fèi)者端點(diǎn),而在另一路由中用于生產(chǎn)者端點(diǎn):

from("file://orders").convertBodyTo(String.class).to("direct:orders");from("direct:orders).to("log:com.mycompany.order?level=DEBUG").to("jms:topic:OrdersTopic");

Direct端點(diǎn)是通用端點(diǎn)之一,它允許將消息從一條路由同步傳遞到另一條路由。

這有助于創(chuàng)建可讀代碼并在代碼的多個(gè)位置重用路由。

索引推文

現(xiàn)在,讓我們看一下代碼中的一些路由。 讓我們從簡(jiǎn)單的事情開始:

private String ES_TWEET_INDEXER_ENDPOINT = "direct:tweet-indexer-ES";...from("twitter://streaming/sample?type=EVENT&consumerKey={{twitter4j.oauth.consumerKey}}&consumerSecret={{twitter4j.oauth.consumerSecret}}?cessToken={{twitter4j.oauth.accessToken}}?cessTokenSecret={{twitter4j.oauth.accessTokenSecret}}").to(ES_TWEET_INDEXER_ENDPOINT);

這是如此簡(jiǎn)單,對(duì)吧? 到現(xiàn)在為止,您可能已經(jīng)知道該路由從Twitter示例提要中讀取了推文,并將它們傳遞到“ direct:tweet-indexer-ES”端點(diǎn)。 請(qǐng)注意,consumerKey,consumerSecret等已配置并作為系統(tǒng)屬性傳遞(請(qǐng)參見http://twitter4j.org/en/configuration.html )。

現(xiàn)在,讓我們看一下一個(gè)稍微復(fù)雜的Route,該路由從“ direct:tweet-indexer-ES”端點(diǎn)讀取,并將Tweets批量插入到Elasticsearch中(有關(guān)每個(gè)步驟的詳細(xì)說明,請(qǐng)參見注釋):

@Value("${elasticsearch.tweet.uri}")private String elasticsearchTweetUri;...from(ES_TWEET_INDEXER_ENDPOINT)// groups tweets into separate indexes on a weekly basis to make it easier clean up old tweets:.process(new WeeklyIndexNameHeaderUpdater(ES_TWEET_INDEX_TYPE))// converts Twitter4j Tweet object into an elasticsearch document represented by a Map:.process(new ElasticSearchTweetConverter())// collects tweets into weekly batches based on index name:.aggregate(header("indexName"), new ListAggregationStrategy())// creates new batches every 2 seconds.completionInterval(2000)// makes sure the last batch will be processed before the application shuts down:.forceCompletionOnStop()// inserts a batch of tweets to elasticsearch: .to(elasticsearchTweetUri).log("Uploaded documents to ElasticSearch index ${headers.indexName}: ${body.size()}");

關(guān)于此路線的注意事項(xiàng):

  • elasticsearchTweetUri是一個(gè)字段,其值由Spring從application.properties文件(elasticsearch.tweet.uri = elasticsearch:// tweet-indexer?operation = BULK_INDEX&ip = 127.0.0.1&port = 9300)中獲取并注入到該字段中
  • 為了在Route中實(shí)現(xiàn)自定義處理邏輯,我們可以創(chuàng)建實(shí)現(xiàn)Processor接口的類。 參見WeeklyIndexNameHeaderUpdater和ElasticSearchTweetConverter
  • 使用自定義ListAggregationStrategy策略聚合推文,該策略將消息聚合到ArrayList中,稍后每2秒(或在應(yīng)用程序停止時(shí))傳遞給下一個(gè)終結(jié)點(diǎn)
  • Camel實(shí)現(xiàn)了一種表達(dá)語(yǔ)言 ,我們正在使用它來(lái)記錄批處理的大小(“ $ {body.size()}”)和插入消息的索引的名稱($ {headers.indexName})。

在Elasticsearch中搜索推文

現(xiàn)在我們已經(jīng)在Elasticsearch中索引了推文,是時(shí)候?qū)ζ溥M(jìn)行一些搜索了。

首先,讓我們看一下接收搜索查詢的Route和限制搜索結(jié)果數(shù)量的maxSize參數(shù):

public static final String TWEET_SEARCH_URI = "vm:tweetSearch";...from(TWEET_SEARCH_URI).setHeader("CamelFileName", simple("tweet-${body}-${header.maxSize}-${date:now:yyyyMMddHHmmss}.txt"))// calls the search() method of the esTweetService which returns an iterator// to process search result - better than keeping the whole resultset in memory:.split(method(esTweetService, "search"))// converts Elasticsearch doucment to Map object:.process(new ElasticSearchSearchHitConverter())// serializes the Map object to JSON:.marshal(new JacksonDataFormat())// appends new line at the end of every tweet.setBody(simple("${body}\n"))// write search results as json into a file under /tmp folder:.to("file:/tmp?fileExist=Append").end().log("Wrote search results to /tmp/${headers.CamelFileName}");

當(dāng)消息傳遞到“ vm:tweetSearch”端點(diǎn)(該端點(diǎn)使用內(nèi)存隊(duì)列異步處理消息)時(shí),將觸發(fā)此路由。

SearchController類實(shí)現(xiàn)了REST api,允許用戶通過使用Camel的ProducerTemplate類將消息發(fā)送到“ vm:tweetSearch”端點(diǎn)來(lái)運(yùn)行tweet搜索:

@Autowiredprivate ProducerTemplate producerTemplate;@RequestMapping(value = "/tweet/search", method = { RequestMethod.GET, RequestMethod.POST },produces = MediaType.TEXT_PLAIN_VALUE)@ResponseBodypublic String tweetSearch(@RequestParam("q") String query,@RequestParam(value = "max") int maxSize) {LOG.info("Tweet search request received with query: {} and max: {}", query, maxSize);Map<String, Object> headers = new HashMap<String, Object>();// "content" is the field in the Elasticsearch index that we'll be querying:headers.put("queryField", "content");headers.put("maxSize", maxSize);producerTemplate.asyncRequestBodyAndHeaders(CamelRouter.TWEET_SEARCH_URI, query, headers);return "Request is queued";}

這將觸發(fā)Elasticsearch的執(zhí)行,但是結(jié)果不會(huì)在響應(yīng)中返回,而是寫入/ tmp文件夾中的文件(如前所述)。

此路由使用ElasticSearchService類在ElasticSearch中搜索推文。 當(dāng)執(zhí)行此Route時(shí),Camel調(diào)用search()方法并傳遞搜索查詢和maxSize作為輸入?yún)?shù):

public SearchHitIterator search(@Body String query, @Header(value = "queryField") String queryField, @Header(value = "maxSize") int maxSize) {boolean scroll = maxSize > batchSize;LOG.info("Executing {} on index type: '{}' with query: '{}' and max: {}", scroll ? "scan & scroll" : "search", indexType, query, maxSize);QueryBuilder qb = termQuery(queryField, query);long startTime = System.currentTimeMillis();SearchResponse response = scroll ? prepareSearchForScroll(maxSize, qb) : prepareSearchForRegular(maxSize, qb);return new SearchHitIterator(client, response, scroll, maxSize, KEEP_ALIVE_MILLIS, startTime);}

請(qǐng)注意,根據(jù)maxSize和batchSize,代碼將執(zhí)行常規(guī)搜索以返回單頁(yè)結(jié)果,或者執(zhí)行滾動(dòng)請(qǐng)求以使我們能夠檢索大量結(jié)果。 在滾動(dòng)的情況下, SearchHitIterator將隨后對(duì)Elasticsearch進(jìn)行調(diào)用,以分批檢索結(jié)果。

安裝ElasticSearch

  • 從https://www.elastic.co/downloads/elasticsearch下載Elasticsearch。
  • 將其安裝到本地文件夾($ ES_HOME)
  • 編輯$ ES_HOME / config / elasticsearch.yml并添加以下行:
    cluster.name:tweet-indexer
  • 安裝BigDesk插件以監(jiān)視Elasticsearch:$ ES_HOME / bin / plugin -install lukas-vlcek / bigdesk
  • 運(yùn)行Elasticsearch:$ ES_HOME / bin / elasticsearch.sh或$ ES_HOME / bin / elasticsearch.bat
  • 這些步驟將允許您以最少的配置運(yùn)行獨(dú)立的Elasticsearch實(shí)例,但請(qǐng)記住,它們并非供生產(chǎn)使用。

    運(yùn)行

    這是應(yīng)用程序的入口點(diǎn),可以從命令行運(yùn)行。

    package com.kaviddiss.twittercamel;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);} }

    要運(yùn)行該應(yīng)用程序,請(qǐng)從您最喜歡的IDE運(yùn)行Application.main()方法,或者從命令行執(zhí)行以下代碼:

    $GRADLE_HOME/bin/gradlew build && java -jar build/libs/twitter-camel-ingester-0.0.1-SNAPSHOT.jar

    一旦應(yīng)用程序啟動(dòng),它將自動(dòng)開始索引推文。 轉(zhuǎn)到http:// localhost:9200 / _plugin / bigdesk /#cluster可視化索引:

    要搜索推文,請(qǐng)?jiān)跒g覽器中輸入與此類似的URL: http:// localhost:8080 / tweet / search?q = toronto&max = 100 。

    使用BigDesk插件,我們可以監(jiān)視Elasticsearch如何索引推文:

    結(jié)論

    在Apache Camel的簡(jiǎn)介中,我們介紹了如何使用此集成框架與Twitter提要feed和Elasticsearch之類的外部組件進(jìn)行通信,以實(shí)時(shí)索引和搜索推文。

    • 示例應(yīng)用程序的源代碼可從https://github.com/davidkiss/twitter-camel-ingester獲得 。

    翻譯自: https://www.javacodegeeks.com/2015/09/learn-apache-camel-indexing-tweets-in-real-time.html

    總結(jié)

    以上是生活随笔為你收集整理的学习Apache Camel –实时索引推文的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。