您当前的位置: 首页 > 

Reactive(反应式)编程导论

蔚1 发布时间:2019-08-25 23:30:59 ,浏览量:2

反应式编程是一种涉及数据流和变化传播的异步编程范例。这意味着可以通过所采用的编程语言轻松地表达静态(例如阵列)或动态(例如事件发射器)数据流。

本文已参加 GitChat「我的技术实践」有奖征文活动,活动链接: GitChat「我的技术实践」有奖征文活动

Reactor 和Rxjava是 Reactive Programming 范例的一个具体实现,可以概括为:

反应式编程是一种涉及数据流和变化传播的异步编程范例。这意味着可以通过所采用的编程语言轻松地表达静态(例如阵列)或动态(例如事件发射器)数据流。

作为反应式编程方向的第一步,Microsoft 在.NET 生态系统中创建了 Reactive Extensions(Rx)库。然后 RxJava 在 JVM 上实现了响应式编程。随着时间的推移,通过 Reactive Streams 工作出现了 Java 的标准化,这一规范定义了 JVM 上的反应库的一组接口和交互规则。它的接口已经在父类 Flow 下集成到 Java 9 中。

另外 Java 8 还引入了 Stream,它旨在有效地处理数据流(包括原始类型),这些数据流可以在没有延迟或很少延迟的情况下访问。它是基于拉的,只能使用一次,缺少与时间相关的操作,并且可以执行并行计算,但无法指定要使用的线程池。但是它还没有设计用于处理延迟操作,例如 I / O 操作。其所不支持的特性就是 Reactor 或 RxJava 等 Reactive API 的用武之地。

Reactor 或 Rxjava 等反应性 API 也提供 Java 8 Stream 等运算符,但它们更适用于任何流序列(不仅仅是集合),并允许定义一个转换操作的管道,该管道将应用于通过它的数据,这要归功于方便的流畅 API 和使用 lambdas。它们旨在处理同步或异步操作,并允许您缓冲,合并,连接或对数据应用各种转换。

首先考虑一下,为什么需要这样的异步反应式编程库?现代应用程序可以支持大量并发用户,即使现代硬件的功能不断提高,现代软件的性能仍然是一个关键问题。

人们可以通过两种方式来提高系统的能力:

  • 并行化:使用更多线程和更多硬件资源。
  • 在现有资源的使用方式上寻求更高的效率。

通常,Java 开发人员使用阻塞代码编写程序。这种做法很好,直到出现性能瓶颈,此时需要引入额外的线程。但是,资源利用率的这种扩展会很快引入争用和并发问题。

更糟糕的是,会导致浪费资源。一旦程序涉及一些延迟(特别是 I / O,例如数据库请求或网络调用),资源就会被浪费,因为线程(或许多线程)现在处于空闲状态,等待数据。

所以并行化方法不是灵丹妙药,获得硬件的全部功能是必要的。

第二种方法,寻求现有资源的更高的使用率,可以解决资源浪费问题。通过编写异步,非阻塞代码,您可以使用相同的底层资源将执行切换到另一个活动任务,然后在异步处理完成后返回到当前线程进行继续处理。

但是如何在 JVM 上生成异步代码? Java 提供了两种异步编程模型:

  • CallBacks:异步方法没有返回值,但需要额外的回调参数(lambda 或匿名类),在结果可用时调用它们。
  • Futures:异步方法立即返回 Future 。异步线程计算任务结果,但 Future 对象包装对它的访问。该值不会立即可用,并且可以轮询对象,直到该值可用。例如,运行 Callable 任务的 ExecutorService 使用 Future 对象。

但是上面两种方法都有局限性。首先多个 callback 难以组合在一起,很快导致代码难以阅读以及难以维护(称为“Callback Hell”):

考虑下面一个例子:在用户的 UI 上展示用户喜欢的 top 5 个商品的详细信息,如果不存在的话则调用推荐服务获取 5 个;这个功能的实现需要三个服务支持:一个是获取用户喜欢的商品的 ID 的接口(userService.getFavorites),第二个是获取商品详情信息接口(favoriteService.getDetails),第三个是推荐商品与商品详情的服务(suggestionService.getSuggestions),基于 callback 模式实现上面功能代码如下:

userService.getFavorites(userId, new Callback() { //1  public void onSuccess(List list) { //2    if (list.isEmpty()) { //3      suggestionService.getSuggestions(new Callback() {//4        public void onSuccess(List list) {           UiUtils.submitOnUiThread(() -> { //5            list.stream()                .limit(5)                .forEach(uiList::show); //6            });        }        public void onError(Throwable error) { //7          UiUtils.errorPopup(error);        }      });    } else {      list.stream() //8          .limit(5)          .forEach(favId -> favoriteService.getDetails(favId, //9            new Callback() {              public void onSuccess(Favorite details) {//10                UiUtils.submitOnUiThread(() -> uiList.show(details));              }              public void onError(Throwable error) {//11                UiUtils.errorPopup(error);              }            }          ));    }  }  public void onError(Throwable error) {    UiUtils.errorPopup(error);  }});
  • 我们的三个服务接口都是基于 callback 的,当异步任务执行完毕后,如果结果正常则会调用 callback 的 onSuccess 方法,如果结果异常则会调用 onError 方法。
  • 代码 1 中我们调用了 userService.getFavorites 接口来获取用户 userId 的推荐商品 id 列表,如果获取结果正常则会调用代码 2,如果失败则会调用代码 7,通知用户 UI 错误信息。
  • 如果正常则会执行代码 3 判断推荐商品 id 列表是否为空,如果是的话则执行代码 4 调用推荐商品与商品详情的服务(suggestionService.getSuggestions),如果获取商品详情失败则执行代码 7callback 的 OnError 把错误信息显示到用户 UI,否则如果成功则执行代码 5 切换线程到 UI 线程,在获取的商品详情列表上施加 jdk8 stream 运算使用 limit 获取 5 个元素,然后显示到 UI 上。
  • 代码 3 如果判断用户推荐商品 id 列表不为空则执行代码 8,在商品 id 列表上使用 JDK8 stream 获取流,然后使用 limit 获取 5 个元素,然后执行代码 9 调用 favoriteService.getDetails 服务获取具体商品的详情,这里多个 id 获取详情是并发进行的,当获取到详情成功后会执行代码 10 在 UI 线程上绘制出商品详情信息,如果失败则执行代码 11 显示错误。

如上为了实现该功能,我们写了很多代码,使用了大量 callback,这些代码比较晦涩难懂,并且存在代码重复,下面我们使用 Reactor 来实现等价的功能:

userService.getFavorites(userId) //1           .flatMap(favoriteService::getDetails) //2           .switchIfEmpty(suggestionService.getSuggestions()) //3           .take(5) //4           .publishOn(UiUtils.uiThreadScheduler()) //5           .subscribe(uiList::show, UiUtils::errorPopup); //6
  • 代码 1 调用 getFavorites 服务获取 userId 对应的商品列表,该方法会马上返回一个流对象,然后代码 2 在流上施加 flatMap 运算把每个商品 id 转换为商品 Id 对应的商品详情信息(通过调用服务 favoriteService::getDetails),由于方法 getDetails 是异步的,所以 flatmap 实际上实现了同步转异步,然后把所有商品详情信息组成新的流返回。

  • 代码 3 判断如果返回的流中没有元素则调用 suggestionService.getSuggestions()服务获取推荐的商品详情列表,代码 4 则从代码 2 或者代码 3 返回的流中获取 5 个元素(5 个商品详细信息),然后执行代码 5publishOn 把当前线程切换到 UI 调度器来执行,代码 6 则通过 subscribe 方法激活整个流处理链,然后在 UI 线程上绘制商品详情列表或者显示错误。

如上代码可知基于 reactor 编写的代码逻辑属于声明式编程,比较通俗易懂,代码量也比较少,不含有重复的代码。

future 相比 callback 要好一些,但尽管 CompletableFuture 在 Java 8 上进行了改进,但它们仍然表现不佳。一起编排多个 future 是可行但是不容易的,它们不支持延迟计算(比如 rxjava 中的 defer 操作)和高级错误处理,例如下面例子。考虑另外一个例子:首先我们获取一个 id 列表,然后根据 id 分别获取对应的 name 和统计数据,然后组合每个 id 对应的 name 和统计数据为一个新的数据,最后输出所有组合对的值,下面我们使用 CompletableFuture 来实现这个功能,以便保证整个过程是异步的,并且每个 id 对应的处理是并发的:

CompletableFuture ids = ifhIds(); //1CompletableFuture result = ids.thenComposeAsync(l -> { //2  Stream zip =      l.stream().map(i -> { //3        CompletableFuture nameTask = ifhName(i); //3.1        CompletableFuture statTask = ifhStat(i); //3.2        return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); //3.3      });  List combinationList = zip.collect(Collectors.toList()); //4  CompletableFuture[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);//5  CompletableFuture allDone = CompletableFuture.allOf(combinationArray); //6  return allDone.thenApply(v -> combinationList.stream()//7      .map(CompletableFuture::join)       .collect(Collectors.toList()));});List results = result.join(); //8
  • 如上代码 1 我们调用 ifhIds 方法异步返回了一个 CompletableFuture 对象,其内部保存了 id 列表。

  • 代码 2 调用 ids 的 thenComposeAsync 方法返回一个新的 CompletableFuture 对象,新 CompletableFuture 对象的数据是代码 2 中的 lambda 表达式执行结果,表达式内代码 3 获取 id 列表的流对象,然后使用 map 操作把 id 元素转换为 name 与统计信息拼接的字符串,这里是通过代码 3.1 根据 id 获取 name 对应的 CompletableFuture 对象,代码 3.2 获取统计信息对应的 CompletableFuture,然后使用代码 3.3 把两个 CompletableFuture 对象进行合并做到的。

  • 代码 3 会返回一个流对象,其中元素是所有 id 对应的 name 与统计信息组合后的结果,然后代码 4 把流中元素收集保存到了 combinationList 列表里面。代码 5 把列表转换为了数组,这是因为代码 2 的 allOf 操作符的参数必须为数组。

  • 代码 6 把 combinationList 列表中的所有 CompletableFuture 对象转换为了一个 allDone(等所有 CompletableFuture 对象的任务执行完毕),到这里我们调用 allDone 的 get()方法就可以等待所有异步处理执行完毕,但是我们目的是想获取到所有异步任务的执行结果,所以代码 7 在 allDone 上施加了 thenApply 运算,意在等所有任务处理完毕后调用所有 CompletableFuture 的 join 方法获取每个任务的执行结果,然后收集为列表后返回一个新的 CompletableFuture 对象,然后代码 8 在新的 CompletableFuture 上调用 join 方法获取所有执行结果列表。

Reactor 本身提供了更多的开箱即用的操作符,使用 Reactor 来实现上面功能代码如下:

Flux ids = ifhrIds(); //1Flux combinations =    ids.flatMap(id -> { //2      Mono nameTask = ifhrName(id); //2.1      Mono statTask = ifhrStat(id); //2.2      return nameTask.zipWith(statTask, //2.3          (name, stat) -> "Name " + name + " has stats " + stat);    });Mono result = combinations.collectList(); //3List results = result.block(); //4
  • 如上代码 1 我们调用 ifhIds 方法异步返回了一个 Flux 对象,其内部保存了 id 列表。

  • 代码 2 调用 ids 的 flatMap 方法对其中元素进行转换,代码 2.1 根据 id 获取 name 信息(返回流对象 Mono),代码 2.2 根据 id 获取统计信息(返回流对象 Mono),代码 3 结合两个流为新的流元素。

  • 代码 3 调用新流的 collectList 方法把所有的流对象转换为列表,然后返回一个新的 Mono 流对象。

  • 代码 4 则调用新的 Mono 流对象的 block 方法阻塞获取所有执行结果。

如上代码使用 reactor 方式编写的代码相比使用 CompletableFuture 实现相同功能来说,更简洁,更通俗易懂。

Callback 和 Future 的这些弊病是相似的,而响应式编程正是使用发布者 - 订阅者方式来解决这些问题的。

诸如 Reactor 之类的反应库旨在解决 JVM 上“经典”异步方法的这些缺点,同时还关注一些其他方面:

  • 可组合性和可读性
  • 数据作为一个用丰富的运算符词汇表操纵的流程
  • 在您订阅之前没有任何事情发生
  • 背压或消费者向生产者发出信号反馈发出信号过快的能力
  • 高级但高价值的抽象,与并发无关

可组合性,指的是编排多个异步任务的能力,使用先前任务的结果作为后续任务的输入或以 fork-join 方式执行多个任务。

编排任务的能力与代码的可读性和可维护性紧密相关。随着异步过程层数量和复杂性的增加,能够编写和读取代码变得越来越困难。正如我们所看到的,callback 模型很简单,但其主要缺点之一是,对于复杂的处理,您需要从回调执行回调,本身嵌套在另一个回调中,依此类推。那个混乱被称为 Callback Hell,正如你可以猜到的(或者从经验中得知),这样的代码很难回归并推理。

Reactor 提供了丰富的组合选项,其中代码反映了抽象过程的组织,并且所有内容通常都保持在同一级别(嵌套最小化)。

您可以将响应式应用程序处理的数据视为在装配线中移动。Reactor 既是传送带又是工作站。原材料从源(原始发布者)注入,最终作为成品准备推送给消费者(或订阅者)。

原材料可以经历各种转换和其他中间步骤,或者是将中间元素聚集在一起形成较大装配线的一部分。如果在装配线中某一点出现堵塞,受影响的工作站可向上游发出信号以限制原材料的向下流动。

在 Reactor 中,运算符是我们装配线中类比的工作站。每个运算符都会向发布者添加行为,并将上一步的发布者包装到新实例中。因此链接整个链,使得数据源自第一个发布者并沿着链向下移动,由每个链点进行转换。最终,订阅者订阅该流,然后激活完成该过程。需要注意,在订阅者订阅发布者之前没有任何事情发生。

虽然 Reactive Streams 规范根本没有指定运算符,但 Reactor 或者 rxjava 等反应库的最佳附加值之一是它们提供的丰富的运算符。这些涉及很多方面,从简单的转换和过滤到复杂的编排和错误处理。

在 Reactor 中,当您编写 Publisher 链时,默认情况下数据不会启动。相反,您可以创建异步过程的抽象描述(这可以帮助重用和组合)。

通过订阅操作,您可以将发布者绑定到订阅者,从而触发整个链中的数据流。这是通过订阅者发出的单个请求信号在内部实现的,该请求信号在上游传播,一直返回到源发布者。

上游传播信号也用于实现背压,我们在装配线中将其描述为当工作站比上游工作站处理速度慢时向上游线路发送的反馈信号。

Reactive Streams 规范定义的真正机制非常接近于下面的类比:订阅者可以在无限制模式下工作,让生产者以最快的速度推送所有数据,或者它可以使用请求机制向生产者发送信号通知它准备处理最多 n 个元素。

施加到源的中间操作符也可以在途中更改请求。想象一个缓冲区运算符,它以 10 个批次对元素进行分组。如果订阅者请求 1 个缓冲区,则源可以生成 10 个元素。一些生产者还实施预取策略,这避免了往返的request(1),并且如果在请求之前生成元素并不太昂贵,则预取是很有益的。

这将推模型转换为推拉式混合模式,如果上游生产了很多元素,则下游可以从上游拉出 n 个元素。但是如果元素没有准备好,就会在上游生产出元素后推数据到下游。

更多技术分享,请关注微信公众号:技术原始积累;另外《Java 并发编程之美》已经出版,对并发感兴趣的同学可以参考下。

阅读全文: http://gitbook.cn/gitchat/activity/5d61f61174e7cf7224748221

您还可以下载 CSDN 旗下精品原创内容社区 GitChat App ,阅读更多 GitChat 专享技术内容哦。

FtooAtPSkEJwnW-9xkCLqSTRpBKX

关注
打赏
1688896170
查看更多评论

蔚1

暂无认证

  • 2浏览

    0关注

    4645博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.0699s