日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

通过Spring Integration消费Twitter Streaming API

發布時間:2023/12/3 javascript 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 通过Spring Integration消费Twitter Streaming API 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.概述

眾所周知, Spring Integration具有用于與外部系統交互的大量連接器。 Twitter也不例外,而且很長一段時間以來,因為Spring Social一直是一個開箱即用的解決方案,Spring Integration利用該解決方案來連接到社交網絡。

1.1Spring社交EOL

不幸的是, Spring Social已經到了使用壽命 ,該項目現在處于維護模式。 Spring團隊決定不進一步開發Spring Social的原因是,使API綁定與社交網絡的API保持同步變得很繁瑣。

除此之外,Spring Framework 5發布后,開發人員希望利用其響應式編程模型,這將要求團隊在現有的響應式社交綁定旁邊重新實現一個響應式Spring Social綁定。

現在建議開發人員實現自己的綁定或使用專用庫之一連接社交網絡。

1.2 Spring Integration的Twitter模塊已移至擴展

Spring Social現在處于維護模式,這迫使Spring Integration團隊將Twitter支持模塊從主項目移至擴展。 由于Spring Social不會接收更新,因此它將基于早期的Spring Framework版本構建。 這將導致類路徑沖突,也將阻礙Spring Integration的開發。

因此, 從Spring Integration 5.1開始,Twitter模塊可作為擴展使用 。

1.3有哪些替代方案?

Twitter4J是Yamas Yusuke開發和維護的Twitter API的非官方Java庫。 官方的HBC庫(由Twitter構建)是一個Java HTTP Client,用于使用Twitter的Streaming API。 自2016年以來,后者從未見過重大更新,而Twitter4J正在定期更新。

也可以選擇實現自己的API綁定。 在使用RestTemplate的基于Spring的項目中,絕對是一個選擇,并且這是進行REST調用的簡便方法。

本指南以流模式使用Twitter4J,可以將其集成到Spring Integration消息流中。

1.4 Twitter流如何工作?

簡而言之, 您的應用打開了一個與Twitter API的單一連接,只要發生新匹配,就會通過該連接發送新結果 。 相反,另一種方法是通過向REST API重復發送請求來批量傳送數據。

流提供了一種低延遲的傳遞機制 ,該機制可以支持非常高的吞吐量,而不必處理速率限制。

2.示例項目

該示例項目展示了Twitter的Streaming API到Spring Integration消息流的集成,可在GitHub找到 : https : //github.com/springuni/springuni-examples/tree/master/spring-integration/twitter-streaming 。

Maven依賴

由于Spring Social現在是EOL,因此我們不會在此基礎上繼續發展。 我們引入的只是spring-integration-core和twitter4j-stream 。

<dependencies><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-core</artifactId></dependency><dependency><groupId>org.twitter4j</groupId><artifactId>twitter4j-stream</artifactId><version>4.0.1</version></dependency></dependencies>

該項目還使用了Lombok和Spring Boot測試支持,但是這些是可選的。

Spring Integration的可聽消息源

Spring Integration提供了對實現入站消息組件的支持。 它們分為輪詢和監聽行為

最初依賴于Inbound Twitter Channel Adapter建立在Spring Social之上,現在已移至擴展, 它是輪詢用戶 。 也就是說,您必須提供一個輪詢器配置才能使用它。 另一方面,Twitter實施速率限制,以管理應用程序獲取更新的頻率。 使用舊的Twitter Channel適配器時,您應該考慮速率限制,以便您配置的輪詢間隔符合Twitter策略。

另一方面, 偵聽入站組件更簡單,通常只需要實現MessageProducerSupport 。 這樣的偵聽組件看起來像這樣。

public class MyMessageProducer extends MessageProducerSupport {public MyMessageProducer(MessageChannel outputChannel) {// Defining an output channel is requiredsetOutputChannel(outputChannel);}@Overrideprotected void onInit() {super.onInit();// Custom initialization - if applicable - comes here}@Overridepublic void doStart() {// Lifecycle method for starting receiving messages}@Overridepublic void doStop() {// Lifecycle method for stopping receiving messages}private void receiveMessage() {// Receive data from upstream serviceSomeData data = ...;// Convert it to a message as appropriate and send it outthis.sendMessage(MessageBuilder.withPayload(data).build());}}

只有兩個必需的元素:

  • 必須定義輸出消息通道
  • 每當組件收到消息時,都必須調用sendMessage

(可選)您可能希望控制組件的初始化并管理其生命周期。

由于Twitter的Streaming API本質上是消息驅動的,因此監聽行為自然很合適。 讓我們看看如何在這樣的上下文中合并Twitter4J。

使用Twitter4J連接到Twitter Streaming API

Twitter4J管理連接處理的細微差別,并從Twitter的Streaming API接收更新。 我們需要做的就是獲取一個TwitterStream實例,附加一個偵聽器并定義過濾。

實例化

Twitter4J網站上的流示例表明,應通過TwitterStreamFactory創建一個TwitterStream實例。 這完全有道理,但是在Spring應用程序上下文中,我們希望它成為托管bean。

Spring的FactoryBean工具是包含創建單例TwitterStream實例的詳細信息的簡單FactoryBean方法。

public class TwitterStreamFactory extends AbstractFactoryBean<TwitterStream> {@Overridepublic Class<?> getObjectType() {return TwitterStream.class;}@Overrideprotected TwitterStream createInstance() {return new twitter4j.TwitterStreamFactory().getInstance();}@Overrideprotected void destroyInstance(TwitterStream twitterStream) {twitterStream.shutdown();}}

盡管我們也可以將其公開為普通的bean,而不用由FactoryBean創建FactoryBean ,但這不會適當地將其關閉。

附加偵聽器并定義過濾

這將是我們自定義MessageProducer實現的責任。

@Slf4j public class TwitterMessageProducer extends MessageProducerSupport {private final TwitterStream twitterStream;private List<Long> follows;private List<String> terms;private StatusListener statusListener;private FilterQuery filterQuery;public TwitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {this.twitterStream = twitterStream;setOutputChannel(outputChannel);}@Overrideprotected void onInit() {super.onInit();statusListener = new StatusListener();long[] followsArray = null;if (!CollectionUtils.isEmpty(follows)) {followsArray = new long[follows.size()];for (int i = 0; i < follows.size(); i++) {followsArray[i] = follows.get(i);}}String[] termsArray = null;if (!CollectionUtils.isEmpty(terms)) {termsArray = terms.toArray(new String[0]);}filterQuery = new FilterQuery(0, followsArray, termsArray);}@Overridepublic void doStart() {twitterStream.addListener(statusListener);twitterStream.filter(filterQuery);}@Overridepublic void doStop() {twitterStream.cleanUp();twitterStream.clearListeners();}public void setFollows(List<Long> follows) {this.follows = follows;}public void setTerms(List<String> terms) {this.terms = terms;}StatusListener getStatusListener() {return statusListener;}FilterQuery getFilterQuery() {return filterQuery;}class StatusListener extends StatusAdapter {@Overridepublic void onStatus(Status status) {sendMessage(MessageBuilder.withPayload(status).build());}@Overridepublic void onException(Exception ex) {log.error(ex.getMessage(), ex);}@Overridepublic void onStallWarning(StallWarning warning) {log.warn(warning.toString());}} }

MessageProducerSupport和TwitterStream的管理界面提供的生命周期方法可以很好地配合使用。 這也將使我們能夠在需要時在運行時停止和啟動組件。

Java配置

盡管Spring可以自動裝配組件,但我還是更喜歡通過手動配置來控制依賴關系。

@Slf4j @Configuration public class TwitterConfig {@BeanTwitterStreamFactory twitterStreamFactory() {return new TwitterStreamFactory();}@BeanTwitterStream twitterStream(TwitterStreamFactory twitterStreamFactory) {return twitterStreamFactory.getInstance();}@BeanMessageChannel outputChannel() {return MessageChannels.direct().get();}@BeanTwitterMessageProducer twitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {TwitterMessageProducer twitterMessageProducer =new TwitterMessageProducer(twitterStream, outputChannel);twitterMessageProducer.setTerms(Arrays.asList("java", "microservices", "spring"));return twitterMessageProducer;}@BeanIntegrationFlow twitterFlow(MessageChannel outputChannel) {return IntegrationFlows.from(outputChannel).transform(Status::getText).handle(m -> log.info(m.getPayload().toString())).get();}}

這里的重要部分是我們的自定義消息生成器如何與消息流集成。 基本上,除了在生產者的輸出通道中列出消息之外,我們不需要執行任何其他操作。

測試中

只有Chuck Norris在生產中測試代碼。 但是,像您和我這樣的普通凡人,我們確實會編寫測試用例。

@RunWith(SpringRunner.class) @ContextConfiguration(classes = TestConfig.class) public class TwitterMessageProducerTest {@MockBeanprivate TwitterStream twitterStream;@Autowiredprivate PollableChannel outputChannel;@Autowiredprivate TwitterMessageProducer twitterMessageProducer;@Testpublic void shouldBeInitialized() {StatusListener statusListener = twitterMessageProducer.getStatusListener();verify(twitterStream).addListener(statusListener);FilterQuery filterQuery = twitterMessageProducer.getFilterQuery();verify(twitterStream).filter(filterQuery);}@Testpublic void shouldReceiveStatus() {StatusListener statusListener = twitterMessageProducer.getStatusListener();Status status = mock(Status.class);statusListener.onStatus(status);Message<?> statusMessage = outputChannel.receive();assertSame(status, statusMessage.getPayload());}@Import(TwitterConfig.class)static class TestConfig {@BeanMessageChannel outputChannel() {return MessageChannels.queue(1).get();}}}

我喜歡Twitter4J的設計,因為它利用了界面。 該庫的大多數重要部分都作為普通接口公開。 TwitterStream也不例外。 也就是說,在測試用例中可以輕松地將其嘲笑。

六,結論

  • Spring Social現在已經停產了 -它不會收到新功能
  • Spring Integration的Twitter模塊可作為擴展使用 -已從主項目中移出。
  • Twitter入站通道適配器是一個輪詢用戶 –選擇輪詢間隔時必須處理速率限制
  • Twitter的Streaming API符合入站通道適配器的監聽行為

翻譯自: https://www.javacodegeeks.com/2018/12/streaming-api-spring-integration.html

總結

以上是生活随笔為你收集整理的通过Spring Integration消费Twitter Streaming API的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。