meetup_使用RxNetty访问Meetup的流API
meetup
本文將涉及多個主題:響應式編程,HTTP,解析JSON以及與社交API集成。 完全在一個用例中:我們將通過非夸張的RxNetty庫實時加載和處理新的metup.com事件,結合Netty框架的強大功能和RxJava庫的靈活性。 Meetup提供了公開可用的流API ,可實時推送世界各地注冊的每個Meetup。 只需瀏覽至stream.meetup.com/2/open_events并觀察JSON塊如何緩慢地出現在屏幕上。 每當有人創建新事件時,自包含的JSON就會從服務器推送到您的瀏覽器。 這意味著這樣的請求永無止境,相反,只要需要,我們就會不斷接收部分數據。 我們已經在將Twitter4J變成RxJava的Observable中研究了類似的情況。 每個新的Meetup事件都會發布一個獨立的JSON文檔,與此類似(省略許多細節):
{ "id" : "219088449","name" : "Silver Wings Brunch","time" : 1421609400000,"mtime" : 1417814004321,"duration" : 900000,"rsvp_limit" : 0,"status" : "upcoming","event_url" : "http://www.meetup.com/Laguna-Niguel-Social-Networking-Meetup/events/219088449/","group" : { "name" : "Former Flight Attendants South Orange and North San Diego Co","state" : "CA"...},"venue" : { "address_1" : "26860 Ortega Highway","city" : "San Juan Capistrano","country" : "US"...},"venue_visibility" : "public","visibility" : "public","yes_rsvp_count" : 1... }每當我們長時間輪詢的HTTP連接(帶有Transfer-Encoding: chunked響應標頭)推送此類JSON時,我們都希望對其進行解析并以某種方式進一步傳遞。 我們討厭回調,因此RxJava似乎是一個合理的選擇(認為: Observable<Event> )。
步驟1:使用RxNetty接收原始數據
我們不能使用普通的HTTP客戶端,因為它們專注于請求-響應語義。 這里沒有任何響應,我們只是永遠保持打開的連接,并在數據到達時使用它們。 RxJava具有開箱即用的RxApacheHttp庫,但它假定為text/event-stream內容類型 。 相反,我們將使用底層的通用RxNetty庫。 它是Netty(duh!)的包裝,并且能夠實現任意的 TCP / IP(包括HTTP)以及UDP客戶端和服務器。 如果您不了解Netty,則它是基于數據包的,而不是面向流的,因此我們可以預期每次Meetup推送都會有一個Netty事件。 該API當然不是簡單明了的,但是一旦您使用它,它就會變得有意義:
HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("stream.meetup.com", 443).pipelineConfigurator(new HttpClientPipelineConfigurator<>()).withSslEngineFactory(DefaultFactories.trustAll()).build();final Observable<HttpClientResponse> responses = httpClient.submit(HttpClientRequest.createGet("/2/open_events")); final Observable byteBufs = responses.flatMap(AbstractHttpContentHolder::getContent); final Observable chunks = byteBufs.map(content -> content.toString(StandardCharsets.UTF_8));首先,我們創建HttpClient并設置SSL(請注意,關于服務器證書的trustAll()可能不是最佳的生產設置)。 稍后我們submit() GET請求,并返回Observable<HttpClientResponse<ByteBuf>> 。 ByteBuf是Netty對通過網絡發送或接收的一堆字節的抽象。 此觀察結果將立即告訴我們從Meetup收到的每條數據。 從響應中提取ByteBuf ,我們將其轉換為包含上述JSON的String 。 到目前為止,一切正常。
步驟2:將數據包與JSON文檔對齊
Netty非常強大,因為它不會掩蓋泄漏抽象所固有的復雜性。 每次通過TCP / IP線路接收到某些內容時,都會通知我們。 您可能會相信,當服務器發送100字節時,客戶端的Netty會將收到的這100字節通知我們。 但是,TCP / IP堆棧可以自由地拆分和合并您通過有線發送的數據,尤其是因為它假定是流,因此如何將其拆分為數據包應該是無關緊要的。 Netty的文檔中對此警告做了很大的解釋。 對我們意味著什么? 當Meetup發送單個事件時,我們可能僅收到一個可觀察到的chunks String 。 但是同樣可以將其劃分為任意數量的數據包,因此chunks將發出多個String 。 更糟糕的是,如果Meetup接連發送兩個事件,則它們可能適合一個數據包。 在這種情況下, chunks將發出一個帶有兩個獨立JSON文檔的String 。 事實上,我們不能假設JSON字符串和收到的網絡數據包之間有任何對齊。 我們所知道的是,代表事件的各個JSON文檔由換行符分隔。 令人驚訝的是, RxJavaString官方附加組件RxJavaString提供了一種精確的方法:
Observable jsonChunks = StringObservable.split(chunks, "\n");實際上,甚至還有更簡單的StringObservable.byLine(chunks) ,但它使用的是平臺相關的行尾。 最好在官方文檔中解釋split()作用:
現在我們可以安全地解析jsonChunks發出的每個String了:
步驟3:解析JSON
有趣的是,這一步驟并不是那么簡單。 我承認,我排序的享受WSDL時間,因為我很容易,可預見生成如下web服務的合同Java模型。 JSON,特別是在JSON模式的邊緣市場滲透方面,基本上是集成的“狂野西部”。 通常,您會得到非正式的文檔或請求和響應的樣本。 沒有類型信息或格式,無論字段是否為必填項,等等。此外,由于我不情愿使用地圖映射 (在那里,Clojure程序員),為了使用基于JSON的REST服務,我必須自己編寫映射POJO。 好吧,有解決方法。 首先,我舉了一個由Meetup流API生成的JSON的典型示例,并將其放在src/main/json/meetup/event.json 。 然后,我使用jsonschema2pojo-maven-plugin ( 也存在Gradle和Ant版本)。 插件的名稱令人困惑,它還可以與JSON示例(不僅是架構)一起使用以生成Java模型:
<plugin><groupId>org.jsonschema2pojo</groupId><artifactId>jsonschema2pojo-maven-plugin</artifactId><version>0.4.7</version><configuration><sourceDirectory>${basedir}/src/main/json/meetup</sourceDirectory><targetPackage>com.nurkiewicz.meetup.generated</targetPackage><includeHashcodeAndEquals>true</includeHashcodeAndEquals><includeToString>true</includeToString><initializeCollections>true</initializeCollections><sourceType>JSON</sourceType><useCommonsLang3>true</useCommonsLang3><useJodaDates>true</useJodaDates><useLongIntegers>true</useLongIntegers><outputDirectory>target/generated-sources</outputDirectory></configuration><executions><execution><id>generate-sources</id><phase>generate-sources</phase><goals><goal>generate</goal></goals></execution></executions> </plugin>此時,Maven將創建與Jackson兼容的Event.java , Venue.java , Group.java等:
private Event parseEventJson(String jsonStr) {try {return objectMapper.readValue(jsonStr, Event.class);} catch (IOException e) {throw new UncheckedIOException(e);} }很好,它很好:
final Observableevents = jsonChunks.map(this::parseEventJson);步驟5:獲利!!!
有了Observable<Event>我們可以實現一些非常有趣的用例。 是否要查找剛剛創建的波蘭所有聚會的名稱? 當然!
events.filter(event -> event.getVenue() != null).filter(event -> event.getVenue().getCountry().equals("pl")).map(Event::getName).forEach(System.out::println);尋找統計信息每分鐘創建多少個事件? 沒問題!
events.buffer(1, TimeUnit.MINUTES).map(List::size).forEach(count -> log.info("Count: {}", count));或者,您是否想繼續搜索將來最遠的聚會,而跳過比已發現的聚會更近的聚會?
events.filter(event -> event.getTime() != null).scan(this::laterEventFrom).distinct().map(Event::getTime).map(Instant::ofEpochMilli).forEach(System.out::println);//...private Event laterEventFrom(Event first, Event second) {return first.getTime() > second.getTime() ?first :second; }此代碼過濾掉沒有已知時間的事件,發出當前事件或前一個事件( scan() ),具體取決于后面的事件,過濾出重復事件并顯示時間。 這個運行了幾分鐘的微型程序已經發現一個計劃于2015年11月創建的聚會,而在撰寫本文時它是2014年12月。 可能性是無止境的。
希望我能對如何輕松地將各種技術融合在一起有一個很好的了解:React式編程以編寫超快速的網絡代碼,無樣板代碼的類型安全的JSON解析和RxJava來快速處理事件流。 請享用!
翻譯自: https://www.javacodegeeks.com/2014/12/accessing-meetups-streaming-api-with-rxnetty.html
meetup
總結
以上是生活随笔為你收集整理的meetup_使用RxNetty访问Meetup的流API的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 你买新电脑了么买新的电脑
- 下一篇: 异步http 超时_具有Completa