flux mono 转_自语之Reactor中FluxMono的粗略使用
最近,需要快速使用Reactor的的兩個(gè)類Flux和Mono中的的方法進(jìn)行開(kāi)發(fā)。在搜索半天之后,發(fā)現(xiàn)大部分都是一些轉(zhuǎn)載的文章,筆者點(diǎn)了好幾個(gè)不同的網(wǎng)站,看到的卻是同一篇文章。
在此,筆者不講過(guò)多的原理,以實(shí)踐為主。
比如,講一些筆者使用過(guò)的Flux&Mono中方法,以及這些方法的使用場(chǎng)景…..。
Flux&Mono的生產(chǎn):
Flux是Reactor中的多元流,一次可以產(chǎn)生(發(fā)射)多個(gè)元素(對(duì)象)。類似于Java中的集合類(List)。
Mono是Reactor單個(gè)元素,一次只能產(chǎn)生0或者一個(gè)元素(對(duì)象)。類似于Java中的pojo普通對(duì)象。
接下來(lái)看看, 如何創(chuàng)建Flux&和Mono:
關(guān)于Flux的創(chuàng)建方式:just + generate + empty
Flux flux1 = Flux.just(Maps.newHashMap()); // 集合創(chuàng)建
Flux flux2 = Flux.just(1, 2, 3, 4); // 包裝類創(chuàng)建
Flux flux3 = Flux.just("1", "2", "3"); // 字符串創(chuàng)建
Flux flux4 = Flux.empty(); // 返回一個(gè)空數(shù)據(jù)
// 使用generate方法創(chuàng)建
Flux.generate(create -> {
create.next("1"); // 每次只能接受一個(gè)值
create.next("2");
create.complete(); // 創(chuàng)建完成
});
復(fù)制代碼關(guān)于Mono的創(chuàng)建:just + justOrEmpty + empty
Mono mono1 = Mono.just(Maps.newHashMap()); // 集合創(chuàng)建
Mono mono2 = Mono.just(1); // 包裝類創(chuàng)建
Mono mono3 = Mono.just("1"); // 字符串創(chuàng)建
Mono.justOrEmpty(null); // 參數(shù)一個(gè)null或者非空的數(shù)據(jù)
Mono.justOrEmpty(1);
Mono.empty(); // 返回空數(shù)據(jù)
復(fù)制代碼
Flux&Mono的消費(fèi):
不管是Flux還是Mono,創(chuàng)建之后需要觸發(fā)消費(fèi)邏輯,才能對(duì)產(chǎn)生的元素進(jìn)行處理。
所以, 這里需要調(diào)用subscribe方法,才可以對(duì)產(chǎn)生的元素進(jìn)行消費(fèi)(處理)。
Flux flux2 = Flux.just(1, 2, 3, 4); // 包裝類創(chuàng)建
flux2.subscribe(System.out::println);
// 打印結(jié)果:1 2 3 4
Mono Mono1 = Mono.just(1); // 包裝類創(chuàng)建
Mono1.subscribe(System.out::println);
// 打印結(jié)果: 1
復(fù)制代碼通過(guò)查看消費(fèi)方法(subscribe)的的方法的參數(shù)列表,可以看出還存在多種的消費(fèi)方式:
Consumer:消費(fèi)邏輯, 可以傳入系統(tǒng)處理函數(shù), 可以傳遞自定義處理的Lambda表達(dá)式
errorConsumer:異常處理邏輯, 當(dāng)產(chǎn)生異常的時(shí)候,需要傳入的處理邏輯。
completeConsumer:完成處理邏輯,也叫完成信號(hào)。
思維類比:整個(gè)Flux&Mono生產(chǎn)和消費(fèi)過(guò)程, 有點(diǎn)類似Java中的try-catch-finally
try -----Flux&Mono中的組合操作(filter、filterMap、zipWith…)
catch -----errorConsumer異常處理邏輯。
finally ----completeConsumer, 完成信號(hào)。
消費(fèi)函數(shù)subscribe的使用:
下面是reactor中subscribe方法的兩個(gè)重載。(當(dāng)然還有些重載的subscribe方法,筆者也沒(méi)怎么用過(guò),所以在此就不羅列出來(lái)了)
// 消費(fèi)一
public final Disposable subscribe(Consumer super T> consumer);
// 消費(fèi)二:
public final Disposable subscribe(
@Nullable Consumer super T> consumer,
@Nullable Consumer super Throwable> errorConsumer,
@Nullable Runnable completeConsumer);
復(fù)制代碼對(duì)于消費(fèi)一的consumer, 我們可以傳入一個(gè)函數(shù)(System.out::println),也可以傳入一個(gè)Lamdba表達(dá)式。
消費(fèi)一(上文)的consumer,傳入item -> {}進(jìn)行消費(fèi), 效果如下:
Flux.range(1, 10) // 產(chǎn)生 [1, 10]的整數(shù)
// 過(guò)濾
.map(item -> {
// 偶數(shù)翻倍
if(item % 2 == 0) {
return item * 2;
}
return item;
})
// 消費(fèi)
.subscribe(item -> {
System.out.print(item + " ");
});
//打印結(jié)果:1 4 3 8 5 12 7 16 9 20
復(fù)制代碼消費(fèi)二多出errorConsumer, completeConsumer兩個(gè)參數(shù), 都可以傳入landba表達(dá)式進(jìn)行使用:
info -> log.error("這是一個(gè)異常, 信息為:", info):一般使用該方式進(jìn)行記錄異常
() -> { doSomething….}:作為完成信號(hào)。
Flux.range(1, 10)
// 過(guò)濾
.map(item -> {
// 偶數(shù)翻倍
if(item % 2 == 0) {
return item * 2;
}
return item;
})
// 消費(fèi)
.subscribe(item -> {
System.out.print(item + " ");
}, info -> {
System.out.print("異常處理");
}, () -> {
System.out.println("完成處理");
});
// 打印結(jié)果:
// 1 4 3 8 5 12 7 16 9 20 完成處理
復(fù)制代碼在上面的代碼中,如果發(fā)生異常怎么辦?在Flux&Mono中,異常處理&完成處理都屬于終止信號(hào), 所以程序一旦出現(xiàn)異常,那么就執(zhí)行異常處理, 并不會(huì)執(zhí)行完成處理。如果未出現(xiàn)異常, 則只有執(zhí)行完成處理。
Flux.range(1, 10)
// 過(guò)濾
.map(item -> {
// 偶數(shù)翻倍
if(item == 7) { // 到7進(jìn)行異常拋出
throw new RuntimeException();
}
return item;
})
// 消費(fèi)
.subscribe(item -> {
System.out.print(item + " ");
}, info -> {
System.out.print("異常");
}, () -> {
System.out.println("完成");
});
// 1 2 3 4 5 6 異常
復(fù)制代碼
組合處理:map&flatMap
兩者都可以對(duì)Flux&Mono中的元素進(jìn)行映射。
區(qū)別:
map:傳入的表達(dá)式,可以返回一個(gè)普通的pojo對(duì)象。
flatMap:傳入的表達(dá)式,只能返回一個(gè)Mono&Flux對(duì)象。
String[] arr = {"張三", "22"};
List arr2 = Arrays.asList(arr);
Flux.just(arr2)
.flatMap(obj -> {
P person = new P();
// 返回mono對(duì)象
return Mono.just(person);
}).map(obj -> {
P person = new P();
// 返回普通對(duì)象
return person;
}).subscribe();
復(fù)制代碼
背壓的概念:
流水線的概念:
在Flux&Mono中, 組合處理如果存在返回值, 那么該返回值是往下游(下一個(gè)組合處理)傳遞。
Flux.range(1, 10)
.flatMap(A -> {
return B;
}).map(B -> {
return C;
}).subscribe(C -> {
dosomething...
});
// 上游flatMap 返回B
// map接受的參數(shù), 為B, 并返回C
// subscribe接受的參數(shù)為C
// 從上往下, 可以看做是一個(gè)流水線, flatMap&map可以看做是流水線上的工人,subscribe則是生產(chǎn)出來(lái)的產(chǎn)品。
復(fù)制代碼
總結(jié)
以上是生活随笔為你收集整理的flux mono 转_自语之Reactor中FluxMono的粗略使用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: UG基础知识学习视频目录整理(制图篇)
- 下一篇: 方向键按键转发,模仿笔记本Fn按键