日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Rx2.0后台开发分享

發布時間:2023/12/4 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Rx2.0后台开发分享 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Rxjava2.x

  • 微軟的一個函數庫,Rx是一個編程模型,模板是提供一致的編程接口,幫助開發者更方便的處理異步數據流,現在Rx已經支持幾乎全部的流行編程語言。比較流行的有Rxjava,RxJs,Rx.NET,社區網站:http://reactivex.io/
  • Rx使用觀察者模式
    • 使用觀察者模式監聽:RX可以訂閱任何可觀察的數據流并且執行操作
    • 組合:RX使用查詢式的操作符和變換數據流
    • 創建:Rx可以方便的創建事件流和數據流
  • 簡化代碼
    • 函數式風格:對可觀察數據流使用無副作用的輸入輸出函數,避免程序中錯綜復雜的狀態
    • 簡化代碼:Rx的操作符通常可以將復雜難題簡單化為很少的代碼
    • 異步錯誤處理: 傳統的try/catch沒有辦法處理異步計算,Rx提供了合適的錯誤處理機制
    • 輕松使用并發:Rx的Observables和Schedulers讓開發者可以擺脫底層線程同步和各種并發問題。
  • jar包maven倉庫地址
<dependency><groupId>io.reactivex.rxjava2</groupId><artifactId>rxjava</artifactId><version>2.2.6</version> </dependency>
  • 一個簡單的例子:

    public void myTestObservable(){Observable.fromIterable(Lists.newArrayList(1,2,3,4,5)).filter(integer -> {return integer > 2;}).subscribe(integer -> {System.out.println(Thread.currentThread().getName() + " : "+ integer );});}
    • 在Rxjava中,一個實現了Observer接口的對象可以訂閱(subscribe)一個Observable類的實例。訂閱者(Subscriber)對Observable發射(emit)的任何數據或數據序列作出響應。這種模式簡化了并發的操作,因為他不需要阻塞等待Observable發射數據,而是創建一個處于待命狀態的觀察者哨兵,哨兵在未來某個時刻響應Observable的通知。

  • Observable

    • 如上圖是Observable發射數據的一個流程圖,
    • 時間線 左邊 ----右邊, 各個不同形狀標識Observable上的元素
    • 最后垂直線表示Observable成功執行
    • 向下虛線箭標識數據被發射出去
    • 盒子表示各種操作符來對對應數據進行處理
    • 第二條時間線也是一個Observable,不過是轉換之后的
    • 當轉換時候出現錯誤將會并不會終止,他會用一個error事件替代這個位置
  • Subscribe

    • 觀察者通過SubScribe操作關聯Observable
  • Observer

    • 觀察者,決定了事件觸發的時候將有怎么樣的行為
    void onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull Throwable e); void onComplete();
    • onNext事件,被觀察者每發射一個數據都會調onNext事件,相當于異步任務中的回調接口,相當于Feature的get獲取,只不過onNext不是阻塞的就是一個哨兵模式,每次發射數據,獲取立即獲取對應結果,然后執行之后的邏輯
    • onCompleted事件,表示事件隊列完成,Rxjava不僅把每一個事件單獨處理,還會把他看做一個隊列,Rxjava規定,當不會再發射新的元素觸發onNext事件時候,需要觸發onCompleted事件作為結束標志。
    • onError事件,事件隊列處理中出現異常時候,onError會被觸發,可以在onError中統一處理異常情況
    • onSubScribe事件,表示元素開始發射,相當于所有元素執行之前的一個預處理位置。
  • Schedulers

    • 默認情況下Observable和Observer執行過程是在同一個線程執行如上面最簡單例子,如果想要切換線程在不同線程執行可以用SubscribeOn(),observeOn()。
    • Rxjava提供了幾種線調度器
調度器類型效果
Schedulers.newThread();為每一個任務創建一個新線程
Schedulers.computation();用于計算任務,如時間循環和回調處理,不要用于IO操作,默認線程數等于處理器數量
Schedulers.io();用于IO密集型任務,如異步阻塞IO操作,這個調度器的線程池會根據需要增長;對應普通計算任務,一般用上面這個,Schedulers.io默認是一個CachedThreadScheduler,很像一個有線程緩存的新線程調度器。
Schedulers.single();擁有一個線程單例的線程池,所有任務都在這一個線程中執行,當線程中有任務執行時候,他的任務將會按照先進先出的順序依次執行。
Schedulers.trampoline();Creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes. 在當前線程立即執行任務,如果當前線程有任務在執行,則會將其暫停,等插入進來的任務執行完之后,再將未完成的任務接著執行。
  • 來一個完整的例子來解釋一下線程切換:
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {for (Integer integer : intList) {System.out.println(Thread.currentThread().getName() + " : send");emitter.onNext(integer);}emitter.onComplete();}}).subscribeOn(Schedulers.computation()).observeOn(Schedulers.newThread()).flatMap(new Function<Integer, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Integer integer) throws Exception {return Observable.just(integer).subscribeOn(Schedulers.computation()).filter(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {System.out.println(Thread.currentThread().getName() + ": filter one integer: "+ integer);return integer > 2;}});}}).observeOn(Schedulers.io()).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println(Thread.currentThread().getName()+ " : onSubscribe");}});

這個代碼看起來比較長,也可以這么寫:

Observable.create(emitter -> {intList.forEach(intTemp -> emitter.onNext(intTemp));emitter.onComplete();}).subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation()).flatMap(intStr -> Observable.just(intStr).subscribeOn(Schedulers.computation()).filter(filterInt -> Integer.valueOf(filterInt.toString()) > 2)).observeOn(Schedulers.computation()).subscribe(intTemp -> System.out.println(intTemp));
  • 第一個subscribeOn指定被觀察對象發射的線程,使用的computation模型
  • 第一個observeOn指定之后的flatMap操作符切換到另外線程中執行
  • 最后的observeOn指定觀察者哨兵消費數據的線程,會有如下結果

  • Observable的這種異步切換線程方式從整體流程來看還是同步的方式,他必須先Observable發射數據-----操作符change-----消費數據并不是每次發射一個數據的同時進行change接著消費的并行實現,因此Rxjava提供了另外一個并行的方式,如下案例
public static void flowableDemo() throws InterruptedException {Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 1; i < 100; i++) {System.out.println(Thread.currentThread().getName() + " 發射數據");emitter.onNext(i);}emitter.onComplete();}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).filter(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {System.out.println(Thread.currentThread().getName() + " 過濾發射數據");return integer > 0;}}).observeOn(Schedulers.newThread()).subscribe(new Subscriber<Object>() {public void onSubscribe(Subscription subscription) {System.out.println("取出n條數據");subscription.request(3);}public void onNext(Object o) {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName());System.out.println("消費數據:" + o);}public void onError(Throwable throwable) {System.out.println(throwable.getMessage());}public void onComplete() {System.out.println("onComplete");}});}

  • 用Flowable不僅僅是對每個模塊進行了線程的切換,在同時也是并行的執行了整個流程
  • 我覺得在異步編程方面Rxjava的確比原始的Thread ,Runnable這類操作要方便的多,通過幾個操作符就可以達到異步的目的,這也是Rxjava的一個優勢,而在我們工作中我們一般都是用架構組的異步框架也可以做到很優雅的異步編程,比起Rxjava而言只是會多創建一個異步類而已,那么我們來對比一下兩種異步操作,我用之前的批量關注接口做測試
  • 原來的異步方式:
public void batchFollowPush(UserInfo userInfo, List<UserInfo> objectUserInfos, Integer source) {List<Boolean> batchFollowResult = new ArrayList<>();Future<Boolean> pushFuture = null;for (UserInfo objectUserInfo : objectUserInfos) {try {pushFuture = (Future<Boolean>)pushAsync.asyncFollowAndStoreMoment(userInfo, objectUserInfo, source);batchFollowResult.add(pushFuture.get());} catch (Exception e) {if (pushFuture != null) {pushFuture.cancel(true);}throw new RuntimeException(e);}}log.info("batchFollow result:{}", JSON.toJSONString(batchFollowResult));}//異步類中方法public Object asyncFollowAndStoreMoment(UserInfo userInfo, UserInfo objectUserInfo, Integer source) {// 關注交互限制Optional<InteractError> interactErrorOptional = personFacade.interactRestrict(userInfo, objectUserInfo);if (interactErrorOptional.isPresent()) {return Boolean.FALSE;}int result = wooerFacade.addOrUpdatePush2(userInfo.getMemberID(), objectUserInfo.getMemberID(), 1, source, HeadUaUtils.getPlatform());Boolean isTrue = result > 0;if (isTrue == null || !isTrue) {return Boolean.FALSE;}limitedPushFacade.followPush(userInfo, objectUserInfo);MomentListStoreParam momentListStoreParam =new MomentListStoreParam(Arrays.asList(Long.valueOf(objectUserInfo.getMemberID())),userInfo.getMemberID().longValue());log.info("batchFollow afterFilter param:{}", JSON.toJSONString(momentListStoreParam));momentManagerFacade.storeMomentMongoByMomentId(momentListStoreParam);return Boolean.TRUE;}
  • RX方式
public IResponse batchFollowRx(BatchFollowForm batchFollowForm, UserInfo myUserInfo) {log.info("batchFollow param:{}", JSON.toJSONString(batchFollowForm));BusinessAssert.isNotNull(batchFollowForm, CommonError.ARGS_EMPTY);List<Long> objectIDs = batchFollowForm.getObjectIDs();List<UserInfo> objectUserInfo = coreUserService.getList(objectIDs, UserInfo.class);Flowable.create(new FlowableOnSubscribe<UserInfo>() {@Overridepublic void subscribe(FlowableEmitter<UserInfo> emitter) throws Exception {for (UserInfo info : objectUserInfo) {emitter.onNext(info);}}}, BackpressureStrategy.ERROR).parallel().runOn(Schedulers.io()).filter(userInfo -> {Optional<InteractError> interactErrorOptional = personFacade.interactRestrict(myUserInfo, userInfo);if (interactErrorOptional.isPresent()) {return Boolean.FALSE;}Boolean isTrue =wooerFacade.addOrUpdatePush2(myUserInfo.getMemberID(), userInfo.getMemberID(),1, batchFollowForm.getSource(), HeadUaUtils.getPlatform()) > 0;if (isTrue == null || !isTrue) {return Boolean.FALSE;}return Boolean.TRUE;}).runOn(Schedulers.computation()).sequential().subscribe(new Consumer<UserInfo>() {@Overridepublic void accept(UserInfo userInfo) throws Exception {limitedPushFacade.followPush(myUserInfo, userInfo);MomentListStoreParam momentListStoreParam =new MomentListStoreParam(Arrays.asList(Long.valueOf(userInfo.getMemberID())),userInfo.getMemberID().longValue());log.info("batchFollow afterFilter param:{}", JSON.toJSONString(momentListStoreParam));momentManagerFacade.storeMomentMongoByMomentId(momentListStoreParam);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {log.error("batch follow error exception:{}", throwable.getMessage());}});return MessageResponse.success(FOLLOW_SUCCESS);}
  • 測試批量關注接口,相同容器環境在同一個namespace上,并且相同條件的男性賬號,關注同一批13個異性用戶:

    • 普通callback異步方式:

    • Rxjava方式

  • backPressure

    • 以上的例子中每次都用到了BackpressureStrategy操作符這個是Rxjava2.x后為了解決背壓問題的一個操作符,所謂背壓,即異步的情況下發射數據的速度大于消費數據的速度帶來的問題。
    • BackPressure現象說明:Flowable無限的生產事件,但是SubScribe消費的速度很慢,導致事件堆積,當堆積到一定程度將會造成OOM,我們模擬一下這種情況。
public static void oomDemo(){Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {for (int i=0;;i++){System.out.println(Thread.currentThread().getName() + " onNext : "+ i);emitter.onNext(i);}}}).subscribeOn(Schedulers.io()).observeOn(Schedulers.single()).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Thread.sleep(2000);System.out.println(Thread.currentThread().getName() + " consumer : "+ integer);}});}
  • 讓發射數據在多個線程中執行,讓消費數據在一個線程中執行并且每兩秒才消費一個,這樣會導致發射的數據不斷的累積在內存中,最終可能會導致oom,我們通過內存信息來看他執行之后一段時間的堆內存信息

  • PSYoiungGen 年輕態區,總共1024k,使用了511k
  • eden區域是新對象區,已經被沾滿
  • from和to區域 大學是一樣,在gc時候會遍歷from或者to區域,將不能清除的拷貝到另外一個區,然后清除本區域留下的,然后循環
  • paroldGen 老年代區域也已經被占滿
  • 這種狀態下Observable因內存不夠已經oom,停止運行了,只有消費線程在消費數據。
io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.OutOfMemoryError: GC overhead limit exceededat io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:69)at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  • 我們用一個Flowable的例子來看他如何解決這個oom的問題:
public static void oomDemoFix(){Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i=0;;i++){System.out.println(Thread.currentThread().getName() + " onNext : "+ i);emitter.onNext(i);}}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription subscription) {subscription.request(50);}@Overridepublic void onNext(Integer integer) {System.out.println(Thread.currentThread().getName() + "消費數據: "+ integer);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}});}
  • 我們在創建Flowable的時候增加了一個參數 BackpressureStrategy.ERROR,這個參數指定了在處理背壓問題時候執行的一個策略,當內存滿時候接下來繼續發射的數據將會拋出MissingBackpressureException 異常,其余的策略稍等介紹
  • 還有另外一個不同 onSubscribe中傳遞的不是Disposable變成了Subscription,而且還執行了這句代碼subscription.request(50)。因為在Flowable中采用了一個新的思路,響應獲取發射數據的方法來解決流量不均勻而造成的oom的問題,也就是我要消費多少我就取多少,這里我們從發射數據中取出了50條。其他的還是會存儲在內存中。

Flowable中主要有這幾個策略

  • BackpressureStrategy.ERROR:如果緩存池(默認128)溢出會立刻拋異常MissingBackpressureexception
  • BackpressureStrategy.BUFFER:RxJava中有一個默認能夠存儲128個事件的緩存池,可以調節大小,生產者生產事件,并且將處理不了的事件緩存。(謹慎使用,因為依賴對消費者消費能力,耗內存)
  • BackpressureStrategy.DROP:消費不了就丟棄,比如先生產200個,并沒有消費,而是在緩存,然后消費者request(200),但是緩存只有128個,所以其他的事件都丟棄了。
  • BackpressureStrategy.LATEST:和DROP功能基本一致,處理不了丟棄,區別在于LATEST能夠讓消費者接收到生產者產生的最后一個事件。
  • BackpressureStrategy.MISSING:生產的事件沒有進行緩存和丟棄,下游接收到的事件必須進行消費或者處理!

感覺這些都是緩兵之計,能否按照我的消費能力來發射數據呢,這樣才完美。

  • Rxjava2.x后有一個FlowableEmitter 這個接口:
public static void fix(Flowable flowable){flowable.observeOn(Schedulers.computation()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription subscription) {subscription.request(20);}@Overridepublic void onNext(Integer integer) {System.out.println("消費數據: "+ 100);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}});}public static Flowable flowableEmitterDemo(){Flowable flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {while (emitter.requested() > 0){System.out.println("下游處理能力:"+ emitter.requested());emitter.onNext(100);}}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io());return flowable;}
  • 我可以在Flowable發射數據之前通過requested來獲取下游Subscriber的消費能力,依據這個來進行數據的發射,這樣既可以控制發射以及消費數據的速度,也能夠避免數據的丟失

現在我們看下開始時候從官網摘抄的Rx的幾個優點:

  • 首先函數式風格,這種編程模式和常規的方式比較的確簡化了不少代碼比如第一個案例,但是感覺我們用stream表達式加上lambda也可以達到這種效果,而且對于map,flatmap,filter等這些操作符對于沒有函數式編程的人來說并不好理解不覺得這是優勢

  • 簡化代碼,這點主要體現在異步編程模式時候,不管和我們java中的異步編程用的Thread和Runnable相比,還是我們框架中的異步編程框架比較的確代碼都更加簡單,只需要通過幾個異步線程切換的操作符便可以達到目的,但是缺點也很明顯,需要引入新的jar,新的技術對不熟悉這塊技術的同事并不友好有一定學習成本不利于維護。

  • 異步錯誤處理,輕松使用并發:通過onError捕獲異常信息,通過操作法切換線程,的確也是優勢所在。

  • 在之前的實踐中還有這種業務模型下使用Rxjava會更具優勢,當我們需要從多個網絡環境來獲取各自信息從中篩選出符合我們預期的并對其進行組合,我們可以通過Rxjava的豐富的操作符以及異步操作來完成。來一個簡單的例子

public static Flowable getIOData1(){return Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Integer> flowableEmitter) throws Exception {for (int i = 0; i < 10; i++) {flowableEmitter.onNext(i);}System.out.println(Thread.currentThread().getName());}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).filter(temp -> temp > 2);}public static Flowable getIOData2(){return Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Integer> flowableEmitter) throws Exception {for (int i = 10; i < 21; i++) {flowableEmitter.onNext(i);}System.out.println(Thread.currentThread().getName());}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).filter(temp -> temp > 12);}public static Flowable getIOData3(){return Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Integer> flowableEmitter) throws Exception {for (int i = 20; i < 30; i++) {flowableEmitter.onNext(i);}System.out.println(Thread.currentThread().getName());}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).filter(temp -> temp > 22);}public static Flowable getIOData4(){return Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull FlowableEmitter<Integer> flowableEmitter) throws Exception {for (int i = 30; i < 41; i++) {flowableEmitter.onNext(i);}System.out.println(Thread.currentThread().getName());}}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io()).filter(temp -> temp > 32);}public static void mergeDemo(){Flowable.merge(getIOData1(), getIOData2(), getIOData3(), getIOData4()).map(temp -> "onNext"+ temp).flatMap(new Function() {@Overridepublic Object apply(@NonNull Object o) throws Exception {return Flowable.just(o).subscribeOn(Schedulers.io());}}).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.computation()).subscribe(new Subscriber() {@Overridepublic void onSubscribe(Subscription subscription) {subscription.request(Long.MAX_VALUE);}@Overridepublic void onNext(Object o) {System.out.println("onNext: "+ o);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}});}
  • 我們定義N個Flowable用異步方式分別請求各個第三方接口來獲取對應的數據并且用filter過濾出我們需要的信息,然后通過merge操作法將所有獲取到的數據組合到同一個Flowable中,進行統一的封裝處理以及之后的一些業務操作。
  • 如果我們用傳統的方式,我們不得不定義N個變量來獲取四個異步線程數據,然后等都獲取完畢之后,在分別對四個變量中保存的信息進行篩選,之后通過邏輯操作合并到一起,和rxjava相比顯然要遜色很多。
  • 這種方式就是Flowable通過內置操作符對自身發射的數據在空間維度上重新組織,或者和其他Flowable發射的數據一起在空間維度上進行重新組織,是的觀察者的邏輯變得更加的簡單直觀因為直接看操作符就能知道具體做了什么,不需要關心數據從哪里來這部分由Flowable屏蔽了,從而使得觀察者更加專注于下游邏輯。

RxJava的響應式優勢只有在異步邏輯占主導時才會體現出來.

  • wiki地址:https://github.com/ReactiveX/RxJava/wiki
  • reactivex官網:http://reactivex.io/
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的Rx2.0后台开发分享的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 亚洲品质自拍 | 日韩一区二区三区四区 | 日本一区二区观看 | 亚洲天堂美女视频 | 免费毛片看 | 国产老熟女伦老熟妇露脸 | 日韩免费成人 | 日本黄色免费网站 | 午夜影院久久久 | 免费污污视频在线观看 | 日韩免费视频观看 | 午夜国产视频 | 午夜性色 | 国产色诱视频 | 91精品视频在线免费观看 | 夜夜嗨老熟女av一区二区三区 | 亚洲国产成人精品激情在线 | 在线爱情大片免费观看大全 | 91久精品 | 黄色国产一区 | 成人一区av | 综合视频一区二区 | 午夜国产一区二区三区 | 亚洲成a | 黄页网站免费观看 | 国产婷婷色一区二区三区在线 | 久草天堂 | 成人免费网视频 | 熟女少妇内射日韩亚洲 | 樱花电影最新免费观看国语版 | 亚洲精品国产精品乱码不99 | 日日夜夜撸撸 | 狠狠五月 | 国产哺乳奶水91在线播放 | 日本电影一区 | 久精品在线观看 | 狠狠地日 | 日韩福利视频网 | 欧美aaa级片| 超碰蜜桃| 热99在线 | 五月激情在线观看 | 欧美久久精品一级黑人c片 1000部多毛熟女毛茸茸 | 亚洲免费观看高清完整版在线 | 东北女人av| 99久久婷婷国产综合精品电影 | 久久久国产精品免费 | 人妻无码一区二区三区 | 欧美日韩中文字幕 | 国产黄色在线免费观看 | 在线成人影视 | 都市激情 亚洲 | 成人在线国产精品 | 欧美成人国产 | 97国产免费 | 91一区二区视频 | 国产啊v在线观看 | 午夜影院 | 中文字幕色图 | 亚洲一道本 | 午夜色图 | 日韩成人在线看 | 成人黄色激情视频 | 野花视频在线观看免费 | 青青青草视频在线 | 男朋友是消防员第一季 | 成人免费看高清电影在线观看 | 国产精品伦理一区 | 欧美日韩在线视频一区二区 | 一区二区三区日韩精品 | 一区二区三区免费看 | 日本wwwxxx | 一区二区三区精品免费视频 | 四月婷婷 | 婷婷丁香视频 | 亚洲精品传媒 | 国产区123| 丰满少妇在线观看bd | 成人91在线观看 | 韩国精品视频在线观看 | 亚洲深夜福利视频 | 日韩精品无码一本二本三本色 | 国产精品久久久久久久久久 | 爱看av在线| 国语对白精彩对话 | 国产jizz18女人高潮 | 国产真人无码作爱视频免费 | 国产小视频网址 | 日本熟妇毛茸茸丰满 | 国产高清不卡 | 成人一区二区三区在线观看 | 国产一级大片在线观看 | 亚洲精选一区二区 | 午夜免费在线观看 | 新呦u视频一区二区 | 有码中文| 欧美草比视频 | 成人综合婷婷国产精品久久 | 亚洲综合婷婷久久 |