Java反应式框架Reactor中的Mono和Flux
阅读数:224 评论数:0
跳转到新版页面分类
python/Java
正文
一、简介
1、背压
可以理解为,生产者可以感受到消费者反馈的消费压力,并根据压力进行动态调整生产速率。
2、响应流
(1)必须是无阻塞的。
(2)必须是一个数据流。
(3)可以异步执行。
(4)能够处理背压。
二、Publisher
由于响应流的特点,我们必须返回一个类似于Java中的Future的概念,在有结果可用时通知消费者进行消费响应。
Reactive Stream规范中这种定义为Publisher<T>,它可以根据订阅者Subscriber<? super T>的需求推送元素。
而Flux和Mono都是Publisher<T>在Reactor 3实现。
三、Flux
Flux是一个发出0-N个元素组成的异步序列的Publisher<T>,可以被onComplete信号或者onError信号所终止。在响应流规范中存在三种给下游消费者调用的方法onNext、onComplete和onError。
通过下面的对比可能更易于理解这个概念。
1、传统数据处理
public List<ClientUser> allUsers() {
return Arrays.asList(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
2、流式数据处理
public Stream<ClientUser> allUsers() {
return Stream.of(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
3、反应式数据处理
public Flux<ClientUser> allUsers(){
return Flux.just(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
四、Mono
Mono是一个发出0-1个元素的Publisher<T>,可以被onComplete信号或者onError信号所终止。
1、传统数据处理
public ClientUser currentUser () {
return isAuthenticated ? new ClientUser("felord.cn", "reactive") : null;
}
2、Optional的处理方式
public Optional<ClientUser> currentUser () {
return isAuthenticated ? Optional.of(new ClientUser("felord.cn", "reactive"))
: Optional.empty();
}
3、反应式数据处理
public Mono<ClientUser> currentUser () {
return isAuthenticated ? Mono.just(new ClientUser("felord.cn", "reactive"))
: Mono.empty();
}
五、构建响应式数据流
1、Flux的创建
大体上分成两大类:一类是基于各种工厂模式的静态创建,而另一类则采用编程的试动态创建.
(1)just()
public static void test_01(){
Flux.just(1,2,3).subscribe(System.out::println);
}
接收一个可变长参数。
(2)fromXxx()
(3)from(Publisher source)
public static void test_02(){
Flux.from(subscriber->{
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onComplete();
}).subscribe(System.out::println);
}
(4)fromIterable()
public static void test_03(){
Flux.fromIterable(Arrays.asList(1,2,3)).subscribe(System.out::println);
}
(5)range()生成整数数据流
public static void test_04(){
Flux.range(0, 10).subscribe(System.out::println);
}
(6)interval()周期性的生成连续的元素序列
public static void test_05(){
Flux.interval(Duration.ofMillis(2000), Duration.ofMillis(1000)).subscribe(System.out::println);
}
(7)generate()编程方式创建
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator);
public static void test_06(){
Flux.generate(sink->{
sink.next(1);
// 停止
sink.complete();
}).subscribe(System.out::println);
Flux.generate(()->1, (i, sink)->{
sink.next(i++);
return i;
}).subscribe(System.out::println);
}
(8)create()
类似generate不过使用的FluxSink生成元素。
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter);
public static void test_07(){
Flux.create(emitter->{
emitter.next(1);
emitter.next(2);
emitter.complete();
}).subscribe(System.out::println);
}
六、Mono&Flux操作
1、buffer
buffer方法相关于将序列中的一部分数据收集到一个集合中,并作为一个新的流元素发送到下游。
public static void test_01(){
Flux.just(1,2,3,4,5,6).buffer(2).subscribe(System.out::println);
}
//结果输出
[1, 2]
[3, 4]
[5, 6]
2、window
功能跟buffer类似,区别是window将元素分成一个个Flux对象而不是一个Flux中的集合元素。
public static void test_04(){
Flux.just(1,2,3,4,5,6).window(2).toIterable().forEach(f->{
f.subscribe(System.out::println);
System.out.println("--------------------");
});
}
// 结果
1
2
--------------------
3
4
--------------------
5
6
--------------------
3、map
转换方法,将每一个元素按map中定义的Function函数进行转换。
public static void test_41(){
Flux.range(1,5)
.map(i -> i*i)
.subscribe(System.out::println);
}
4、filter
过滤方法,对元素序列中的所有元素进行过滤。
public static void test_08(){
Flux.just(1,2,3,4,5,6).filter(i->i%2==0).subscribe(System.out::println);
}
5、skip
跳过指定个数、时间或断言的元素。
public static void test_26(){
Flux.range(1, 100)
.skipUntil(i->i > 10)
.subscribe(System.out::println);
}
// 结果
11
12
13
14
15
16
...
6、then()
重新生成新的数据流。
public static void test_11(){
Flux.just(1, 2, 3, 4, 5, 6)
// 返回 Flux<Void>
.then()
.subscribe(System.out::println);
}
7、when
这是一个聚合操作,将等待一个或者多个Publisher执行完成之后再进行接下来的操作。
public static void test_12(){
Flux.just(1, 2, 3, 4, 5, 6)
.flatMap(i->{
System.out.println(i);
return Mono.when(s->{
System.out.println("all done");
s.onComplete();
},
s->{
System.out.println("all done-2");
s.onComplete();
});
}).subscribe();
}
// 执行结果
1
all done
all done-2
2
all done
all done-2
3
all done
all done-2
8、merge合并多个流
public static void test_13(){
Flux.merge(Flux.interval(Duration.ofMillis(0), Duration.ofMillis(100)).take(2),
Flux.interval(Duration.ofMillis(50), Duration.ofMillis(100)).take(2))
.subscribe(System.out::println);
}
9、zip
这也是一个合并方法,不过它是按一对一匹配的,如果序列中有对应不上的元素会被丢弃。
public static void test_15(){
Flux.zip(Flux.just(1, 2, 5), Flux.just(3, 4)).subscribe(System.out::println);
}
// 结果
[1,3]
[2,4]
10、onErrorReturn
public static void test_18(){
Flux.range(1, 6)
.map(i -> 10/(i-3))
.onErrorReturn(5) // 出错后给一个默认值 然后序列就终端了 元素序列只剩出错前的一段
.map(i -> i*i)
.subscribe(System.out::println, System.err::println);
}
// 结果
25
100
25
onErrorReturn异常发生时返回一个给定的默认值。
11、any 流中是否有有符合条件的任意一个元素。
12、all 流中所有元素是否都满足条件。
13、subscribe
订阅流,也可以说消费流,如果选择空参方法就是丢弃流。
public static void test_33(){
Flux.just(1,2,3,4)
.subscribe(new Subscriber<Integer>() {
Subscription sub;
@Override
public void onSubscribe(Subscription s) {
System.out.println("onSubscribe");;
sub = s;
// 每次消费一个元素
sub.request(1);
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext");
System.out.println(integer);
sub.request(1);
}
@Override
public void onError(Throwable t) {
System.out.println("onError");;
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");;
}
});
}