日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

rxjava 被观察者_RxJava:从未来到可观察

發布時間:2023/12/3 java 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rxjava 被观察者_RxJava:从未来到可观察 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

rxjava 被觀察者

大約4年前,我第一次在Matthew Podwysocki的博客上遇到了Reactive Extensions ,但是直到我幾周前看到Matthew在Code Mesh上發表演講后,我才對它有所了解。

最近它似乎越來越流行,我注意到Netflix編寫了一個Java版本RxJava 。

我以為可以嘗試通過更改在探索cypher的MERGE函數時暴露的Observable而不是Future的代碼來嘗試一下。

回顧一下,我們有50個線程,我們進行了100次迭代,在這些迭代中我們創建了隨機(用戶,事件)對。 我們最多創建10個用戶和50個事件,目標是同時發送相同對的請求。

在另一篇文章的示例中,我丟棄了每個查詢的結果,而在這里我返回了結果,因此我有一些要訂閱的內容。

代碼的輪廓如下所示:

public class MergeTimeRx {public static void main( final String[] args ) throws InterruptedException, IOException{String pathToDb = "/tmp/foo";FileUtils.deleteRecursively( new File( pathToDb ) );GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );final ExecutionEngine engine = new ExecutionEngine( db );int numberOfThreads = 50;int numberOfUsers = 10;int numberOfEvents = 50;int iterations = 100;Observable<ExecutionResult> events = processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations );events.subscribe( new Action1<ExecutionResult>(){@Overridepublic void call( ExecutionResult result ){for ( Map<String, Object> row : result ){}}} );....}}

使用RxJava的好處是,沒有提到我們如何獲取ExecutionResult的集合,這并不重要。 我們只有它們的流,并且通過在Observable上調用訂閱函數,只要有另一個函數可用,我們就會得到通知。

我發現的大多數示例都顯示了如何從單個線程生成事件,但是我想使用線程池,以便可以同時觸發許多請求。 processEvents方法最終看起來像這樣:

private static Observable<ExecutionResult> processEvents( final ExecutionEngine engine, final int numberOfUsers, final int numberOfEvents, final int numberOfThreads, final int iterations ){final Random random = new Random();final List<Integer> userIds = generateIds( numberOfUsers );final List<Integer> eventIds = generateIds( numberOfEvents );return Observable.create( new Observable.OnSubscribeFunc<ExecutionResult>(){@Overridepublic Subscription onSubscribe( final Observer<? super ExecutionResult> observer ){final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads );List<Future<ExecutionResult>> jobs = new ArrayList<>();for ( int i = 0; i < iterations; i++ ){Future<ExecutionResult> job = executor.submit( new Callable<ExecutionResult>(){@Overridepublic ExecutionResult call(){Integer userId = userIds.get( random.nextInt( numberOfUsers ) );Integer eventId = eventIds.get( random.nextInt( numberOfEvents ) );return engine.execute("MERGE (u:User {id: {userId}})\n" +"MERGE (e:Event {id: {eventId}})\n" +"MERGE (u)-[:HAS_EVENT]->(e)\n" +"RETURN u, e",MapUtil.map( "userId", userId, "eventId", eventId ) );}} );jobs.add( job );}for ( Future<ExecutionResult> future : jobs ){try{observer.onNext( future.get() );}catch ( InterruptedException | ExecutionException ignored ){}}observer.onCompleted();executor.shutdown();return Subscriptions.empty();}} );}

我不確定這是否是使用Observable的正確方法,因此如果我記錯了,請在評論中讓我知道。

我不確定處理錯誤的正確方法是什么。 我最初在catch塊中調用了observer#onError ,但這意味著不會再產生不是我想要的事件。

如果您想使用它,該代碼可以作為要點 。 我添加了以下依賴關系以獲取RxJava庫:

<dependency><groupId>com.netflix.rxjava</groupId><artifactId>rxjava-core</artifactId><version>0.15.1</version></dependency>

參考: RxJava : 從未來到我們的JCG合作伙伴 Mark Needham在Mark Needham Blog博客上均可觀察到。

翻譯自: https://www.javacodegeeks.com/2014/01/rxjava-from-future-to-observable.html

rxjava 被觀察者

總結

以上是生活随笔為你收集整理的rxjava 被观察者_RxJava:从未来到可观察的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。