日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

meetup_使用RxNetty访问Meetup的流API

發布時間:2023/12/3 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 meetup_使用RxNetty访问Meetup的流API 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

meetup

本文將涉及多個主題:響應式編程,HTTP,解析JSON以及與社交API集成。 完全在一個用例中:我們將通過非夸張的RxNetty庫實時加載和處理新的metup.com事件,結合Netty框架的強大功能和RxJava庫的靈活性。 Meetup提供了公開可用的流API ,可實時推送世界各地注冊的每個Meetup。 只需瀏覽至stream.meetup.com/2/open_events并觀察JSON塊如何緩慢地出現在屏幕上。 每當有人創建新事件時,自包含的JSON就會從服務器推送到您的瀏覽器。 這意味著這樣的請求永無止境,相反,只要需要,我們就會不斷接收部分數據。 我們已經在將Twitter4J變成RxJava的Observable中研究了類似的情況。 每個新的Meetup事件都會發布一個獨立的JSON文檔,與此類似(省略許多細節):

{ "id" : "219088449","name" : "Silver Wings Brunch","time" : 1421609400000,"mtime" : 1417814004321,"duration" : 900000,"rsvp_limit" : 0,"status" : "upcoming","event_url" : "http://www.meetup.com/Laguna-Niguel-Social-Networking-Meetup/events/219088449/","group" : { "name" : "Former Flight Attendants South Orange and North San Diego Co","state" : "CA"...},"venue" : { "address_1" : "26860 Ortega Highway","city" : "San Juan Capistrano","country" : "US"...},"venue_visibility" : "public","visibility" : "public","yes_rsvp_count" : 1... }

每當我們長時間輪詢的HTTP連接(帶有Transfer-Encoding: chunked響應標頭)推送此類JSON時,我們都希望對其進行解析并以某種方式進一步傳遞。 我們討厭回調,因此RxJava似乎是一個合理的選擇(認為: Observable<Event> )。

步驟1:使用RxNetty接收原始數據

我們不能使用普通的HTTP客戶端,因為它們專注于請求-響應語義。 這里沒有任何響應,我們只是永遠保持打開的連接,并在數據到達時使用它們。 RxJava具有開箱即用的RxApacheHttp庫,但它假定為text/event-stream內容類型 。 相反,我們將使用底層的通用RxNetty庫。 它是Netty(duh!)的包裝,并且能夠實現任意的 TCP / IP(包括HTTP)以及UDP客戶端和服務器。 如果您不了解Netty,則它是基于數據包的,而不是面向流的,因此我們可以預期每次Meetup推送都會有一個Netty事件。 該API當然不是簡單明了的,但是一旦您使用它,它就會變得有意義:

HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("stream.meetup.com", 443).pipelineConfigurator(new HttpClientPipelineConfigurator<>()).withSslEngineFactory(DefaultFactories.trustAll()).build();final Observable<HttpClientResponse> responses = httpClient.submit(HttpClientRequest.createGet("/2/open_events")); final Observable byteBufs = responses.flatMap(AbstractHttpContentHolder::getContent); final Observable chunks = byteBufs.map(content -> content.toString(StandardCharsets.UTF_8));

首先,我們創建HttpClient并設置SSL(請注意,關于服務器證書的trustAll()可能不是最佳的生產設置)。 稍后我們submit() GET請求,并返回Observable<HttpClientResponse<ByteBuf>> 。 ByteBuf是Netty對通過網絡發送或接收的一堆字節的抽象。 此觀察結果將立即告訴我們從Meetup收到的每條數據。 從響應中提取ByteBuf ,我們將其轉換為包含上述JSON的String 。 到目前為止,一切正常。

步驟2:將數據包與JSON文檔對齊

Netty非常強大,因為它不會掩蓋泄漏抽象所固有的復雜性。 每次通過TCP / IP線路接收到某些內容時,都會通知我們。 您可能會相信,當服務器發送100字節時,客戶端的Netty會將收到的這100字節通知我們。 但是,TCP / IP堆棧可以自由地拆分和合并您通過有線發送的數據,尤其是因為它假定是流,因此如何將其拆分為數據包應該是無關緊要的。 Netty的文檔中對此警告做了很大的解釋。 對我們意味著什么? 當Meetup發送單個事件時,我們可能僅收到一個可觀察到的chunks String 。 但是同樣可以將其劃分為任意數量的數據包,因此chunks將發出多個String 。 更糟糕的是,如果Meetup接連發送兩個事件,則它們可能適合一個數據包。 在這種情況下, chunks將發出一個帶有兩個獨立JSON文檔的String 。 事實上,我們不能假設JSON字符串和收到的網絡數據包之間有任何對齊。 我們所知道的是,代表事件的各個JSON文檔由換行符分隔。 令人驚訝的是, RxJavaString官方附加組件RxJavaString提供了一種精確的方法:

Observable jsonChunks = StringObservable.split(chunks, "\n");

實際上,甚至還有更簡單的StringObservable.byLine(chunks) ,但它使用的是平臺相關的行尾。 最好在官方文檔中解釋split()作用:

現在我們可以安全地解析jsonChunks發出的每個String了:

步驟3:解析JSON

有趣的是,這一步驟并不是那么簡單。 我承認,我排序的享受WSDL時間,因為我很容易,可預見生成如下web服務的合同Java模型。 JSON,特別是在JSON模式的邊緣市場滲透方面,基本上是集成的“狂野西部”。 通常,您會得到非正式的文檔或請求和響應的樣本。 沒有類型信息或格式,無論字段是否為必填項,等等。此外,由于我不情愿使用地圖映射 (在那里,Clojure程序員),為了使用基于JSON的REST服務,我必須自己編寫映射POJO。 好吧,有解決方法。 首先,我舉了一個由Meetup流API生成的JSON的典型示例,并將其放在src/main/json/meetup/event.json 。 然后,我使用jsonschema2pojo-maven-plugin ( 也存在Gradle和Ant版本)。 插件的名稱令人困惑,它還可以與JSON示例(不僅是架構)一起使用以生成Java模型:

<plugin><groupId>org.jsonschema2pojo</groupId><artifactId>jsonschema2pojo-maven-plugin</artifactId><version>0.4.7</version><configuration><sourceDirectory>${basedir}/src/main/json/meetup</sourceDirectory><targetPackage>com.nurkiewicz.meetup.generated</targetPackage><includeHashcodeAndEquals>true</includeHashcodeAndEquals><includeToString>true</includeToString><initializeCollections>true</initializeCollections><sourceType>JSON</sourceType><useCommonsLang3>true</useCommonsLang3><useJodaDates>true</useJodaDates><useLongIntegers>true</useLongIntegers><outputDirectory>target/generated-sources</outputDirectory></configuration><executions><execution><id>generate-sources</id><phase>generate-sources</phase><goals><goal>generate</goal></goals></execution></executions> </plugin>

此時,Maven將創建與Jackson兼容的Event.java , Venue.java , Group.java等:

private Event parseEventJson(String jsonStr) {try {return objectMapper.readValue(jsonStr, Event.class);} catch (IOException e) {throw new UncheckedIOException(e);} }

很好,它很好:

final Observableevents = jsonChunks.map(this::parseEventJson);

步驟5:獲利!!!

有了Observable<Event>我們可以實現一些非常有趣的用例。 是否要查找剛剛創建的波蘭所有聚會的名稱? 當然!

events.filter(event -> event.getVenue() != null).filter(event -> event.getVenue().getCountry().equals("pl")).map(Event::getName).forEach(System.out::println);

尋找統計信息每分鐘創建多少個事件? 沒問題!

events.buffer(1, TimeUnit.MINUTES).map(List::size).forEach(count -> log.info("Count: {}", count));

或者,您是否想繼續搜索將來最遠的聚會,而跳過比已發現的聚會更近的聚會?

events.filter(event -> event.getTime() != null).scan(this::laterEventFrom).distinct().map(Event::getTime).map(Instant::ofEpochMilli).forEach(System.out::println);//...private Event laterEventFrom(Event first, Event second) {return first.getTime() > second.getTime() ?first :second; }

此代碼過濾掉沒有已知時間的事件,發出當前事件或前一個事件( scan() ),具體取決于后面的事件,過濾出重復事件并顯示時間。 這個運行了幾分鐘的微型程序已經發現一個計劃于2015年11月創建的聚會,而在撰寫本文時它是2014年12月。 可能性是無止境的。

希望我能對如何輕松地將各種技術融合在一起有一個很好的了解:React式編程以編寫超快速的網絡代碼,無樣板代碼的類型安全的JSON解析和RxJava來快速處理事件流。 請享用!

翻譯自: https://www.javacodegeeks.com/2014/12/accessing-meetups-streaming-api-with-rxnetty.html

meetup

總結

以上是生活随笔為你收集整理的meetup_使用RxNetty访问Meetup的流API的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。