Java反应式框架Reactor中的Mono和Flux

阅读数:224 评论数:0

跳转到新版页面

分类

python/Java

正文

一、简介

1、背压

可以理解为,生产者可以感受到消费者反馈的消费压力,并根据压力进行动态调整生产速率。

2、响应流

(1)必须是无阻塞的。

(2)必须是一个数据流。

(3)可以异步执行。

(4)能够处理背压。

二、Publisher

由于响应流的特点,我们必须返回一个类似于Java中的Future的概念,在有结果可用时通知消费者进行消费响应。

Reactive Stream规范中这种定义为Publisher<T>,它可以根据订阅者Subscriber<? super T>的需求推送元素。

而Flux和Mono都是Publisher<T>在Reactor 3实现。

三、Flux

Flux是一个发出0-N个元素组成的异步序列的Publisher<T>,可以被onComplete信号或者onError信号所终止。在响应流规范中存在三种给下游消费者调用的方法onNext、onComplete和onError。

通过下面的对比可能更易于理解这个概念。

1、传统数据处理

public List<ClientUser> allUsers() {
    return Arrays.asList(new ClientUser("felord.cn", "reactive"),
            new ClientUser("Felordcn", "Reactor"));
}

2、流式数据处理

public Stream<ClientUser> allUsers() {
    return  Stream.of(new ClientUser("felord.cn", "reactive"),
            new ClientUser("Felordcn", "Reactor"));
}

3、反应式数据处理

public Flux<ClientUser> allUsers(){
    return Flux.just(new ClientUser("felord.cn", "reactive"),
            new ClientUser("Felordcn", "Reactor"));
}

四、Mono

Mono是一个发出0-1个元素的Publisher<T>,可以被onComplete信号或者onError信号所终止。

1、传统数据处理

public ClientUser currentUser () {
    return isAuthenticated ? new ClientUser("felord.cn", "reactive") : null;
}

2、Optional的处理方式

public Optional<ClientUser> currentUser () {
    return isAuthenticated ? Optional.of(new ClientUser("felord.cn", "reactive"))
            : Optional.empty();
}

3、反应式数据处理

public Mono<ClientUser> currentUser () {
    return isAuthenticated ? Mono.just(new ClientUser("felord.cn", "reactive"))
            : Mono.empty();
}

五、构建响应式数据流

1、Flux的创建

大体上分成两大类:一类是基于各种工厂模式的静态创建,而另一类则采用编程的试动态创建.

(1)just()

public static void test_01(){
	Flux.just(1,2,3).subscribe(System.out::println);
}

接收一个可变长参数。

(2)fromXxx()

(3)from(Publisher source)

    public static void test_02(){

        Flux.from(subscriber->{
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onNext(3);
            subscriber.onComplete();
        }).subscribe(System.out::println);
    }

(4)fromIterable()

    public static void test_03(){

        Flux.fromIterable(Arrays.asList(1,2,3)).subscribe(System.out::println);

    }

(5)range()生成整数数据流

    public static void test_04(){

        Flux.range(0, 10).subscribe(System.out::println);

    }

(6)interval()周期性的生成连续的元素序列

    public static void test_05(){
        Flux.interval(Duration.ofMillis(2000), Duration.ofMillis(1000)).subscribe(System.out::println);
    }

(7)generate()编程方式创建

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator);

 public static void test_06(){
     Flux.generate(sink->{
         sink.next(1);
         // 停止
         sink.complete();
     }).subscribe(System.out::println);

     Flux.generate(()->1, (i, sink)->{
         sink.next(i++);
         return i;
     }).subscribe(System.out::println);
 }

(8)create()

类似generate不过使用的FluxSink生成元素。

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter);

    public static void test_07(){
        Flux.create(emitter->{
            emitter.next(1);
            emitter.next(2);
            emitter.complete();
        }).subscribe(System.out::println);
    }

 

六、Mono&Flux操作

1、buffer

buffer方法相关于将序列中的一部分数据收集到一个集合中,并作为一个新的流元素发送到下游。

public static void test_01(){
        Flux.just(1,2,3,4,5,6).buffer(2).subscribe(System.out::println);
    }

//结果输出
 [1, 2]
 [3, 4]
 [5, 6]

2、window

功能跟buffer类似,区别是window将元素分成一个个Flux对象而不是一个Flux中的集合元素。

public static void test_04(){
   Flux.just(1,2,3,4,5,6).window(2).toIterable().forEach(f->{
       f.subscribe(System.out::println);
       System.out.println("--------------------");
   });
}
// 结果 
 1
 2
 --------------------
 3
 4
 --------------------
 5
 6
 --------------------

3、map

转换方法,将每一个元素按map中定义的Function函数进行转换。

public static void test_41(){
    Flux.range(1,5)
            .map(i -> i*i)
            .subscribe(System.out::println);
}

4、filter

过滤方法,对元素序列中的所有元素进行过滤。

 public static void test_08(){
        Flux.just(1,2,3,4,5,6).filter(i->i%2==0).subscribe(System.out::println);
    }

5、skip

跳过指定个数、时间或断言的元素。

 public static void test_26(){
	Flux.range(1, 100)
	          .skipUntil(i->i > 10)
	          .subscribe(System.out::println);
	}
// 结果 
11
12
13
14
15
16
...

6、then()

重新生成新的数据流。

public static void test_11(){
	Flux.just(1, 2, 3, 4, 5, 6)
        // 返回 Flux<Void>
        .then()
        .subscribe(System.out::println);
}

7、when

这是一个聚合操作,将等待一个或者多个Publisher执行完成之后再进行接下来的操作。

    public static void test_12(){
        Flux.just(1, 2, 3, 4, 5, 6)
                .flatMap(i->{
                    System.out.println(i);
                    return Mono.when(s->{
                        System.out.println("all done");
                        s.onComplete();
                    },
                    s->{
                        System.out.println("all done-2");
                        s.onComplete();
                    });
                }).subscribe();
    }

// 执行结果 
1
all done
all done-2
2
all done
all done-2
3
all done
all done-2

8、merge合并多个流

public static void test_13(){
    Flux.merge(Flux.interval(Duration.ofMillis(0), Duration.ofMillis(100)).take(2),
            Flux.interval(Duration.ofMillis(50), Duration.ofMillis(100)).take(2))
            .subscribe(System.out::println);
}

9、zip

这也是一个合并方法,不过它是按一对一匹配的,如果序列中有对应不上的元素会被丢弃。

public static void test_15(){
        Flux.zip(Flux.just(1, 2, 5), Flux.just(3, 4)).subscribe(System.out::println);
    }
// 结果 
[1,3]
[2,4]

10、onErrorReturn

    public static void test_18(){

        Flux.range(1, 6)
                .map(i -> 10/(i-3))
                .onErrorReturn(5)   // 出错后给一个默认值 然后序列就终端了 元素序列只剩出错前的一段
                .map(i -> i*i)
                .subscribe(System.out::println, System.err::println);
    }
// 结果 
25
100
25

onErrorReturn异常发生时返回一个给定的默认值。

11、any 流中是否有有符合条件的任意一个元素。

12、all 流中所有元素是否都满足条件。

13、subscribe

订阅流,也可以说消费流,如果选择空参方法就是丢弃流。

 public static void test_33(){

        Flux.just(1,2,3,4)
                .subscribe(new Subscriber<Integer>() {
                    Subscription sub;
                    @Override
                    public void onSubscribe(Subscription s) {
                        System.out.println("onSubscribe");;
                        sub = s;
                        // 每次消费一个元素
                        sub.request(1);
                    }
                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext");
                        System.out.println(integer);
                        sub.request(1);
                    }
                    @Override
                    public void onError(Throwable t) {
                        System.out.println("onError");;
                        t.printStackTrace();
                    }
                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");;
                    }
                });
    }

 

 




相关推荐

一、spring5之前的springMVC异常处理机制 1、使用@ExceptionHandler 用于局部方法捕获,与抛出异常的方法处于同一个Controller类。 @Controller pub

一、简介 Spring Cache 提供了 @Cacheable 、@CachePut 、@CacheEvict 、@Caching 等注解,在方法上使用。 核心

一、概述 webflux是spring5中引进的非阻塞编程模型,而与之相对应提SpringMVC是一种阻塞模式,比如Spring Cloud Gateway是用了SpringWebflux开发。 为什