flux mono 转_自语之Reactor中FluxMono的粗略使用
最近,需要快速使用Reactor的的兩個類Flux和Mono中的的方法進行開發。在搜索半天之后,發現大部分都是一些轉載的文章,筆者點了好幾個不同的網站,看到的卻是同一篇文章。
在此,筆者不講過多的原理,以實踐為主。
比如,講一些筆者使用過的Flux&Mono中方法,以及這些方法的使用場景…..。
Flux&Mono的生產:
Flux是Reactor中的多元流,一次可以產生(發射)多個元素(對象)。類似于Java中的集合類(List)。
Mono是Reactor單個元素,一次只能產生0或者一個元素(對象)。類似于Java中的pojo普通對象。
接下來看看, 如何創建Flux&和Mono:
關于Flux的創建方式:just + generate + empty
Flux flux1 = Flux.just(Maps.newHashMap()); // 集合創建
Flux flux2 = Flux.just(1, 2, 3, 4); // 包裝類創建
Flux flux3 = Flux.just("1", "2", "3"); // 字符串創建
Flux flux4 = Flux.empty(); // 返回一個空數據
// 使用generate方法創建
Flux.generate(create -> {
create.next("1"); // 每次只能接受一個值
create.next("2");
create.complete(); // 創建完成
});
復制代碼關于Mono的創建:just + justOrEmpty + empty
Mono mono1 = Mono.just(Maps.newHashMap()); // 集合創建
Mono mono2 = Mono.just(1); // 包裝類創建
Mono mono3 = Mono.just("1"); // 字符串創建
Mono.justOrEmpty(null); // 參數一個null或者非空的數據
Mono.justOrEmpty(1);
Mono.empty(); // 返回空數據
復制代碼
Flux&Mono的消費:
不管是Flux還是Mono,創建之后需要觸發消費邏輯,才能對產生的元素進行處理。
所以, 這里需要調用subscribe方法,才可以對產生的元素進行消費(處理)。
Flux flux2 = Flux.just(1, 2, 3, 4); // 包裝類創建
flux2.subscribe(System.out::println);
// 打印結果:1 2 3 4
Mono Mono1 = Mono.just(1); // 包裝類創建
Mono1.subscribe(System.out::println);
// 打印結果: 1
復制代碼通過查看消費方法(subscribe)的的方法的參數列表,可以看出還存在多種的消費方式:
Consumer:消費邏輯, 可以傳入系統處理函數, 可以傳遞自定義處理的Lambda表達式
errorConsumer:異常處理邏輯, 當產生異常的時候,需要傳入的處理邏輯。
completeConsumer:完成處理邏輯,也叫完成信號。
思維類比:整個Flux&Mono生產和消費過程, 有點類似Java中的try-catch-finally
try -----Flux&Mono中的組合操作(filter、filterMap、zipWith…)
catch -----errorConsumer異常處理邏輯。
finally ----completeConsumer, 完成信號。
消費函數subscribe的使用:
下面是reactor中subscribe方法的兩個重載。(當然還有些重載的subscribe方法,筆者也沒怎么用過,所以在此就不羅列出來了)
// 消費一
public final Disposable subscribe(Consumer super T> consumer);
// 消費二:
public final Disposable subscribe(
@Nullable Consumer super T> consumer,
@Nullable Consumer super Throwable> errorConsumer,
@Nullable Runnable completeConsumer);
復制代碼對于消費一的consumer, 我們可以傳入一個函數(System.out::println),也可以傳入一個Lamdba表達式。
消費一(上文)的consumer,傳入item -> {}進行消費, 效果如下:
Flux.range(1, 10) // 產生 [1, 10]的整數
// 過濾
.map(item -> {
// 偶數翻倍
if(item % 2 == 0) {
return item * 2;
}
return item;
})
// 消費
.subscribe(item -> {
System.out.print(item + " ");
});
//打印結果:1 4 3 8 5 12 7 16 9 20
復制代碼消費二多出errorConsumer, completeConsumer兩個參數, 都可以傳入landba表達式進行使用:
info -> log.error("這是一個異常, 信息為:", info):一般使用該方式進行記錄異常
() -> { doSomething….}:作為完成信號。
Flux.range(1, 10)
// 過濾
.map(item -> {
// 偶數翻倍
if(item % 2 == 0) {
return item * 2;
}
return item;
})
// 消費
.subscribe(item -> {
System.out.print(item + " ");
}, info -> {
System.out.print("異常處理");
}, () -> {
System.out.println("完成處理");
});
// 打印結果:
// 1 4 3 8 5 12 7 16 9 20 完成處理
復制代碼在上面的代碼中,如果發生異常怎么辦?在Flux&Mono中,異常處理&完成處理都屬于終止信號, 所以程序一旦出現異常,那么就執行異常處理, 并不會執行完成處理。如果未出現異常, 則只有執行完成處理。
Flux.range(1, 10)
// 過濾
.map(item -> {
// 偶數翻倍
if(item == 7) { // 到7進行異常拋出
throw new RuntimeException();
}
return item;
})
// 消費
.subscribe(item -> {
System.out.print(item + " ");
}, info -> {
System.out.print("異常");
}, () -> {
System.out.println("完成");
});
// 1 2 3 4 5 6 異常
復制代碼
組合處理:map&flatMap
兩者都可以對Flux&Mono中的元素進行映射。
區別:
map:傳入的表達式,可以返回一個普通的pojo對象。
flatMap:傳入的表達式,只能返回一個Mono&Flux對象。
String[] arr = {"張三", "22"};
List arr2 = Arrays.asList(arr);
Flux.just(arr2)
.flatMap(obj -> {
P person = new P();
// 返回mono對象
return Mono.just(person);
}).map(obj -> {
P person = new P();
// 返回普通對象
return person;
}).subscribe();
復制代碼
背壓的概念:
流水線的概念:
在Flux&Mono中, 組合處理如果存在返回值, 那么該返回值是往下游(下一個組合處理)傳遞。
Flux.range(1, 10)
.flatMap(A -> {
return B;
}).map(B -> {
return C;
}).subscribe(C -> {
dosomething...
});
// 上游flatMap 返回B
// map接受的參數, 為B, 并返回C
// subscribe接受的參數為C
// 從上往下, 可以看做是一個流水線, flatMap&map可以看做是流水線上的工人,subscribe則是生產出來的產品。
復制代碼
總結
以上是生活随笔為你收集整理的flux mono 转_自语之Reactor中FluxMono的粗略使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: UG基础知识学习视频目录整理(制图篇)
- 下一篇: 方向键按键转发,模仿笔记本Fn按键