欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 维修 > webflux响应式编程

webflux响应式编程

2025/8/13 14:03:28 来源:https://blog.csdn.net/qq_16992475/article/details/146120508  浏览:    关键词:webflux响应式编程

webflux&webclient

尚硅谷SpringBoot响应式编程教程,最新springboot3入门到实战

响应式编程设计实战及SpringWebFlux源码剖析 - 拉勾

文章目录

  • 前置知识
    • 1、Lambda
    • 2、Function
    • 3、StreamAPI
      • 中间操作:Intermediate Operations
      • 终止操作:Terminal Operation
    • 4、Reactive-Stream
      • 为什么有Reactive-Stream规范
      • 消息传递是响应式核心
      • Reactive-Stream规范核心接口
        • API Components
        • 发布订阅写法
        • 响应式编程理解
  • Reactor
    • 1、快速上手
      • 介绍
      • 依赖
    • 2、响应式编程
      • 2.1. 阻塞是对资源的浪费
      • 2.2. 异步可以解决问题吗?
      • 2.3. 从命令式编程到响应式编程
        • 2.3.1. 可编排性与可读性
        • 2.3.2. 就像装配流水线
        • 2.3.3. 操作符(Operators)
        • 2.3.4. subscribe() 之前什么都不会发生
        • 2.3.5. 背压
        • 2.3.6. 热(Hot) vs 冷(Cold)
    • 3、核心特性
      • 1、Mono和Flux
        • ==响应式流:元素(内容) + 信号(完成/异常)==
        • 基本操作
      • 2、subscribe()
        • 自定义流的信号感知回调
        • 自定义消费者
      • 3、流的取消
        • **Disposable**
      • 4、BaseSubscriber与生命周期钩子
      • 5、背压和请求重塑
        • 1、buffer:缓冲
        • 2、limit:限流
      • 6、以编程方式创建序列-Sink
        • 1、同步环境-generate
        • 2、多线程-create
      • 7、 handle()
      • 8、自定义线程调度
      • 9、错误处理
        • 1. Catch and return a static default value.
        • 2. Catch and execute an alternative path with a fallback method.
        • 3. Catch and dynamically compute a fallback value.
        • 4. Catch, wrap to a BusinessException, and re-throw.
        • 5. Catch, log an error-specific message, and re-throw.
        • 6. Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.
        • 7. 忽略当前异常,仅通知记录,继续推进
        • 8.其它
      • 10、常用操作
        • filter
        • filterMap
        • concatMap
        • concat
        • concatWith
        • transform
        • defaultIfEmpty
        • merge
        • zip
      • 11、超时与重试
      • 12、Sinks工具类
        • 单播/多播/重放/背压
        • 缓存
      • 13、阻塞式api
        • block
      • 14、Context api
  • WebFlux
    • 0、组件对比
    • 1、WebFlux
      • 1、引入
      • 2、Reactor Core
        • 1、HttpHandler、HttpServer
      • 3、DispatcherHandler
        • 1、请求处理流程
      • 4、注解开发
        • 1、目标方法传参
        • 2、返回值写法
      • 5、文件上传
      • 6、错误处理
      • 7、RequestContext
      • 8、自定义Flux配置
        • WebFluxConfigurer
      • 9、Filter

image-20250222161307934

image-20250222161501414

前置知识

1、Lambda

Java8语法糖

package com.atguiggu.lambda;import java.util.*;
import java.util.function.*;
import java.util.stream.Collectors;/*** @author lfy* @Description* @create 2023-11-16 20:07*///函数式接口;只要是函数式接口就可以用Lambda表达式简化
//函数式接口: 接口中有且只有一个未实现的方法,这个接口就叫函数式接口interface MyInterface {int sum(int i, int j);
}interface MyHaha {int haha();default int heihei() {return 2;}; //默认实现
}interface My666 {void aaa(int i,int j,int k);
}@FunctionalInterface //检查注解,帮我们快速检查我们写的接口是否函数式接口
interface MyHehe {int hehe(int i);}//1、自己写实现类
class MyInterfaceImpl implements MyInterface {@Overridepublic int sum(int i, int j) {return i + j;}
}public class Lambda {public static void main(String[] args) {//声明一个函数BiConsumer<String,String> consumer = (a,b)->{System.out.println("哈哈:"+a+";呵呵:"+b);};consumer.accept("1","2");//声明一个函数Function<String,Integer> function = (String x) -> Integer.parseInt(x);System.out.println(function.apply("2"));Supplier<String> supplier = ()-> UUID.randomUUID().toString();String s = supplier.get();System.out.println(s);BiFunction<String,Integer,Long> biFunction = (a,b)-> 888L;Predicate<Integer> even = (t)-> t%2 ==0;//        even.test()//正向判断
//        even.negate().test(2) //反向判断System.out.println(even.negate().test(2));}public static void bbbbb(String[] args) {var names = new ArrayList<String>();names.add("Alice");names.add("Bob");names.add("Charlie");names.add("David");//比较器
//        Collections.sort(names, new Comparator<String>() {
//            @Override
//            public int compare(String o1, String o2) {
//                return o2.compareTo(o1);
//            }
//        });//直接写函数式接口就方便   (o1,o2)->o1.compareTo(o2)
//        Collections.sort(names,(o1,o2)->o1.compareTo(o2));System.out.println(names);// 类::方法; 引用类中的实例方法; 忽略lambda的完整写法Collections.sort(names,String::compareTo);System.out.println(names);new  Thread(new Runnable() {@Overridepublic void run() {System.out.println("哈哈啊");}}).start();Runnable runnable = () -> System.out.println("aaa");new Thread(runnable).start();//最佳实战://1、以后调用某个方法传入参数,这个参数实例是一个接口对象,且只定义了一个方法,就直接用lambda简化写法}/*** lambda简化函数式接口实例创建** @param args*/public static void aaaa(String[] args) {//1、自己创建实现类对象MyInterface myInterface = new MyInterfaceImpl();System.out.println(myInterface.sum(1, 2));//2、创建匿名实现类MyInterface myInterface1 = new MyInterface() {@Overridepublic int sum(int i, int j) {return i * i + j * j;}};
//        System.out.println(myInterface1.sum(2, 3));//冗余写法//3、lambda表达式:语法糖  参数列表  + 箭头 + 方法体MyInterface myInterface2 = (x, y) -> {return x * x + y * y;};System.out.println(myInterface2.sum(2, 3));//参数位置最少情况MyHaha myHaha = () -> {return 1;};MyHehe myHehe = y -> {return y * y;};MyHehe hehe2 = y -> y - 1;//完整写法如上://简化写法://1)、参数类型可以不写,只写(参数名),参数变量名随意定义;//    参数表最少可以只有一个 (),或者只有一个参数名;//2、方法体如果只有一句话,{} 可以省略MyHehe hehe3 = y -> y + 1;System.out.println(hehe3.hehe(7));//以上Lambda表达式简化了实例的创建。//总结:// 1、Lambda表达式: (参数表) -> {方法体}// 2、分辨出你的接口是否函数式接口。 函数式接口就可以lambda简化}}

2、Function

在Java中,函数式接口是只包含一个抽象方法的接口。它们是支持Lambda表达式的基础,因为Lambda表达式需要一个目标类型,这个目标类型必须是一个函数式接口。

函数式接口的出入参定义:

1、有入参,无出参【消费者】: function.accept

BiConsumer<String,String> function = (a,b)->{ //能接受两个入参System.out.println("哈哈:"+a+";呵呵:"+b);
};
function.accept("1","2");

2、有入参,有出参【多功能函数】: function.apply

Function<String,Integer> function = (String x) -> Integer.parseInt(x);
System.out.println(function.apply("2"));

3、无入参,无出参【普通函数】:

Runnable runnable = () -> System.out.println("aaa");new Thread(runnable).start();

4、无入参 ,有出参【提供者】: supplier.get()

Supplier<String> supplier = ()-> UUID.randomUUID().toString();
String s = supplier.get();
System.out.println(s);

java.util.function包下的所有function定义:

  • Consumer: 消费者
  • Supplier: 提供者
  • Predicate: 断言

get/test/apply/accept调用的函数方法;

位于java.util.function包下

image-20250222163309270

3、StreamAPI

最佳实战:以后凡是你写for循环处理数据的统一全部用StreamAPI进行替换;

Stream所有数据和操作被组合成流管道流管道组成:

  • 一个数据源(可以是一个数组、集合、生成器函数、I/O管道)
  • 零或多个中间操作(将一个流变形成另一个流)
  • 一个终止操作(产生最终结果)

中间操作:Intermediate Operations

  • filter:过滤; 挑出我们用的元素

  • map: 映射: 一一映射,a 变成 b

    • mapToInt、mapToLong、mapToDouble
  • flatMap:打散、散列、展开、扩维:一对多映射

filter、
map、mapToInt、mapToLong、mapToDouble
flatMap、flatMapToInt、flatMapToLong、flatMapToDouble
mapMulti、mapMultiToInt、mapMultiToLong、mapMultiToDouble、
parallel、unordered、onClose、sequential
distinct、sorted、peek、limit、skip、takeWhile、dropWhile、

终止操作:Terminal Operation

forEach、forEachOrdered、toArray、reduce、collect、toList、min、
max、count、anyMatch、allMatch、noneMatch、findFirst、findAny、iterator

image-20250222165413603

Stream.of(1, 2, 3).filter(e -> {System.out.println("e1 = " + e);return true;
}).filter(e -> {System.out.println("e2 = " + e);return true;
}).collect(Collectors.toList());输出是:
e1 = 1
e2 = 1
e1 = 2
e2 = 2
e1 = 3
e2 = 3

4、Reactive-Stream

Reactive Streams是JVM面向流的库的标准和规范(jdk9开始,有java.util.concurrent.Flow类)

1、处理可能无限数量的元素

2、有序

3、在组件之间异步传递元素

4、强制性非阻塞背压模式

推荐阅读:

  • jdk9 reactive响应式规范:https://www.reactive-streams.org/

  • 响应式宣言:https://www.reactivemanifesto.org/zh-CN

  • ReactiveStream: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md

2、响应式编程-Reactor核心.pptx

img

为什么有Reactive-Stream规范

目的:通过全异步的方式、加缓存区构建一个实时的数据流系统,

Kafka、MQ能构建出大型分布式的响应式系统。

缺本地化的消息系统解决方案:

  • 让所有的异步线程能互相监听消息,处理消息,构建实时消息处理流

消息传递是响应式核心

之前a调用b,必须b做完了事情,a才能接着做事情。现在响应式就是a先将b要做的事情放到缓冲区中,b监听这个缓冲区,从缓冲区中拿数据,去做事情,这样a就不用等待了。

引入一个缓存区,引入消息队列,就能实现全系统、全异步、不阻塞、不等待、实时响应

image-20250222203555264

Reactive-Stream规范核心接口

API Components

查看jdk9的java.util.concurrent.Flow类

image-20250222205430843

image-20250222205700217

image-20250222205718529

发布订阅写法
package com.atguigu.flow;import lombok.SneakyThrows;import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;/*** @author lfy* @Description* @create 2023-11-17 20:59*/
public class FlowDemo {//定义流中间操作处理器; 只用写订阅者的接口static class MyProcessor extends SubmissionPublisher<String>  implements Flow.Processor<String,String> {private Flow.Subscription subscription; //保存绑定关系@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("processor订阅绑定完成");this.subscription = subscription;subscription.request(1); //找上游要一个数据}@Override //数据到达,触发这个回调public void onNext(String item) {System.out.println("processor拿到数据:"+item);//再加工item += ":哈哈";submit(item);//把我加工后的数据发出去subscription.request(1); //再要新数据}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}}/*** 1、Publisher:发布者* 2、Subscriber:订阅者* 3、Subscription: 订阅关系* 4、Processor: 处理器* @param args*///发布订阅模型:观察者模式,public static void main(String[] args) throws InterruptedException {//1、定义一个发布者; 发布数据;SubmissionPublisher<String> publisher = new SubmissionPublisher<>();//2、定一个中间操作:  给每个元素加个 哈哈 前缀MyProcessor myProcessor1 = new MyProcessor();MyProcessor myProcessor2 = new MyProcessor();MyProcessor myProcessor3 = new MyProcessor();//3、定义一个订阅者; 订阅者感兴趣发布者的数据;Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {private Flow.Subscription subscription;@Override //在订阅时  onXxxx:在xxx事件发生时,执行这个回调public void onSubscribe(Flow.Subscription subscription) {System.out.println(Thread.currentThread()+"订阅开始了:"+subscription);this.subscription = subscription;//从上游请求一个数据subscription.request(1);}@Override //在下一个元素到达时; 执行这个回调;   接受到新数据public void onNext(String item) {System.out.println(Thread.currentThread()+"订阅者,接受到数据:"+item);if(item.equals("p-7")){subscription.cancel(); //取消订阅}else {subscription.request(1);}}@Override //在错误发生时,public void onError(Throwable throwable) {System.out.println(Thread.currentThread()+"订阅者,接受到错误信号:"+throwable);}@Override //在完成时public void onComplete() {System.out.println(Thread.currentThread()+"订阅者,接受到完成信号:");}};//4、绑定发布者和订阅者publisher.subscribe(myProcessor1); //此时处理器相当于订阅者myProcessor1.subscribe(myProcessor2); //此时处理器相当于发布者myProcessor2.subscribe(myProcessor3);myProcessor3.subscribe(subscriber);  //链表关系绑定出责任链。//绑定操作;就是发布者,记住了所有订阅者都有谁,有数据后,给所有订阅者把数据推送过去。//        publisher.subscribe(subscriber);for (int i = 0; i < 10; i++) {//发布10条数据if(i == 5){
//                publisher.closeExceptionally(new RuntimeException("5555"));}else {publisher.submit("p-"+i);}//publisher发布的所有数据在它的buffer区;//中断
//            publisher.closeExceptionally();}//ReactiveStream//jvm底层对于整个发布订阅关系做好了 异步+缓存区处理 = 响应式系统;// 我们只需要编排流处理环节,其它的交给jvm完成(比如:异步就又jvm自己干,我们不用关心)//发布者通道关闭publisher.close();//        publisher.subscribe(subscriber2);//发布者有数据,订阅者就会拿到Thread.sleep(20000);}
}
响应式编程理解

image-20250222215133725

使用少量资源处理大量并发的一种解决方案。

Reactor

projectreactor官网

image-20250222220814257

image-20250222224054001

image-20250222223757272

1、快速上手

介绍

Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,具备高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于[N]个元素)Mono(用于 [0|1]个元素),并完全遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)。

Reactor 的 reactor-ipc 组件还支持非阻塞的进程间通信(inter-process communication, IPC)。 Reactor IPC 为 HTTP(包括 Websockets)、TCP 和 UDP 提供了支持背压的网络引擎,从而适合 应用于微服务架构。并且完整支持响应式编解码(reactive encoding and decoding)。

依赖

<dependencyManagement> <dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2023.0.0</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
<dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId> </dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId> <scope>test</scope></dependency>
</dependencies>

2、响应式编程

响应式编程是一种关注于数据流(data streams)变化传递(propagation of change)异步编程方式。 这意味着它可以用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。

了解历史:

  • 在响应式编程方面,微软跨出了第一步,它在 .NET 生态中创建了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在JVM上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式 编程规范,它定义了一系列标准接口和交互规范。并整合到 Java 9 中(使用 Flow 类)。
  • 响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 因为其中也有 Iterable-Iterator 这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。
  • 使用 iterator 是一种“命令式”(imperative)编程范式,即使访问元素的方法是 Iterable 的唯一职责。关键在于,什么时候执行 next() 获取元素取决于开发者。在响应式流中,相对应的 角色是 Publisher-Subscriber,但是 当有新的值到来的时候 ,却反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键。此外,对推送来的数据的操作 是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过 描述“控制流程”来定义对数据流的处理逻辑。
  • 除了数据推送,对错误处理(error handling)和完成(completion)信号的定义也很完善。 一个 Publisher 可以推送新的值到它的 Subscriber(调用 onNext 方法), 同样也可以推送错误(调用 onError 方法)和完成(调用 onComplete 方法)信号。 错误和完成信号都可以终止响应式流。可以用下边的表达式描述:
onNext x 0..N [onError | onComplete]

2.1. 阻塞是对资源的浪费

现代应用需要应对大量的并发用户,而且即使现代硬件的处理能力飞速发展,软件性能仍然是关键因素

广义来说我们有两种思路来提升程序性能:

  1. 并行化(parallelize) :使用更多的线程和硬件资源。[异步]
  2. 基于现有的资源来 提高执行效率

通常,Java开发者使用阻塞式(blocking)编写代码。这没有问题,在出现性能瓶颈后, 我们可以增加处理线程,线程中同样是阻塞的代码。但是这种使用资源的方式会迅速面临 资源竞争和并发问题。

更糟糕的是,阻塞会浪费资源。具体来说,比如当一个程序面临延迟(通常是I/O方面, 比如数据库读写请求或网络调用),所在线程需要进入 idle 状态等待数据,从而浪费资源。

所以,并行化方式并非银弹。这是挖掘硬件潜力的方式,但是却带来了复杂性,而且容易造成浪费。

2.2. 异步可以解决问题吗?

第二种思路——提高执行效率——可以解决资源浪费问题。通过编写 异步非阻塞 的代码, (任务发起异步调用后)执行过程会切换到另一个 使用同样底层资源 的活跃任务,然后等 异步调用返回结果再去处理。

但是在 JVM 上如何编写异步代码呢?Java 提供了两种异步编程方式:

  • 回调(Callbacks) :异步方法没有返回值,而是采用一个 callback 作为参数(lambda 或匿名类),当结果出来后回调这个 callback。常见的例子比如 Swings 的 EventListener。
  • Futures :异步方法 立即 返回一个 Future<T>,该异步方法要返回结果的是 T 类型,通过 Future封装。这个结果并不是 立刻 可以拿到,而是等实际处理结束才可用。比如, ExecutorService 执行 Callable 任务时会返回 Future 对象。

这些技术够用吗?并非对于每个用例都是如此,两种方式都有局限性。

回调很难组合起来,因为很快就会导致代码难以理解和维护(即所谓的“回调地狱(callback hell)”)。

考虑这样一种情景:

  • 在用户界面上显示用户的5个收藏,或者如果没有任何收藏提供5个建议。
  • 这需要3个 服务(一个提供收藏的ID列表,第二个服务获取收藏内容,第三个提供建议内容):

回调地狱(Callback Hell)的例子:

userService.getFavorites(userId, new Callback<List<String>>() { public void onSuccess(List<String> list) { if (list.isEmpty()) { suggestionService.getSuggestions(new Callback<List<Favorite>>() {public void onSuccess(List<Favorite> list) { UiUtils.submitOnUiThread(() -> { list.stream().limit(5).forEach(uiList::show); });}public void onError(Throwable error) { UiUtils.errorPopup(error);}});} else {list.stream() .limit(5).forEach(favId -> favoriteService.getDetails(favId, new Callback<Favorite>() {public void onSuccess(Favorite details) {UiUtils.submitOnUiThread(() -> uiList.show(details));}public void onError(Throwable error) {UiUtils.errorPopup(error);}}));}}public void onError(Throwable error) {UiUtils.errorPopup(error);}
});

Reactor改造后为:

userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup); 

如果你想确保“收藏的ID”的数据在800ms内获得(如果超时,从缓存中获取)呢?在基于回调的代码中, 会比较复杂。但 Reactor 中就很简单,在处理链中增加一个 timeout 的操作符即可。

userService.getFavorites(userId).timeout(Duration.ofMillis(800)) .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()).take(5).publishOn(UiUtils.uiThreadScheduler()).subscribe(uiList::show, UiUtils::errorPopup);

额外扩展:

Futures 比回调要好一点,但即使在 Java 8 引入了 CompletableFuture,它对于多个处理的组合仍不够好用。 编排多个 Futures 是可行的,但却不易。此外,Future 还有一个问题:当对 Future 对象最终调用 get() 方法时,仍然会导致阻塞,并且缺乏对多个值以及更进一步对错误的处理。

考虑另外一个例子,我们首先得到 ID 的列表,然后通过它进一步获取到“对应的 name 和 statistics” 为元素的列表,整个过程用异步方式来实现。

CompletableFuture 处理组合的例子

CompletableFuture<List<String>> ids = ifhIds(); CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { Stream<CompletableFuture<String>> zip =l.stream().map(i -> { CompletableFuture<String> nameTask = ifhName(i); CompletableFuture<Integer> statTask = ifhStat(i); return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); });List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); return allDone.thenApply(v -> combinationList.stream().map(CompletableFuture::join) .collect(Collectors.toList()));
});List<String> results = result.join(); 
assertThat(results).contains("Name NameJoe has stats 103","Name NameBart has stats 104","Name NameHenry has stats 105","Name NameNicole has stats 106","Name NameABSLAJNFOAJNFOANFANSF has stats 121");

2.3. 从命令式编程到响应式编程

类似 Reactor 这样的响应式库的目标就是要弥补上述“经典”的 JVM 异步方式所带来的不足, 此外还会关注一下几个方面:

  • 可编排性(Composability) 以及 可读性(Readability)
  • 使用丰富的 操作符 来处理形如 的数据
  • 订阅(subscribe) 之前什么都不会发生
  • 背压(backpressure) 具体来说即 消费者能够反向告知生产者生产内容的速度的能力
  • 高层次 (同时也是有高价值的)的抽象,从而达到 并发无关 的效果
2.3.1. 可编排性与可读性

可编排性,指的是编排多个异步任务的能力。比如我们将前一个任务的结果传递给后一个任务作为输入, 或者将多个任务以分解再汇总(fork-join)的形式执行,或者将异步的任务作为离散的组件在系统中 进行重用。

这种编排任务的能力与代码的可读性和可维护性是紧密相关的。随着异步处理任务数量和复杂度 的提高,编写和阅读代码都变得越来越困难。就像我们刚才看到的,回调模式是简单的,但是缺点 是在复杂的处理逻辑中,回调中会层层嵌入回调,导致 回调地狱(Callback Hell) 。你能猜到 (或有过这种痛苦经历),这样的代码是难以阅读和分析的。

Reactor 提供了丰富的编排操作,从而代码直观反映了处理流程,并且所有的操作保持在同一层次 (尽量避免了嵌套)。

2.3.2. 就像装配流水线

你可以想象数据在响应式应用中的处理,就像流过一条装配流水线。Reactor 既是传送带, 又是一个个的装配工或机器人。原材料从源头(最初的 Publisher)流出,最终被加工为成品, 等待被推送到消费者(或者说 Subscriber)。

原材料会经过不同的中间处理过程,或者作为半成品与其他半成品进行组装。如果某处有齿轮卡住, 或者某件产品的包装过程花费了太久时间,相应的工位就可以向上游发出信号来限制或停止发出原材料。

2.3.3. 操作符(Operators)

在 Reactor 中,操作符(operator)就像装配线中的工位(操作员或装配机器人)。**每一个操作符 对 Publisher 进行相应的处理,然后将 Publisher 包装为一个新的 Publisher。**就像一个链条, 数据源自第一个 Publisher,然后顺链条而下,在每个环节进行相应的处理。最终,一个订阅者 (Subscriber)终结这个过程。请记住,在订阅者(Subscriber)订阅(subscribe)到一个 发布者(Publisher)之前,什么都不会发生。

理解了操作符会创建新的 Publisher 实例这一点,能够帮助你避免一个常见的问题, 这种问题会让你觉得处理链上的某个操作符没有起作用。

虽然响应式流规范(Reactive Streams specification)没有规定任何操作符, 类似 Reactor 这样的响应式库所带来的最大附加价值之一就是提供丰富的操作符。包括基础的转换操作, 到过滤操作,甚至复杂的编排和错误处理操作。

2.3.4. subscribe() 之前什么都不会发生

在 Reactor 中,当你创建了一条 Publisher 处理链,数据还不会开始生成。事实上,你是创建了 一种抽象的对于异步处理流程的描述(从而方便重用和组装)。

当真正“订阅(subscrib)”的时候,你需要将 Publisher 关联到一个 Subscriber 上,然后 才会触发整个链的流动。这时候,Subscriber 会向上游发送一个 request 信号,一直到达源头 的 Publisher。

2.3.5. 背压

向上游传递信号这一点也被用于实现 背压 ,就像在装配线上,某个工位的处理速度如果慢于流水线 速度,会对上游发送反馈信号一样。

在响应式流规范中实际定义的机制同刚才的类比非常接近:订阅者可以无限接受数据并让它的源头 “满负荷”推送所有的数据,也可以通过使用 request 机制来告知源头它一次最多能够处理 n 个元素。

中间环节的操作也可以影响 request。想象一个能够将每10个元素分批打包的缓存(buffer)操作。 如果订阅者请求一个元素,那么对于源头来说可以生成10个元素。此外预取策略也可以使用了, 比如在订阅前预先生成元素。

这样能够将“推送”模式转换为“推送+拉取”混合的模式,如果下游准备好了,可以从上游拉取 n 个元素;但是如果上游元素还没有准备好,下游还是要等待上游的推送。

2.3.6. 热(Hot) vs 冷(Cold)

在 Rx 家族的响应式库中,响应式流分为“热”和“冷”两种类型,区别主要在于响应式流如何 对订阅者进行响应:

  • 一个“冷”的序列,指对于每一个 Subscriber,都会收到从头开始所有的数据。如果源头 生成了一个 HTTP 请求,对于每一个订阅都会创建一个新的 HTTP 请求。
  • 一个“热”的序列,指对于一个 Subscriber,只能获取从它开始 订阅 之后 发出的数据。不过注意,有些“热”的响应式流可以缓存部分或全部历史数据。 通常意义上来说,一个“热”的响应式流,甚至在即使没有订阅者接收数据的情况下,也可以 发出数据(这一点同 “Subscribe() 之前什么都不会发生”的规则有冲突)。

3、核心特性

1、Mono和Flux

Mono: 0|1 数据流

Flux: N数据流

响应式流:元素(内容) + 信号(完成/异常)

image-20250222225143286

基本操作

类比Stream流操作,中间操作对应流转为另1个新流,终止操作对应订阅。只有开始订阅了,流才会开始,流中的 每个元素 和 信号 都是按顺序进入流处理,然后交给订阅者处理。

package com.atguigu.reactor;import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.*;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;/*** @author lfy* @Description* @create 2023-11-23 20:58*/
public class FluxDemo {public static void main(String[] args) {//        Flux.concat(Flux.just(1,2,3),Flux.just(7,8,9))//                .subscribe(System.out::println);Flux.range(1, 7)//                .log() //日志   onNext(1~7).filter(i -> i > 3) //挑出>3的元素//                .log() //onNext(4~7).map(i -> "haha-" + i).log()  // onNext(haha-4 ~ 7).subscribe(System.out::println);//今天: Flux、Mono、弹珠图、事件感知API、每个操作都是操作的上个流的东西}/*** 响应式编程核心:看懂文档弹珠图;* 信号: 正常/异常(取消)* SignalType:*      SUBSCRIBE: 被订阅*      REQUEST:  请求了N个元素*      CANCEL: 流被取消*      ON_SUBSCRIBE:在订阅时候*      ON_NEXT: 在元素到达*      ON_ERROR: 在流错误*      ON_COMPLETE:在流正常完成时*      AFTER_TERMINATE:中断以后*      CURRENT_CONTEXT:当前上下文*      ON_CONTEXT:感知上下文* <p>* doOnXxx API触发时机*      1、doOnNext:每个数据(流的数据)到达的时候触发*      2、doOnEach:每个元素(流的数据和信号)到达的时候触发*      3、doOnRequest: 消费者请求流元素的时候*      4、doOnError:流发生错误*      5、doOnSubscribe: 流被订阅的时候*      6、doOnTerminate: 发送取消/异常信号中断了流*      7、doOnCancle: 流被取消*      8、doOnDiscard:流中元素被忽略的时候** @param args*/public void doOnXxxx(String[] args) {// 关键:doOnNext:表示流中某个元素到达以后触发我一个回调// doOnXxx要感知某个流的事件,写在这个流的后面,新流的前面Flux.just(1, 2, 3, 4, 5, 6, 7, 0, 5, 6).doOnNext(integer -> System.out.println("元素到达:" + integer)) //元素到达得到时候触发.doOnEach(integerSignal -> { //each封装的详细System.out.println("doOnEach.." + integerSignal);})//1,2,3,4,5,6,7,0.map(integer -> 10 / integer) //10,5,3,.doOnError(throwable -> {System.out.println("数据库已经保存了异常:" + throwable.getMessage());}).map(integer -> 100 / integer).doOnNext(integer -> System.out.println("元素到哈:" + integer)).subscribe(System.out::println);}//Mono<Integer>: 只有一个Integer//Flux<Integer>: 有很多Integerpublic void fluxDoOn(String[] args) throws IOException, InterruptedException {//        Mono<Integer> just = Mono.just(1);////        just.subscribe(System.out::println);//空流:  链式API中,下面的操作符,操作的是上面的流。// 事件感知API:当流发生什么事的时候,触发一个回调,系统调用提前定义好的钩子函数(Hook【钩子函数】);doOnXxx;Flux<Integer> flux = Flux.range(1, 7).delayElements(Duration.ofSeconds(1)).doOnComplete(() -> {System.out.println("流正常结束...");}).doOnCancel(() -> {System.out.println("流已被取消...");}).doOnError(throwable -> {System.out.println("流出错..." + throwable);}).doOnNext(integer -> {System.out.println("doOnNext..." + integer);}); //有一个信号:此时代表完成信号flux.subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("订阅者和发布者绑定好了:" + subscription);request(1); //背压}@Overrideprotected void hookOnNext(Integer value) {System.out.println("元素到达:" + value);if (value < 5) {request(1);if (value == 3) {int i = 10 / 0;}} else {cancel();//取消订阅}; //继续要元素}@Overrideprotected void hookOnComplete() {System.out.println("数据流结束");}@Overrideprotected void hookOnError(Throwable throwable) {System.out.println("数据流异常");}@Overrideprotected void hookOnCancel() {System.out.println("数据流被取消");}@Overrideprotected void hookFinally(SignalType type) {System.out.println("结束信号:" + type);// 正常、异常//                try {//                    //业务//                }catch (Exception e){////                }finally {//                    //结束//                }}});Thread.sleep(2000);//        Flux<Integer> range = Flux.range(1, 7);System.in.read();}//测试Fluxpublic void flux() throws IOException {//        Mono: 0|1个元素的流//        Flux: N个元素的流;  N>1//发布者发布数据流:源头//1、多元素的流Flux<Integer> just = Flux.just(1, 2, 3, 4, 5); ////流不消费就没用; 消费:订阅just.subscribe(e -> System.out.println("e1 = " + e));//一个数据流可以有很多消费者just.subscribe(e -> System.out.println("e2 = " + e));//对于每个消费者来说流都是一样的;  广播模式;System.out.println("==========");Flux<Long> flux = Flux.interval(Duration.ofSeconds(1));//每秒产生一个从0开始的递增数字flux.subscribe(System.out::println);System.in.read();}
}

2、subscribe()

flxu.subscribe() // 流被订阅,默认订阅
flux.subscribe(e->System.out.println(e)) // 指定订阅规则:正常消费者:只消费正常元素
自定义流的信号感知回调
flux.subscribe(v-> System.out.println("v = " + v), //流元素消费throwable -> System.out.println("throwable = " + throwable), //感知异常结束()-> System.out.println("流结束了...") //感知正常结束
);
自定义消费者
flux.subscribe(new BaseSubscriber<String>() {// 生命周期钩子1: 订阅关系绑定的时候触发@Overrideprotected void hookOnSubscribe(Subscription subscription) {// 流被订阅的时候触发System.out.println("绑定了..."+subscription);//找发布者要数据request(1); //要1个数据// requestUnbounded(); //要无限数据}@Overrideprotected void hookOnNext(String value) {System.out.println("数据到达,正在处理:"+value);request(1); //要1个数据}//  hookOnComplete、hookOnError 二选一执行@Overrideprotected void hookOnComplete() {System.out.println("流正常结束...");}@Overrideprotected void hookOnError(Throwable throwable) {System.out.println("流异常..."+throwable);}@Overrideprotected void hookOnCancel() {System.out.println("流被取消...");}@Overrideprotected void hookFinally(SignalType type) {System.out.println("最终回调...一定会被执行");}
});

3、流的取消

消费者调用 cancle() 取消流的订阅;

Disposable
Flux<String> flux = Flux.range(1, 10).map(i -> {System.out.println("map..."+i);if(i==9) {i = 10/(9-i); //数学运算异常;  doOnXxx}return "哈哈:" + i;}); //流错误的时候,把错误吃掉,转为正常信号//        flux.subscribe(); //流被订阅; 默认订阅;
//        flux.subscribe(v-> System.out.println("v = " + v));//指定订阅规则: 正常消费者:只消费正常元素//        flux.subscribe(
//                v-> System.out.println("v = " + v), //流元素消费
//                throwable -> System.out.println("throwable = " + throwable), //感知异常结束
//                ()-> System.out.println("流结束了...") //感知正常结束
//        );// 流的生命周期钩子可以传播给订阅者。
//  a() {
//      data = b();
//  }
flux.subscribe(new BaseSubscriber<String>() {// 生命周期钩子1: 订阅关系绑定的时候触发@Overrideprotected void hookOnSubscribe(Subscription subscription) {// 流被订阅的时候触发System.out.println("绑定了..."+subscription);//找发布者要数据request(1); //要1个数据// requestUnbounded(); //要无限数据}@Overrideprotected void hookOnNext(String value) {System.out.println("数据到达,正在处理:"+value);if(value.equals("哈哈:5")){cancel(); //取消流}request(1); //要1个数据}//  hookOnComplete、hookOnError 二选一执行@Overrideprotected void hookOnComplete() {System.out.println("流正常结束...");}@Overrideprotected void hookOnError(Throwable throwable) {System.out.println("流异常..."+throwable);}@Overrideprotected void hookOnCancel() {System.out.println("流被取消...");}@Overrideprotected void hookFinally(SignalType type) {System.out.println("最终回调...一定会被执行");}
});

4、BaseSubscriber与生命周期钩子

自定义消费者,推荐直接编写 BaseSubscriber 的逻辑;

Flux.range(1, 10).map(e -> String.valueOf(e)).map(e -> e + "-zzhua").doOnCancel(()->{System.out.println("doOnCancel");}).subscribe(new BaseSubscriber<String>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("hookOnSubscribe");subscription.request(1);}@Overrideprotected void hookOnNext(String value) {System.out.println("hookOnNext:" + value);request(1);if (value.equals("5-zzhua")) {cancel();}}@Overrideprotected void hookOnComplete() {System.out.println("hookOnComplete");super.hookOnComplete(); // no-op}@Overrideprotected void hookOnError(Throwable throwable) {System.out.println("hookOnError" + throwable);super.hookOnError(throwable); // throw ...}@Overrideprotected void hookOnCancel() {System.out.println("hookOnCancel");super.hookOnCancel(); // no-op}@Overrideprotected void hookFinally(SignalType type) {System.out.println("hookFinally");super.hookFinally(type);// no-op}});

5、背压和请求重塑

背压(Backpressure ):由订阅者请求发布者传送指定数量请求,而不是由发布者任意发布

请求重塑(Reshape Requests)

1、buffer:缓冲
Flux<List<Integer>> flux = Flux.range(1, 10)  //原始流10个.buffer(3).log();//缓冲区:缓冲3个元素: 消费一次最多可以拿到三个元素; 凑满数批量发给消费者
//
//        //一次发一个,一个一个发;
// 10元素,buffer(3);消费者请求4次,数据消费完成flux.subscribe(v-> system.out.println("类型:"+v.getclass()+" 值为:"+V));// 消费者每次 request(1)拿到的是几个真正的数据:buffer数据
Flux<List<Integer>> flux = Flux.range(0, 10).buffer(3).log();
flux.subscribe(e -> {System.out.println("subscribe" + e);
});
2、limit:限流
Flux.range(1, 1000).log()//限流触发,看上游是怎么限流获取数据的.limitRate(100) .subscribe();
// 75 % 预取策路:limitRate(100)
// 第一次抓取100个数据,如果 75 % 的元素已经处理了,继续抓取 新的 75 % 元素;

6、以编程方式创建序列-Sink

Sink.next

Sink.complete

1、同步环境-generate
// 创建1个1~10的序列
Flux<Object> flux = Flux.generate(() -> 0,                 // 初始值(state, sink) -> {if (state <= 10) {sink.next(state);// 发送数据(每次执行,只能调用1次next)} else {sink.complete(); // 完成信号}if (state == 7) {sink.error(new RuntimeException("i dislike 7"));}return state + 1;}
);flux.log().doOnError(e -> System.out.println("doOnError:" + e)).subscribe();
System.out.println("main start...");Disposable disposable = Flux.range(1, 10).delayElements(Duration.ofSeconds(1)).log().map(t -> t * 10).doOnCancel(()-> System.out.println("doOnCancel")).subscribe(t -> System.out.println("subscribe:" + t));System.out.println("start a thread...");// 5s之后,取消这个流
new Thread(() -> {try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}disposable.dispose();
}).start();System.out.println("main end...");
/*
main start...
[ INFO] (main) onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
[ INFO] (main) request(unbounded)
start a thread...
main end...
[ INFO] (parallel-1) onNext(1)
subscribe:10
[ INFO] (parallel-2) onNext(2)
subscribe:20
[ INFO] (parallel-3) onNext(3)
subscribe:30
[ INFO] (parallel-4) onNext(4)
subscribe:40
doOnCancel
[ INFO] (Thread-0) cancel()
*/
2、多线程-create
static class MyListener {private FluxSink<Object> fluxSink;public MyListener(FluxSink<Object> fluxSink) {this.fluxSink = fluxSink;}public void online(String name) {System.out.println("用户登录了: " + name);fluxSink.next(name); // 传入用户}}public static void main(String[] args) {Flux<Object> flux = Flux.create(fluxSink -> {MyListener myListener = new MyListener(fluxSink);for (int i = 1; i <= 100; i++) {myListener.online("张" + i);}});flux.subscribe();
}

7、 handle()

自定义流中元素处理规则

Flux.range(1, 10).log()// 自定义元素处理规则, 比如替代map(..)等操作,由于handle可以自定义,所以更加强大.handle((value, sink)->{sink.next("张~" + value);}).log().subscribe()

8、自定义线程调度

响应式:响应式编程: 全异步、消息、事件回调

默认还是用当前线程,生成整个流、流的发布、流的中间操作

// 流的发布、中间操作,默认使用当前线程
Flux.range(1, 10).publishOn(Schedulers.single()) // 在哪个线程池把这个流的数据和操作执行.log().map(t -> t * 10).log().subscribeOn(Schedulers.single()) // 指定订阅者订阅操作线程.subscribe((e) -> System.out.println(Thread.currentThread().getName() + ":" + e));// 调度器: 线程池
// Schedulers.immediate(); // 无执行上下文,当前线程运行所有操作
// Schedulers.single();    // 使用固定的1个单线程
// 线程池中有 10 * cpu核心个线程, 队列默认10w, keepAliveTime 60s
// Schedulers.boundedElastic(); // 有界、弹性调度; 不是无限扩充的的线程池;
Schedulers.parallel();
// 自定义线程池
Schedulers.fromExecutor(new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000)));LockSupport.park();
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);final Flux<String> flux = Flux.range(1, 2).map(i -> 10 + i).log().publishOn(s).map(i -> "value " + i).log();// 只要不指定线程池,默认发布者用的线程就是订阅者的线程;
new Thread(() -> flux.subscribe(System.out::println)).start();/*
[ INFO] (Thread-0) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (Thread-0) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (Thread-0) | request(unbounded)
[ INFO] (Thread-0) | request(256)
[ INFO] (Thread-0) | onNext(11)
[ INFO] (Thread-0) | onNext(12)
[ INFO] (Thread-0) | onComplete()
[ INFO] (parallel-scheduler-1) | onNext(value 11)
value 11
[ INFO] (parallel-scheduler-1) | onNext(value 12)
value 12
[ INFO] (parallel-scheduler-1) | onComplete()
*/

9、错误处理

命令式编程:常见的错误处理方式

1. Catch and return a static default value.

捕获异常返回一个静态默认值

try {return doSomethingDangerous(10);
}
catch (Throwable error) {return "RECOVERED";
}

onErrorReturn: 实现上面效果,错误的时候返回一个值

  • 1、吃掉异常,消费者无异常感知
  • 2、返回一个兜底默认值
  • 3、流正常完成(不再处理后续元素);
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).subscribe(v-> System.out.println("v = " + v),err -> System.out.println("err = " + err),()-> System.out.println("流结束")  // error handling example);/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = java.lang.ArithmeticException: / by zero
*/
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i))// 只要发生异常,就使用默认值.onErrorReturn("哈哈-6666").subscribe(v-> System.out.println("v = " + v),err -> System.out.println("err = " + err),()-> System.out.println("流结束")  // error handling example);/*
v = 100 / 1 = 100
v = 100 / 2 = 50
v = 哈哈-6666
流结束
*/
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i))// 是指定的异常,才使用默认值// .onErrorReturn(ArithmeticException.class,"哈哈-6666").onErrorReturn(NullPointerException.class,"哈哈-6666").subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束")  // error handling example);
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = java.lang.ArithmeticException: / by zero
*/
2. Catch and execute an alternative path with a fallback method.

吃掉异常,执行一个兜底方法;

try {return doSomethingDangerous(10);
}
catch (Throwable error) {return doOtherthing(10);
}

onErrorResume

  • 1、吃掉异常,消费者无异常感知
  • 2、调用一个兜底方法
  • 3、流正常完成
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).onErrorResume(err -> Mono.just("哈哈-777")).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
v = 哈哈-777
流结束
*/
3. Catch and dynamically compute a fallback value.

捕获并动态计算一个返回值,即根据错误返回一个新值

try {Value v = erroringMethod();return MyWrapper.fromValue(v);
}
catch (Throwable error) {return MyWrapper.fromError(error);
}
.onErrorResume(err -> Flux.error(new BusinessException(err.getMessage()+":炸了")))
  • 1、吃掉异常,消费者有感知
  • 2、调用一个自定义方法
  • 3、流异常完成
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).onErrorResume(err -> {if (err instanceof NullPointerException) {return Mono.just("哈哈-777");}return Mono.just("其它");}).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
v = 哈哈-777
流结束
*/
4. Catch, wrap to a BusinessException, and re-throw.

捕获并包装成一个业务异常,并重新抛出

try {return callExternalService(k);
}
catch (Throwable error) {throw new BusinessException("oops, SLA exceeded", error);
}

包装重新抛出异常: 推荐用 .onErrorMap

  • 1、吃掉异常,消费者有感知
  • 2、抛新异常
  • 3、流异常完成
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).onErrorResume(err -> Flux.error(new BusinessException(err.getMessage()))).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = com.zzhua.test02.BusinessException
*/
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).onErrorMap(err -> {return new BusinessException("除数不能为0" + err.getMessage());}).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = com.zzhua.test02.BusinessException
*/
5. Catch, log an error-specific message, and re-throw.

捕获异常,记录特殊的错误日志,重新抛出

try {return callExternalService(k);
}
catch (RuntimeException error) {//make a record of the errorlog("uh oh, falling back, service failed for key " + k);throw error;
}
  • 异常被捕获、做自己的事情
  • 不影响异常继续顺着流水线传播
  • 不吃掉异常,只在异常发生的时候做一件事,订阅者有感知
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).doOnError(err -> {System.out.println("err已被记录 = " + err);}).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err已被记录 = java.lang.ArithmeticException: / by zero
err = java.lang.ArithmeticException: / by zero
*/
6. Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).doOnError(err -> {System.out.println("err已被记录 = " + err);}).doFinally(signalType -> {System.out.println("流信号:" + signalType);}).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err已被记录 = java.lang.ArithmeticException: / by zero
err = java.lang.ArithmeticException: / by zero
流信号:onError
*/
7. 忽略当前异常,仅通知记录,继续推进
Flux.just(1, 2, 3, 0, 5).map(i -> 10 / i).onErrorContinue((err, val) -> {System.out.println("err = " + err);System.out.println("val = " + val);System.out.println("发现" + val + "有问题了,继续执行其他的,我会记录这个问题");}) //发生.subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 10
v = 5
v = 3
err = java.lang.ArithmeticException: / by zero
val = 0
发现0有问题了,继续执行其他的,我会记录这个问题
v = 2
流结束
*/
8.其它
Flux.just(1, 2, 3, 0, 5).map(i -> 10 / i).onErrorStop() // 错误后,停止流,源头中断,所有订阅者全部结束,错误结束.subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 10
v = 5
v = 3
err = java.lang.ArithmeticException: / by zero
*/
Flux.just(1, 2, 3, 0, 5).map(i -> 10 / i).onErrorComplete() //发生错误后,停止流.subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 10
v = 5
v = 3
流结束
*/

10、常用操作

filter、flatMap、concatMap、flatMapMany、transform、defaultIfEmpty、switchIfEmpty、concat、concatWith、merge、mergeWith、mergeSequential、zip、zipWith…

filter
Flux.just(1, 2, 3, 4).log().filter(e -> e % 2 == 0).subscribe();/*
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArrayConditionalSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(4)
[ INFO] (main) | onComplete()
*/
filterMap
Flux.just("zhang san", "li si").log().flatMap((t)->{String[] arr = t.split("\\s");return Flux.fromArray(arr);}).log().subscribe();/*
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) onSubscribe(FluxFlatMap.FlatMapMain)
[ INFO] (main) request(unbounded)
[ INFO] (main) | request(256)
[ INFO] (main) | onNext(zhang san)
[ INFO] (main) onNext(zhang)
[ INFO] (main) onNext(san)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(li si)
[ INFO] (main) onNext(li)
[ INFO] (main) onNext(si)
[ INFO] (main) | request(1)
[ INFO] (main) | onComplete()
[ INFO] (main) onComplete()
*/
concatMap
Flux.just(1, 2).concatMap(s -> Flux.just(s * 10, s * 100)).log().subscribe((e)-> System.out.println("e = " + e));/*
[ INFO] (main) onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(10)
e = 10
[ INFO] (main) onNext(100)
e = 100
[ INFO] (main) onNext(20)
e = 20
[ INFO] (main) onNext(200)
e = 200
[ INFO] (main) onComplete()
*/
concat
Flux.concat(Flux.just(1, 2), Flux.just("a", "b")).log().subscribe();/*
[ INFO] (main) onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(1)
[ INFO] (main) onNext(2)
[ INFO] (main) onNext(a)
[ INFO] (main) onNext(b)
[ INFO] (main) onComplete()
*/
concatWith
Flux.just(1, 2).concatWith(Flux.just(3, 4)).log().subscribe/*
[ INFO] (main) onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(1)
[ INFO] (main) onNext(2)
[ INFO] (main) onNext(3)
[ INFO] (main) onNext(4)
[ INFO] (main) onComplete()
*/
transform
AtomicInteger atomic = new AtomicInteger(0);Flux<String> flux = Flux.just("a", "b", "c").transform(values -> {if (atomic.incrementAndGet() == 1) {//如果是:第一次调用,老流中的所有元素转成大写return values.map(String::toUpperCase);} else {//如果不是第一次调用,原封不动返回return values;}});//transform 无defer,不会共享外部变量的值。 无状态转换; 原理,无论多少个订阅者,transform只执行一次
//transform 有defer,会共享外部变量的值。   有状态转换; 原理,无论多少个订阅者,每个订阅者transform都只执行一次
flux.subscribe(v -> System.out.println("订阅者1:v = " + v));
flux.subscribe(v -> System.out.println("订阅者2:v = " + v));/*
订阅者1:v = A
订阅者1:v = B
订阅者1:v = C
订阅者2:v = A
订阅者2:v = B
订阅者2:v = C
*/
改成transformDeferred/*
订阅者1:v = A
订阅者1:v = B
订阅者1:v = C
订阅者2:v = a
订阅者2:v = b
订阅者2:v = c
*/   
defaultIfEmpty
/*** defaultIfEmpty:  静态兜底数据* switchIfEmpty:  空转换; 调用动态兜底方法;  返回新流数据*/
void empty() {//Mono.just(null);//流里面有一个null值元素//Mono.empty();//流里面没有元素,只有完成信号/结束信号haha().defaultIfEmpty(hehe())//如果发布者元素为null,指定默认值,否则用发布者的值;.subscribe(v -> System.out.println("v = " + v));haha().switchIfEmpty(hehe())//如果发布者元素为null,指定默认值,否则用发布者的值;.subscribe(v -> System.out.println("v = " + v));}Mono<String> hehe() {return Mono.just("兜底数据...");
}Mono<String> haha() {return Mono.empty();
}
merge
/*** concat: 连接; A流 所有元素和 B流所有元素拼接* merge:合并; A流 所有元素和 B流所有元素 按照时间序列合并* mergeWith:* mergeSequential: 按照哪个流先发元素排队*/
@Test
void merge() throws IOException {Flux.mergeSequential();Flux.merge(Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(1)),Flux.just("a", "b").delayElements(Duration.ofMillis(1500)),Flux.just("haha", "hehe", "heihei", "xixi").delayElements(Duration.ofMillis(500))).log().subscribe();Flux.just(1, 2, 3).mergeWith(Flux.just(4, 5, 6));System.in.read();
}
zip
/*** zip: 无法结对的元素会被忽略;* 最多支持8流压缩;*/
void zip (){//Tuple:元组;// Flux< Tuple2:<Integer,String> >Flux.just(1,2,3).zipWith(Flux.just("a","b","c","d")).map(tuple -> {Integer t1 = tuple.getT1(); // 元组中的第一个元素String t2 = tuple.getT2();  // 元组中的第二个元素return t1 + "==>" + t2;}).log().subscribe(v-> System.out.println("v = " + v));}

11、超时与重试

Flux.just(1).delayElements(Duration.ofSeconds(3)).log().timeout(Duration.ofSeconds(2)).retry(3)  // 把流从头到尾重新请求1次.map(i -> "haha-" + i).subscribe(e-> System.out.println("e = " + e));LockSupport.park();/*
[ INFO] (main) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) cancel()
[ INFO] (parallel-1) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-1) request(unbounded)
[ INFO] (parallel-3) cancel()
[ INFO] (parallel-3) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-3) request(unbounded)
[ INFO] (parallel-5) cancel()
[ INFO] (parallel-5) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-5) request(unbounded)
[ INFO] (parallel-7) cancel()
[ERROR] (parallel-7) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 2000ms in 'log' (and no fallback has been configured)
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 2000ms in 'log' (and no fallback has been configured)
Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 2000ms in 'log' (and no fallback has been configured)at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:296)
...
*/
Flux.just(1).delayElements(Duration.ofSeconds(3)).log().timeout(Duration.ofSeconds(2)).retry(3)  // 把流从头到尾重新请求1次.onErrorReturn(2) // 上面重试失败后, 会抛出异常, 这里在抛出异常的情况下返回2.map(i -> "haha-" + i).subscribe(e-> System.out.println("e = " + e));LockSupport.park();/*
[ INFO] (main) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) cancel()
[ INFO] (parallel-1) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-1) request(unbounded)
[ INFO] (parallel-3) cancel()
[ INFO] (parallel-3) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-3) request(unbounded)
[ INFO] (parallel-5) cancel()
[ INFO] (parallel-5) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-5) request(unbounded)
[ INFO] (parallel-7) cancel()
e = haha-2
*/

12、Sinks工具类

// Sinks.many(); // 发送Flux数据
// Sinks.one();  // 发送Mono数据// Sinks: 接受器,数据管道,所有数据顺着这个管道往下走的Sinks.many().unicast();   // 单播    这个管道只能绑定单个订阅者(消费者)
Sinks.many().multicast(); // 多播    这个管道能绑定多个订阅者
Sinks.many().replay();    // 重放    这个管道能重放元素。是否给后来的订阅者把之前的元素依然发给它;
单播/多播/重放/背压
// Sinks.Many<Object> many = Sinks.many()
//         .multicast() //多播
//         .onBackpressureBuffer(); //背压队列//默认订阅者,从订阅的那一刻开始接元素//发布者数据重放; 底层利用队列进行缓存之前数据
Sinks.Many<Object> many = Sinks.many().replay().limit(3);new Thread(() -> {for (int i = 0; i < 10; i++) {many.tryEmitNext("a-" + i);try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}).start();//订阅
many.asFlux().subscribe(v -> System.out.println("v1 = " + v));new Thread(() -> {try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}many.asFlux().subscribe(v -> System.out.println("v2 = " + v));
}).start();
缓存
Flux<Integer> cache = Flux.range(1, 10).delayElements(Duration.ofSeconds(1)) // 不调缓存默认就是缓存所有.cache(2);                            // 缓存两个元素; 默认全部缓存// 立即订阅
cache.subscribe();new Thread(()->{// 5s后再去订阅try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}cache.subscribe(v-> System.out.println("v = " + v));
}).start();LockSupport.park();/*
v = 3
v = 4
v = 5
v = 6
v = 7
v = 8
v = 9
v = 10
*/

13、阻塞式api

block
Integer integer = Flux.just(1, 2, 4).map(i -> i + 10).blockLast();
System.out.println(integer);List<Integer> integers = Flux.just(1, 2, 4).map(i -> i + 10).collectList().block(); // 也是一种订阅者; BlockingMonoSubscriber
System.out.println("integers = " + integers);/*
14
integers = [11, 12, 14]
*/
// 百万数据,8个线程,每个线程处理100,进行分批处理一直处理结束
Flux.range(1,1000000).buffer(100).parallel(8).runOn(Schedulers.newParallel("yy")).log().flatMap(list->Flux.fromIterable(list)).collectSortedList(Integer::compareTo).subscribe(v-> System.out.println("v = " + v));LockSupport.park();

14、Context api

//Context-API: https://projectreactor.io/docs/core/release/reference/#context
//ThreadLocal在响应式编程中无法使用。
//响应式中,数据流期间共享数据,Context API: Context:读写 ContextView:只读;
static void threadlocal() {//支持Context的中间操作Flux.just(1, 2, 3).transformDeferredContextual((flux, context) -> {System.out.println("flux = " + flux);System.out.println("context = " + context);return flux.map(i -> i + "==>" + context.get("prefix"));})//上游能拿到下游的最近一次数据.contextWrite(Context.of("prefix", "哈哈"))//ThreadLocal共享了数据,上游的所有人能看到; Context由下游传播给上游.subscribe(v -> System.out.println("v = " + v));// 以前 命令式编程// controller -- service -- dao// 响应式编程 dao(10:数据源) --> service(10) --> controller(10); 从下游反向传播}

WebFlux

  • Reactor核心HttpHandler 原生API;

  • DispatcherHandler 原理;

    • DispatcherHandler 组件分析
    • DispatcherHandler 请求处理流程
    • 返回结果处理
    • 异常处理
    • 视图解析
      • 重定向
      • Rendering
  • 注解式 - Controller

    • 兼容老版本方式
    • 新版本变化
      • SSE
      • 文件上传
  • 错误响应

    • @ExceptionHandler
      • ErrorResponse: 自定义 错误响应
      • ProblemDetail:自定义PD返回
  • WebFlux配置

    • @EnableWebFlux
    • WebFluxConfigurer

WebFlux:底层完全基于netty+reactor+springweb 完成一个全异步非阻塞的web响应式框架

底层:异步 + 消息队列(内存) + 事件回调机制 = 整套系统

优点:能使用少量资源处理大量请求;

0、组件对比

API功能Servlet-阻塞式WebWebFlux-响应式Web
前端控制器DispatcherServletDispatcherHandler
处理器ControllerWebHandler/Controller
请求、响应ServletRequestServletResponseServerWebExchange: ServerHttpRequest、ServerHttpResponse
过滤器Filter(HttpFilter)WebFilter
异常处理器HandlerExceptionResolverDispatchExceptionHandler
Web配置@EnableWebMvc@EnableWebFlux
自定义配置WebMvcConfigurerWebFluxConfigurer
返回结果任意Mono、Flux、任意
发送REST请求RestTemplateWebClient

Mono: 返回[ 0 |1 ]数据流

Flux:返回[ N ]数据流

1、WebFlux

底层基于Netty实现的Web容器与请求/响应处理机制

参照:https://docs.spring.io/spring-framework/reference/6.0/web/webflux.html

1、引入

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.6</version>
</parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>
</dependencies>

Context 响应式上下文数据传递; 由下游传播给上游;

以前: 浏览器 --> Controller --> Service --> Dao: 阻塞式编程

现在: Dao(数据源查询对象【数据发布者】) --> Service --> Controller --> 浏览器: 响应式

大数据流程: 从一个数据源拿到大量数据进行分析计算;

ProductVistorDao.loadData()

.distinct()

.map()

.filter()

.handle()

.subscribe();

;//加载最新的商品浏览数据

img

2、Reactor Core

1、HttpHandler、HttpServer
public static void main(String[] args) throws IOException {//快速自己编写一个能处理请求的服务器//1、创建一个能处理Http请求的处理器。 参数:请求、响应; 返回值:Mono<Void>:代表处理完成的信号HttpHandler handler = (ServerHttpRequest request,ServerHttpResponse response)->{URI uri = request.getURI();System.out.println(Thread.currentThread()+"请求进来:"+uri);//编写请求处理的业务,给浏览器写一个内容 URL + "Hello~!"//            response.getHeaders();    // 获取响应头//            response.getCookies();    // 获取Cookie//            response.getStatusCode(); // 获取响应状态码;//            response.bufferFactory(); // buffer工厂//            response.writeWith()      // 把xxx写出去//            response.setComplete();   // 响应结束, 该方法返回Mono<Void>//创建 响应数据的 DataBufferDataBufferFactory factory = response.bufferFactory();//数据BufferDataBuffer buffer = factory.wrap(new String(uri + " => Hello!").getBytes());// 数据的发布者:Mono<DataBuffer>、Flux<DataBuffer>// 需要一个 DataBuffer 的发布者return response.writeWith(Mono.just(buffer));};//2、启动一个服务器,监听8080端口,接受数据,拿到数据交给 HttpHandler 进行请求处理ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);//3、启动Netty服务器HttpServer.create().host("localhost").port(8080).handle(adapter) //用指定的处理器处理请求.bindNow(); //现在就绑定System.out.println("服务器启动完成....监听8080,接受请求");System.in.read();System.out.println("服务器停止....");}

3、DispatcherHandler

SpringMVC: DispatcherServlet;

SpringWebFlux: DispatcherHandler

1、请求处理流程
  • HandlerMapping:请求映射处理器; 保存每个请求由哪个方法进行处理
  • HandlerAdapter:处理器适配器;反射执行目标方法
  • HandlerResultHandler:处理器结果处理器;

SpringMVC: DispatcherServlet 有一个 doDispatch() 方法,来处理所有请求;

WebFlux: DispatcherHandler 有一个 handle() 方法,来处理所有请求;

public Mono<Void> handle(ServerWebExchange exchange) { if (this.handlerMappings == null) {return createNotFoundError();}if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {return handlePreFlight(exchange);}return Flux.fromIterable(this.handlerMappings) //拿到所有的 handlerMappings// 找每一个mapping看谁能处理请求.concatMap(mapping -> mapping.getHandler(exchange)) .next() // 直接触发获取元素; 拿到流的第一个元素; 找到第一个能处理这个请求的handlerAdapter.switchIfEmpty(createNotFoundError()) // 如果没拿到这个元素,则响应404错误;// 异常处理,一旦前面发生异常,调用处理异常.onErrorResume(ex -> handleDispatchError(exchange, ex)) // 调用方法处理请求,得到响应结果.flatMap(handler -> handleRequestWith(exchange, handler)); 
}
  • 1、请求和响应都封装在 ServerWebExchange 对象中,由handle方法进行处理
  • 2、如果没有任何的请求映射器; 直接返回一个: 创建一个未找到的错误; 404; 返回Mono.error;终结流
  • 3、跨域工具,是否跨域请求,跨域请求检查是否复杂跨域,需要预检请求;
  • 4、Flux流式操作,先找到HandlerMapping,再获取handlerAdapter,再用Adapter处理请求,期间的错误由onErrorResume触发回调进行处理;

源码中的核心两个:

  • handleRequestWith: 编写了handlerAdapter怎么处理请求
  • handleResult: String、User、ServerSendEvent、Mono、Flux …

concatMap: 先挨个元素变,然后把变的结果按照之前元素的顺序拼接成一个完整流

private <R> Mono<R> createNotFoundError() {Exception ex = new ResponseStatusException(HttpStatus.NOT_FOUND);return Mono.error(ex);
}
Mono.defer(() -> {Exception ex = new ResponseStatusException(HttpStatus.NOT_FOUND);return Mono.error(ex);
}); //有订阅者,且流被激活后就动态调用这个方法; 延迟加载;

4、注解开发

1、目标方法传参

https://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/arguments.html

Controller method argumentDescription
ServerWebExchange封装了请求和响应对象的对象; 自定义获取数据、自定义响应
ServerHttpRequest, ServerHttpResponse请求、响应
WebSession访问Session对象
java.security.Principal
org.springframework.http.HttpMethod请求方式
java.util.Locale国际化
java.util.TimeZone + java.time.ZoneId时区
@PathVariable路径变量
@MatrixVariable矩阵变量
@RequestParam请求参数
@RequestHeader请求头;
@CookieValue获取Cookie
@RequestBody获取请求体,Post、文件上传
HttpEntity封装后的请求对象
@RequestPart获取文件上传的数据 multipart/form-data.
java.util.Map, org.springframework.ui.Model, and org.springframework.ui.ModelMap.Map、Model、ModelMap
@ModelAttribute
Errors, BindingResult数据校验,封装错误
SessionStatus + class-level @SessionAttributes
UriComponentsBuilderFor preparing a URL relative to the current request’s host, port, scheme, and context path. See URI Links.
@SessionAttribute
@RequestAttribute转发请求的请求域数据
Any other argument所有对象都能作为参数:1、基本类型 ,等于标注@RequestParam 2、对象类型,等于标注 @ModelAttribute
2、返回值写法

sse和websocket区别:

  • SSE:单工;请求过去以后,等待服务端源源不断的数据
  • websocket:双工: 连接建立后,可以任何交互;
Controller method return valueDescription
@ResponseBody把响应数据写出去,如果是对象,可以自动转为json
HttpEntity, ResponseEntityResponseEntity:支持快捷自定义响应内容
HttpHeaders没有响应内容,只有响应头
ErrorResponse快速构建错误响应
ProblemDetailSpringBoot3;
String就是和以前的使用规则一样;forward: 转发到一个地址redirect: 重定向到一个地址配合模板引擎
View直接返回视图对象
java.util.Map, org.springframework.ui.Model以前一样
@ModelAttribute以前一样
Rendering新版的页面跳转API; 不能标注 @ResponseBody 注解
void仅代表响应完成信号
Flux, Observable, or other reactive type使用 text/event-stream 完成SSE效果
Other return values未在上述列表的其他返回值,都会当成给页面的数据;

5、文件上传

https://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/multipart-forms.html

class MyForm {private String name;private MultipartFile file;// ...}@Controller
public class FileUploadController {@PostMapping("/form")public String handleFormUpload(MyForm form, BindingResult errors) {// ...}}

现在

@PostMapping("/")
public String handle(@RequestPart("meta-data") Part metadata, @RequestPart("file-data") FilePart file) { // ...
}

6、错误处理

@ExceptionHandler(ArithmeticException.class)
public String error(ArithmeticException exception){System.out.println("发生了数学运算异常"+exception);//返回这些进行错误处理;//        ProblemDetail:  建造者:声明式编程、链式调用//        ErrorResponse : return "炸了,哈哈...";
}

7、RequestContext

8、自定义Flux配置

WebFluxConfigurer

容器中注入这个类型的组件,重写底层逻辑

@Configuration
public class MyWebConfiguration {//配置底层@Beanpublic WebFluxConfigurer webFluxConfigurer(){return new WebFluxConfigurer() {@Overridepublic void addCorsMappings(CorsRegistry registry) {registry.addMapping("/**").allowedHeaders("*").allowedMethods("*").allowedOrigins("localhost");}};}
}

9、Filter

@Component
public class MyWebFilter implements WebFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {ServerHttpRequest request = exchange.getRequest();ServerHttpResponse response = exchange.getResponse();System.out.println("请求处理放行到目标方法之前...");Mono<Void> filter = chain.filter(exchange); //放行//流一旦经过某个操作就会变成新流Mono<Void> voidMono = filter.doOnError(err -> {System.out.println("目标方法异常以后...");}) // 目标方法发生异常后做事.doFinally(signalType -> {System.out.println("目标方法执行以后...");});// 目标方法执行之后//上面执行不花时间。return voidMono; //看清楚返回的是谁!!!}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com