WebFlux基础之响应式编程
上篇文章,我們簡單的了解了WebFlux的一些基礎(chǔ)與背景,并通過示例來寫了一個demo。我們知道WebFlux是響應(yīng)式的web框架,其特點之一就是可以通過函數(shù)式編程方式配置route。另外究竟什么是響應(yīng)式編程呢?這篇文章我們就簡單探討一下
一、Java8中的函數(shù)式編程
百科中這樣定義函數(shù)式編程:
函數(shù)式編程是種編程方式,它將電腦運算視為函數(shù)的計算。函數(shù)編程語言最重要的基礎(chǔ)是λ演算(lambda calculus),而且λ演算的函數(shù)可以接受函數(shù)當(dāng)作輸入(參數(shù))和輸出(返回值)。那么在Java8里怎么樣來實現(xiàn)它呢?
示例一
在這里我先自己寫一個例子
定義接口:
package com.bdqn.lyrk.basic.java;/*** 函數(shù)式接口** @author chen.nie* @date 2018/7/18**/ @FunctionalInterface public interface OperateNumberFunctions {void operate(Integer number);default void print() {} }?
在定義的接口上添加@FunctionalInterface表明其是函數(shù)式接口,這個注解用于檢測函數(shù)式接口規(guī)范,定義函數(shù)式接口時該接口內(nèi)必須有且只有一個抽象的方法。
定義類:
package com.bdqn.lyrk.basic.java;import java.util.Optional; import java.util.function.Predicate;/*** 定義函數(shù)式編程類*/ public class NumberFunctions {private Integer number;private NumberFunctions() {}private static NumberFunctions numberFunctions = new NumberFunctions();public static NumberFunctions of(Integer number) {numberFunctions.number = number;return numberFunctions;}public NumberFunctions add(Integer number) {numberFunctions.number += number;return numberFunctions;}public NumberFunctions subtraction(Integer number) {numberFunctions.number -= number;return numberFunctions;}public Optional<NumberFunctions> filter(Predicate<Integer> predicate) {if (predicate.test(this.number)) return Optional.of(numberFunctions);return Optional.ofNullable(new NumberFunctions());}public void operate(OperateNumberFunctions functions) {functions.operate(this.number);} }?
在這里定義類進行簡單的運算與過濾條件。那么在Main方法里可以這么寫:
package com.bdqn.lyrk.basic.java;public class Main {public static void main(String[] args) {NumberFunctions.of(10).add(30).subtraction(2).filter(number -> number>20).get().operate(System.out::println);} }那么輸出結(jié)果為38
示例二
在Java8里有一個類叫Stream。Stream是數(shù)據(jù)流的意思,這個類略微有點像Reactor中Flux,它提供了類似于操作符的功能,我們來看一個例子:
Main方法
package com.bdqn.lyrk.basic.java;import java.util.stream.Stream;import static java.util.stream.Collectors.toList;public class Main {public static void main(String[] args) {/*在這里先將Stream里的內(nèi)容做乘2的操作然后在進行倒序排序緊接著過濾出是4的倍數(shù)的數(shù)字然后轉(zhuǎn)換成集合在打印*/Stream.of(15, 26, 34, 455, 5, 6).map(number -> number * 2).sorted((num1, num2) -> num2 - num1).filter(integer -> integer % 4 == 0).collect(toList()).forEach(System.out::println);} }運行得到的結(jié)果:
68 52 12關(guān)于::操作符
該操作符是lambda表達式的更特殊寫法,使用此操作符可以簡化函數(shù)式接口的實現(xiàn),這個方法至少滿足以下特定條件:
1)方法返回值與函數(shù)式接口相同
2)方法參數(shù)與函數(shù)式接口相同
舉例說明
package java.util.function;/*** Represents a supplier of results.** <p>There is no requirement that a new or distinct result be returned each* time the supplier is invoked.** <p>This is a <a href="package-summary.html">functional interface</a>* whose functional method is {@link #get()}.** @param <T> the type of results supplied by this supplier** @since 1.8*/ @FunctionalInterface public interface Supplier<T> {/*** Gets a result.** @return a result*/T get(); }java中Runnable接口:
@FunctionalInterface public interface Runnable {/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see java.lang.Thread#run()*/public abstract void run(); }java中的Predicate接口:
package java.util.function;import java.util.Objects;/*** Represents a predicate (boolean-valued function) of one argument.** <p>This is a <a href="package-summary.html">functional interface</a>* whose functional method is {@link #test(Object)}.** @param <T> the type of the input to the predicate** @since 1.8*/ @FunctionalInterface public interface Predicate<T> {/*** Evaluates this predicate on the given argument.** @param t the input argument* @return {@code true} if the input argument matches the predicate,* otherwise {@code false}*/boolean test(T t);/*** Returns a composed predicate that represents a short-circuiting logical* AND of this predicate and another. When evaluating the composed* predicate, if this predicate is {@code false}, then the {@code other}* predicate is not evaluated.** <p>Any exceptions thrown during evaluation of either predicate are relayed* to the caller; if evaluation of this predicate throws an exception, the* {@code other} predicate will not be evaluated.** @param other a predicate that will be logically-ANDed with this* predicate* @return a composed predicate that represents the short-circuiting logical* AND of this predicate and the {@code other} predicate* @throws NullPointerException if other is null*/default Predicate<T> and(Predicate<? super T> other) {Objects.requireNonNull(other);return (t) -> test(t) && other.test(t);}/*** Returns a predicate that represents the logical negation of this* predicate.** @return a predicate that represents the logical negation of this* predicate*/default Predicate<T> negate() {return (t) -> !test(t);}/*** Returns a composed predicate that represents a short-circuiting logical* OR of this predicate and another. When evaluating the composed* predicate, if this predicate is {@code true}, then the {@code other}* predicate is not evaluated.** <p>Any exceptions thrown during evaluation of either predicate are relayed* to the caller; if evaluation of this predicate throws an exception, the* {@code other} predicate will not be evaluated.** @param other a predicate that will be logically-ORed with this* predicate* @return a composed predicate that represents the short-circuiting logical* OR of this predicate and the {@code other} predicate* @throws NullPointerException if other is null*/default Predicate<T> or(Predicate<? super T> other) {Objects.requireNonNull(other);return (t) -> test(t) || other.test(t);}/*** Returns a predicate that tests if two arguments are equal according* to {@link Objects#equals(Object, Object)}.** @param <T> the type of arguments to the predicate* @param targetRef the object reference with which to compare for equality,* which may be {@code null}* @return a predicate that tests if two arguments are equal according* to {@link Objects#equals(Object, Object)}*/static <T> Predicate<T> isEqual(Object targetRef) {return (null == targetRef)? Objects::isNull: object -> targetRef.equals(object);} }那么上述的接口分別可以使用如下寫法,注意實現(xiàn)該接口的方法特點
package com.bdqn.lyrk.basic.java;import java.util.function.Predicate; import java.util.function.Supplier;public class Main {private static int i;public static void main(String[] args) {/*創(chuàng)建對象的方式*/Supplier<Object> supplier = Object::new;/*調(diào)用方法的方式(無參數(shù))*/Runnable runnable = Main::add;/*調(diào)用方法的方式(有參數(shù))*/Predicate<String> predicate = Main::filter;}public static void add() {i++;System.out.println("test" + i);}public static boolean filter(String test) {return test != null;} }我們可以看到使用函數(shù)式編程借助于lambda表達式,使得代碼更簡潔清爽?
?
二、Java中的響應(yīng)式編程
關(guān)于響應(yīng)式編程,百度百科是這么定義的:
簡稱RP(Reactive Programming)
響應(yīng)式編程是一種面向數(shù)據(jù)流和變化傳播的編程范式。這意味著可以在編程語言中很方便地表達靜態(tài)或動態(tài)的數(shù)據(jù)流,而相關(guān)的計算模型會自動將變化的值通過數(shù)據(jù)流進行傳播。 在這里有兩個關(guān)鍵詞:數(shù)據(jù)流與變化傳播。下面我們來通過代碼來演示下響應(yīng)式編程是怎么回事Java8及以前版本
最典型的示例就是,JDK提供的觀察者模式類Observer與Observalbe:
package com.hzgj.lyrk.demo;import java.util.Observable;public class ObserverDemo extends Observable {public static void main(String[] args) {ObserverDemo observable = new ObserverDemo();observable.addObserver((o, arg) -> {System.out.println("發(fā)生變化");});observable.addObserver((o, arg) -> {System.out.println("收到被觀察者通知,準備改變");});observable.setChanged();observable.notifyObservers();} }?
在上述代碼示例中觀察者并沒有及時執(zhí)行,而是在接受到被觀察者發(fā)送信號的時候才有了“響應(yīng)”。其中setChanged()與notifyObservers方法就對應(yīng)響應(yīng)式編程中定義的關(guān)鍵詞--變化與傳播。還有一個典型的示例就是Swing中的事件機制,有興趣的朋友可以下去查閱相關(guān)資料,在這里就不再進行闡述。
Java9及其后版本
從java9開始,Observer與Observable已經(jīng)被標記為過時的類了,取而代之的是Flow類。Flow才是真正意義上的響應(yīng)式編程類,因為觀察者Observer與Observable雖然能夠響應(yīng),但是在數(shù)據(jù)流的體現(xiàn)并不是特別突出。Flow這個類,我們可以先看一下:
public final class Flow {private Flow() {} // uninstantiable/*** A producer of items (and related control messages) received by* Subscribers. Each current {@link Subscriber} receives the same* items (via method {@code onNext}) in the same order, unless* drops or errors are encountered. If a Publisher encounters an* error that does not allow items to be issued to a Subscriber,* that Subscriber receives {@code onError}, and then receives no* further messages. Otherwise, when it is known that no further* messages will be issued to it, a subscriber receives {@code* onComplete}. Publishers ensure that Subscriber method* invocations for each subscription are strictly ordered in <a* href="package-summary.html#MemoryVisibility"><i>happens-before</i></a>* order.** <p>Publishers may vary in policy about whether drops (failures* to issue an item because of resource limitations) are treated* as unrecoverable errors. Publishers may also vary about* whether Subscribers receive items that were produced or* available before they subscribed.** @param <T> the published item type*/@FunctionalInterfacepublic static interface Publisher<T> {/*** Adds the given Subscriber if possible. If already* subscribed, or the attempt to subscribe fails due to policy* violations or errors, the Subscriber's {@code onError}* method is invoked with an {@link IllegalStateException}.* Otherwise, the Subscriber's {@code onSubscribe} method is* invoked with a new {@link Subscription}. Subscribers may* enable receiving items by invoking the {@code request}* method of this Subscription, and may unsubscribe by* invoking its {@code cancel} method.** @param subscriber the subscriber* @throws NullPointerException if subscriber is null*/public void subscribe(Subscriber<? super T> subscriber);}/*** A receiver of messages. The methods in this interface are* invoked in strict sequential order for each {@link* Subscription}.** @param <T> the subscribed item type*/public static interface Subscriber<T> {/*** Method invoked prior to invoking any other Subscriber* methods for the given Subscription. If this method throws* an exception, resulting behavior is not guaranteed, but may* cause the Subscription not to be established or to be cancelled.** <p>Typically, implementations of this method invoke {@code* subscription.request} to enable receiving items.** @param subscription a new subscription*/public void onSubscribe(Subscription subscription);/*** Method invoked with a Subscription's next item. If this* method throws an exception, resulting behavior is not* guaranteed, but may cause the Subscription to be cancelled.** @param item the item*/public void onNext(T item);/*** Method invoked upon an unrecoverable error encountered by a* Publisher or Subscription, after which no other Subscriber* methods are invoked by the Subscription. If this method* itself throws an exception, resulting behavior is* undefined.** @param throwable the exception*/public void onError(Throwable throwable);/*** Method invoked when it is known that no additional* Subscriber method invocations will occur for a Subscription* that is not already terminated by error, after which no* other Subscriber methods are invoked by the Subscription.* If this method throws an exception, resulting behavior is* undefined.*/public void onComplete();}/*** Message control linking a {@link Publisher} and {@link* Subscriber}. Subscribers receive items only when requested,* and may cancel at any time. The methods in this interface are* intended to be invoked only by their Subscribers; usages in* other contexts have undefined effects.*/public static interface Subscription {/*** Adds the given number {@code n} of items to the current* unfulfilled demand for this subscription. If {@code n} is* less than or equal to zero, the Subscriber will receive an* {@code onError} signal with an {@link* IllegalArgumentException} argument. Otherwise, the* Subscriber will receive up to {@code n} additional {@code* onNext} invocations (or fewer if terminated).** @param n the increment of demand; a value of {@code* Long.MAX_VALUE} may be considered as effectively unbounded*/public void request(long n);/*** Causes the Subscriber to (eventually) stop receiving* messages. Implementation is best-effort -- additional* messages may be received after invoking this method.* A cancelled subscription need not ever receive an* {@code onComplete} or {@code onError} signal.*/public void cancel();}/*** A component that acts as both a Subscriber and Publisher.** @param <T> the subscribed item type* @param <R> the published item type*/public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}static final int DEFAULT_BUFFER_SIZE = 256;/*** Returns a default value for Publisher or Subscriber buffering,* that may be used in the absence of other constraints.** @implNote* The current value returned is 256.** @return the buffer size value*/public static int defaultBufferSize() {return DEFAULT_BUFFER_SIZE;}}
Flow這個類里定義最基本的Publisher與Subscribe,該模式就是發(fā)布訂閱模式。我們來看一下代碼示例:
package com.hzgj.lyrk.demo;import java.util.concurrent.Flow;public class Main {public static void main(String[] args) {Flow.Publisher<String> publisher = subscriber -> {subscriber.onNext("1"); // 1subscriber.onNext("2");subscriber.onError(new RuntimeException("出錯")); // 2// subscriber.onComplete(); };publisher.subscribe(new Flow.Subscriber<>() {@Overridepublic void onSubscribe(Flow.Subscription subscription) {subscription.cancel();}@Overridepublic void onNext(String item) {System.out.println(item);}@Overridepublic void onError(Throwable throwable) {System.out.println("出錯了");}@Overridepublic void onComplete() {System.out.println("publish complete");}});} }? 代碼1 是一種數(shù)據(jù)流的體現(xiàn),在Publisher中每次調(diào)用onNext的時候,在中都會在Subscribe的onNext方法進行消費
代碼2 同樣是發(fā)送錯誤信號,等待訂閱者進行消費
運行結(jié)果:
1 2 出錯了在上述代碼中我們可以發(fā)現(xiàn):Publisher在沒有被訂閱的時候,是不會觸發(fā)任何行為的。每次調(diào)用Publisher的onNext方法的時候都像是在發(fā)信號,訂閱者收到信號時執(zhí)行相關(guān)內(nèi)容,這就是典型的響應(yīng)式編程的案例。不過java9提供的這個功能對異步的支持不太好,也不夠強大。因此才會出現(xiàn)Reactor與RxJava等響應(yīng)式框架
轉(zhuǎn)載于:https://www.cnblogs.com/niechen/p/9329191.html
總結(jié)
以上是生活随笔為你收集整理的WebFlux基础之响应式编程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微信小程序日期选择器
- 下一篇: 二、1、怎么做都好做,没flag就抓包