Reactor by Example--轉

原文地址:https://www.infoq.com/articles/reactor-by-example

Key takeaways

  • Reactor is a reactive streams library targeting Java 8 and providing an Rx-conforming API
  • It uses the same approach and philosophy as RxJava despite some API differences
  • It is a 4th generation reactive library that allows operator fusion, like RxJava 2
  • Reactor is a core dependency in the reactive programming model support of Spring Framework 5.

RxJava recap

Reactor, like RxJava 2, is a?fourth generation?reactive library. It has been launched by Spring custodian Pivotal, and builds on the Reactive Streams specification, Java 8, and the ReactiveX vocabulary. Its design is the result of a savant mix fueled by designs and core contributors from Reactor 2 (the previous major version) and RxJava.

In previous articles in this series, "RxJava by Example"?and "Testing RxJava", you learned about the basics of reactive programming: how data is conceptualized as a stream, the Observable class and its various operators, the factory methods that create Observables from static and dynamic sources.

Observable is the push source and Observer is the simple interface for consuming this source via the act of subscribing. Keep in mind that the contract of an Observable is to notify its Observer of 0 or more data items through onNext, optionally followed by either an onError or onComplete terminating event.

To test an Observable, RxJava provides aTestSubscriber, which is a special flavor of Observer that allows you to assert events in your stream.

In this article we'll draw a parallel between Reactor and what you already learned about RxJava, and showcase the common elements as well as the differences.

Reactor's types

Reactor's two main types are the?Flux<T>?and?Mono<T>. A Flux is the equivalent of an RxJavaObservable, capable of emitting 0 or more items, and then optionally either completing or erroring.

A Mono on the other hand can emit?at most?once. It corresponds to both?Single?and?Maybetypes on the RxJava side. Thus an asynchronous task that just wants to signal completion can use a?Mono<Void>.

This simple distinction between two types makes things easy to grasp while providing meaningful semantics in a reactive API: by just looking at the returned reactive type, one can know if a method is more of a "fire-and-forget" or "request-response" (Mono) kind of thing or is really dealing with multiple data items as a stream (Flux).

Both Flux and Mono make use of this semantic by coercing to the relevant type when using some operators. For instance, calling?single()?on a?Flux<T>?will return a?Mono<T>, whereas concatenating two monos together using?concatWith?will produce a?Flux. Similarly, some operators will make no sense on a?Mono?(for example?take(n), which produces n > 1 results), whereas other operators will?only?make sense on a?Mono?(e.g.?or(otherMono)).

One aspect of the Reactor design philosophy is to keep the API lean, and this separation into two reactive types is a good middle ground between expressiveness and API surface.

"Build on Rx, with Reactive Streams at every stage"

As expressed in "RxJava by Example", RxJava bears some superficial resemblance to Java 8 Streams API, in terms of concepts. Reactor on the other hand looks a lot like RxJava, but this is of course in no way a coincidence. The intention is to provide a Reactive Streams native library that exposes an Rx-conforming operator API for asynchronous logic composition. So while Reactor is rooted in Reactive Streams, it seeks general API alignment with RxJava where possible.

Reactive Libraries and Reactive Streams adoption

Reactive Streams?(abbreviated RS in the remainder of this article) is "an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure". It is a set of textual specifications along with a TCK and four simple interfaces (Publisher,?Subscriber,Subscription?and?Processor), which will be integrated in Java 9.

It mainly deals with the concept of reactive-pull back-pressure (more on that later) and how to interoperate between several implementing reactive sources. It doesn't cover operators at all, focusing instead exclusively on the stream's lifecycle.

A key differentiator for Reactor is its RS first approach.Both?Flux?and?Mono?are?RS Publisherimplementations and conform to reactive-pull back-pressure.

In RxJava 1 only a subset of operators support back-pressure, and even though RxJava 1 has adapters to RS types, its?Observable?doesn't implement these types directly. That is easily explained by the fact that RxJava 1 predates the RS specification and served as one of the foundational works during the specification's design.

That means that each time you use these adapters you are left with a?Publisher, which again doesn't have any operator. In order to do anything useful from there, you'll probably want to go back to an?Observable, which means using yet another adapter. This visual clutter can be detrimental to readability, especially when an entire framework like Spring 5 directly builds on top of?Publisher.

Another difference with RxJava 1 to keep in mind when migrating to Reactor or RxJava 2 is that in the RS specification,?null?values are not authorized. It might turn out important if your code base uses?null?to signal some special cases.

RxJava 2 was developed after the Reactive Streams specification, and thus has a direct implementation of?Publisher?in its new?Flowable?type. But instead of focusing exclusively on RS types, RxJava 2 also keeps the "legacy" RxJava 1 types (Observable,?Completable, and?Single)? and introduces the "RxJava Optional",?Maybe. Although they still provide the semantic differentiation we talked about earlier, these types have the drawback of not implementing RS interfaces. Note that unlike in RxJava 1,?Observable?in RxJava 2 does not support the backpressure protocol in RxJava 2 (a feature now exclusively reserved to?Flowable). It has been kept for the purpose of providing a rich and fluent API for cases, such as user interface eventing, where backpressure is impractical or impossible.?Completable,?Single?and?Maybe?have by design no-need for backpressure support, they will offer a rich API as well and defer any workload until subscribed.

Reactor is once again leaner in this area, sporting its?Mono?and?Flux?types, both implementingPublisher?and both backpressure-ready. There's a relatively small overhead for?Mono?to behave as a?Publisher, but it is mostly offsetted by other?Mono?optimizations. We'll see in a later section what backpressure means for?Mono.

An API similar but not equal to RxJava's

The ReactiveX and RxJava vocabulary of operators can be overwhelming at times, and some operators can have confusing names for historical reasons. Reactor aims to have a more compact API and to deviate in some cases, e.g. in order to choose better names, but overall the two APIs look a lot alike. In fact the latest iterations in RxJava 2 actually borrow some vocabulary from Reactor as well, a hint of the ongoing close collaboration between the two projects. Some operators and concepts first appear in one library or the other, but often end up in both.

For instance,?Flux?has the same familiar?just?factory method (albeit having only two?justvariants: one element and a vararg). But?from, has been replaced by several explicit variants, most notable being?fromIterable. Flux also has all the usual suspects in term of operators:?map,merge,?concat,?flatMap,?take…, etc.

One example of an RxJava operator name that Reactor eschewed was the puzzling?amboperator, which has been replaced with the more appropriately named?firstEmitting. Additionally, to introduce greater consistency in the API,?toList?has been renamed?collectList. In fact all?collectXXX?operators now aggregate values into a specific type of collection but still produce a?Mono?of said collection, while?toXXX?methods are reserved for type conversions that take you out of the reactive world, eg.?toFuture().

One more mean by which Reactor can be leaner, this time in terms of class instantiation and resource usage, is?fusion: Reactor is capable of merging multiple sequential uses of certain operators (eg. calling?concatWith?twice) into a single use, only instantiating the operator's inner classes once (macro-fusion). That includes some data source based optimization which greatly helps?Mono?offset the cost of implementing?Publisher. It is also capable of sharing resources like inner queues between several compatible operators (micro-fusion). These capabilities make Reactor a fourth-generation reactive library. But that is a topic for a future article.

Let's take a closer look at a few Reactor operators. (You will notice the contrast with some of the examples in the earlier articles in our series.)

A few operator examples

(This section contains snippets of code, and we encourage you to try them and experiment further with Reactor. To that effect, you should open your IDE of choice and create a test project with Reactor as a dependency.)

To do so in Maven, add the following to the dependencies section of your pom.xml:

<dependency><groupId>io.projectreactor</groupId>	<artifactId>reactor-core</artifactId><version>3.0.3.RELEASE</version>
</dependency>

To do the same in Gradle, edit the dependencies section to add reactor, similarly to this:

dependencies {compile "io.projectreactor:reactor-core:3.0.3.RELEASE"
}

Let's play with examples used in the previous articles in this series!

Very similarly to how you would create your first?Observable?in RxJava, you can create a?Fluxusing the?just(T…)?and?fromIterable(Iterable<T>)?Reactor factory methods. Remember that given a?List,?just?would?just?emit the list as one whole, single emission, while?fromIterable?will emit each element?from?the?iterable?list:

public class ReactorSnippets {private static List<String> words = Arrays.asList("the","quick","brown","fox","jumped","over","the","lazy","dog");@Testpublic void simpleCreation() {Flux<String> fewWords = Flux.just("Hello", "World");Flux<String> manyWords = Flux.fromIterable(words);fewWords.subscribe(System.out::println);System.out.println();manyWords.subscribe(System.out::println);}
}

Like in the corresponding RxJava examples, this prints
Hello
World

the
quick
brown
fox
jumped
over
the
lazy
dog

In order to output the individual letters in the fox sentence we'll also need?flatMap?(as we did in RxJava by Example), but in Reactor we use?fromArray?instead of?from. We then want to filter out duplicate letters and sort them using?distinct?and?sort. Finally, we want to output an index for each distinct letter, which can be done using?zipWith?and?range:

@Test
public void findingMissingLetter() {Flux<String> manyLetters = Flux.fromIterable(words).flatMap(word -> Flux.fromArray(word.split(""))).distinct().sort().zipWith(Flux.range(1, Integer.MAX_VALUE),(string, count) -> String.format("%2d. %s", count, string));manyLetters.subscribe(System.out::println);
}

This helps us notice the?s?is missing as expected:

1. a
2. b
...
18. r
19. t
20. u
...
25. z

One way of fixing that is to correct the original words array, but we could also manually add the "s" value to the?Flux?of letters using?concat/concatWith?and a?Mono:

@Test
public void restoringMissingLetter() {Mono<String> missing = Mono.just("s");Flux<String> allLetters = Flux.fromIterable(words).flatMap(word -> Flux.fromArray(word.split(""))).concatWith(missing).distinct().sort().zipWith(Flux.range(1, Integer.MAX_VALUE),(string, count) -> String.format("%2d. %s", count, string));allLetters.subscribe(System.out::println);
}

This adds the missing?s?just before we filter out duplicates and sort/count the letters:

1. a
2. b
...
18. r
19. s
20. t
...
26. z

The previous article noted the resemblance between the Rx vocabulary and the Streams API, and in fact when the data is readily available from memory, Reactor, like Java Streams, acts in simple push mode (see the backpressure section below to understand why). More complex and truly asynchronous snippets wouldn't work with this pattern of just subscribing in the main thread, primarily because control would return to the main thread and then exit the application as soon as the subscription is done. For instance:

@Test
public void shortCircuit() {Flux<String> helloPauseWorld = Mono.just("Hello").concatWith(Mono.just("world").delaySubscriptionMillis(500));helloPauseWorld.subscribe(System.out::println);
}

This snippet prints "Hello", but fails to print the delayed "world" because the test terminates too early. In snippets and tests where you only sort of write a main class like this, you'll usually want to revert back to blocking behavior. To do that you could create a?CountDownLatch?and callcountDown?in your subscriber (both in?onError?and?onComplete). But then that's not very reactive, is it? (and what if you forget to count down, in case of error for instance?)

The second way you could solve that issue is by using one of the operators that revert back to the non-reactive world. Specifically,?toIterable?and?toStream?will both produce a blocking instance. So let's use?toStream?for our example:

@Test
public void blocks() {Flux<String> helloPauseWorld = Mono.just("Hello").concatWith(Mono.just("world").delaySubscriptionMillis(500));helloPauseWorld.toStream().forEach(System.out::println);
}

As you would expect, this prints "Hello" followed by a short pause, then prints "world" and terminates.

As we mentioned above, RxJava?amb()?operator has been renamed?firstEmitting?(which more clearly hints at the operator's purpose: selecting the first?Flux?to emit). In the following example, we create a?Mono?whose start is delayed by 450ms and a?Flux?that emits its values with a 400ms pause?before?each value. When?firstEmitting()?them together, since the first value from theFlux?comes in before the?Mono's value, it is the?Flux?that ends up being played:

@Test
public void firstEmitting() {Mono<String> a = Mono.just("oops I'm late").delaySubscriptionMillis(450);Flux<String> b = Flux.just("let's get", "the party", "started").delayMillis(400);Flux.firstEmitting(a, b).toIterable().forEach(System.out::println);
}

This prints each part of the sentence with a short 400ms pause between each section.

At this point you might wonder, what if you're writing a test for a Flux that introduces delays of 4000ms instead of 400? You don't want to wait 4s in a unit test! Fortunately, we'll see in a later section that Reactor comes with powerful testing facilities that nicely cover this case.

But for now, we have sampled how Reactor compares for a few common operators, so let's zoom back and have a look at other differentiating aspects of the library.

A Java 8 foundation

Reactor targets Java 8 rather than previous Java versions. This is once again aligning with the goal of reducing the API surface: RxJava targets Java 6 where there is no?java.util.functionpackage so classes like?Function?or?Consumer?can't be leveraged. Instead they had to add specific classes like?Func1,?Func2,?Action0,?Action1, etc. In RxJava 2 these classes mirrorjava.util.function?the way Reactor 2 used to do when it still had to support Java 7.

The Reactor API also embraces types introduced in Java 8. Most of the time-related operators will be about a duration (eg.?timeout,?interval,?delay, etc.), so using the Java 8?Duration classis appropriate.

The Java 8?Stream?API and?CompletableFuture?can also both be easily converted to a?Flux/Mono, and vice-versa. Should we usually convert a?Stream?to a?Flux?though? Not really. The level of indirection added by?Flux?or?Mono?is a negligible cost when they decorate more costly operations like IO or memory-bound operations, but most of the time a?Stream?doesn't imply that kind of latency and it is is perfectly ok to use the?Stream?API directly. Note that for these use cases in RxJava 2 we'd use the?Observable, as it is not backpressured and thus becomes a simple?pushuse case once you've subscribed. But Reactor is based on Java 8, and the Stream API is expressive enough for most use cases. Note also that even though you can find?Flux?and?Monofactories for literal or simple Objects, they mostly serve the purpose of being combined in higher level flows. So typically you wouldn't want to transform an accessor like "long getCount()" into a "Mono<Long> getCount()" when migrating an existing codebase to reactive patterns.

The Backpressure story

One of the main focuses (if not?the?main focus) of the RS specification and of Reactor itself isbackpressure. The idea of backpressure is that in a push scenario where the producer is quicker than the consumer, there's value in letting the consumer signal back to the producer and say "Hey! Slow down a little, I'm overwhelmed". This gives the producer a chance to control its pace rather than having to resort to discarding data (sampling) or worse, risking a cascading failure.

You may wonder at this point where backpressure comes into the picture with?Mono: what kind of consumer could possibly be overwhelmed by a single emission? Short answer is "probably none". However, there's still a key difference between how a?Mono?works and how aCompletableFuture?works. The latter is?push?only: if you have a reference to the?Future, it means the task processing an asynchronous result is already executing. On the other hand, what a backpressured?Flux?or?Mono?enables is a?deferred pull-push?interaction:

  1. Deferred?because nothing happens before the call to?subscribe()
  2. Pull?because at the subscription and request steps, the?Subscriber?will send a signal upstream to the source and essentially?pull?the next chunk of data
  3. Push?from producer to consumer from there on, within the boundary of the number of requested elements

For?Mono,?subscribe()?is the button that you press to say "I'm ready to receive my data". For Flux, this button is?request(n), which is kind of a generalization of the former.

Realizing that?Mono?is a?Publisher?that will usually represent a costly task (in terms of IO, latency, etc.) is critical to understanding the value of backpressure here: if you don't subscribe, you don't pay the cost of that task. Since?Mono?will often be orchestrated in a reactive chain with regular backpressured?Flux, possibly combining results from multiple asynchronous sources, the availability of this on-demand subscribe triggering is key in order to avoid blocking.

Having backpressure helps us differentiate that last use case from another?Mono?broad use case: asynchronously aggregating data from a?Flux?into a?Mono. Operators like?reduce?and?hasElementare capable of consuming each item in the?Flux, aggregating some form of data about it (respectively the result of a reduce function and a boolean) and exposing that data as a?Mono. In that case, the backpressure signalled upstream is?Long.MAX_VALUE, which lets the upstream work in a fully?push?fashion.

Another interesting aspect of backpressure is how it naturally limits the amount of objects held in memory by the stream. As a?Publisher, the source of data is most probably slow (at least slowish) at producing items, so the request from downstream can very well start beyond the number of readily available items. In this case, the whole stream naturally falls into a push pattern where new items are notified to the consumer. But when there is a production peak and the pace of production accelerates, things fall nicely back into a pull model. In both cases, at most?N?data (the request()?amount) is kept in memory.

You can reason about the memory used by your asynchronous processing by correlating that demand for?N?with the number of kilobytes an item consumes,?W: you can then infer that at mostW*N?memory will be consumed. In fact, Reactor will most of the time take advantage of knowing?Nto apply optimizations: creating queues bounded accordingly and applying prefetching strategies where it can automatically request 75% of N every time that same ? amount has been received.

Finally, Reactor operators will sometimes change the backpressure signal to correlate it with the expectations and semantics they represent. One prime example of this behavior would bebuffer(10): for every request of?N?from downstream, that operator would request?10N?from upstream, which represents enough data to fill the number of buffers the subscriber is ready to consume. This is called "active backpressure", and it can be put to good use by developers in order to explicitly tell Reactor how to switch from an input volume to a different output volume, in micro-batching scenarios for instance.

Relation to Spring

Reactor is the reactive foundation for the whole Spring ecosystem, and most notably Spring 5 (through Spring Web Reactive) and Spring Data "Kay" (which corresponds to spring-data-commons 2.0).

Having a reactive version for both of these projects is essential, in the sense that this enables us to write a web application that is reactive from start to finish: a request comes in, is asynchronously processed all the way down to and including the database, and results come back asynchronously as well. This allows a Spring application to be very efficient with resources, avoiding the usual pattern of dedicating a thread to a request and blocking it for I/O.

So Reactor is going to be used for the internal reactive plumbing of future Spring applications, as well as in the APIs these various Spring components expose. More generally, they'll be able to deal with?RS Publishers, but most of the time these will happen to be?Flux/Mono, bringing in the rich feature set of Reactor. Of course, you will be able to use your reactive library of choice, as the framework provides hooks for? adapting between Reactor types and RxJava types or even simpler RS types.

At the time of writing of this article, you can already experiment with Spring Web Reactive in Spring Boot by using Spring Boot?2.0.0.BUILD-SNAPSHOT?and the?spring-boot-starter-web-reactive?dependency (eg. by generating such a project on?start.spring.io):

<dependency><groupId>org.springframework.boot.experimental</groupId><artifactId>spring-boot-starter-web-reactive</artifactId>
</dependency>

This lets you write your?@Controller?mostly as usual, but replaces the underlying Spring MVC traditional layer with a reactive one, replacing many of the Spring MVC contracts by reactive non-blocking ones. By default, this reactive layer is based on top of Tomcat 8.5, but you can also elect to use Undertow or Netty.

Additionally, although Spring APIs are based on Reactor types, the Spring Web Reactive module lets you use various reactive types for both the request and response:

  • Mono<T>: as the?@RequestBody, the request entity?T?is asynchronously deserialized and you can chain your processing to the resulting mono afterward. As the return type, once the?Monoemits a value, the T is serialized asynchronously and sent back to the client. You can combine both approaches by augmenting the request Mono and returning that augmented chain as the resulting Mono.
  • Flux<T>: Used in streaming scenarios (including input streaming when used as?@RequestBodyand?Server Sent Events?with a?Flux<ServerSentEvent>?return type)
  • Single/Observable: Same as?Mono?and?Flux?respectively, but switching to an RxJava implementation.
  • Mono<Void>?as a return type: Request handling completes when the Mono completes.
  • Non-reactive return types (void?and?T): This now implies that your controller method is synchronous,?but should be non-blocking?(short-lived processing). The request handling finishes once the method is executed. The returned?T?is serialized back to the client asynchronously.

Here is a quick example of a plain text @Controller using the experimental web reactive module:

@RestController
public class ExampleController {private final MyReactiveLibrary reactiveLibrary;//Note Spring Boot 4.3+ autowires single constructors nowpublic ExampleController(MyReactiveLibrary reactiveLibrary) {this.reactiveLibrary = reactiveLibrary;}@GetMapping("hello/{who}")public Mono<String> hello(@PathVariable String who) {return Mono.just(who).map(w -> "Hello " + w + "!");}@GetMapping("helloDelay/{who}")public Mono<String> helloDelay(@PathVariable String who) {return reactiveLibrary.withDelay("Hello " + who + "!!", 2);}@PostMapping("heyMister")public Flux<String> hey(@RequestBody Mono<Sir> body) {return Mono.just("Hey mister ").concatWith(body.flatMap(sir -> Flux.fromArray(sir.getLastName().split(""))).map(String::toUpperCase).take(1)).concatWith(Mono.just(". how are you?"));}
}

The first endpoint takes a path variable, transforms it into a?Mono<String>?and?maps?that name to a greeting sentence that is returned to the client.

By doing a GET on?/hello/Simon?we get "Hello Simon!"?as a text/plain response.

The second endpoint is a bit more complicated: it asynchronously receives a serialized?Sirinstance (a class simply made up of a?firstName?and?lastName?attributes) and?flatMaps?it into a stream of the last name's letters. It then takes the first of these letters,?maps?it to upper case andconcatenates it into a greeting sentence.

So POSTing the following JSON object to?/heyMister

{"firstName": "Paul","lastName": "tEsT"
}

Returns the string "Hello mister T. How are you?".

The reactive aspect of Spring Data is also currently being developed in the Kay release train, which for?spring-data-commons?is the?2.0.x?branch. There is a?first Milestone out?that you can get by adding the Spring Data Kay-M1 bom to your pom:

<dependencyManagement><dependencies><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-releasetrain</artifactId><version>Kay-M1</version><scope>import</scope><type>pom</type></dependency></dependencies>
</dependencyManagement>

Then for this simplistic example just add the Spring Data Commons dependency in your pom (it will take the version from the BOM above):

<dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-commons</artifactId>
</dependency>

Reactive support in Spring Data revolves around the new?ReactiveCrudRepository<T, ID>interface, which extends?Repository<T, ID>. This interface exposes CRUD methods, using Reactor input and return types. There is also an RxJava 1 based version calledRxJava1CrudRepository. For instance, in the classical blocking?CrudRepository, retrieving one entity by its id would be done using "T findOne(ID id)". It becomes "Mono<T> findOne(ID id)" and "Observable<T> findOne(ID id)" in?ReactiveCrudRepository?and?RxJava1CrudRepositoryrespectively. There are even variants that take a Mono/Single as argument, to asynchronously provide the key and compose on that.

Assuming a reactive backing store (or a mock?ReactiveCrudRepository?bean), the following (very naive) controller would be reactive from start to finish:

@RestController
public class DataExampleController {private final ReactiveCrudRepository<Sir, String> reactiveRepository;//Note Spring Boot 4.3+ autowires single constructors nowpublic DataExampleController(ReactiveCrudRepository<Sir, String> repo) {this.reactiveRepository = repo;}@GetMapping("data/{who}")public Mono<ResponseEntity<Sir>> hello(@PathVariable String who) {return reactiveRepository.findOne(who).map(ResponseEntity::ok).defaultIfEmpty(ResponseEntity.status(404).body(null));}
}

Notice how the data repository usage naturally flows into the response path: we asynchronously fetch the entity and wrap it as a?ResponseEntity?using?map, obtaining a?Mono?we can return right away. If the Spring Data repository cannot find data for this key, it will return an empty?Mono. We make that explicit by using?defaultIfEmpty?and returning a 404.

Testing Reactor

The article "Testing RxJava" covered techniques for testing an?Observable. As we saw, RxJava comes with a?TestScheduler?that you can use with operators that accept a?Scheduler?as a parameter, to manipulate a virtual clock on these operators. It also features a?TestSubscriberclass that can be leveraged to wait for the completion of an?Observable?and to make assertions about every event (number and values for?onNext, has?onError?triggered, etc.) In RxJava 2, theTestSubscriber?is an RS?Subscriber, so you can test Reactor's?Flux?and?Mono?with it!

In Reactor, these two broad features are combined into the?StepVerifier?class. It can be found in the addon module?reactor-test?from the?reactor-addons?repository. The?StepVerifier?can be initialized by creating an instance from any?Publisher, using the?StepVerifier.create?builder. If you want to use virtual time, you can use the?StepVerifier.withVirtualTime?builder, which takes a?Supplier<Publisher>. The reason for this is that it will first ensure that aVirtualTimeScheduler?is created and enabled as the default Scheduler implementation to use, making the need to explicitly pass the scheduler to operators obsolete. The StepVerifier will then configure if necessary the?Flux/Mono?created within the Supplier, turning timed operators into "virtually timed operator". You can then script stream expectations and time progress: what the next elements should be, should there be an error, should it move forward in time, etc. Other methods include verifying that data matches a given?Predicate?or even consume onNext events, allowing you to do more advanced interactions with the value (like using an assertion library). Any?AssertionError?thrown by one of these will be reflected back in the final verification result. Finally, call?verify()?to check your expectations, this will truly subscribe to the defined source via?StepVerifier.create?or?StepVerifier.withVirtualTime.

Let's take a few simple examples and demonstrate how?StepVerifier?works. For these snippets, you'll want to add the following test dependencies to your pom:

<dependency><groupId>io.projectreactor.addons</groupId><artifactId>reactor-test</artifactId><version>3.0.3.RELEASE</version><scope>test</scope>
</dependency><dependency><groupId>org.assertj</groupId><artifactId>assertj-core</artifactId><version>3.5.2</version><scope>test</scope>
</dependency>

First, imagine you have reactive class called?MyReactiveLibrary?that produces a few?Flux?that you want to test:

@Component
public class MyReactiveLibrary {public Flux<String> alphabet5(char from) {return Flux.range((int) from, 5).map(i -> "" + (char) i.intValue());}public Mono<String> withDelay(String value, int delaySeconds) {return Mono.just(value).delaySubscription(Duration.ofSeconds(delaySeconds));}
}

The first method is intended to return the 5 letters of the alphabet following (and including) the given starting letter. The second method returns a flux that emits a given value after a given delay, in seconds.

The first test we'd like to write ensures that calling?alphabet5?from x limits the output to x, y, z. With?StepVerifier?it would go like this:

@Test
public void testAlphabet5LimitsToZ() {MyReactiveLibrary library = new MyReactiveLibrary();StepVerifier.create(library.alphabet5('x')).expectNext("x", "y", "z").expectComplete().verify();
}

The second test we'd like to run on?alphabet5?is that every returned value is an alphabetical character. For that we'd like to use a rich assertion library like?AssertJ:

@Test
public void testAlphabet5LastItemIsAlphabeticalChar() {MyReactiveLibrary library = new MyReactiveLibrary();StepVerifier.create(library.alphabet5('x')).consumeNextWith(c -> assertThat(c).as("first is alphabetic").matches("[a-z]")).consumeNextWith(c -> assertThat(c).as("second is alphabetic").matches("[a-z]")).consumeNextWith(c -> assertThat(c).as("third is alphabetic").matches("[a-z]")).consumeNextWith(c -> assertThat(c).as("fourth is alphabetic").matches("[a-z]")).expectComplete().verify();
}

Turns out both of these tests fail :(. Let's have a look at the output the?StepVerifier?gives us in each case to see if we can spot the bug:

java.lang.AssertionError: expected: onComplete(); actual: onNext({)

and

java.lang.AssertionError: [fourth is alphabetic] 
Expecting:"{"
to match pattern:"[a-z]"

So it looks like our method doesn't stop at z but continues emitting characters from the ASCII range. We could fix that by adding a?.take(Math.min(5, 'z' - from + 1))?for instance, or using the same?Math.min?as the second argument to range.

The last test we want to make involves virtual time manipulation: we'll test the delaying method but without actually waiting for the given amount of seconds, by using the?withVirtualTimebuilder:

@Test
public void testWithDelay() {MyReactiveLibrary library = new MyReactiveLibrary();Duration testDuration =StepVerifier.withVirtualTime(() -> library.withDelay("foo", 30)).expectSubscription().thenAwait(Duration.ofSeconds(10)).expectNoEvent(Duration.ofSeconds(10)).thenAwait(Duration.ofSeconds(10)).expectNext("foo").expectComplete().verify();System.out.println(testDuration.toMillis() + "ms");
}

This tests a flux that would be delayed by 30 seconds for the following scenario: an immediate subscription, followed by 3x10s where nothing happens, then an onNext("foo") and completion.

The?System.out?output prints the actual duration the verification took, which in my latest run was 8ms :)

Note that when using the?create?builder instead, the?thenAwait?and?expectNoEvent?methods would still be available but would actually block for the provided duration.

StepVerifier?comes with many more methods for describing expectations and asserting state of a?Publisher?(and if you think about new ones, contributions and feedback are always welcome in the?github repository).

Custom Hot Source

Note that the concept of hot and cold observables discussed at the end of "RxJava by Example" also applies to Reactor.

If you want to create a custom Flux, instead of the RxJava?AsyncEmitter?class, you'd use Reactor's?FluxSink. This will cover all the asynchronous corner cases for you and let you focus on emitting your values.

Use?Flux.create?and get a?FluxSink?in the callback that you can use to emit data via?next. This custom Flux can be cold, so in order to make it hot you can use publish() and connect(). Building on the example from the previous article with a feed of price ticks, we get an almost verbatim translation in Reactor:

SomeFeed<PriceTick> feed = new SomeFeed<>();
Flux<PriceTick> flux =Flux.create(emitter ->{SomeListener listener = new SomeListener() {@Overridepublic void priceTick(PriceTick event) {emitter.next(event);if (event.isLast()) {emitter.complete();}}@Overridepublic void error(Throwable e) {emitter.error(e);}};feed.register(listener);}, FluxSink.OverflowStrategy.BUFFER);ConnectableFlux<PriceTick> hot = flux.publish();

Before connecting to the hot Flux, why not subscribe?twice?? One subscription will print the detail of each tick while the other will only print the instrument:

hot.subscribe(priceTick -> System.out.printf("%s %4s %6.2f%n", priceTick.getDate(), priceTick.getInstrument(), priceTick.getPrice()));hot.subscribe(priceTick -> System.out.println(priceTick.getInstrument()));

We then connect to the hot flux and let it run for 5 seconds before our test snippet terminates:

hot.connect();
Thread.sleep(5000);

(note that in the example repository, the feed would also terminate on its own if the?isLast()method of?PriceTick?is changed).

FluxSink?also lets you check if downstream has cancelled its subscription via?isCancelled(). You can also get feedback on the outstanding requested amount viarequestedFromDownstream(), which is useful if you want to simply comply with backpressure. Finally, you can make sure any specific resources your source uses are released uponCancellation?via?setCancellation.

Note that there's a backpressure implication of using FluxSink: you must provide anOverflowStrategy?explicitly to let the operator deal with backpressure. This is equivalent to usingonBackpressureXXX?operators (eg.?FluxSink.OverflowStrategy.BUFFER?is equivalent to using.onBackpressureBuffer()), which kind of overrides any backpressure instructions from downstream.

Conclusion

In this article, you have learned about Reactor, a fourth-generation reactive library that builds on the Rx language but targets Java 8 and the Reactive Streams specification. We've shown how the concepts you might have learned in RxJava also apply to Reactor, despite a few API differences. We've also shown how Reactor serves as the foundation for Spring 5, and that it offers resources for testing a?Publisher/Flux/Mono.

If you want to dig deeper into using Reactor, the snippets presented in this article are available in our?github repository. There is also a workshop, the "Lite Rx API hands-on", that covers more operators and use cases.

Finally, you can reach the Reactor team on?Gitter?and provide feedback there or through?github issues?(and of course, pull-requests are welcomed as well).

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/542063.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/542063.shtml
英文地址,請注明出處:http://en.pswp.cn/news/542063.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

springboot項目后臺運行關閉_springboot項目在服務器上部署過程(新手教程)

環境&#xff1a;服務器系統&#xff1a;ubuntu16jdkmysql工具 xshell6下載地址&#xff1a;https://www.netsarang.com/download/down_form.html?code622&downloadType0&licenseType1xftp6下載地址&#xff1a;https://www.netsarang.com/download/down_form.html?c…

如何在React Native中使用文本輸入組件?

You know, an app becomes more authentic and professional when there is the interaction between the app and the user. 您知道&#xff0c;當應用程序與用戶之間存在交互時&#xff0c;該應用程序將變得更加真實和專業。 The text input component in react-native brin…

lvs負載均衡—NAT模式

NAT模式原理圖 Virtual Server via NAT &#xff1a; 用地址翻譯實現虛擬服務器,地址轉換器有能被外界訪問到的合法IP地址,它修改來自專有網絡的流出包的地址,外界看起來包是來自地址轉換器本身,當外界包送到轉換器時,它能判斷出應該將包送到內部網的哪個節點。 優點是節省IP …

Django1.9開發博客06- 模板繼承

模板繼承就是網站的多個頁面可以共享同一個頁面布局或者是頁面的某幾個部分的內容。通過這種方式你就需要在每個頁面復制粘貼同樣的代碼了。 如果你想改變頁面某個公共部分&#xff0c;你不需要每個頁面的去修改&#xff0c;只需要修改一個模板就行了&#xff0c;這樣最大化復用…

樂高機器人亮劍_2500名選手大比拼 全球機器人廣州從化“亮劍”

導讀&#xff1a;國際機器人從化總動員學生自己編程、拼裝的機器人既能像相撲手一樣摔跤&#xff0c;又能像蜘蛛俠一樣爬上爬下。還有智能垃圾處理系統&#xff0c;瞄準城市垃圾分類下的“痛點”。在2019RoboRAVE國際教育機器人大會全球總決賽的現場&#xff0c;只有想不到&…

python 編碼問題_Python電源挑戰| 競爭編碼問題

python 編碼問題Question: 題&#xff1a; A power function is that positive number that can be expressed as x^x i.e x raises to the power of x, where x is any positive number. You will be given an integer array A and you need to print if the elements of arr…

lvs負載均衡—高可用集群(keepalived)

基本概念&#xff1a; 什么是Keepalived呢&#xff0c;keepalived觀其名可知&#xff0c;保持存活&#xff0c;在網絡里面就是保持在線了&#xff0c;也就是所謂的高可用或熱備&#xff0c;用來防止單點故障(單點故障是指一旦某一點出現故障就會導致整個系統架構的不可用)的發…

定期定量采購_企業常見的六種采購策略

注冊職業采購經理CPPM考試網?www.apscppm.com對不起&#xff0c;我是采購合同生效的條件是什么&#xff1f;怎樣制定談判方案&#xff1f;如何在采購時讓供應商聽你的指揮&#xff01;沒做預算不能采購&#xff0c;應該作為企業采購管理的基本原則。編制現金預算就是要解決收入…

stringreader_Java StringReader markSupported()方法與示例

stringreaderStringReader類markSupported()方法 (StringReader Class markSupported() method) markSupported() method is available in java.io package. markSupported()方法在java.io包中可用。 markSupported() method is used to check whether this StringReader strea…

pacemaker+corosync實現集群管理

前言: 高可用集群&#xff0c;是指以減少服務中斷&#xff08;如因服務器宕機等引起的服務中斷&#xff09;時間為目的的服務器集群技術。簡單的說&#xff0c;集群就是一組計算機&#xff0c;它們作為一個整體向用戶提供一組網絡資源。這些單個的計算機系統就是集群的節點。 …

分頁導航的實現方法

這個導航是閱讀了精通css這本書后做的demo&#xff0c;感覺以前寫的真的是弱爆了 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Document</title><style type"text/css">ol.n…

更換mysql_這些被你忽視的MySQL細節,可能會讓你丟飯碗!

我們在 MySQL 入門篇主要介紹了基本的 SQL 命令、數據類型和函數&#xff0c;在具備以上知識后&#xff0c;你就可以進行 MySQL 的開發工作了&#xff0c;但是如果要成為一個合格的開發人員&#xff0c;你還要具備一些更高級的技能&#xff0c;下面我們就來探討一下 MySQL 都需…

Java RandomAccessFile skipBytes()方法與示例

RandomAccessFile類skipBytes()方法 (RandomAccessFile Class skipBytes() method) skipBytes() method is available in java.io package. skipBytes()方法在java.io包中可用。 skipBytes() method is used to skip the given number of bytes in this file and possibly set …

rhcs集群套件—紅帽6的高可用

含義及理解&#xff1a; RHCS是Red Hat Cluster Suite的縮寫&#xff0c;也就是紅帽子集群套件&#xff0c;RHCS是一個能夠提供高可用性、高可靠性、負載均衡、存儲共享且經濟廉價的集群工具集合&#xff0c;&#xff0c;它將集群系統中三大集群架構&#xff08;高可用性集群、…

MapReduce二次排序

2019獨角獸企業重金招聘Python工程師標準>>> 默認情況下&#xff0c;Map輸出的結果會對Key進行默認的排序&#xff0c;但是有時候需要對Key排序的同時還需要對Value進行排序&#xff0c;這時候就要用到二次排序了。下面我們來說說二次排序 1、二次排序原理 我們把二…

數據有序_詳解數據庫插入性能優化:合并+事務+有序數據進行INSERT操作

概述對于一些數據量較大的系統&#xff0c;數據庫面臨的問題除了查詢效率低下&#xff0c;還有就是數據入庫時間長。特別像報表系統&#xff0c;每天花費在數據導入上的時間可能會長達幾個小時或十幾個小時之久。因此&#xff0c;優化數據庫插入性能是很有意義的。其實最有效的…

Java ProcessBuilder environment()方法與示例

ProcessBuilder類的environment()方法 (ProcessBuilder Class environment() method) environment() method is available in java.lang package. environment()方法在java.lang包中可用。 environment() method is used to return Map interfaces of this process builder env…

容器內應用日志收集方案

容器化應用日志收集挑戰 應用日志的收集、分析和監控是日常運維工作重要的部分&#xff0c;妥善地處理應用日志收集往往是應用容器化重要的一個課題。 Docker處理日志的方法是通過docker engine捕捉每一個容器進程的STDOUT和STDERR&#xff0c;通過為contrainer制定不同log dri…

python統計行號_利用Python進行數據分析(第三篇上)

上一篇文章我記錄了自己在入門 Python 學習的一些基礎內容以及實際操作代碼時所碰到的一些問題。這篇我將會記錄我在學習和運用 Python 進行數據分析的過程&#xff1a;介紹 Numpy 和 Pandas 兩個包運用 Numpy 和 Pandas 分析一維、二維數據數據分析的基本過程實戰項目【用 Pyt…

lnmp架構搭建—源碼編譯(nginx、mysql、php)

含義及理解&#xff1a; LNMP LinuxNginxMysqlPHP&#xff1a;LNMP是指一組通常一起使用來運行動態網站或者服務器的自由軟件名稱首字母縮寫。L指Linux&#xff0c;N指Nginx&#xff0c;M一般指MySQL&#xff0c;也可以指MariaDB&#xff0c;P一般指PHP&#xff0c;也可以指P…