日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Reactor by Example--转

發布時間:2025/4/5 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Reactor by Example--转 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

原文地址:https://www.infoq.com/articles/reactor-by-example

Key takeaways

  • Reactor is a reactive streams library targeting Java 8 and providing an Rx-conforming API
  • It uses the same approach and philosophy as RxJava despite some API differences
  • It is a 4th generation reactive library that allows operator fusion, like RxJava 2
  • Reactor is a core dependency in the reactive programming model support of Spring Framework 5.

RxJava recap

Reactor, like RxJava 2, is a?fourth generation?reactive library. It has been launched by Spring custodian Pivotal, and builds on the Reactive Streams specification, Java 8, and the ReactiveX vocabulary. Its design is the result of a savant mix fueled by designs and core contributors from Reactor 2 (the previous major version) and RxJava.

In previous articles in this series, "RxJava by Example"?and "Testing RxJava", you learned about the basics of reactive programming: how data is conceptualized as a stream, the Observable class and its various operators, the factory methods that create Observables from static and dynamic sources.

Observable is the push source and Observer is the simple interface for consuming this source via the act of subscribing. Keep in mind that the contract of an Observable is to notify its Observer of 0 or more data items through onNext, optionally followed by either an onError or onComplete terminating event.

To test an Observable, RxJava provides aTestSubscriber, which is a special flavor of Observer that allows you to assert events in your stream.

In this article we'll draw a parallel between Reactor and what you already learned about RxJava, and showcase the common elements as well as the differences.

Reactor's types

Reactor's two main types are the?Flux<T>?and?Mono<T>. A Flux is the equivalent of an RxJavaObservable, capable of emitting 0 or more items, and then optionally either completing or erroring.

A Mono on the other hand can emit?at most?once. It corresponds to both?Single?and?Maybetypes on the RxJava side. Thus an asynchronous task that just wants to signal completion can use a?Mono<Void>.

This simple distinction between two types makes things easy to grasp while providing meaningful semantics in a reactive API: by just looking at the returned reactive type, one can know if a method is more of a "fire-and-forget" or "request-response" (Mono) kind of thing or is really dealing with multiple data items as a stream (Flux).

Both Flux and Mono make use of this semantic by coercing to the relevant type when using some operators. For instance, calling?single()?on a?Flux<T>?will return a?Mono<T>, whereas concatenating two monos together using?concatWith?will produce a?Flux. Similarly, some operators will make no sense on a?Mono?(for example?take(n), which produces n > 1 results), whereas other operators will?only?make sense on a?Mono?(e.g.?or(otherMono)).

One aspect of the Reactor design philosophy is to keep the API lean, and this separation into two reactive types is a good middle ground between expressiveness and API surface.

"Build on Rx, with Reactive Streams at every stage"

As expressed in "RxJava by Example", RxJava bears some superficial resemblance to Java 8 Streams API, in terms of concepts. Reactor on the other hand looks a lot like RxJava, but this is of course in no way a coincidence. The intention is to provide a Reactive Streams native library that exposes an Rx-conforming operator API for asynchronous logic composition. So while Reactor is rooted in Reactive Streams, it seeks general API alignment with RxJava where possible.

Reactive Libraries and Reactive Streams adoption

Reactive Streams?(abbreviated RS in the remainder of this article) is "an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure". It is a set of textual specifications along with a TCK and four simple interfaces (Publisher,?Subscriber,Subscription?and?Processor), which will be integrated in Java 9.

It mainly deals with the concept of reactive-pull back-pressure (more on that later) and how to interoperate between several implementing reactive sources. It doesn't cover operators at all, focusing instead exclusively on the stream's lifecycle.

A key differentiator for Reactor is its RS first approach.Both?Flux?and?Mono?are?RS Publisherimplementations and conform to reactive-pull back-pressure.

In RxJava 1 only a subset of operators support back-pressure, and even though RxJava 1 has adapters to RS types, its?Observable?doesn't implement these types directly. That is easily explained by the fact that RxJava 1 predates the RS specification and served as one of the foundational works during the specification's design.

That means that each time you use these adapters you are left with a?Publisher, which again doesn't have any operator. In order to do anything useful from there, you'll probably want to go back to an?Observable, which means using yet another adapter. This visual clutter can be detrimental to readability, especially when an entire framework like Spring 5 directly builds on top of?Publisher.

Another difference with RxJava 1 to keep in mind when migrating to Reactor or RxJava 2 is that in the RS specification,?null?values are not authorized. It might turn out important if your code base uses?null?to signal some special cases.

RxJava 2 was developed after the Reactive Streams specification, and thus has a direct implementation of?Publisher?in its new?Flowable?type. But instead of focusing exclusively on RS types, RxJava 2 also keeps the "legacy" RxJava 1 types (Observable,?Completable, and?Single)? and introduces the "RxJava Optional",?Maybe. Although they still provide the semantic differentiation we talked about earlier, these types have the drawback of not implementing RS interfaces. Note that unlike in RxJava 1,?Observable?in RxJava 2 does not support the backpressure protocol in RxJava 2 (a feature now exclusively reserved to?Flowable). It has been kept for the purpose of providing a rich and fluent API for cases, such as user interface eventing, where backpressure is impractical or impossible.?Completable,?Single?and?Maybe?have by design no-need for backpressure support, they will offer a rich API as well and defer any workload until subscribed.

Reactor is once again leaner in this area, sporting its?Mono?and?Flux?types, both implementingPublisher?and both backpressure-ready. There's a relatively small overhead for?Mono?to behave as a?Publisher, but it is mostly offsetted by other?Mono?optimizations. We'll see in a later section what backpressure means for?Mono.

An API similar but not equal to RxJava's

The ReactiveX and RxJava vocabulary of operators can be overwhelming at times, and some operators can have confusing names for historical reasons. Reactor aims to have a more compact API and to deviate in some cases, e.g. in order to choose better names, but overall the two APIs look a lot alike. In fact the latest iterations in RxJava 2 actually borrow some vocabulary from Reactor as well, a hint of the ongoing close collaboration between the two projects. Some operators and concepts first appear in one library or the other, but often end up in both.

For instance,?Flux?has the same familiar?just?factory method (albeit having only two?justvariants: one element and a vararg). But?from, has been replaced by several explicit variants, most notable being?fromIterable. Flux also has all the usual suspects in term of operators:?map,merge,?concat,?flatMap,?take…, etc.

One example of an RxJava operator name that Reactor eschewed was the puzzling?amboperator, which has been replaced with the more appropriately named?firstEmitting. Additionally, to introduce greater consistency in the API,?toList?has been renamed?collectList. In fact all?collectXXX?operators now aggregate values into a specific type of collection but still produce a?Mono?of said collection, while?toXXX?methods are reserved for type conversions that take you out of the reactive world, eg.?toFuture().

One more mean by which Reactor can be leaner, this time in terms of class instantiation and resource usage, is?fusion: Reactor is capable of merging multiple sequential uses of certain operators (eg. calling?concatWith?twice) into a single use, only instantiating the operator's inner classes once (macro-fusion). That includes some data source based optimization which greatly helps?Mono?offset the cost of implementing?Publisher. It is also capable of sharing resources like inner queues between several compatible operators (micro-fusion). These capabilities make Reactor a fourth-generation reactive library. But that is a topic for a future article.

Let's take a closer look at a few Reactor operators. (You will notice the contrast with some of the examples in the earlier articles in our series.)

A few operator examples

(This section contains snippets of code, and we encourage you to try them and experiment further with Reactor. To that effect, you should open your IDE of choice and create a test project with Reactor as a dependency.)

To do so in Maven, add the following to the dependencies section of your pom.xml:

<dependency><groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId><version>3.0.3.RELEASE</version> </dependency>

To do the same in Gradle, edit the dependencies section to add reactor, similarly to this:

dependencies {compile "io.projectreactor:reactor-core:3.0.3.RELEASE" }

Let's play with examples used in the previous articles in this series!

Very similarly to how you would create your first?Observable?in RxJava, you can create a?Fluxusing the?just(T…)?and?fromIterable(Iterable<T>)?Reactor factory methods. Remember that given a?List,?just?would?just?emit the list as one whole, single emission, while?fromIterable?will emit each element?from?the?iterable?list:

public class ReactorSnippets {private static List<String> words = Arrays.asList("the","quick","brown","fox","jumped","over","the","lazy","dog");@Testpublic void simpleCreation() {Flux<String> fewWords = Flux.just("Hello", "World");Flux<String> manyWords = Flux.fromIterable(words);fewWords.subscribe(System.out::println);System.out.println();manyWords.subscribe(System.out::println);} }

Like in the corresponding RxJava examples, this prints
Hello
World

the
quick
brown
fox
jumped
over
the
lazy
dog

In order to output the individual letters in the fox sentence we'll also need?flatMap?(as we did in RxJava by Example), but in Reactor we use?fromArray?instead of?from. We then want to filter out duplicate letters and sort them using?distinct?and?sort. Finally, we want to output an index for each distinct letter, which can be done using?zipWith?and?range:

@Test public void findingMissingLetter() {Flux<String> manyLetters = Flux.fromIterable(words).flatMap(word -> Flux.fromArray(word.split(""))).distinct().sort().zipWith(Flux.range(1, Integer.MAX_VALUE),(string, count) -> String.format("%2d. %s", count, string));manyLetters.subscribe(System.out::println); }

This helps us notice the?s?is missing as expected:

1. a
2. b
...
18. r
19. t
20. u
...
25. z

One way of fixing that is to correct the original words array, but we could also manually add the "s" value to the?Flux?of letters using?concat/concatWith?and a?Mono:

@Test public void restoringMissingLetter() {Mono<String> missing = Mono.just("s");Flux<String> allLetters = Flux.fromIterable(words).flatMap(word -> Flux.fromArray(word.split(""))).concatWith(missing).distinct().sort().zipWith(Flux.range(1, Integer.MAX_VALUE),(string, count) -> String.format("%2d. %s", count, string));allLetters.subscribe(System.out::println); }

This adds the missing?s?just before we filter out duplicates and sort/count the letters:

1. a
2. b
...
18. r
19. s
20. t
...
26. z

The previous article noted the resemblance between the Rx vocabulary and the Streams API, and in fact when the data is readily available from memory, Reactor, like Java Streams, acts in simple push mode (see the backpressure section below to understand why). More complex and truly asynchronous snippets wouldn't work with this pattern of just subscribing in the main thread, primarily because control would return to the main thread and then exit the application as soon as the subscription is done. For instance:

@Test public void shortCircuit() {Flux<String> helloPauseWorld = Mono.just("Hello").concatWith(Mono.just("world").delaySubscriptionMillis(500));helloPauseWorld.subscribe(System.out::println); }

This snippet prints "Hello", but fails to print the delayed "world" because the test terminates too early. In snippets and tests where you only sort of write a main class like this, you'll usually want to revert back to blocking behavior. To do that you could create a?CountDownLatch?and callcountDown?in your subscriber (both in?onError?and?onComplete). But then that's not very reactive, is it? (and what if you forget to count down, in case of error for instance?)

The second way you could solve that issue is by using one of the operators that revert back to the non-reactive world. Specifically,?toIterable?and?toStream?will both produce a blocking instance. So let's use?toStream?for our example:

@Test public void blocks() {Flux<String> helloPauseWorld = Mono.just("Hello").concatWith(Mono.just("world").delaySubscriptionMillis(500));helloPauseWorld.toStream().forEach(System.out::println); }

As you would expect, this prints "Hello" followed by a short pause, then prints "world" and terminates.

As we mentioned above, RxJava?amb()?operator has been renamed?firstEmitting?(which more clearly hints at the operator's purpose: selecting the first?Flux?to emit). In the following example, we create a?Mono?whose start is delayed by 450ms and a?Flux?that emits its values with a 400ms pause?before?each value. When?firstEmitting()?them together, since the first value from theFlux?comes in before the?Mono's value, it is the?Flux?that ends up being played:

@Test public void firstEmitting() {Mono<String> a = Mono.just("oops I'm late").delaySubscriptionMillis(450);Flux<String> b = Flux.just("let's get", "the party", "started").delayMillis(400);Flux.firstEmitting(a, b).toIterable().forEach(System.out::println); }

This prints each part of the sentence with a short 400ms pause between each section.

At this point you might wonder, what if you're writing a test for a Flux that introduces delays of 4000ms instead of 400? You don't want to wait 4s in a unit test! Fortunately, we'll see in a later section that Reactor comes with powerful testing facilities that nicely cover this case.

But for now, we have sampled how Reactor compares for a few common operators, so let's zoom back and have a look at other differentiating aspects of the library.

A Java 8 foundation

Reactor targets Java 8 rather than previous Java versions. This is once again aligning with the goal of reducing the API surface: RxJava targets Java 6 where there is no?java.util.functionpackage so classes like?Function?or?Consumer?can't be leveraged. Instead they had to add specific classes like?Func1,?Func2,?Action0,?Action1, etc. In RxJava 2 these classes mirrorjava.util.function?the way Reactor 2 used to do when it still had to support Java 7.

The Reactor API also embraces types introduced in Java 8. Most of the time-related operators will be about a duration (eg.?timeout,?interval,?delay, etc.), so using the Java 8?Duration classis appropriate.

The Java 8?Stream?API and?CompletableFuture?can also both be easily converted to a?Flux/Mono, and vice-versa. Should we usually convert a?Stream?to a?Flux?though? Not really. The level of indirection added by?Flux?or?Mono?is a negligible cost when they decorate more costly operations like IO or memory-bound operations, but most of the time a?Stream?doesn't imply that kind of latency and it is is perfectly ok to use the?Stream?API directly. Note that for these use cases in RxJava 2 we'd use the?Observable, as it is not backpressured and thus becomes a simple?pushuse case once you've subscribed. But Reactor is based on Java 8, and the Stream API is expressive enough for most use cases. Note also that even though you can find?Flux?and?Monofactories for literal or simple Objects, they mostly serve the purpose of being combined in higher level flows. So typically you wouldn't want to transform an accessor like "long getCount()" into a "Mono<Long> getCount()" when migrating an existing codebase to reactive patterns.

The Backpressure story

One of the main focuses (if not?the?main focus) of the RS specification and of Reactor itself isbackpressure. The idea of backpressure is that in a push scenario where the producer is quicker than the consumer, there's value in letting the consumer signal back to the producer and say "Hey! Slow down a little, I'm overwhelmed". This gives the producer a chance to control its pace rather than having to resort to discarding data (sampling) or worse, risking a cascading failure.

You may wonder at this point where backpressure comes into the picture with?Mono: what kind of consumer could possibly be overwhelmed by a single emission? Short answer is "probably none". However, there's still a key difference between how a?Mono?works and how aCompletableFuture?works. The latter is?push?only: if you have a reference to the?Future, it means the task processing an asynchronous result is already executing. On the other hand, what a backpressured?Flux?or?Mono?enables is a?deferred pull-push?interaction:

  • Deferred?because nothing happens before the call to?subscribe()
  • Pull?because at the subscription and request steps, the?Subscriber?will send a signal upstream to the source and essentially?pull?the next chunk of data
  • Push?from producer to consumer from there on, within the boundary of the number of requested elements
  • For?Mono,?subscribe()?is the button that you press to say "I'm ready to receive my data". For Flux, this button is?request(n), which is kind of a generalization of the former.

    Realizing that?Mono?is a?Publisher?that will usually represent a costly task (in terms of IO, latency, etc.) is critical to understanding the value of backpressure here: if you don't subscribe, you don't pay the cost of that task. Since?Mono?will often be orchestrated in a reactive chain with regular backpressured?Flux, possibly combining results from multiple asynchronous sources, the availability of this on-demand subscribe triggering is key in order to avoid blocking.

    Having backpressure helps us differentiate that last use case from another?Mono?broad use case: asynchronously aggregating data from a?Flux?into a?Mono. Operators like?reduce?and?hasElementare capable of consuming each item in the?Flux, aggregating some form of data about it (respectively the result of a reduce function and a boolean) and exposing that data as a?Mono. In that case, the backpressure signalled upstream is?Long.MAX_VALUE, which lets the upstream work in a fully?push?fashion.

    Another interesting aspect of backpressure is how it naturally limits the amount of objects held in memory by the stream. As a?Publisher, the source of data is most probably slow (at least slowish) at producing items, so the request from downstream can very well start beyond the number of readily available items. In this case, the whole stream naturally falls into a push pattern where new items are notified to the consumer. But when there is a production peak and the pace of production accelerates, things fall nicely back into a pull model. In both cases, at most?N?data (the request()?amount) is kept in memory.

    You can reason about the memory used by your asynchronous processing by correlating that demand for?N?with the number of kilobytes an item consumes,?W: you can then infer that at mostW*N?memory will be consumed. In fact, Reactor will most of the time take advantage of knowing?Nto apply optimizations: creating queues bounded accordingly and applying prefetching strategies where it can automatically request 75% of N every time that same ? amount has been received.

    Finally, Reactor operators will sometimes change the backpressure signal to correlate it with the expectations and semantics they represent. One prime example of this behavior would bebuffer(10): for every request of?N?from downstream, that operator would request?10N?from upstream, which represents enough data to fill the number of buffers the subscriber is ready to consume. This is called "active backpressure", and it can be put to good use by developers in order to explicitly tell Reactor how to switch from an input volume to a different output volume, in micro-batching scenarios for instance.

    Relation to Spring

    Reactor is the reactive foundation for the whole Spring ecosystem, and most notably Spring 5 (through Spring Web Reactive) and Spring Data "Kay" (which corresponds to spring-data-commons 2.0).

    Having a reactive version for both of these projects is essential, in the sense that this enables us to write a web application that is reactive from start to finish: a request comes in, is asynchronously processed all the way down to and including the database, and results come back asynchronously as well. This allows a Spring application to be very efficient with resources, avoiding the usual pattern of dedicating a thread to a request and blocking it for I/O.

    So Reactor is going to be used for the internal reactive plumbing of future Spring applications, as well as in the APIs these various Spring components expose. More generally, they'll be able to deal with?RS Publishers, but most of the time these will happen to be?Flux/Mono, bringing in the rich feature set of Reactor. Of course, you will be able to use your reactive library of choice, as the framework provides hooks for? adapting between Reactor types and RxJava types or even simpler RS types.

    At the time of writing of this article, you can already experiment with Spring Web Reactive in Spring Boot by using Spring Boot?2.0.0.BUILD-SNAPSHOT?and the?spring-boot-starter-web-reactive?dependency (eg. by generating such a project on?start.spring.io):

    <dependency><groupId>org.springframework.boot.experimental</groupId><artifactId>spring-boot-starter-web-reactive</artifactId> </dependency>

    This lets you write your?@Controller?mostly as usual, but replaces the underlying Spring MVC traditional layer with a reactive one, replacing many of the Spring MVC contracts by reactive non-blocking ones. By default, this reactive layer is based on top of Tomcat 8.5, but you can also elect to use Undertow or Netty.

    Additionally, although Spring APIs are based on Reactor types, the Spring Web Reactive module lets you use various reactive types for both the request and response:

    • Mono<T>: as the?@RequestBody, the request entity?T?is asynchronously deserialized and you can chain your processing to the resulting mono afterward. As the return type, once the?Monoemits a value, the T is serialized asynchronously and sent back to the client. You can combine both approaches by augmenting the request Mono and returning that augmented chain as the resulting Mono.
    • Flux<T>: Used in streaming scenarios (including input streaming when used as?@RequestBodyand?Server Sent Events?with a?Flux<ServerSentEvent>?return type)
    • Single/Observable: Same as?Mono?and?Flux?respectively, but switching to an RxJava implementation.
    • Mono<Void>?as a return type: Request handling completes when the Mono completes.
    • Non-reactive return types (void?and?T): This now implies that your controller method is synchronous,?but should be non-blocking?(short-lived processing). The request handling finishes once the method is executed. The returned?T?is serialized back to the client asynchronously.

    Here is a quick example of a plain text @Controller using the experimental web reactive module:

    @RestController public class ExampleController {private final MyReactiveLibrary reactiveLibrary;//Note Spring Boot 4.3+ autowires single constructors nowpublic ExampleController(MyReactiveLibrary reactiveLibrary) {this.reactiveLibrary = reactiveLibrary;}@GetMapping("hello/{who}")public Mono<String> hello(@PathVariable String who) {return Mono.just(who).map(w -> "Hello " + w + "!");}@GetMapping("helloDelay/{who}")public Mono<String> helloDelay(@PathVariable String who) {return reactiveLibrary.withDelay("Hello " + who + "!!", 2);}@PostMapping("heyMister")public Flux<String> hey(@RequestBody Mono<Sir> body) {return Mono.just("Hey mister ").concatWith(body.flatMap(sir -> Flux.fromArray(sir.getLastName().split(""))).map(String::toUpperCase).take(1)).concatWith(Mono.just(". how are you?"));} }

    The first endpoint takes a path variable, transforms it into a?Mono<String>?and?maps?that name to a greeting sentence that is returned to the client.

    By doing a GET on?/hello/Simon?we get "Hello Simon!"?as a text/plain response.

    The second endpoint is a bit more complicated: it asynchronously receives a serialized?Sirinstance (a class simply made up of a?firstName?and?lastName?attributes) and?flatMaps?it into a stream of the last name's letters. It then takes the first of these letters,?maps?it to upper case andconcatenates it into a greeting sentence.

    So POSTing the following JSON object to?/heyMister

    {"firstName": "Paul","lastName": "tEsT" }

    Returns the string "Hello mister T. How are you?".

    The reactive aspect of Spring Data is also currently being developed in the Kay release train, which for?spring-data-commons?is the?2.0.x?branch. There is a?first Milestone out?that you can get by adding the Spring Data Kay-M1 bom to your pom:

    <dependencyManagement><dependencies><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-releasetrain</artifactId><version>Kay-M1</version><scope>import</scope><type>pom</type></dependency></dependencies> </dependencyManagement>

    Then for this simplistic example just add the Spring Data Commons dependency in your pom (it will take the version from the BOM above):

    <dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-commons</artifactId> </dependency>

    Reactive support in Spring Data revolves around the new?ReactiveCrudRepository<T, ID>interface, which extends?Repository<T, ID>. This interface exposes CRUD methods, using Reactor input and return types. There is also an RxJava 1 based version calledRxJava1CrudRepository. For instance, in the classical blocking?CrudRepository, retrieving one entity by its id would be done using "T findOne(ID id)". It becomes "Mono<T> findOne(ID id)" and "Observable<T> findOne(ID id)" in?ReactiveCrudRepository?and?RxJava1CrudRepositoryrespectively. There are even variants that take a Mono/Single as argument, to asynchronously provide the key and compose on that.

    Assuming a reactive backing store (or a mock?ReactiveCrudRepository?bean), the following (very naive) controller would be reactive from start to finish:

    @RestController public class DataExampleController {private final ReactiveCrudRepository<Sir, String> reactiveRepository;//Note Spring Boot 4.3+ autowires single constructors nowpublic DataExampleController(ReactiveCrudRepository<Sir, String> repo) {this.reactiveRepository = repo;}@GetMapping("data/{who}")public Mono<ResponseEntity<Sir>> hello(@PathVariable String who) {return reactiveRepository.findOne(who).map(ResponseEntity::ok).defaultIfEmpty(ResponseEntity.status(404).body(null));} }

    Notice how the data repository usage naturally flows into the response path: we asynchronously fetch the entity and wrap it as a?ResponseEntity?using?map, obtaining a?Mono?we can return right away. If the Spring Data repository cannot find data for this key, it will return an empty?Mono. We make that explicit by using?defaultIfEmpty?and returning a 404.

    Testing Reactor

    The article "Testing RxJava" covered techniques for testing an?Observable. As we saw, RxJava comes with a?TestScheduler?that you can use with operators that accept a?Scheduler?as a parameter, to manipulate a virtual clock on these operators. It also features a?TestSubscriberclass that can be leveraged to wait for the completion of an?Observable?and to make assertions about every event (number and values for?onNext, has?onError?triggered, etc.) In RxJava 2, theTestSubscriber?is an RS?Subscriber, so you can test Reactor's?Flux?and?Mono?with it!

    In Reactor, these two broad features are combined into the?StepVerifier?class. It can be found in the addon module?reactor-test?from the?reactor-addons?repository. The?StepVerifier?can be initialized by creating an instance from any?Publisher, using the?StepVerifier.create?builder. If you want to use virtual time, you can use the?StepVerifier.withVirtualTime?builder, which takes a?Supplier<Publisher>. The reason for this is that it will first ensure that aVirtualTimeScheduler?is created and enabled as the default Scheduler implementation to use, making the need to explicitly pass the scheduler to operators obsolete. The StepVerifier will then configure if necessary the?Flux/Mono?created within the Supplier, turning timed operators into "virtually timed operator". You can then script stream expectations and time progress: what the next elements should be, should there be an error, should it move forward in time, etc. Other methods include verifying that data matches a given?Predicate?or even consume onNext events, allowing you to do more advanced interactions with the value (like using an assertion library). Any?AssertionError?thrown by one of these will be reflected back in the final verification result. Finally, call?verify()?to check your expectations, this will truly subscribe to the defined source via?StepVerifier.create?or?StepVerifier.withVirtualTime.

    Let's take a few simple examples and demonstrate how?StepVerifier?works. For these snippets, you'll want to add the following test dependencies to your pom:

    <dependency><groupId>io.projectreactor.addons</groupId><artifactId>reactor-test</artifactId><version>3.0.3.RELEASE</version><scope>test</scope> </dependency><dependency><groupId>org.assertj</groupId><artifactId>assertj-core</artifactId><version>3.5.2</version><scope>test</scope> </dependency>

    First, imagine you have reactive class called?MyReactiveLibrary?that produces a few?Flux?that you want to test:

    @Component public class MyReactiveLibrary {public Flux<String> alphabet5(char from) {return Flux.range((int) from, 5).map(i -> "" + (char) i.intValue());}public Mono<String> withDelay(String value, int delaySeconds) {return Mono.just(value).delaySubscription(Duration.ofSeconds(delaySeconds));} }

    The first method is intended to return the 5 letters of the alphabet following (and including) the given starting letter. The second method returns a flux that emits a given value after a given delay, in seconds.

    The first test we'd like to write ensures that calling?alphabet5?from x limits the output to x, y, z. With?StepVerifier?it would go like this:

    @Test public void testAlphabet5LimitsToZ() {MyReactiveLibrary library = new MyReactiveLibrary();StepVerifier.create(library.alphabet5('x')).expectNext("x", "y", "z").expectComplete().verify(); }

    The second test we'd like to run on?alphabet5?is that every returned value is an alphabetical character. For that we'd like to use a rich assertion library like?AssertJ:

    @Test public void testAlphabet5LastItemIsAlphabeticalChar() {MyReactiveLibrary library = new MyReactiveLibrary();StepVerifier.create(library.alphabet5('x')).consumeNextWith(c -> assertThat(c).as("first is alphabetic").matches("[a-z]")).consumeNextWith(c -> assertThat(c).as("second is alphabetic").matches("[a-z]")).consumeNextWith(c -> assertThat(c).as("third is alphabetic").matches("[a-z]")).consumeNextWith(c -> assertThat(c).as("fourth is alphabetic").matches("[a-z]")).expectComplete().verify(); }

    Turns out both of these tests fail :(. Let's have a look at the output the?StepVerifier?gives us in each case to see if we can spot the bug:

    java.lang.AssertionError: expected: onComplete(); actual: onNext({)

    and

    java.lang.AssertionError: [fourth is alphabetic] Expecting:"{" to match pattern:"[a-z]"

    So it looks like our method doesn't stop at z but continues emitting characters from the ASCII range. We could fix that by adding a?.take(Math.min(5, 'z' - from + 1))?for instance, or using the same?Math.min?as the second argument to range.

    The last test we want to make involves virtual time manipulation: we'll test the delaying method but without actually waiting for the given amount of seconds, by using the?withVirtualTimebuilder:

    @Test public void testWithDelay() {MyReactiveLibrary library = new MyReactiveLibrary();Duration testDuration =StepVerifier.withVirtualTime(() -> library.withDelay("foo", 30)).expectSubscription().thenAwait(Duration.ofSeconds(10)).expectNoEvent(Duration.ofSeconds(10)).thenAwait(Duration.ofSeconds(10)).expectNext("foo").expectComplete().verify();System.out.println(testDuration.toMillis() + "ms"); }

    This tests a flux that would be delayed by 30 seconds for the following scenario: an immediate subscription, followed by 3x10s where nothing happens, then an onNext("foo") and completion.

    The?System.out?output prints the actual duration the verification took, which in my latest run was 8ms :)

    Note that when using the?create?builder instead, the?thenAwait?and?expectNoEvent?methods would still be available but would actually block for the provided duration.

    StepVerifier?comes with many more methods for describing expectations and asserting state of a?Publisher?(and if you think about new ones, contributions and feedback are always welcome in the?github repository).

    Custom Hot Source

    Note that the concept of hot and cold observables discussed at the end of "RxJava by Example" also applies to Reactor.

    If you want to create a custom Flux, instead of the RxJava?AsyncEmitter?class, you'd use Reactor's?FluxSink. This will cover all the asynchronous corner cases for you and let you focus on emitting your values.

    Use?Flux.create?and get a?FluxSink?in the callback that you can use to emit data via?next. This custom Flux can be cold, so in order to make it hot you can use publish() and connect(). Building on the example from the previous article with a feed of price ticks, we get an almost verbatim translation in Reactor:

    SomeFeed<PriceTick> feed = new SomeFeed<>(); Flux<PriceTick> flux =Flux.create(emitter ->{SomeListener listener = new SomeListener() {@Overridepublic void priceTick(PriceTick event) {emitter.next(event);if (event.isLast()) {emitter.complete();}}@Overridepublic void error(Throwable e) {emitter.error(e);}};feed.register(listener);}, FluxSink.OverflowStrategy.BUFFER);ConnectableFlux<PriceTick> hot = flux.publish();

    Before connecting to the hot Flux, why not subscribe?twice?? One subscription will print the detail of each tick while the other will only print the instrument:

    hot.subscribe(priceTick -> System.out.printf("%s %4s %6.2f%n", priceTick.getDate(), priceTick.getInstrument(), priceTick.getPrice()));hot.subscribe(priceTick -> System.out.println(priceTick.getInstrument()));

    We then connect to the hot flux and let it run for 5 seconds before our test snippet terminates:

    hot.connect(); Thread.sleep(5000);

    (note that in the example repository, the feed would also terminate on its own if the?isLast()method of?PriceTick?is changed).

    FluxSink?also lets you check if downstream has cancelled its subscription via?isCancelled(). You can also get feedback on the outstanding requested amount viarequestedFromDownstream(), which is useful if you want to simply comply with backpressure. Finally, you can make sure any specific resources your source uses are released uponCancellation?via?setCancellation.

    Note that there's a backpressure implication of using FluxSink: you must provide anOverflowStrategy?explicitly to let the operator deal with backpressure. This is equivalent to usingonBackpressureXXX?operators (eg.?FluxSink.OverflowStrategy.BUFFER?is equivalent to using.onBackpressureBuffer()), which kind of overrides any backpressure instructions from downstream.

    Conclusion

    In this article, you have learned about Reactor, a fourth-generation reactive library that builds on the Rx language but targets Java 8 and the Reactive Streams specification. We've shown how the concepts you might have learned in RxJava also apply to Reactor, despite a few API differences. We've also shown how Reactor serves as the foundation for Spring 5, and that it offers resources for testing a?Publisher/Flux/Mono.

    If you want to dig deeper into using Reactor, the snippets presented in this article are available in our?github repository. There is also a workshop, the "Lite Rx API hands-on", that covers more operators and use cases.

    Finally, you can reach the Reactor team on?Gitter?and provide feedback there or through?github issues?(and of course, pull-requests are welcomed as well).

    ?

    轉載于:https://www.cnblogs.com/davidwang456/p/6179079.html

    總結

    以上是生活随笔為你收集整理的Reactor by Example--转的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    欧美成人区| 国产精品九九久久99视频 | 91亚洲综合 | 国产午夜三级一二三区 | 日本在线观看中文字幕 | 韩国av一区二区三区在线观看 | 手机看片午夜 | 国产精品免费观看视频 | 免费在线看成人av | 91人人澡| 成年人免费观看在线视频 | 美女搞黄国产视频网站 | 91综合视频在线观看 | 五月网婷婷 | 一级黄色片在线播放 | 久久久免费毛片 | 伊人五月 | 四虎视频 | 国产一级在线免费观看 | 国产一区二区精品 | 日日操网站 | 中文乱码视频在线观看 | 超碰在线亚洲 | 国产免费观看av | 在线有码中文 | 日本精品中文字幕 | 日本精品在线 | 日韩视频中文字幕 | 五月婷婷中文网 | 丁香六月婷婷 | 日韩在线视频免费播放 | 婷婷亚洲综合五月天小说 | 丁香婷婷色月天 | 婷婷综合久久 | 伊人色播| 国产精品久久久久久一二三四五 | 国产a网站 | 在线黄色观看 | 中文字幕免费一区二区 | 日韩精品一区二区三区在线视频 | 国产一区二区中文字幕 | 久久无码av一区二区三区电影网 | 国产精品久久久久久麻豆一区 | 国产精品毛片一区二区在线 | 国产亚洲精品女人久久久久久 | 97成人在线免费视频 | 国产精品毛片一区二区 | 日韩在线观看中文字幕 | 日本在线观看一区二区三区 | 日韩二级毛片 | av电影免费| 日韩av免费在线看 | 狠狠色噜噜狠狠狠合久 | 永久免费精品视频 | 在线看v片 | 欧美日韩精品免费观看视频 | 亚洲天堂在线观看完整版 | 亚洲日韩中文字幕 | 美女中文字幕 | 午夜精品久久久99热福利 | 啪啪动态视频 | 亚洲激情六月 | 最新中文字幕在线观看视频 | 久久人人爽人人爽人人 | 国产高清av | 永久免费毛片 | 久久久精品二区 | 九九热.com| 国产a级片免费观看 | 欧美一级专区免费大片 | 欧美日韩国产综合一区二区 | 日本三级中文字幕在线观看 | www.黄色 | 亚洲精品国精品久久99热 | 国产成人黄色在线 | 乱子伦av| 99久久精品久久久久久清纯 | 91九色视频导航 | 色老板在线 | 午夜影院在线观看18 | 久久国产免费看 | 久久亚洲免费 | 在线色吧 | 欧美日韩精品在线观看 | 免费高清男女打扑克视频 | 中文字幕第一页在线视频 | 国产麻豆剧果冻传媒视频播放量 | 免费av小说 | 欧美永久视频 | 国产精品中文在线 | 天天曰天天爽 | 午夜久久福利影院 | 啪嗒啪嗒免费观看完整版 | 久久午夜电影院 | 午夜视频在线观看一区二区三区 | 久久综合婷婷国产二区高清 | 国产日韩在线播放 | 亚洲国产成人精品在线观看 | 福利电影久久 | 国产h片在线观看 | 久草电影免费在线观看 | 国产精品永久 | 一级黄色大片在线观看 | 奇米网网址 | 欧美日韩视频在线观看免费 | 精品中文字幕在线观看 | 国产精品福利视频 | 午夜视频在线观看欧美 | 日韩精品中文字幕在线不卡尤物 | 五月开心六月伊人色婷婷 | 午夜91视频 | 亚洲国产美女精品久久久久∴ | 高清中文字幕 | 欧美在线视频免费 | 亚洲精品国产第一综合99久久 | 欧美一区二区三区在线看 | 亚洲va欧美va国产va黑人 | 日本精品久久久久中文字幕 | 欧美 日韩 性 | 国产精品亚洲精品 | 免费视频97 | 亚洲成av人片在线观看 | 视频 天天草 | 五月情婷婷| av亚洲产国偷v产偷v自拍小说 | 99久久精品免费一区 | a在线视频v视频 | 国产美女主播精品一区二区三区 | 91激情视频在线播放 | 91久久久久久久一区二区 | 午夜在线免费观看视频 | 99视频国产精品免费观看 | 九九热精品视频在线播放 | 在线91av | a在线播放 | 国产视频97 | av高清在线观看 | 国产精品免费视频观看 | 在线亚洲午夜片av大片 | 这里只有精品视频在线 | 香蕉精品视频在线观看 | 日韩网站在线免费观看 | 国产亚洲小视频 | 国产美女精品人人做人人爽 | 免费一级毛毛片 | 久久久视屏 | 黄色av在 | 国产一级电影免费观看 | 国产精品欧美久久久久天天影视 | 在线观看免费黄色 | 中文字幕亚洲综合久久五月天色无吗'' | 精品久久综合 | 91精品国产麻豆国产自产影视 | 又色又爽又黄高潮的免费视频 | 黄色的网站免费看 | 一区二区三区免费在线观看视频 | 丁香九月婷婷综合 | 视频在线99 | 天堂久久电影网 | 久久视频一区二区 | 91久久国产露脸精品国产闺蜜 | 久久热亚洲 | 久久人人爽人人片 | 一区二区影院 | 中文视频在线 | 日韩高清成人 | 91自拍91 | 亚洲资源在线观看 | 午夜.dj高清免费观看视频 | 亚洲午夜精品久久久久久久久久久久 | 国产高清视频 | 精品亚洲欧美无人区乱码 | zzijzzij日本成熟少妇 | 久久精品之 | 久久国色夜色精品国产 | 国产精品大片在线观看 | 久久情爱| 久草新在线 | 欧美嫩草影院 | 91成人在线看 | 婷婷成人综合 | 久久中文欧美 | 在线观看精品黄av片免费 | 日韩在线观看一区二区 | 亚洲精品国产精品久久99热 | 亚洲最新av | 在线亚洲观看 | 国产99在线 | 少妇视频在线播放 | 久久久久久久久久国产精品 | 麻豆国产视频下载 | 午夜久久网站 | 国内精品久久久久影院男同志 | 成人国产一区二区 | 国产日韩高清在线 | 中文字幕在线观看第二页 | 免费看片网站91 | 国产91九色蝌蚪 | 怡红院av久久久久久久 | 黄视频网站大全 | 亚洲黄a | 中文字幕有码在线播放 | 国产日本在线观看 | 欧美日一级片 | 99久久www免费 | 久久久久久久久久久久久久电影 | 国产麻豆视频免费观看 | 久久综合色天天久久综合图片 | 在线观看精品黄av片免费 | 在线亚洲午夜片av大片 | 日韩午夜精品福利 | 最新av网址在线观看 | 国产成人福利在线观看 | 中文字幕中文字幕在线一区 | 一区二区三区三区在线 | 欧美性色综合网站 | 久久久福利 | 国产精品白丝av | 免费观看一级视频 | 精品一区二区影视 | 久久在线免费 | 99久在线精品99re8热视频 | 久久久久综合精品福利啪啪 | 成人资源网 | 日韩成人精品一区二区 | 国产99精品 | 在线a人片免费观看视频 | 一级a毛片高清视频 | 激情欧美一区二区三区 | 亚洲专区路线二 | 亚洲精品影院在线观看 | 日日夜夜国产 | 99在线看| 99精品一区二区三区 | avove黑丝| 在线观看www91| 久热免费在线观看 | 免费看v片网站 | 91av原创 | 日韩午夜精品福利 | 丁香网五月天 | 中文字幕在线观看第三页 | 国产免费观看久久黄 | 免费日韩一区二区三区 | 色99之美女主播在线视频 | 色 中文字幕 | 国产视频在线观看一区 | 中文字幕亚洲综合久久五月天色无吗'' | 成人久久18免费网站麻豆 | 亚洲香蕉视频 | 久久久久国产免费免费 | 日韩女同一区二区三区在线观看 | 五月天丁香视频 | 亚洲精品久久久蜜臀下载官网 | 国产精品女人久久久 | 成人一级片在线观看 | 九九热精品视频在线观看 | 黄色小视频在线观看免费 | 免费看精品久久片 | 欧美资源 | 女人高潮一级片 | 超碰免费97| 91在线观看高清 | 狠狠干在线| 婷婷色六月天 | 欧美精品999 | 九九九热精品免费视频观看 | 日本99热 | 天天干天天怕 | 成人av片在线观看 | 免费日韩 精品中文字幕视频在线 | 在线小视频 | 夜夜高潮夜夜爽国产伦精品 | 欧美日本一二三 | 色橹橹欧美在线观看视频高清 | 91在线视频播放 | 国产97在线观看 | 亚洲精品黄网站 | 国产精品久久电影网 | 欧美日韩国产色综合一二三四 | 欧美日韩裸体免费视频 | 亚洲视频资源在线 | 午夜精品福利一区二区三区蜜桃 | 99久久精品国产欧美主题曲 | 天天干天天干天天射 | 91丨九色丨国产在线 | 色婷婷激婷婷情综天天 | 久久另类视频 | a级成人毛片 | 成人黄色大片在线观看 | 免费在线黄色av | 日本在线h | 欧美国产日韩激情 | 国产精品成人一区二区 | 黄色在线视频网址 | 欧美精品乱码久久久久久按摩 | 在线观看av大片 | 久久最新视频 | 国产精品美女久久久 | 开心色插 | 亚洲爱爱视频 | 91试看| 国产一级电影在线 | 在线三级播放 | 韩国一区二区在线观看 | 激情综合色综合久久 | 西西444www | 国产精品短视频 | 精品黄色在线观看 | 婷婷六月丁| 午夜精品一区二区三区可下载 | 四虎亚洲精品 | 亚洲欧洲精品一区二区 | 一区在线观看 | 麻豆一区二区三区视频 | www在线免费观看 | 亚洲高清91 | 免费激情网 | 成人国产一区 | 97成人在线视频 | 色香蕉网| 国产精品入口66mio女同 | 91视频久久久久 | av在线播放不卡 | 国产小视频免费在线观看 | 99久久综合国产精品二区 | 久草久草在线 | 中文字幕a∨在线乱码免费看 | 中文字幕在线不卡国产视频 | 狠狠gao| 日韩av网站在线播放 | 精品9999| 免费97视频 | 五月天六月丁香 | 国产精品免费观看国产网曝瓜 | 三级av中文字幕 | 97精品国产91久久久久久 | 日韩特级黄色片 | 日韩欧美一区二区三区黑寡妇 | 六月婷操| 国产一区视频免费在线观看 | 日韩三级视频在线看 | 日日夜夜中文字幕 | 久久精彩| 精品a在线 | 久久高清国产视频 | 国产精品毛片一区视频 | 九九视频免费在线观看 | 不卡视频在线 | 国产糖心vlog在线观看 | 视频国产在线观看18 | 久久精品亚洲精品国产欧美 | 手机av永久免费 | 日韩成人精品在线观看 | 日韩在线观看小视频 | 国产又黄又硬又爽 | 最近中文字幕免费av | 欧美整片sss | 国产午夜精品免费一区二区三区视频 | 久久久久久久国产精品影院 | 99999精品 | 99免费观看视频 | 六月婷色 | 在线免费av网站 | 激情综合网色播五月 | 久久精品国产一区 | 日韩av二区 | 97免费视频在线 | 国产91小视频 | 亚洲在线色 | 婷婷 中文字幕 | 日韩在线在线 | 欧美性一级观看 | 国产精品久久久久久久久久久久冷 | 久久久久久久国产精品视频 | 欧美亚洲精品在线观看 | 免费国产在线视频 | 久久香蕉国产精品麻豆粉嫩av | 成人免费在线观看电影 | 999在线观看视频 | 亚洲免费永久精品国产 | 五月婷婷黄色网 | 欧美另类巨大 | 亚洲国产偷| 在线视频中文字幕一区 | 国产又粗又猛又黄又爽的视频 | 97在线视频免费观看 | 天堂网中文在线 | 最近日韩免费视频 | 91高清免费在线观看 | 久久久精品国产一区二区 | 91视频com| 久久99视频免费观看 | 中文字幕av网站 | 国产精品99免视看9 国产精品毛片一区视频 | 夜夜夜夜爽 | 久久99婷婷| 亚洲综合黄色 | 久久精品国产免费看久久精品 | 亚洲人视频在线 | 黄色一级大片在线观看 | 国内精品久久久久久久 | 国产精品18久久久久vr手机版特色 | 蜜臀久久99精品久久久无需会员 | 久久精品—区二区三区 | 国产精品毛片一区二区 | 色六月婷婷 | 天天射成人| 五月天国产 | 久久福利剧场 | 久久久色| 97精产国品一二三产区在线 | 国产精品免费观看在线 | 91视频中文字幕 | 国产精品一区在线 | 99精品视频在线播放免费 | 激情视频国产 | 亚洲国产中文在线观看 | 99r精品视频在线观看 | 色婷久久 | 久久99在线观看 | 国产五月婷婷 | 精品国产视频一区 | 日本特黄特色aaa大片免费 | 日韩一区正在播放 | 亚洲国产一二三 | 久久激情视频免费观看 | av在线免费在线 | 国产黄色视 | 久久久久久国产精品久久 | 欧美日韩伦理在线 | 欧美成年人在线观看 | 中文字幕人成一区 | 久久夜色精品国产欧美一区麻豆 | 国内精品久久久久久久久久清纯 | 97超碰人人网 | 久草在线手机视频 | 久久九九影院 | 国产大片免费久久 | 亚州国产精品 | 2021国产在线视频 | 国产精品区一区 | 欧美 日韩精品 | 九色免费视频 | 精品久久久久一区二区国产 | 色www.| 国产成人精品综合 | 一区二区三区四区不卡 | 人人干人人添 | 免费合欢视频成人app | 成人在线免费视频观看 | av中文字幕免费在线观看 | 精品一区二区免费视频 | 欧美在线视频免费 | 最近免费观看的电影完整版 | 国产免费黄色 | 欧美日韩电影在线播放 | 久草免费新视频 | 又污又黄网站 | 456成人精品影院 | 麻豆精品传媒视频 | a视频免费 | 国产视频精品久久 | 成人免费观看网站 | 在线播放视频一区 | 国产精品一区二区在线看 | 精品久久久久一区二区国产 | 免费成人av | 久久久亚洲电影 | 中文字幕在线看视频国产中文版 | 在线亚洲午夜片av大片 | 91污污 | 亚洲精品中文字幕在线观看 | 夜添久久精品亚洲国产精品 | 日韩av一区在线观看 | 久久人人97超碰com | 美女搞黄国产视频网站 | 不卡的一区二区三区 | 精品久久免费 | 久久久久久久久久久久亚洲 | 婷婷色亚洲 | 高清av中文字幕 | 中文字幕欧美日韩va免费视频 | 成人免费视频视频在线观看 免费 | 国产综合在线观看视频 | 欧美精品三级在线观看 | 丁香亚洲| 黄色1级大片 | 六月色婷婷 | 91自拍视频在线观看 | 午夜18视频在线观看 | 国产精品免费久久久久影院仙踪林 | 亚洲久草在线 | 日韩在线精品一区 | 一区在线观看 | 麻豆免费观看视频 | 激情丁香综合五月 | 国产手机在线观看视频 | 久久久久国产免费免费 | 欧美孕交vivoestv另类 | 四虎在线永久免费观看 | 少妇视频一区 | 在线观看成人国产 | 日韩最新理论电影 | www.狠狠插.com | 麻豆传媒电影在线观看 | 成人毛片久久 | 婷婷丁香久久五月婷婷 | 亚洲国产精品女人久久久 | 久久69精品| 在线观看黄色av | 91丨九色丨国产丨porny精品 | 中文字幕在线有码 | 国产在线欧美在线 | 成人午夜黄色 | 免费网站色 | 日韩午夜电影院 | 337p日本欧洲亚洲大胆裸体艺术 | 欧美激情综合五月色丁香 | h网站免费在线观看 | 国产精品网在线观看 | 婷婷丁香在线 | 黄色www在线观看 | 成x99人av在线www | 色综合网| 亚洲电影图片小说 | 999久久精品 | 成人免费一区二区三区在线观看 | 中文字幕第一页在线播放 | 国产麻豆视频免费观看 | 黄色免费观看 | 天天爽天天做 | 日韩精品一区二区久久 | 免费视频97 | 久草视频网 | 天天躁日日| 高清av在线 | 看全黄大色黄大片 | 亚洲精品中文字幕视频 | 黄网站污 | www视频免费在线观看 | 成人91在线 | 人人干,人人爽 | 免费看黄网站在线 | 99人久久精品视频最新地址 | 精品一区电影国产 | 免费看成年人 | 国产亚洲精品久 | 五月天婷亚洲天综合网鲁鲁鲁 | 国内精品久久久久久久影视麻豆 | 国产电影黄色av | 欧美狠狠色 | 欧美aaa大片 | 一区二区视频播放 | 色开心| 51久久成人国产精品麻豆 | 99视频+国产日韩欧美 | 亚洲激情 在线 | 91香蕉视频污在线 | 免费a视频在线观看 | 久久久久黄 | 成年人视频免费在线 | 特级xxxxx欧美 | 国产小视频在线观看 | 久久国产精品久久精品国产演员表 | 中文字幕色在线 | 91精品一区国产高清在线gif | 日韩av一区二区在线播放 | 亚洲人成人在线 | 成人宗合网 | 国内精品久久久久影院优 | 欧美在线久久 | 91精品一区国产高清在线gif | 国内精品视频一区二区三区八戒 | 人人看人人做人人澡 | 偷拍福利视频一区二区三区 | 国产日韩欧美视频在线观看 | 久久综合九色欧美综合狠狠 | 伊人国产在线观看 | 视频成人免费 | 最近高清中文字幕 | 欧美日韩视频免费 | 亚洲国产资源 | 中文字幕资源站 | 美女国产免费 | 国产中文字幕视频 | 一区二区精品 | 日韩欧美精品在线观看 | 国产精品久久久久久久久蜜臀 | 超碰在线日韩 | 超碰97久久 | www.久久久.com | 在线观看视频97 | 亚洲一二区视频 | 天天操天天射天天舔 | 色婷婷福利视频 | 国产精品久久久久一区二区国产 | 精品自拍网 | 亚洲毛片在线观看. | 美女网站色| 久久公开视频 | 成人黄色电影免费观看 | 99热这里只有精品1 av中文字幕日韩 | 一区视频在线 | 亚洲视频1 | 国产视频日韩视频欧美视频 | 黄色com | 国产精品毛片网 | 91重口视频 | 免费一级片在线观看 | 国产视频 亚洲视频 | 波多野结衣日韩 | 国产黄色大全 | 天天插日日操 | 国内丰满少妇猛烈精品播 | av电影一区 | 国产91探花 | 久久精品高清 | 国产丝袜在线 | 免费在线观看成人小视频 | 97碰碰视频| 国产精品私人影院 | 天天干天天操天天操 | 久久a级片| 亚洲天堂网在线观看视频 | 日韩精品视频在线观看免费 | 亚洲综合在线观看视频 | 亚州视频在线 | 天天看天天干 | 久久首页 | 成人av在线直播 | 中文字幕中文字幕在线中文字幕三区 | 成人高清在线观看 | 91社区国产高清 | 国产精品99久久免费黑人 | 波多野结衣一区二区 | 国产精品av电影 | 在线免费观看av网站 | 96视频在线 | 欧美一级淫片videoshd | 天天综合网久久 | 91精品在线麻豆 | 少妇bbw搡bbbb搡bbbb | 精品你懂的| 在线观看视频精品 | 最近免费中文视频 | 天天骚夜夜操 | 亚洲天天看| 久久嗨| 国产自偷自拍 | 亚洲乱码国产乱码精品天美传媒 | 91精品免费视频 | 国产精品久久99综合免费观看尤物 | 天天操天天艹 | 国产免费观看久久 | 久久理论影院 | 久草在线资源免费 | 久久久久久国产精品999 | 中文字幕丝袜制服 | 天天爱av导航 | 国产视频久久久 | 亚州日韩中文字幕 | 天天操天天操天天操天天操天天操天天操 | 中文字幕频道 | 97福利| 日韩免费观看一区二区 | 国产精品福利午夜在线观看 | 成人毛片a| 91精品国产一区 | 国产精品永久在线 | 久久免费成人精品视频 | 国产美女在线免费观看 | 成人黄大片 | 国产精品女人网站 | 国产精品不卡在线播放 | 国产91全国探花系列在线播放 | 欧美嫩草影院 | 久久香蕉国产精品麻豆粉嫩av | www免费黄色 | 久久精品国产一区二区 | 久久亚洲精品电影 | 丁香六月综合网 | 又黄又爽的视频在线观看网站 | 超碰97免费观看 | 日韩精品一区二区在线观看视频 | 一区 二区电影免费在线观看 | 欧美日韩国产在线观看 | 国产精品久久三 | 国产又粗又猛又色又黄网站 | 日日精品 | 成人久久影院 | 国产亚洲在线观看 | 国产精品一区二区三区在线 | 玖玖玖精品 | 欧洲亚洲女同hd | 视频国产精品 | 激情开心色 | 2019av在线视频 | 国产精品视频永久免费播放 | 欧美日韩视频观看 | 中文字幕在线播放av | 91av片| 欧美成人在线免费观看 | 色九九影院 | 91福利在线观看 | 欧美analxxxx | 在线视频 一区二区 | 久久久久久久久久影院 | 国产精品久久久久免费a∨ 欧美一级性生活片 | 亚洲综合激情 | 欧美日韩一区二区在线观看 | 亚洲影院国产 | 少妇bbb好爽 | 一区免费在线 | 日韩一区二区久久 | 中文字幕丝袜制服 | 菠萝菠萝在线精品视频 | 亚洲黄色免费在线看 | 国产精品成人久久 | 亚洲人成影院在线 | 91免费网站在线观看 | 久久激情小视频 | 国产啊v在线观看 | 黄色aa久久 | 亚洲艳情| av在线永久免费观看 | 日韩免费高清 | 97在线影院 | 成人免费网站在线观看 | 久久与婷婷| 日日夜夜精品免费视频 | 日日干天天操 | 很黄很污的视频网站 | 国产精品久久婷婷六月丁香 | 亚洲一区网 | 天天干天天干天天干 | 探花视频在线观看免费 | 色大片免费看 | 国产精品综合久久久久久 | 国内视频在线观看 | 国产中文字幕视频在线 | 亚洲精品乱码白浆高清久久久久久 | 中文字幕在线观看91 | 亚洲三级黄色 | 九七人人干 | 国产成人av福利 | 国产精品99久久久精品免费观看 | 久久久电影网站 | 日韩精品欧美一区 | 国产精品欧美日韩在线观看 | 国产亚洲aⅴaaaaaa毛片 | 欧美日韩国产色综合一二三四 | 在线a视频 | 国产成人精品一区在线 | 91视频久久久久久 | 国产视频一区二区三区在线 | 久久成人精品电影 | 91传媒在线 | 欧美 亚洲 另类 激情 另类 | 一区二区毛片 | 男女拍拍免费视频 | 国产精品爽爽久久久久久蜜臀 | 日韩欧美在线中文字幕 | 国产精品女主播一区二区三区 | 亚洲高清久久久 | 久草视频观看 | 免费高清男女打扑克视频 | 一级一片免费看 | 99久久久国产精品免费99 | 五月亚洲婷婷 | 婷婷在线观看视频 | 日韩免费二区 | 一区中文字幕在线观看 | 国产精品区一区 | 国产无吗一区二区三区在线欢 | 日韩欧美在线观看 | 色婷久久 | 天天干夜夜想 | 一本到在线 | 国产日韩一区在线 | 香蕉视频在线免费看 | 日韩小视频 | 视频国产一区二区三区 | 亚洲精选久久 | 国产成人精品福利 | 99免费视频 | 99免费在线播放99久久免费 | 97高清视频 | 国产一区欧美日韩 | av丁香花| 97香蕉久久超级碰碰高清版 | 狠狠操在线 | 精品一区中文字幕 | 8090yy亚洲精品久久 | 91色影院| 人人干人人超 | 欧美人交a欧美精品 | 日本久久综合网 | 日韩美一区二区三区 | 91精品免费视频 | 国产一区在线视频播放 | 深夜国产福利 | 免费网站黄色 | 欧美一级免费黄色片 | 亚洲一区精品二人人爽久久 | 五月婷婷综合在线观看 | 亚洲欧美999 | 日韩高清不卡一区二区三区 | 免费视频97 | 久久久久久免费网 | 日韩影片在线观看 | 国产成人区 | 久久久国产精品麻豆 | 最新高清无码专区 | av免费网| 欧美极品少妇xbxb性爽爽视频 | 日韩精品专区 | 中文乱码视频在线观看 | 久久99精品久久久久久三级 | 在线精品亚洲 | 久久欧美视频 | 狠狠色丁香| 黄色免费高清视频 | 久久这里有精品 | 成人黄在线观看 | 日韩免费二区 | 亚洲永久精品在线 | 高清免费在线视频 | 97在线精品视频 | 日韩精品视频网站 | 国产精品入口传媒 | 在线看一区 | 99精品视频在线免费观看 | 精品亚洲欧美一区 | av短片在线 | 久久久99精品免费观看app | 在线观看免费一级片 | 夜夜操天天操 | 伊人av综合| 国产一线二线三线性视频 | 久久免费资源 | 伊人久久精品久久亚洲一区 | 伊人网综合在线观看 | 欧美在线观看小视频 | 97偷拍在线视频 | 少妇高潮流白浆在线观看 | 国产高清小视频 | av在线8| av在线直接看 | 国产精品网站一区二区三区 | 欧美日韩在线观看不卡 | 国产精品免费在线视频 | 顶级bbw搡bbbb搡bbbb | 91成人在线网站 | 久久午夜电影院 | 亚洲 欧美 变态 国产 另类 | 四虎在线观看 | 精品国精品自拍自在线 | 亚洲女同ⅹxx女同tv | 999超碰| 日韩在线观看免费 | 久久天天躁 | 国产二区视频在线观看 | 丁香六月婷婷综合 | 在线观看免费黄视频 | 久久人人添人人爽添人人88v | 五月婷婷深开心 | 激情综合网五月激情 | 99热999| 久久99精品久久久久久 | 欧美大片aaa | 一区中文字幕在线观看 | 五月天综合色 | 亚洲国产精品小视频 | 国产午夜精品福利视频 | 欧美精品成人在线 | 亚洲欧洲xxxx | 久久久精品国产一区二区三区 | 伊人天天狠天天添日日拍 | 天天操天天操天天操天天操天天操天天操 | 丁香激情五月 | 久久精品精品电影网 | 青春草视频在线播放 | 中文字幕在线观看免费观看 | 最近中文字幕 | 欧美一区二区三区免费看 | 欧美另类交在线观看 | 91亚色在线观看 | 96视频在线| 嫩嫩影院理论片 | 丁香久久激情 | av字幕在线 | 丝袜美腿亚洲综合 | 成人全视频免费观看在线看 | 一二区av | 国产精品免费在线视频 | 国产99久久九九精品免费 | 久久精品国产第一区二区三区 | 欧美日韩国产一二三区 | 久久免费在线视频 | 人人狠狠综合久久亚洲婷 | 免费成人在线网站 | 一本—道久久a久久精品蜜桃 | 人人舔人人插 | 97超视频免费观看 | 欧美一级黄大片 | 一 级 黄 色 片免费看的 | 91看片在线观看 | 在线观看免费国产小视频 | 91视频观看免费 | 久久手机免费视频 | 国产精品美女在线 | 国产成人免费精品 | 99久久精品国产免费看不卡 | 久久视精品 | 九九九在线 | 欧美成人基地 | 成人h视频在线播放 | 日韩av视屏在线观看 | 丁香花在线视频观看免费 | 日本天天色| 丁香狠狠 | 国产黄色在线网站 | 欧美日在线观看 | 欧美日韩午夜 | 福利区在线观看 | 人人射| 国产高清永久免费 | 中文字幕电影高清在线观看 | 伊人亚洲精品 | 日韩av电影网站在线观看 | 欧美一级裸体视频 | 又黄又刺激的视频 | 精品国产福利在线 | 日韩免费成人av | 99在线视频免费观看 | 日日干精品 | 亚洲一区精品人人爽人人躁 | 久久草| 亚洲成人黄色av | 99视频99 | 日韩aa视频 | 日韩精品中文字幕在线观看 | 国产视频 久久久 | 成年人免费在线观看网站 | 国产精品第二十页 | 伊人久久精品久久亚洲一区 | 精品国产a | 国产最新视频在线 | 中文字幕在线观看一区二区三区 | 蜜桃视频成人在线观看 | 丝袜护士aⅴ在线白丝护士 天天综合精品 | 操操操操网 | 狠狠色丁香婷婷综合视频 | 日本黄色免费观看 | 日韩精品免费一区二区三区 | 国产区在线视频 | 18网站在线观看 | 日韩免费一区 | 久久久久一区二区三区四区 | 精品成人久久 | 国产成本人视频在线观看 | 日本中文字幕高清 | 91插插视频 | 精品国产免费一区二区三区五区 | 日韩欧美一区二区三区在线 | 永久免费的啪啪网站免费观看浪潮 | av在线网站大全 | 在线视频福利 | 亚洲资源视频 | 日韩欧美精品在线 | 久99久精品视频免费观看 | 中文字幕专区高清在线观看 | 亚洲区另类春色综合小说校园片 | 免费黄色网址大全 | 国产精品一区二区av影院萌芽 | 久草手机视频 | 免费a视频 | 日韩视频免费看 | 亚洲精品中文在线资源 | 99久久精品费精品 | 久久久久久高潮国产精品视 | 五月天婷婷在线视频 | 国产中文字幕91 | 午夜视频播放 | 欧美a级成人淫片免费看 | 欧美精品第一 | 日日夜夜天天射 | 国产高清久久久 | 日韩免费一区二区在线观看 | 成人a免费看 | 99久久久国产精品免费99 | 丝袜一区在线 | 欧美孕交vivoestv另类 | 久草免费电影 | 亚洲成人黄色网址 | 久久69精品 | 色综久久 |