日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > HTML >内容正文

HTML

springboot 使用webflux响应式开发教程(一)

發布時間:2025/3/20 HTML 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 springboot 使用webflux响应式开发教程(一) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

什么是webFlux

左側是傳統的基于Servlet的Spring Web MVC框架,右側是5.0版本新引入的基于Reactive Streams的Spring WebFlux框架,從上到下依次是Router Functions,WebFlux,Reactive Streams三個新組件。

Router Functions: 對標@Controller,@RequestMapping等標準的Spring MVC注解,提供一套函數式風格的API,用于創建Router,Handler和Filter。
WebFlux: 核心組件,協調上下游各個組件提供響應式編程支持。
Reactive Streams: 一種支持背壓(Backpressure)的異步數據流處理標準,主流實現有RxJava和Reactor,Spring WebFlux默認集成的是Reactor。
在Web容器的選擇上,Spring WebFlux既支持像Tomcat,Jetty這樣的的傳統容器(前提是支持Servlet 3.1 Non-Blocking IO API),又支持像Netty,Undertow那樣的異步容器。不管是何種容器,Spring WebFlux都會將其輸入輸出流適配成Flux<DataBuffer>格式,以便進行統一處理。

值得一提的是,除了新的Router Functions接口,Spring WebFlux同時支持使用老的Spring MVC注解聲明Reactive Controller。和傳統的MVC Controller不同,Reactive Controller操作的是非阻塞的ServerHttpRequest和ServerHttpResponse,而不再是Spring MVC里的HttpServletRequest和HttpServletResponse。

@GetMapping("/reactive/restaurants")public Flux<Restaurant> findAll() {return restaurantRepository.findAll();}

可以看到主要變化就是在 返回的類型上Flux<Restaurant>

Flux和Mono 是?Reactor?中的流數據類型,其中Flux會發送多次,Mono會發送0次或一次

使用webflux需要具備的基礎是Reactive programming 的理解。 Reactor 的基礎 和 熟練的java8 lambda使用

創建springboot應用
下面通過創建股票報價的demo來演示。

通過?https://start.spring.io?或idea自帶功能創建springboot項目,groupId為io.spring.workshop,artifactId為 stock-quotes。

?

勾選 ReactiveWeb

?

修改 application.properties 配置文件,指定接口 8081

server.port=8081

啟動應用,成功后控制臺輸出日志

?

日志顯示使用Netty而不是tomcat,后續會使用Tomcat

股票報價生成
定義實體

@Data public class Quote {private static final MathContext MATH_CONTEXT = new MathContext(2);private String ticker;private BigDecimal price;private Instant instant;public Quote() {}public Quote(String ticker, BigDecimal price) {this.ticker = ticker;this.price = price;}public Quote(String ticker, Double price) {this(ticker, new BigDecimal(price, MATH_CONTEXT));}@Overridepublic String toString() {return "Quote{" +"ticker='" + ticker + '\'' +", price=" + price +", instant=" + instant +'}';} }

定義生成器

@Component public class QuoteGenerator {private final MathContext mathContext = new MathContext(2);private final Random random = new Random();private final List<Quote> prices = new ArrayList<>();/*** 生成行情數據*/public QuoteGenerator() {this.prices.add(new Quote("CTXS", 82.26));this.prices.add(new Quote("DELL", 63.74));this.prices.add(new Quote("GOOG", 847.24));this.prices.add(new Quote("MSFT", 65.11));this.prices.add(new Quote("ORCL", 45.71));this.prices.add(new Quote("RHT", 84.29));this.prices.add(new Quote("VMW", 92.21));}public Flux<Quote> fetchQuoteStream(Duration period) {// 需要周期生成值并返回,使用 Flux.intervalreturn Flux.interval(period)// In case of back-pressure, drop events .onBackpressureDrop()// For each tick, generate a list of quotes.map(this::generateQuotes)// "flatten" that List<Quote> into a Flux<Quote>.flatMapIterable(quotes -> quotes).log("io.spring.workshop.stockquotes");}/*** Create quotes for all tickers at a single instant.*/private List<Quote> generateQuotes(long interval) {final Instant instant = Instant.now();return prices.stream().map(baseQuote -> {BigDecimal priceChange = baseQuote.getPrice().multiply(new BigDecimal(0.05 * this.random.nextDouble()), this.mathContext);Quote result = new Quote(baseQuote.getTicker(), baseQuote.getPrice().add(priceChange));result.setInstant(instant);return result;}).collect(Collectors.toList());} }

使用webflux創建web應用

webflux的使用有兩種方式,基于注解和函數式編程。這里使用函數式編程,先貼代碼:

創建QuoteHandler

@Component public class QuoteHandler {private final Flux<Quote> quoteStream;public QuoteHandler(QuoteGenerator quoteGenerator) {this.quoteStream = quoteGenerator.fetchQuoteStream(ofMillis(1000)).share();}public Mono<ServerResponse> hello(ServerRequest request) {return ok().contentType(TEXT_PLAIN).body(BodyInserters.fromObject("Hello Spring!"));}public Mono<ServerResponse> echo(ServerRequest request) {return ok().contentType(TEXT_PLAIN).body(request.bodyToMono(String.class), String.class);}public Mono<ServerResponse> streamQuotes(ServerRequest request) {return ok().contentType(APPLICATION_STREAM_JSON).body(this.quoteStream, Quote.class);}public Mono<ServerResponse> fetchQuotes(ServerRequest request) {int size = Integer.parseInt(request.queryParam("size").orElse("10"));return ok().contentType(APPLICATION_JSON).body(this.quoteStream.take(size), Quote.class);} }

創建Router

@Configuration public class QuoteRouter {@Beanpublic RouterFunction<ServerResponse> route(QuoteHandler quoteHandler) {return RouterFunctions.route(GET("/hello").and(accept(TEXT_PLAIN)), quoteHandler::hello).andRoute(POST("/echo").and(accept(TEXT_PLAIN).and(contentType(TEXT_PLAIN))), quoteHandler::echo).andRoute(GET("/quotes").and(accept(APPLICATION_JSON)), quoteHandler::fetchQuotes).andRoute(GET("/quotes").and(accept(APPLICATION_STREAM_JSON)), quoteHandler::streamQuotes);} }

需要注意的是在springboot中Handler和Router都需要打上@Configuration。

HTTP請求交由Router轉發給對應的Handler,Handler處理請求,并返回Mono<ServerResponse>,這里的Router類似@RequestMapping,Handler類似Controller。這么理解非常容易。

運行項目,瀏覽器輸入 http://localhost:8081/hello 或者 使用curl,即可收到 "Hello Spring!"的文本信息。

到目前為止,一個簡單的webflux示例已經完成,但是還沒有體現出它與傳統模式有何不同。

下面我們來做一下測試:

$ curl http://localhost:8081/echo -i -d "WebFlux workshop" -H "Content-Type: text/plain" HTTP/1.1 200 OK transfer-encoding: chunked Content-Type: text/plainWebFlux workshop

還是沒有區別T.T,看下一步。

$ curl http://localhost:8081/quotes -i -H "Accept: application/stream+json" HTTP/1.1 200 OK transfer-encoding: chunked Content-Type: application/stream+json{"ticker":"CTXS","price":82.77,"instant":"2018-05-15T06:45:51.261Z"} {"ticker":"DELL","price":64.83,"instant":"2018-05-15T06:45:51.261Z"} {"ticker":"GOOG","price":881,"instant":"2018-05-15T06:45:51.261Z"} {"ticker":"MSFT","price":67.3,"instant":"2018-05-15T06:45:51.261Z"} {"ticker":"ORCL","price":48.1,"instant":"2018-05-15T06:45:51.261Z"} {"ticker":"RHT","price":85.1,"instant":"2018-05-15T06:45:51.261Z"} {"ticker":"VMW","price":92.24,"instant":"2018-05-15T06:45:51.261Z"} -------------------------------無敵分割線------------------------------------- {"ticker":"CTXS","price":85.7,"instant":"2018-05-15T06:45:52.260Z"} {"ticker":"DELL","price":64.12,"instant":"2018-05-15T06:45:52.260Z"} {"ticker":"GOOG","price":879,"instant":"2018-05-15T06:45:52.260Z"} {"ticker":"MSFT","price":67.9,"instant":"2018-05-15T06:45:52.260Z"} {"ticker":"ORCL","price":46.43,"instant":"2018-05-15T06:45:52.260Z"} {"ticker":"RHT","price":86.8,"instant":"2018-05-15T06:45:52.260Z"} ...

上面的分割線是為了易于分辨人為加上去的,我們看到返回結果每隔一秒刷新一次,不終止的話會一直返回數據,傳統的Request/Response是一次請求,一次返回。

注意是設置了Header?Accept: application/stream+json?,

如果將Header設置為?Accept: application/json?,只會得到一次Response。

寫測試
springboot的test模塊包含WebTestClient,可以用來對webflux服務端進行測試。

@RunWith(SpringRunner.class) // We create a `@SpringBootTest`, starting an actual server on a `RANDOM_PORT` @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class StockQuotesApplicationTests {// Spring Boot will create a `WebTestClient` for you,// already configure and ready to issue requests against "localhost:RANDOM_PORT" @Autowiredprivate WebTestClient webTestClient;@Testpublic void fetchQuotes() {webTestClient// We then create a GET request to test an endpoint.get().uri("/quotes?size=20").accept(MediaType.APPLICATION_JSON).exchange()// and use the dedicated DSL to test assertions against the response .expectStatus().isOk().expectHeader().contentType(MediaType.APPLICATION_JSON).expectBodyList(Quote.class).hasSize(20)// Here we check that all Quotes have a positive price value.consumeWith(allQuotes ->assertThat(allQuotes.getResponseBody()).allSatisfy(quote -> assertThat(quote.getPrice()).isPositive()));}@Testpublic void fetchQuotesAsStream() {List<Quote> result = webTestClient// We then create a GET request to test an endpoint.get().uri("/quotes")// this time, accepting "application/stream+json" .accept(MediaType.APPLICATION_STREAM_JSON).exchange()// and use the dedicated DSL to test assertions against the response .expectStatus().isOk().expectHeader().contentType(MediaType.APPLICATION_STREAM_JSON).returnResult(Quote.class).getResponseBody().take(30).collectList().block();assertThat(result).allSatisfy(quote -> assertThat(quote.getPrice()).isPositive());} }

參考文章:

https://docs.spring.io/spring-framework/docs/5.0.3.RELEASE/spring-framework-reference/web.html#web-reactive-server-functional
http://projectreactor.io/docs
https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.html
https://blog.csdn.net/qq_34438958/article/details/78539234

總結

以上是生活随笔為你收集整理的springboot 使用webflux响应式开发教程(一)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。