webflux&webclient
尚硅谷SpringBoot响应式编程教程,最新springboot3入门到实战
响应式编程设计实战及SpringWebFlux源码剖析 - 拉勾
文章目录
- 前置知识
- 1、Lambda
- 2、Function
- 3、StreamAPI
- 中间操作:Intermediate Operations
- 终止操作:Terminal Operation
- 4、Reactive-Stream
- 为什么有Reactive-Stream规范
- 消息传递是响应式核心
- Reactive-Stream规范核心接口
- API Components
- 发布订阅写法
- 响应式编程理解
- Reactor
- 1、快速上手
- 介绍
- 依赖
- 2、响应式编程
- 2.1. 阻塞是对资源的浪费
- 2.2. 异步可以解决问题吗?
- 2.3. 从命令式编程到响应式编程
- 2.3.1. 可编排性与可读性
- 2.3.2. 就像装配流水线
- 2.3.3. 操作符(Operators)
- 2.3.4. subscribe() 之前什么都不会发生
- 2.3.5. 背压
- 2.3.6. 热(Hot) vs 冷(Cold)
- 3、核心特性
- 1、Mono和Flux
- ==响应式流:元素(内容) + 信号(完成/异常)==
- 基本操作
- 2、subscribe()
- 自定义流的信号感知回调
- 自定义消费者
- 3、流的取消
- **Disposable**
- 4、BaseSubscriber与生命周期钩子
- 5、背压和请求重塑
- 1、buffer:缓冲
- 2、limit:限流
- 6、以编程方式创建序列-Sink
- 1、同步环境-generate
- 2、多线程-create
- 7、 handle()
- 8、自定义线程调度
- 9、错误处理
- 1. Catch and return a static default value.
- 2. Catch and execute an alternative path with a fallback method.
- 3. Catch and dynamically compute a fallback value.
- 4. Catch, wrap to a BusinessException, and re-throw.
- 5. Catch, log an error-specific message, and re-throw.
- 6. Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.
- 7. 忽略当前异常,仅通知记录,继续推进
- 8.其它
- 10、常用操作
- filter
- filterMap
- concatMap
- concat
- concatWith
- transform
- defaultIfEmpty
- merge
- zip
- 11、超时与重试
- 12、Sinks工具类
- 单播/多播/重放/背压
- 缓存
- 13、阻塞式api
- block
- 14、Context api
- WebFlux
- 0、组件对比
- 1、WebFlux
- 1、引入
- 2、Reactor Core
- 1、HttpHandler、HttpServer
- 3、DispatcherHandler
- 1、请求处理流程
- 4、注解开发
- 1、目标方法传参
- 2、返回值写法
- 5、文件上传
- 6、错误处理
- 7、RequestContext
- 8、自定义Flux配置
- WebFluxConfigurer
- 9、Filter

前置知识
1、Lambda
Java8语法糖:
package com.atguiggu.lambda;import java.util.*;
import java.util.function.*;
import java.util.stream.Collectors;/*** @author lfy* @Description* @create 2023-11-16 20:07*///函数式接口;只要是函数式接口就可以用Lambda表达式简化
//函数式接口: 接口中有且只有一个未实现的方法,这个接口就叫函数式接口interface MyInterface {int sum(int i, int j);
}interface MyHaha {int haha();default int heihei() {return 2;}; //默认实现
}interface My666 {void aaa(int i,int j,int k);
}@FunctionalInterface //检查注解,帮我们快速检查我们写的接口是否函数式接口
interface MyHehe {int hehe(int i);}//1、自己写实现类
class MyInterfaceImpl implements MyInterface {@Overridepublic int sum(int i, int j) {return i + j;}
}public class Lambda {public static void main(String[] args) {//声明一个函数BiConsumer<String,String> consumer = (a,b)->{System.out.println("哈哈:"+a+";呵呵:"+b);};consumer.accept("1","2");//声明一个函数Function<String,Integer> function = (String x) -> Integer.parseInt(x);System.out.println(function.apply("2"));Supplier<String> supplier = ()-> UUID.randomUUID().toString();String s = supplier.get();System.out.println(s);BiFunction<String,Integer,Long> biFunction = (a,b)-> 888L;Predicate<Integer> even = (t)-> t%2 ==0;// even.test()//正向判断
// even.negate().test(2) //反向判断System.out.println(even.negate().test(2));}public static void bbbbb(String[] args) {var names = new ArrayList<String>();names.add("Alice");names.add("Bob");names.add("Charlie");names.add("David");//比较器
// Collections.sort(names, new Comparator<String>() {
// @Override
// public int compare(String o1, String o2) {
// return o2.compareTo(o1);
// }
// });//直接写函数式接口就方便 (o1,o2)->o1.compareTo(o2)
// Collections.sort(names,(o1,o2)->o1.compareTo(o2));System.out.println(names);// 类::方法; 引用类中的实例方法; 忽略lambda的完整写法Collections.sort(names,String::compareTo);System.out.println(names);new Thread(new Runnable() {@Overridepublic void run() {System.out.println("哈哈啊");}}).start();Runnable runnable = () -> System.out.println("aaa");new Thread(runnable).start();//最佳实战://1、以后调用某个方法传入参数,这个参数实例是一个接口对象,且只定义了一个方法,就直接用lambda简化写法}/*** lambda简化函数式接口实例创建** @param args*/public static void aaaa(String[] args) {//1、自己创建实现类对象MyInterface myInterface = new MyInterfaceImpl();System.out.println(myInterface.sum(1, 2));//2、创建匿名实现类MyInterface myInterface1 = new MyInterface() {@Overridepublic int sum(int i, int j) {return i * i + j * j;}};
// System.out.println(myInterface1.sum(2, 3));//冗余写法//3、lambda表达式:语法糖 参数列表 + 箭头 + 方法体MyInterface myInterface2 = (x, y) -> {return x * x + y * y;};System.out.println(myInterface2.sum(2, 3));//参数位置最少情况MyHaha myHaha = () -> {return 1;};MyHehe myHehe = y -> {return y * y;};MyHehe hehe2 = y -> y - 1;//完整写法如上://简化写法://1)、参数类型可以不写,只写(参数名),参数变量名随意定义;// 参数表最少可以只有一个 (),或者只有一个参数名;//2、方法体如果只有一句话,{} 可以省略MyHehe hehe3 = y -> y + 1;System.out.println(hehe3.hehe(7));//以上Lambda表达式简化了实例的创建。//总结:// 1、Lambda表达式: (参数表) -> {方法体}// 2、分辨出你的接口是否函数式接口。 函数式接口就可以lambda简化}}
2、Function
在Java中,函数式接口是只包含一个抽象方法的接口。它们是支持Lambda表达式的基础,因为Lambda表达式需要一个目标类型,这个目标类型必须是一个函数式接口。
函数式接口的出入参定义:
1、有入参,无出参【消费者】: function.accept
BiConsumer<String,String> function = (a,b)->{ //能接受两个入参System.out.println("哈哈:"+a+";呵呵:"+b);
};
function.accept("1","2");
2、有入参,有出参【多功能函数】: function.apply
Function<String,Integer> function = (String x) -> Integer.parseInt(x);
System.out.println(function.apply("2"));
3、无入参,无出参【普通函数】:
Runnable runnable = () -> System.out.println("aaa");new Thread(runnable).start();
4、无入参 ,有出参【提供者】: supplier.get()
Supplier<String> supplier = ()-> UUID.randomUUID().toString();
String s = supplier.get();
System.out.println(s);
java.util.function包下的所有function定义:
- Consumer: 消费者
- Supplier: 提供者
- Predicate: 断言
get/test/apply/accept调用的函数方法;
位于java.util.function包下
3、StreamAPI
最佳实战:以后凡是你写for循环处理数据的统一全部用StreamAPI进行替换;
Stream所有数据和操作被组合成流管道流管道组成:
- 一个数据源(可以是一个数组、集合、生成器函数、I/O管道)
- 零或多个中间操作(将一个流变形成另一个流)
- 一个终止操作(产生最终结果)
中间操作:Intermediate Operations
-
filter:过滤; 挑出我们用的元素
-
map: 映射: 一一映射,a 变成 b
-
- mapToInt、mapToLong、mapToDouble
-
flatMap:打散、散列、展开、扩维:一对多映射
filter、
map、mapToInt、mapToLong、mapToDouble
flatMap、flatMapToInt、flatMapToLong、flatMapToDouble
mapMulti、mapMultiToInt、mapMultiToLong、mapMultiToDouble、
parallel、unordered、onClose、sequential
distinct、sorted、peek、limit、skip、takeWhile、dropWhile、
终止操作:Terminal Operation
forEach、forEachOrdered、toArray、reduce、collect、toList、min、
max、count、anyMatch、allMatch、noneMatch、findFirst、findAny、iterator
Stream.of(1, 2, 3).filter(e -> {System.out.println("e1 = " + e);return true;
}).filter(e -> {System.out.println("e2 = " + e);return true;
}).collect(Collectors.toList());输出是:
e1 = 1
e2 = 1
e1 = 2
e2 = 2
e1 = 3
e2 = 3
4、Reactive-Stream
Reactive Streams是JVM面向流的库的标准和规范(jdk9开始,有java.util.concurrent.Flow类)
1、处理可能无限数量的元素
2、有序
3、在组件之间异步传递元素
4、强制性非阻塞,背压模式
推荐阅读:
-
jdk9 reactive响应式规范:https://www.reactive-streams.org/
-
响应式宣言:https://www.reactivemanifesto.org/zh-CN
-
ReactiveStream: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md
2、响应式编程-Reactor核心.pptx
为什么有Reactive-Stream规范
目的:通过全异步的方式、加缓存区构建一个实时的数据流系统,
Kafka、MQ能构建出大型分布式的响应式系统。
缺本地化的消息系统解决方案:
- 让所有的异步线程能互相监听消息,处理消息,构建实时消息处理流
消息传递是响应式核心
之前a调用b,必须b做完了事情,a才能接着做事情。现在响应式就是a先将b要做的事情放到缓冲区中,b监听这个缓冲区,从缓冲区中拿数据,去做事情,这样a就不用等待了。
引入一个缓存区,引入消息队列,就能实现全系统、全异步、不阻塞、不等待、实时响应
Reactive-Stream规范核心接口
API Components
查看jdk9的java.util.concurrent.Flow类
发布订阅写法
package com.atguigu.flow;import lombok.SneakyThrows;import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;/*** @author lfy* @Description* @create 2023-11-17 20:59*/
public class FlowDemo {//定义流中间操作处理器; 只用写订阅者的接口static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String,String> {private Flow.Subscription subscription; //保存绑定关系@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("processor订阅绑定完成");this.subscription = subscription;subscription.request(1); //找上游要一个数据}@Override //数据到达,触发这个回调public void onNext(String item) {System.out.println("processor拿到数据:"+item);//再加工item += ":哈哈";submit(item);//把我加工后的数据发出去subscription.request(1); //再要新数据}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}}/*** 1、Publisher:发布者* 2、Subscriber:订阅者* 3、Subscription: 订阅关系* 4、Processor: 处理器* @param args*///发布订阅模型:观察者模式,public static void main(String[] args) throws InterruptedException {//1、定义一个发布者; 发布数据;SubmissionPublisher<String> publisher = new SubmissionPublisher<>();//2、定一个中间操作: 给每个元素加个 哈哈 前缀MyProcessor myProcessor1 = new MyProcessor();MyProcessor myProcessor2 = new MyProcessor();MyProcessor myProcessor3 = new MyProcessor();//3、定义一个订阅者; 订阅者感兴趣发布者的数据;Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {private Flow.Subscription subscription;@Override //在订阅时 onXxxx:在xxx事件发生时,执行这个回调public void onSubscribe(Flow.Subscription subscription) {System.out.println(Thread.currentThread()+"订阅开始了:"+subscription);this.subscription = subscription;//从上游请求一个数据subscription.request(1);}@Override //在下一个元素到达时; 执行这个回调; 接受到新数据public void onNext(String item) {System.out.println(Thread.currentThread()+"订阅者,接受到数据:"+item);if(item.equals("p-7")){subscription.cancel(); //取消订阅}else {subscription.request(1);}}@Override //在错误发生时,public void onError(Throwable throwable) {System.out.println(Thread.currentThread()+"订阅者,接受到错误信号:"+throwable);}@Override //在完成时public void onComplete() {System.out.println(Thread.currentThread()+"订阅者,接受到完成信号:");}};//4、绑定发布者和订阅者publisher.subscribe(myProcessor1); //此时处理器相当于订阅者myProcessor1.subscribe(myProcessor2); //此时处理器相当于发布者myProcessor2.subscribe(myProcessor3);myProcessor3.subscribe(subscriber); //链表关系绑定出责任链。//绑定操作;就是发布者,记住了所有订阅者都有谁,有数据后,给所有订阅者把数据推送过去。// publisher.subscribe(subscriber);for (int i = 0; i < 10; i++) {//发布10条数据if(i == 5){
// publisher.closeExceptionally(new RuntimeException("5555"));}else {publisher.submit("p-"+i);}//publisher发布的所有数据在它的buffer区;//中断
// publisher.closeExceptionally();}//ReactiveStream//jvm底层对于整个发布订阅关系做好了 异步+缓存区处理 = 响应式系统;// 我们只需要编排流处理环节,其它的交给jvm完成(比如:异步就又jvm自己干,我们不用关心)//发布者通道关闭publisher.close();// publisher.subscribe(subscriber2);//发布者有数据,订阅者就会拿到Thread.sleep(20000);}
}
响应式编程理解
使用少量资源处理大量并发的一种解决方案。
Reactor
projectreactor官网
1、快速上手
介绍
Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,具备高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于[N]个元素)
和 Mono(用于 [0|1]个元素)
,并完全遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)。
Reactor 的 reactor-ipc 组件还支持非阻塞的进程间通信(inter-process communication, IPC)。 Reactor IPC 为 HTTP(包括 Websockets)、TCP 和 UDP 提供了支持背压的网络引擎,从而适合 应用于微服务架构。并且完整支持响应式编解码(reactive encoding and decoding)。
依赖
<dependencyManagement> <dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2023.0.0</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
<dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId> </dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId> <scope>test</scope></dependency>
</dependencies>
2、响应式编程
响应式编程是一种关注于数据流(data streams)和变化传递(propagation of change)的异步编程方式。 这意味着它可以用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。
了解历史:
- 在响应式编程方面,微软跨出了第一步,它在 .NET 生态中创建了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在JVM上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式 编程规范,它定义了一系列标准接口和交互规范。并整合到 Java 9 中(使用 Flow 类)。
- 响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 因为其中也有 Iterable-Iterator 这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。
- 使用 iterator 是一种“命令式”(imperative)编程范式,即使访问元素的方法是 Iterable 的唯一职责。关键在于,什么时候执行 next() 获取元素取决于开发者。在响应式流中,相对应的 角色是 Publisher-Subscriber,但是 当有新的值到来的时候 ,却反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键。此外,对推送来的数据的操作 是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过 描述“控制流程”来定义对数据流的处理逻辑。
- 除了数据推送,对错误处理(error handling)和完成(completion)信号的定义也很完善。 一个 Publisher 可以推送新的值到它的 Subscriber(调用 onNext 方法), 同样也可以推送错误(调用 onError 方法)和完成(调用 onComplete 方法)信号。 错误和完成信号都可以终止响应式流。可以用下边的表达式描述:
onNext x 0..N [onError | onComplete]
2.1. 阻塞是对资源的浪费
现代应用需要应对大量的并发用户,而且即使现代硬件的处理能力飞速发展,软件性能仍然是关键因素。
广义来说我们有两种思路来提升程序性能:
- 并行化(parallelize) :使用更多的线程和硬件资源。[异步]
- 基于现有的资源来 提高执行效率 。
通常,Java开发者使用阻塞式(blocking)编写代码。这没有问题,在出现性能瓶颈后, 我们可以增加处理线程,线程中同样是阻塞的代码。但是这种使用资源的方式会迅速面临 资源竞争和并发问题。
更糟糕的是,阻塞会浪费资源。具体来说,比如当一个程序面临延迟(通常是I/O方面, 比如数据库读写请求或网络调用),所在线程需要进入 idle 状态等待数据,从而浪费资源。
所以,并行化方式并非银弹。这是挖掘硬件潜力的方式,但是却带来了复杂性,而且容易造成浪费。
2.2. 异步可以解决问题吗?
第二种思路——提高执行效率——可以解决资源浪费问题。通过编写 异步非阻塞 的代码, (任务发起异步调用后)执行过程会切换到另一个 使用同样底层资源 的活跃任务,然后等 异步调用返回结果再去处理。
但是在 JVM 上如何编写异步代码呢?Java 提供了两种异步编程方式:
- 回调(Callbacks) :异步方法没有返回值,而是采用一个 callback 作为参数(lambda 或匿名类),当结果出来后回调这个 callback。常见的例子比如 Swings 的 EventListener。
- Futures :异步方法 立即 返回一个 Future<T>,该异步方法要返回结果的是 T 类型,通过 Future封装。这个结果并不是 立刻 可以拿到,而是等实际处理结束才可用。比如, ExecutorService 执行 Callable 任务时会返回 Future 对象。
这些技术够用吗?并非对于每个用例都是如此,两种方式都有局限性。
回调很难组合起来,因为很快就会导致代码难以理解和维护(即所谓的“回调地狱(callback hell)”)。
考虑这样一种情景:
- 在用户界面上显示用户的5个收藏,或者如果没有任何收藏提供5个建议。
- 这需要3个 服务(一个提供收藏的ID列表,第二个服务获取收藏内容,第三个提供建议内容):
回调地狱(Callback Hell)的例子:
userService.getFavorites(userId, new Callback<List<String>>() { public void onSuccess(List<String> list) { if (list.isEmpty()) { suggestionService.getSuggestions(new Callback<List<Favorite>>() {public void onSuccess(List<Favorite> list) { UiUtils.submitOnUiThread(() -> { list.stream().limit(5).forEach(uiList::show); });}public void onError(Throwable error) { UiUtils.errorPopup(error);}});} else {list.stream() .limit(5).forEach(favId -> favoriteService.getDetails(favId, new Callback<Favorite>() {public void onSuccess(Favorite details) {UiUtils.submitOnUiThread(() -> uiList.show(details));}public void onError(Throwable error) {UiUtils.errorPopup(error);}}));}}public void onError(Throwable error) {UiUtils.errorPopup(error);}
});
Reactor改造后为:
userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);
如果你想确保“收藏的ID”的数据在800ms内获得(如果超时,从缓存中获取)呢?在基于回调的代码中, 会比较复杂。但 Reactor 中就很简单,在处理链中增加一个 timeout 的操作符即可。
userService.getFavorites(userId).timeout(Duration.ofMillis(800)) .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()).take(5).publishOn(UiUtils.uiThreadScheduler()).subscribe(uiList::show, UiUtils::errorPopup);
额外扩展:
Futures 比回调要好一点,但即使在 Java 8 引入了 CompletableFuture,它对于多个处理的组合仍不够好用。 编排多个 Futures 是可行的,但却不易。此外,Future 还有一个问题:当对 Future 对象最终调用 get() 方法时,仍然会导致阻塞,并且缺乏对多个值以及更进一步对错误的处理。
考虑另外一个例子,我们首先得到 ID 的列表,然后通过它进一步获取到“对应的 name 和 statistics” 为元素的列表,整个过程用异步方式来实现。
CompletableFuture 处理组合的例子
CompletableFuture<List<String>> ids = ifhIds(); CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { Stream<CompletableFuture<String>> zip =l.stream().map(i -> { CompletableFuture<String> nameTask = ifhName(i); CompletableFuture<Integer> statTask = ifhStat(i); return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); });List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); return allDone.thenApply(v -> combinationList.stream().map(CompletableFuture::join) .collect(Collectors.toList()));
});List<String> results = result.join();
assertThat(results).contains("Name NameJoe has stats 103","Name NameBart has stats 104","Name NameHenry has stats 105","Name NameNicole has stats 106","Name NameABSLAJNFOAJNFOANFANSF has stats 121");
2.3. 从命令式编程到响应式编程
类似 Reactor 这样的响应式库的目标就是要弥补上述“经典”的 JVM 异步方式所带来的不足, 此外还会关注一下几个方面:
- 可编排性(Composability) 以及 可读性(Readability)
- 使用丰富的 操作符 来处理形如 流 的数据
- 在 订阅(subscribe) 之前什么都不会发生
- 背压(backpressure) 具体来说即 消费者能够反向告知生产者生产内容的速度的能力
- 高层次 (同时也是有高价值的)的抽象,从而达到 并发无关 的效果
2.3.1. 可编排性与可读性
可编排性,指的是编排多个异步任务的能力。比如我们将前一个任务的结果传递给后一个任务作为输入, 或者将多个任务以分解再汇总(fork-join)的形式执行,或者将异步的任务作为离散的组件在系统中 进行重用。
这种编排任务的能力与代码的可读性和可维护性是紧密相关的。随着异步处理任务数量和复杂度 的提高,编写和阅读代码都变得越来越困难。就像我们刚才看到的,回调模式是简单的,但是缺点 是在复杂的处理逻辑中,回调中会层层嵌入回调,导致 回调地狱(Callback Hell) 。你能猜到 (或有过这种痛苦经历),这样的代码是难以阅读和分析的。
Reactor 提供了丰富的编排操作,从而代码直观反映了处理流程,并且所有的操作保持在同一层次 (尽量避免了嵌套)。
2.3.2. 就像装配流水线
你可以想象数据在响应式应用中的处理,就像流过一条装配流水线。Reactor 既是传送带, 又是一个个的装配工或机器人。原材料从源头(最初的 Publisher)流出,最终被加工为成品, 等待被推送到消费者(或者说 Subscriber)。
原材料会经过不同的中间处理过程,或者作为半成品与其他半成品进行组装。如果某处有齿轮卡住, 或者某件产品的包装过程花费了太久时间,相应的工位就可以向上游发出信号来限制或停止发出原材料。
2.3.3. 操作符(Operators)
在 Reactor 中,操作符(operator)就像装配线中的工位(操作员或装配机器人)。**每一个操作符 对 Publisher 进行相应的处理,然后将 Publisher 包装为一个新的 Publisher。**就像一个链条, 数据源自第一个 Publisher,然后顺链条而下,在每个环节进行相应的处理。最终,一个订阅者 (Subscriber)终结这个过程。请记住,在订阅者(Subscriber)订阅(subscribe)到一个 发布者(Publisher)之前,什么都不会发生。
理解了操作符会创建新的 Publisher 实例这一点,能够帮助你避免一个常见的问题, 这种问题会让你觉得处理链上的某个操作符没有起作用。
虽然响应式流规范(Reactive Streams specification)没有规定任何操作符, 类似 Reactor 这样的响应式库所带来的最大附加价值之一就是提供丰富的操作符。包括基础的转换操作, 到过滤操作,甚至复杂的编排和错误处理操作。
2.3.4. subscribe() 之前什么都不会发生
在 Reactor 中,当你创建了一条 Publisher 处理链,数据还不会开始生成。事实上,你是创建了 一种抽象的对于异步处理流程的描述(从而方便重用和组装)。
当真正“订阅(subscrib)”的时候,你需要将 Publisher 关联到一个 Subscriber 上,然后 才会触发整个链的流动。这时候,Subscriber 会向上游发送一个 request 信号,一直到达源头 的 Publisher。
2.3.5. 背压
向上游传递信号这一点也被用于实现 背压 ,就像在装配线上,某个工位的处理速度如果慢于流水线 速度,会对上游发送反馈信号一样。
在响应式流规范中实际定义的机制同刚才的类比非常接近:订阅者可以无限接受数据并让它的源头 “满负荷”推送所有的数据,也可以通过使用 request 机制来告知源头它一次最多能够处理 n 个元素。
中间环节的操作也可以影响 request。想象一个能够将每10个元素分批打包的缓存(buffer)操作。 如果订阅者请求一个元素,那么对于源头来说可以生成10个元素。此外预取策略也可以使用了, 比如在订阅前预先生成元素。
这样能够将“推送”模式转换为“推送+拉取”混合的模式,如果下游准备好了,可以从上游拉取 n 个元素;但是如果上游元素还没有准备好,下游还是要等待上游的推送。
2.3.6. 热(Hot) vs 冷(Cold)
在 Rx 家族的响应式库中,响应式流分为“热”和“冷”两种类型,区别主要在于响应式流如何 对订阅者进行响应:
- 一个“冷”的序列,指对于每一个 Subscriber,都会收到从头开始所有的数据。如果源头 生成了一个 HTTP 请求,对于每一个订阅都会创建一个新的 HTTP 请求。
- 一个“热”的序列,指对于一个 Subscriber,只能获取从它开始 订阅 之后 发出的数据。不过注意,有些“热”的响应式流可以缓存部分或全部历史数据。 通常意义上来说,一个“热”的响应式流,甚至在即使没有订阅者接收数据的情况下,也可以 发出数据(这一点同 “Subscribe() 之前什么都不会发生”的规则有冲突)。
3、核心特性
1、Mono和Flux
Mono: 0|1 数据流
Flux: N数据流
响应式流:元素(内容) + 信号(完成/异常)
基本操作
类比Stream流操作,中间操作对应流转为另1个新流,终止操作对应订阅。只有开始订阅了,流才会开始,流中的 每个元素 和 信号 都是按顺序进入流处理,然后交给订阅者处理。
package com.atguigu.reactor;import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.*;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;/*** @author lfy* @Description* @create 2023-11-23 20:58*/
public class FluxDemo {public static void main(String[] args) {// Flux.concat(Flux.just(1,2,3),Flux.just(7,8,9))// .subscribe(System.out::println);Flux.range(1, 7)// .log() //日志 onNext(1~7).filter(i -> i > 3) //挑出>3的元素// .log() //onNext(4~7).map(i -> "haha-" + i).log() // onNext(haha-4 ~ 7).subscribe(System.out::println);//今天: Flux、Mono、弹珠图、事件感知API、每个操作都是操作的上个流的东西}/*** 响应式编程核心:看懂文档弹珠图;* 信号: 正常/异常(取消)* SignalType:* SUBSCRIBE: 被订阅* REQUEST: 请求了N个元素* CANCEL: 流被取消* ON_SUBSCRIBE:在订阅时候* ON_NEXT: 在元素到达* ON_ERROR: 在流错误* ON_COMPLETE:在流正常完成时* AFTER_TERMINATE:中断以后* CURRENT_CONTEXT:当前上下文* ON_CONTEXT:感知上下文* <p>* doOnXxx API触发时机* 1、doOnNext:每个数据(流的数据)到达的时候触发* 2、doOnEach:每个元素(流的数据和信号)到达的时候触发* 3、doOnRequest: 消费者请求流元素的时候* 4、doOnError:流发生错误* 5、doOnSubscribe: 流被订阅的时候* 6、doOnTerminate: 发送取消/异常信号中断了流* 7、doOnCancle: 流被取消* 8、doOnDiscard:流中元素被忽略的时候** @param args*/public void doOnXxxx(String[] args) {// 关键:doOnNext:表示流中某个元素到达以后触发我一个回调// doOnXxx要感知某个流的事件,写在这个流的后面,新流的前面Flux.just(1, 2, 3, 4, 5, 6, 7, 0, 5, 6).doOnNext(integer -> System.out.println("元素到达:" + integer)) //元素到达得到时候触发.doOnEach(integerSignal -> { //each封装的详细System.out.println("doOnEach.." + integerSignal);})//1,2,3,4,5,6,7,0.map(integer -> 10 / integer) //10,5,3,.doOnError(throwable -> {System.out.println("数据库已经保存了异常:" + throwable.getMessage());}).map(integer -> 100 / integer).doOnNext(integer -> System.out.println("元素到哈:" + integer)).subscribe(System.out::println);}//Mono<Integer>: 只有一个Integer//Flux<Integer>: 有很多Integerpublic void fluxDoOn(String[] args) throws IOException, InterruptedException {// Mono<Integer> just = Mono.just(1);//// just.subscribe(System.out::println);//空流: 链式API中,下面的操作符,操作的是上面的流。// 事件感知API:当流发生什么事的时候,触发一个回调,系统调用提前定义好的钩子函数(Hook【钩子函数】);doOnXxx;Flux<Integer> flux = Flux.range(1, 7).delayElements(Duration.ofSeconds(1)).doOnComplete(() -> {System.out.println("流正常结束...");}).doOnCancel(() -> {System.out.println("流已被取消...");}).doOnError(throwable -> {System.out.println("流出错..." + throwable);}).doOnNext(integer -> {System.out.println("doOnNext..." + integer);}); //有一个信号:此时代表完成信号flux.subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("订阅者和发布者绑定好了:" + subscription);request(1); //背压}@Overrideprotected void hookOnNext(Integer value) {System.out.println("元素到达:" + value);if (value < 5) {request(1);if (value == 3) {int i = 10 / 0;}} else {cancel();//取消订阅}; //继续要元素}@Overrideprotected void hookOnComplete() {System.out.println("数据流结束");}@Overrideprotected void hookOnError(Throwable throwable) {System.out.println("数据流异常");}@Overrideprotected void hookOnCancel() {System.out.println("数据流被取消");}@Overrideprotected void hookFinally(SignalType type) {System.out.println("结束信号:" + type);// 正常、异常// try {// //业务// }catch (Exception e){//// }finally {// //结束// }}});Thread.sleep(2000);// Flux<Integer> range = Flux.range(1, 7);System.in.read();}//测试Fluxpublic void flux() throws IOException {// Mono: 0|1个元素的流// Flux: N个元素的流; N>1//发布者发布数据流:源头//1、多元素的流Flux<Integer> just = Flux.just(1, 2, 3, 4, 5); ////流不消费就没用; 消费:订阅just.subscribe(e -> System.out.println("e1 = " + e));//一个数据流可以有很多消费者just.subscribe(e -> System.out.println("e2 = " + e));//对于每个消费者来说流都是一样的; 广播模式;System.out.println("==========");Flux<Long> flux = Flux.interval(Duration.ofSeconds(1));//每秒产生一个从0开始的递增数字flux.subscribe(System.out::println);System.in.read();}
}
2、subscribe()
flxu.subscribe() // 流被订阅,默认订阅
flux.subscribe(e->System.out.println(e)) // 指定订阅规则:正常消费者:只消费正常元素
自定义流的信号感知回调
flux.subscribe(v-> System.out.println("v = " + v), //流元素消费throwable -> System.out.println("throwable = " + throwable), //感知异常结束()-> System.out.println("流结束了...") //感知正常结束
);
自定义消费者
flux.subscribe(new BaseSubscriber<String>() {// 生命周期钩子1: 订阅关系绑定的时候触发@Overrideprotected void hookOnSubscribe(Subscription subscription) {// 流被订阅的时候触发System.out.println("绑定了..."+subscription);//找发布者要数据request(1); //要1个数据// requestUnbounded(); //要无限数据}@Overrideprotected void hookOnNext(String value) {System.out.println("数据到达,正在处理:"+value);request(1); //要1个数据}// hookOnComplete、hookOnError 二选一执行@Overrideprotected void hookOnComplete() {System.out.println("流正常结束...");}@Overrideprotected void hookOnError(Throwable throwable) {System.out.println("流异常..."+throwable);}@Overrideprotected void hookOnCancel() {System.out.println("流被取消...");}@Overrideprotected void hookFinally(SignalType type) {System.out.println("最终回调...一定会被执行");}
});
3、流的取消
消费者调用 cancle() 取消流的订阅;
Disposable
Flux<String> flux = Flux.range(1, 10).map(i -> {System.out.println("map..."+i);if(i==9) {i = 10/(9-i); //数学运算异常; doOnXxx}return "哈哈:" + i;}); //流错误的时候,把错误吃掉,转为正常信号// flux.subscribe(); //流被订阅; 默认订阅;
// flux.subscribe(v-> System.out.println("v = " + v));//指定订阅规则: 正常消费者:只消费正常元素// flux.subscribe(
// v-> System.out.println("v = " + v), //流元素消费
// throwable -> System.out.println("throwable = " + throwable), //感知异常结束
// ()-> System.out.println("流结束了...") //感知正常结束
// );// 流的生命周期钩子可以传播给订阅者。
// a() {
// data = b();
// }
flux.subscribe(new BaseSubscriber<String>() {// 生命周期钩子1: 订阅关系绑定的时候触发@Overrideprotected void hookOnSubscribe(Subscription subscription) {// 流被订阅的时候触发System.out.println("绑定了..."+subscription);//找发布者要数据request(1); //要1个数据// requestUnbounded(); //要无限数据}@Overrideprotected void hookOnNext(String value) {System.out.println("数据到达,正在处理:"+value);if(value.equals("哈哈:5")){cancel(); //取消流}request(1); //要1个数据}// hookOnComplete、hookOnError 二选一执行@Overrideprotected void hookOnComplete() {System.out.println("流正常结束...");}@Overrideprotected void hookOnError(Throwable throwable) {System.out.println("流异常..."+throwable);}@Overrideprotected void hookOnCancel() {System.out.println("流被取消...");}@Overrideprotected void hookFinally(SignalType type) {System.out.println("最终回调...一定会被执行");}
});
4、BaseSubscriber与生命周期钩子
自定义消费者,推荐直接编写 BaseSubscriber 的逻辑;
Flux.range(1, 10).map(e -> String.valueOf(e)).map(e -> e + "-zzhua").doOnCancel(()->{System.out.println("doOnCancel");}).subscribe(new BaseSubscriber<String>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("hookOnSubscribe");subscription.request(1);}@Overrideprotected void hookOnNext(String value) {System.out.println("hookOnNext:" + value);request(1);if (value.equals("5-zzhua")) {cancel();}}@Overrideprotected void hookOnComplete() {System.out.println("hookOnComplete");super.hookOnComplete(); // no-op}@Overrideprotected void hookOnError(Throwable throwable) {System.out.println("hookOnError" + throwable);super.hookOnError(throwable); // throw ...}@Overrideprotected void hookOnCancel() {System.out.println("hookOnCancel");super.hookOnCancel(); // no-op}@Overrideprotected void hookFinally(SignalType type) {System.out.println("hookFinally");super.hookFinally(type);// no-op}});
5、背压和请求重塑
背压(Backpressure ):由订阅者请求发布者传送指定数量请求,而不是由发布者任意发布
请求重塑(Reshape Requests)
1、buffer:缓冲
Flux<List<Integer>> flux = Flux.range(1, 10) //原始流10个.buffer(3).log();//缓冲区:缓冲3个元素: 消费一次最多可以拿到三个元素; 凑满数批量发给消费者
//
// //一次发一个,一个一个发;
// 10元素,buffer(3);消费者请求4次,数据消费完成flux.subscribe(v-> system.out.println("类型:"+v.getclass()+" 值为:"+V));// 消费者每次 request(1)拿到的是几个真正的数据:buffer数据
Flux<List<Integer>> flux = Flux.range(0, 10).buffer(3).log();
flux.subscribe(e -> {System.out.println("subscribe" + e);
});
2、limit:限流
Flux.range(1, 1000).log()//限流触发,看上游是怎么限流获取数据的.limitRate(100) .subscribe();
// 75 % 预取策路:limitRate(100)
// 第一次抓取100个数据,如果 75 % 的元素已经处理了,继续抓取 新的 75 % 元素;
6、以编程方式创建序列-Sink
Sink.next
Sink.complete
1、同步环境-generate
// 创建1个1~10的序列
Flux<Object> flux = Flux.generate(() -> 0, // 初始值(state, sink) -> {if (state <= 10) {sink.next(state);// 发送数据(每次执行,只能调用1次next)} else {sink.complete(); // 完成信号}if (state == 7) {sink.error(new RuntimeException("i dislike 7"));}return state + 1;}
);flux.log().doOnError(e -> System.out.println("doOnError:" + e)).subscribe();
System.out.println("main start...");Disposable disposable = Flux.range(1, 10).delayElements(Duration.ofSeconds(1)).log().map(t -> t * 10).doOnCancel(()-> System.out.println("doOnCancel")).subscribe(t -> System.out.println("subscribe:" + t));System.out.println("start a thread...");// 5s之后,取消这个流
new Thread(() -> {try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}disposable.dispose();
}).start();System.out.println("main end...");
/*
main start...
[ INFO] (main) onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
[ INFO] (main) request(unbounded)
start a thread...
main end...
[ INFO] (parallel-1) onNext(1)
subscribe:10
[ INFO] (parallel-2) onNext(2)
subscribe:20
[ INFO] (parallel-3) onNext(3)
subscribe:30
[ INFO] (parallel-4) onNext(4)
subscribe:40
doOnCancel
[ INFO] (Thread-0) cancel()
*/
2、多线程-create
static class MyListener {private FluxSink<Object> fluxSink;public MyListener(FluxSink<Object> fluxSink) {this.fluxSink = fluxSink;}public void online(String name) {System.out.println("用户登录了: " + name);fluxSink.next(name); // 传入用户}}public static void main(String[] args) {Flux<Object> flux = Flux.create(fluxSink -> {MyListener myListener = new MyListener(fluxSink);for (int i = 1; i <= 100; i++) {myListener.online("张" + i);}});flux.subscribe();
}
7、 handle()
自定义流中元素处理规则
Flux.range(1, 10).log()// 自定义元素处理规则, 比如替代map(..)等操作,由于handle可以自定义,所以更加强大.handle((value, sink)->{sink.next("张~" + value);}).log().subscribe()
8、自定义线程调度
响应式:响应式编程: 全异步、消息、事件回调
默认还是用当前线程,生成整个流、流的发布、流的中间操作
// 流的发布、中间操作,默认使用当前线程
Flux.range(1, 10).publishOn(Schedulers.single()) // 在哪个线程池把这个流的数据和操作执行.log().map(t -> t * 10).log().subscribeOn(Schedulers.single()) // 指定订阅者订阅操作线程.subscribe((e) -> System.out.println(Thread.currentThread().getName() + ":" + e));// 调度器: 线程池
// Schedulers.immediate(); // 无执行上下文,当前线程运行所有操作
// Schedulers.single(); // 使用固定的1个单线程
// 线程池中有 10 * cpu核心个线程, 队列默认10w, keepAliveTime 60s
// Schedulers.boundedElastic(); // 有界、弹性调度; 不是无限扩充的的线程池;
Schedulers.parallel();
// 自定义线程池
Schedulers.fromExecutor(new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000)));LockSupport.park();
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);final Flux<String> flux = Flux.range(1, 2).map(i -> 10 + i).log().publishOn(s).map(i -> "value " + i).log();// 只要不指定线程池,默认发布者用的线程就是订阅者的线程;
new Thread(() -> flux.subscribe(System.out::println)).start();/*
[ INFO] (Thread-0) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (Thread-0) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (Thread-0) | request(unbounded)
[ INFO] (Thread-0) | request(256)
[ INFO] (Thread-0) | onNext(11)
[ INFO] (Thread-0) | onNext(12)
[ INFO] (Thread-0) | onComplete()
[ INFO] (parallel-scheduler-1) | onNext(value 11)
value 11
[ INFO] (parallel-scheduler-1) | onNext(value 12)
value 12
[ INFO] (parallel-scheduler-1) | onComplete()
*/
9、错误处理
命令式编程:常见的错误处理方式
1. Catch and return a static default value.
捕获异常返回一个静态默认值
try {return doSomethingDangerous(10);
}
catch (Throwable error) {return "RECOVERED";
}
onErrorReturn: 实现上面效果,错误的时候返回一个值
- 1、吃掉异常,消费者无异常感知
- 2、返回一个兜底默认值
- 3、流正常完成(不再处理后续元素);
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).subscribe(v-> System.out.println("v = " + v),err -> System.out.println("err = " + err),()-> System.out.println("流结束") // error handling example);/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = java.lang.ArithmeticException: / by zero
*/
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i))// 只要发生异常,就使用默认值.onErrorReturn("哈哈-6666").subscribe(v-> System.out.println("v = " + v),err -> System.out.println("err = " + err),()-> System.out.println("流结束") // error handling example);/*
v = 100 / 1 = 100
v = 100 / 2 = 50
v = 哈哈-6666
流结束
*/
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i))// 是指定的异常,才使用默认值// .onErrorReturn(ArithmeticException.class,"哈哈-6666").onErrorReturn(NullPointerException.class,"哈哈-6666").subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束") // error handling example);
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = java.lang.ArithmeticException: / by zero
*/
2. Catch and execute an alternative path with a fallback method.
吃掉异常,执行一个兜底方法;
try {return doSomethingDangerous(10);
}
catch (Throwable error) {return doOtherthing(10);
}
onErrorResume
- 1、吃掉异常,消费者无异常感知
- 2、调用一个兜底方法
- 3、流正常完成
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).onErrorResume(err -> Mono.just("哈哈-777")).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
v = 哈哈-777
流结束
*/
3. Catch and dynamically compute a fallback value.
捕获并动态计算一个返回值,即根据错误返回一个新值
try {Value v = erroringMethod();return MyWrapper.fromValue(v);
}
catch (Throwable error) {return MyWrapper.fromError(error);
}
.onErrorResume(err -> Flux.error(new BusinessException(err.getMessage()+":炸了")))
- 1、吃掉异常,消费者有感知
- 2、调用一个自定义方法
- 3、流异常完成
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).onErrorResume(err -> {if (err instanceof NullPointerException) {return Mono.just("哈哈-777");}return Mono.just("其它");}).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
v = 哈哈-777
流结束
*/
4. Catch, wrap to a BusinessException, and re-throw.
捕获并包装成一个业务异常,并重新抛出
try {return callExternalService(k);
}
catch (Throwable error) {throw new BusinessException("oops, SLA exceeded", error);
}
包装重新抛出异常: 推荐用 .onErrorMap
- 1、吃掉异常,消费者有感知
- 2、抛新异常
- 3、流异常完成
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).onErrorResume(err -> Flux.error(new BusinessException(err.getMessage()))).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = com.zzhua.test02.BusinessException
*/
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).onErrorMap(err -> {return new BusinessException("除数不能为0" + err.getMessage());}).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = com.zzhua.test02.BusinessException
*/
5. Catch, log an error-specific message, and re-throw.
捕获异常,记录特殊的错误日志,重新抛出
try {return callExternalService(k);
}
catch (RuntimeException error) {//make a record of the errorlog("uh oh, falling back, service failed for key " + k);throw error;
}
- 异常被捕获、做自己的事情
- 不影响异常继续顺着流水线传播
- 不吃掉异常,只在异常发生的时候做一件事,订阅者有感知
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).doOnError(err -> {System.out.println("err已被记录 = " + err);}).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err已被记录 = java.lang.ArithmeticException: / by zero
err = java.lang.ArithmeticException: / by zero
*/
6. Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.
Flux.just(1, 2, 0, 4).map(i -> "100 / " + i + " = " + (100 / i)).doOnError(err -> {System.out.println("err已被记录 = " + err);}).doFinally(signalType -> {System.out.println("流信号:" + signalType);}).subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err已被记录 = java.lang.ArithmeticException: / by zero
err = java.lang.ArithmeticException: / by zero
流信号:onError
*/
7. 忽略当前异常,仅通知记录,继续推进
Flux.just(1, 2, 3, 0, 5).map(i -> 10 / i).onErrorContinue((err, val) -> {System.out.println("err = " + err);System.out.println("val = " + val);System.out.println("发现" + val + "有问题了,继续执行其他的,我会记录这个问题");}) //发生.subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 10
v = 5
v = 3
err = java.lang.ArithmeticException: / by zero
val = 0
发现0有问题了,继续执行其他的,我会记录这个问题
v = 2
流结束
*/
8.其它
Flux.just(1, 2, 3, 0, 5).map(i -> 10 / i).onErrorStop() // 错误后,停止流,源头中断,所有订阅者全部结束,错误结束.subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 10
v = 5
v = 3
err = java.lang.ArithmeticException: / by zero
*/
Flux.just(1, 2, 3, 0, 5).map(i -> 10 / i).onErrorComplete() //发生错误后,停止流.subscribe(v -> System.out.println("v = " + v),err -> System.out.println("err = " + err),() -> System.out.println("流结束"));
/*
v = 10
v = 5
v = 3
流结束
*/
10、常用操作
filter、flatMap、concatMap、flatMapMany、transform、defaultIfEmpty、switchIfEmpty、concat、concatWith、merge、mergeWith、mergeSequential、zip、zipWith…
filter
Flux.just(1, 2, 3, 4).log().filter(e -> e % 2 == 0).subscribe();/*
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArrayConditionalSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(4)
[ INFO] (main) | onComplete()
*/
filterMap
Flux.just("zhang san", "li si").log().flatMap((t)->{String[] arr = t.split("\\s");return Flux.fromArray(arr);}).log().subscribe();/*
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) onSubscribe(FluxFlatMap.FlatMapMain)
[ INFO] (main) request(unbounded)
[ INFO] (main) | request(256)
[ INFO] (main) | onNext(zhang san)
[ INFO] (main) onNext(zhang)
[ INFO] (main) onNext(san)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(li si)
[ INFO] (main) onNext(li)
[ INFO] (main) onNext(si)
[ INFO] (main) | request(1)
[ INFO] (main) | onComplete()
[ INFO] (main) onComplete()
*/
concatMap
Flux.just(1, 2).concatMap(s -> Flux.just(s * 10, s * 100)).log().subscribe((e)-> System.out.println("e = " + e));/*
[ INFO] (main) onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(10)
e = 10
[ INFO] (main) onNext(100)
e = 100
[ INFO] (main) onNext(20)
e = 20
[ INFO] (main) onNext(200)
e = 200
[ INFO] (main) onComplete()
*/
concat
Flux.concat(Flux.just(1, 2), Flux.just("a", "b")).log().subscribe();/*
[ INFO] (main) onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(1)
[ INFO] (main) onNext(2)
[ INFO] (main) onNext(a)
[ INFO] (main) onNext(b)
[ INFO] (main) onComplete()
*/
concatWith
Flux.just(1, 2).concatWith(Flux.just(3, 4)).log().subscribe/*
[ INFO] (main) onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(1)
[ INFO] (main) onNext(2)
[ INFO] (main) onNext(3)
[ INFO] (main) onNext(4)
[ INFO] (main) onComplete()
*/
transform
AtomicInteger atomic = new AtomicInteger(0);Flux<String> flux = Flux.just("a", "b", "c").transform(values -> {if (atomic.incrementAndGet() == 1) {//如果是:第一次调用,老流中的所有元素转成大写return values.map(String::toUpperCase);} else {//如果不是第一次调用,原封不动返回return values;}});//transform 无defer,不会共享外部变量的值。 无状态转换; 原理,无论多少个订阅者,transform只执行一次
//transform 有defer,会共享外部变量的值。 有状态转换; 原理,无论多少个订阅者,每个订阅者transform都只执行一次
flux.subscribe(v -> System.out.println("订阅者1:v = " + v));
flux.subscribe(v -> System.out.println("订阅者2:v = " + v));/*
订阅者1:v = A
订阅者1:v = B
订阅者1:v = C
订阅者2:v = A
订阅者2:v = B
订阅者2:v = C
*/
改成transformDeferred/*
订阅者1:v = A
订阅者1:v = B
订阅者1:v = C
订阅者2:v = a
订阅者2:v = b
订阅者2:v = c
*/
defaultIfEmpty
/*** defaultIfEmpty: 静态兜底数据* switchIfEmpty: 空转换; 调用动态兜底方法; 返回新流数据*/
void empty() {//Mono.just(null);//流里面有一个null值元素//Mono.empty();//流里面没有元素,只有完成信号/结束信号haha().defaultIfEmpty(hehe())//如果发布者元素为null,指定默认值,否则用发布者的值;.subscribe(v -> System.out.println("v = " + v));haha().switchIfEmpty(hehe())//如果发布者元素为null,指定默认值,否则用发布者的值;.subscribe(v -> System.out.println("v = " + v));}Mono<String> hehe() {return Mono.just("兜底数据...");
}Mono<String> haha() {return Mono.empty();
}
merge
/*** concat: 连接; A流 所有元素和 B流所有元素拼接* merge:合并; A流 所有元素和 B流所有元素 按照时间序列合并* mergeWith:* mergeSequential: 按照哪个流先发元素排队*/
@Test
void merge() throws IOException {Flux.mergeSequential();Flux.merge(Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(1)),Flux.just("a", "b").delayElements(Duration.ofMillis(1500)),Flux.just("haha", "hehe", "heihei", "xixi").delayElements(Duration.ofMillis(500))).log().subscribe();Flux.just(1, 2, 3).mergeWith(Flux.just(4, 5, 6));System.in.read();
}
zip
/*** zip: 无法结对的元素会被忽略;* 最多支持8流压缩;*/
void zip (){//Tuple:元组;// Flux< Tuple2:<Integer,String> >Flux.just(1,2,3).zipWith(Flux.just("a","b","c","d")).map(tuple -> {Integer t1 = tuple.getT1(); // 元组中的第一个元素String t2 = tuple.getT2(); // 元组中的第二个元素return t1 + "==>" + t2;}).log().subscribe(v-> System.out.println("v = " + v));}
11、超时与重试
Flux.just(1).delayElements(Duration.ofSeconds(3)).log().timeout(Duration.ofSeconds(2)).retry(3) // 把流从头到尾重新请求1次.map(i -> "haha-" + i).subscribe(e-> System.out.println("e = " + e));LockSupport.park();/*
[ INFO] (main) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) cancel()
[ INFO] (parallel-1) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-1) request(unbounded)
[ INFO] (parallel-3) cancel()
[ INFO] (parallel-3) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-3) request(unbounded)
[ INFO] (parallel-5) cancel()
[ INFO] (parallel-5) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-5) request(unbounded)
[ INFO] (parallel-7) cancel()
[ERROR] (parallel-7) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 2000ms in 'log' (and no fallback has been configured)
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 2000ms in 'log' (and no fallback has been configured)
Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 2000ms in 'log' (and no fallback has been configured)at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:296)
...
*/
Flux.just(1).delayElements(Duration.ofSeconds(3)).log().timeout(Duration.ofSeconds(2)).retry(3) // 把流从头到尾重新请求1次.onErrorReturn(2) // 上面重试失败后, 会抛出异常, 这里在抛出异常的情况下返回2.map(i -> "haha-" + i).subscribe(e-> System.out.println("e = " + e));LockSupport.park();/*
[ INFO] (main) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) cancel()
[ INFO] (parallel-1) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-1) request(unbounded)
[ INFO] (parallel-3) cancel()
[ INFO] (parallel-3) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-3) request(unbounded)
[ INFO] (parallel-5) cancel()
[ INFO] (parallel-5) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-5) request(unbounded)
[ INFO] (parallel-7) cancel()
e = haha-2
*/
12、Sinks工具类
// Sinks.many(); // 发送Flux数据
// Sinks.one(); // 发送Mono数据// Sinks: 接受器,数据管道,所有数据顺着这个管道往下走的Sinks.many().unicast(); // 单播 这个管道只能绑定单个订阅者(消费者)
Sinks.many().multicast(); // 多播 这个管道能绑定多个订阅者
Sinks.many().replay(); // 重放 这个管道能重放元素。是否给后来的订阅者把之前的元素依然发给它;
单播/多播/重放/背压
// Sinks.Many<Object> many = Sinks.many()
// .multicast() //多播
// .onBackpressureBuffer(); //背压队列//默认订阅者,从订阅的那一刻开始接元素//发布者数据重放; 底层利用队列进行缓存之前数据
Sinks.Many<Object> many = Sinks.many().replay().limit(3);new Thread(() -> {for (int i = 0; i < 10; i++) {many.tryEmitNext("a-" + i);try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}).start();//订阅
many.asFlux().subscribe(v -> System.out.println("v1 = " + v));new Thread(() -> {try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}many.asFlux().subscribe(v -> System.out.println("v2 = " + v));
}).start();
缓存
Flux<Integer> cache = Flux.range(1, 10).delayElements(Duration.ofSeconds(1)) // 不调缓存默认就是缓存所有.cache(2); // 缓存两个元素; 默认全部缓存// 立即订阅
cache.subscribe();new Thread(()->{// 5s后再去订阅try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}cache.subscribe(v-> System.out.println("v = " + v));
}).start();LockSupport.park();/*
v = 3
v = 4
v = 5
v = 6
v = 7
v = 8
v = 9
v = 10
*/
13、阻塞式api
block
Integer integer = Flux.just(1, 2, 4).map(i -> i + 10).blockLast();
System.out.println(integer);List<Integer> integers = Flux.just(1, 2, 4).map(i -> i + 10).collectList().block(); // 也是一种订阅者; BlockingMonoSubscriber
System.out.println("integers = " + integers);/*
14
integers = [11, 12, 14]
*/
// 百万数据,8个线程,每个线程处理100,进行分批处理一直处理结束
Flux.range(1,1000000).buffer(100).parallel(8).runOn(Schedulers.newParallel("yy")).log().flatMap(list->Flux.fromIterable(list)).collectSortedList(Integer::compareTo).subscribe(v-> System.out.println("v = " + v));LockSupport.park();
14、Context api
//Context-API: https://projectreactor.io/docs/core/release/reference/#context
//ThreadLocal在响应式编程中无法使用。
//响应式中,数据流期间共享数据,Context API: Context:读写 ContextView:只读;
static void threadlocal() {//支持Context的中间操作Flux.just(1, 2, 3).transformDeferredContextual((flux, context) -> {System.out.println("flux = " + flux);System.out.println("context = " + context);return flux.map(i -> i + "==>" + context.get("prefix"));})//上游能拿到下游的最近一次数据.contextWrite(Context.of("prefix", "哈哈"))//ThreadLocal共享了数据,上游的所有人能看到; Context由下游传播给上游.subscribe(v -> System.out.println("v = " + v));// 以前 命令式编程// controller -- service -- dao// 响应式编程 dao(10:数据源) --> service(10) --> controller(10); 从下游反向传播}
WebFlux
-
Reactor核心:HttpHandler 原生API;
-
DispatcherHandler 原理;
-
- DispatcherHandler 组件分析
- DispatcherHandler 请求处理流程
- 返回结果处理
- 异常处理
- 视图解析
-
-
- 重定向
- Rendering
-
-
注解式 - Controller
-
- 兼容老版本方式
- 新版本变化
-
-
- SSE
- 文件上传
-
-
错误响应
-
- @ExceptionHandler
-
-
- ErrorResponse: 自定义 错误响应
- ProblemDetail:自定义PD返回
-
-
WebFlux配置
-
- @EnableWebFlux
- WebFluxConfigurer
WebFlux:底层完全基于netty+reactor+springweb 完成一个全异步
、非阻塞
的web响应式框架
底层:异步 + 消息队列(内存) + 事件回调机制 = 整套系统
优点:能使用少量资源处理大量请求;
0、组件对比
API功能 | Servlet-阻塞式Web | WebFlux-响应式Web |
---|---|---|
前端控制器 | DispatcherServlet | DispatcherHandler |
处理器 | Controller | WebHandler/Controller |
请求、响应 | ServletRequest、ServletResponse | ServerWebExchange: ServerHttpRequest、ServerHttpResponse |
过滤器 | Filter(HttpFilter) | WebFilter |
异常处理器 | HandlerExceptionResolver | DispatchExceptionHandler |
Web配置 | @EnableWebMvc | @EnableWebFlux |
自定义配置 | WebMvcConfigurer | WebFluxConfigurer |
返回结果 | 任意 | Mono、Flux、任意 |
发送REST请求 | RestTemplate | WebClient |
Mono: 返回[ 0 |1 ]数据流
Flux:返回[ N ]数据流
1、WebFlux
底层基于Netty实现的Web容器与请求/响应处理机制
参照:https://docs.spring.io/spring-framework/reference/6.0/web/webflux.html
1、引入
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.6</version>
</parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>
</dependencies>
Context 响应式上下文数据传递; 由下游传播给上游;
以前: 浏览器 --> Controller --> Service --> Dao: 阻塞式编程
现在: Dao(数据源查询对象【数据发布者】) --> Service --> Controller --> 浏览器: 响应式
大数据流程: 从一个数据源拿到大量数据进行分析计算;
ProductVistorDao.loadData()
.distinct()
.map()
.filter()
.handle()
.subscribe();
;//加载最新的商品浏览数据
2、Reactor Core
1、HttpHandler、HttpServer
public static void main(String[] args) throws IOException {//快速自己编写一个能处理请求的服务器//1、创建一个能处理Http请求的处理器。 参数:请求、响应; 返回值:Mono<Void>:代表处理完成的信号HttpHandler handler = (ServerHttpRequest request,ServerHttpResponse response)->{URI uri = request.getURI();System.out.println(Thread.currentThread()+"请求进来:"+uri);//编写请求处理的业务,给浏览器写一个内容 URL + "Hello~!"// response.getHeaders(); // 获取响应头// response.getCookies(); // 获取Cookie// response.getStatusCode(); // 获取响应状态码;// response.bufferFactory(); // buffer工厂// response.writeWith() // 把xxx写出去// response.setComplete(); // 响应结束, 该方法返回Mono<Void>//创建 响应数据的 DataBufferDataBufferFactory factory = response.bufferFactory();//数据BufferDataBuffer buffer = factory.wrap(new String(uri + " => Hello!").getBytes());// 数据的发布者:Mono<DataBuffer>、Flux<DataBuffer>// 需要一个 DataBuffer 的发布者return response.writeWith(Mono.just(buffer));};//2、启动一个服务器,监听8080端口,接受数据,拿到数据交给 HttpHandler 进行请求处理ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);//3、启动Netty服务器HttpServer.create().host("localhost").port(8080).handle(adapter) //用指定的处理器处理请求.bindNow(); //现在就绑定System.out.println("服务器启动完成....监听8080,接受请求");System.in.read();System.out.println("服务器停止....");}
3、DispatcherHandler
SpringMVC: DispatcherServlet;
SpringWebFlux: DispatcherHandler
1、请求处理流程
- HandlerMapping:请求映射处理器; 保存每个请求由哪个方法进行处理
- HandlerAdapter:处理器适配器;反射执行目标方法
- HandlerResultHandler:处理器结果处理器;
SpringMVC: DispatcherServlet 有一个 doDispatch() 方法,来处理所有请求;
WebFlux: DispatcherHandler 有一个 handle() 方法,来处理所有请求;
public Mono<Void> handle(ServerWebExchange exchange) { if (this.handlerMappings == null) {return createNotFoundError();}if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {return handlePreFlight(exchange);}return Flux.fromIterable(this.handlerMappings) //拿到所有的 handlerMappings// 找每一个mapping看谁能处理请求.concatMap(mapping -> mapping.getHandler(exchange)) .next() // 直接触发获取元素; 拿到流的第一个元素; 找到第一个能处理这个请求的handlerAdapter.switchIfEmpty(createNotFoundError()) // 如果没拿到这个元素,则响应404错误;// 异常处理,一旦前面发生异常,调用处理异常.onErrorResume(ex -> handleDispatchError(exchange, ex)) // 调用方法处理请求,得到响应结果.flatMap(handler -> handleRequestWith(exchange, handler));
}
- 1、请求和响应都封装在 ServerWebExchange 对象中,由handle方法进行处理
- 2、如果没有任何的请求映射器; 直接返回一个: 创建一个未找到的错误; 404; 返回Mono.error;终结流
- 3、跨域工具,是否跨域请求,跨域请求检查是否复杂跨域,需要预检请求;
- 4、Flux流式操作,先找到HandlerMapping,再获取handlerAdapter,再用Adapter处理请求,期间的错误由onErrorResume触发回调进行处理;
源码中的核心两个:
- handleRequestWith: 编写了handlerAdapter怎么处理请求
- handleResult: String、User、ServerSendEvent、Mono、Flux …
concatMap: 先挨个元素变,然后把变的结果按照之前元素的顺序拼接成一个完整流
private <R> Mono<R> createNotFoundError() {Exception ex = new ResponseStatusException(HttpStatus.NOT_FOUND);return Mono.error(ex);
}
Mono.defer(() -> {Exception ex = new ResponseStatusException(HttpStatus.NOT_FOUND);return Mono.error(ex);
}); //有订阅者,且流被激活后就动态调用这个方法; 延迟加载;
4、注解开发
1、目标方法传参
https://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/arguments.html
Controller method argument | Description |
---|---|
ServerWebExchange | 封装了请求和响应对象的对象; 自定义获取数据、自定义响应 |
ServerHttpRequest, ServerHttpResponse | 请求、响应 |
WebSession | 访问Session对象 |
java.security.Principal | |
org.springframework.http.HttpMethod | 请求方式 |
java.util.Locale | 国际化 |
java.util.TimeZone + java.time.ZoneId | 时区 |
@PathVariable | 路径变量 |
@MatrixVariable | 矩阵变量 |
@RequestParam | 请求参数 |
@RequestHeader | 请求头; |
@CookieValue | 获取Cookie |
@RequestBody | 获取请求体,Post、文件上传 |
HttpEntity | 封装后的请求对象 |
@RequestPart | 获取文件上传的数据 multipart/form-data. |
java.util.Map, org.springframework.ui.Model, and org.springframework.ui.ModelMap. | Map、Model、ModelMap |
@ModelAttribute | |
Errors, BindingResult | 数据校验,封装错误 |
SessionStatus + class-level @SessionAttributes | |
UriComponentsBuilder | For preparing a URL relative to the current request’s host, port, scheme, and context path. See URI Links. |
@SessionAttribute | |
@RequestAttribute | 转发请求的请求域数据 |
Any other argument | 所有对象都能作为参数:1、基本类型 ,等于标注@RequestParam 2、对象类型,等于标注 @ModelAttribute |
2、返回值写法
sse和websocket区别:
- SSE:单工;请求过去以后,等待服务端源源不断的数据
- websocket:双工: 连接建立后,可以任何交互;
Controller method return value | Description |
---|---|
@ResponseBody | 把响应数据写出去,如果是对象,可以自动转为json |
HttpEntity, ResponseEntity | ResponseEntity:支持快捷自定义响应内容 |
HttpHeaders | 没有响应内容,只有响应头 |
ErrorResponse | 快速构建错误响应 |
ProblemDetail | SpringBoot3; |
String | 就是和以前的使用规则一样;forward: 转发到一个地址redirect: 重定向到一个地址配合模板引擎 |
View | 直接返回视图对象 |
java.util.Map, org.springframework.ui.Model | 以前一样 |
@ModelAttribute | 以前一样 |
Rendering | 新版的页面跳转API; 不能标注 @ResponseBody 注解 |
void | 仅代表响应完成信号 |
Flux, Observable, or other reactive type | 使用 text/event-stream 完成SSE效果 |
Other return values | 未在上述列表的其他返回值,都会当成给页面的数据; |
5、文件上传
https://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/multipart-forms.html
class MyForm {private String name;private MultipartFile file;// ...}@Controller
public class FileUploadController {@PostMapping("/form")public String handleFormUpload(MyForm form, BindingResult errors) {// ...}}
现在
@PostMapping("/")
public String handle(@RequestPart("meta-data") Part metadata, @RequestPart("file-data") FilePart file) { // ...
}
6、错误处理
@ExceptionHandler(ArithmeticException.class)
public String error(ArithmeticException exception){System.out.println("发生了数学运算异常"+exception);//返回这些进行错误处理;// ProblemDetail: 建造者:声明式编程、链式调用// ErrorResponse : return "炸了,哈哈...";
}
7、RequestContext
8、自定义Flux配置
WebFluxConfigurer
容器中注入这个类型的组件,重写底层逻辑
@Configuration
public class MyWebConfiguration {//配置底层@Beanpublic WebFluxConfigurer webFluxConfigurer(){return new WebFluxConfigurer() {@Overridepublic void addCorsMappings(CorsRegistry registry) {registry.addMapping("/**").allowedHeaders("*").allowedMethods("*").allowedOrigins("localhost");}};}
}
9、Filter
@Component
public class MyWebFilter implements WebFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {ServerHttpRequest request = exchange.getRequest();ServerHttpResponse response = exchange.getResponse();System.out.println("请求处理放行到目标方法之前...");Mono<Void> filter = chain.filter(exchange); //放行//流一旦经过某个操作就会变成新流Mono<Void> voidMono = filter.doOnError(err -> {System.out.println("目标方法异常以后...");}) // 目标方法发生异常后做事.doFinally(signalType -> {System.out.println("目标方法执行以后...");});// 目标方法执行之后//上面执行不花时间。return voidMono; //看清楚返回的是谁!!!}
}