概述
学习响应式编程最难部分在于用响应式思维。
什么是响应式编程
响应式编程就是用异步数据流进行编程
这不是新理念。即使是最典型的点击事件也是一个异步事件流,从而可以对其进行侦测(observe)并进行相应操作。
可以基于任何东西创建数据流。流非常轻便,并且无处不在,任何东西都可以是一个流:变量,用户输入,属性,缓存,数据结构等等。例如,想象一下微博推文也可以是一个数据流,和点击事件一样。你可以对其进行侦听,并作相应反应。
除些之外,我们有很多功能强大的函数,可以对这些流进行合并、过滤、转变等。这就是 “函数式编程” 的强大之处。流可以作为另一个流的输入。甚至多个流也可以作为另一个流的输入。你也可以合并流,从流中过滤出你感兴趣的事件。你也可以将流中的数据值映射转换成另一个流。
流是响应式的核心,下面是 “在按钮上点击” 事件流。
流就是一个按时间顺序正在进行的事件序列(A stream is a sequence of ongoing events ordered in time)。
它可以发送 3 种不同的事物:
- 一个值(类型不限)
- 一个错误
- 或一个已完成(completed) 信号
例如,当包含该按钮的视图或窗口关闭时,流会发送 “completed” 信号。
我们只能异步捕获这些发送的事件,即定义:
- 一个函数,用于当一个值发送出来时再执行
- 定义另一个函数,用于当错误发送出来时执行
- 再定义一个函数,用于当 ‘completed’ 信号发送出来时执行
有时可只定义第一个函数,而忽略定义后两个函数。
对流的 “侦听” 又称为 订阅(subscribing),而定义的函数即为 观察者(observer),流就是 主题(subject, observable)。这是一个典型的观察者模式。
也可以用 ASCII 来画示意图:
--a---b-c---d---X---|->
a, b, c, d 都是发送出的值
X 是错误
| 是 'completed' 信号
---> 是时间线
下面演示将原始的点击事件流转变成一个新的流。
首先,创建一个计数流来指示一个按钮的点击次数。在常见的响应式库中,每个流都会绑定很多的函数,如 map, filter, scan 等。当你调用这些函数时,如 clickStream.map(f)
,会基于 clickStream 返回一个新流。但它不会修改原来的 clickStream 流。这是响应式流的一个核心特性: 不变性(immutability)。因而它能让我们进行函数串联,如 clickStream.map(f).scan(g)
:
clickStream: ---c----c--c----c------c-->
vvvvv map(c becomes 1) vvvv
---1----1--1----1------1-->
vvvvvvvvv scan(+) vvvvvvvvv
counterStream: ---1----2--3----4------5-->
map(f) 函数根据提供的 f 函数将发送出的值转换到另一个新流中,这里是将每次点击映射成数据 1。scan(g) 函数聚合流中所有之前的值,产生值 x = g(accumulated, current),这里 g 是一个简单的加函数。最后,counterStream 每当点击事件发生时就发送出一个代表总点击数的值。
要显示响应式的真正强大之处,假设现想创建一个 “双击” 事件流,为使更有趣,该流可以将多击(多于 2 次)都认为是双击。
在响应式中,这非常简单。画示意图进行思考是理解和创建流的最好方式,无论你是新手还是专家。
灰框表示将流转换成另一个流的函数。首先将点击积累成事件列表,这里 throttle(250ms) 将 250ms 内的点击都合并在一个列表中。现得到了一个列表流,再应用 map() 将每个列表映射成其对应的列表长度。最后,使用 filter(x>=2) 将长度 1 过滤掉。我们用 3 个操作来产生我们需要的流。然后我们可以侦听(订阅)它,进而进行相应的响应。
为什么要采用 RP?
响应式编程提高了代码的抽象水平,因此能专注于那些定义业务逻辑的事件的依存关系,而无需摆弄大量的实现细节。用 RP 写的代码会更加简洁。
现在的 Webapp 和移动 App,都会和与数据事件相关的 UI 事件进行大量交互,因此使用 RP 的优点会更明显。App 已经进化成了更加实时:修改一个表单项会自动触发保存到后台的操作,“点赞” 会实时反应到其他连接的用户,等等。现在的应用含有大量的各种类型的实时事件,以向用户提供高度交互体验。我们需要能处理这些情况的工具,而响应式编程就是答案。
用 RP 思维
现使用一个实例来一步步引导如何使用 RP 思维。本例使用 JavaScript 和 RxJS。
实现一个 “关注谁” 推荐框
界面类似 Twitter 的账号关注推荐 UI:
本例实现以下核心功能:
- 启动时,根据 Github User API 获取账号数据,并显示 3 个推荐
- 点击 “刷新” 后,将另外 3 个推荐导入推荐框中的 3 行中
- 当在账号行上点击 ‘x’ 按钮时,清除该账号并显示另一个
- 每行显示账号的头像及页面链接
请求与应答
如何用 Rx 解决这个问题? (几乎)任何事务都可以是一个流,这是 Rx 的口头禅。
先实现 “启动时,根据 API 获取账号数据,并显示 3 个推荐” 的功能。这里没有特殊的,只需 :
- 发送一个请求
- 获取一个应答
- 显示应答
先将请求表示为一个流。启动时,只需进行一次请求,因此将它建模成数据流时,该流将只会发送出一个值。
--a------|->
这里 a 是字符串 'https://api.github.com/users'
这是我们需要进行请求的 URL 流。每当一个请求事件发生时,它都会告诉我们两件事:何时和什么。事件发送出值的时间就是 “何时” 应用执行请求的时间,而发送出来的值(URL 字符串)就是应该请求的 “什么”。
在 Rx* 中创建这样的一个单值流很简单。流的官方术语是 “Observable”,这是基于它是可被观测的事件命名。
var requestStream = Rx.Observable.just('https://api.github.com/users');
但是现在,它只是一个字符串流,没有做其它任何操作,因此我们需要为当值发送出来时定义一些操作。这通过 订阅 该流完成。
requestStream.subscribe(function(requestUrl) {
// 执行请求操作
jQuery.getJSON(requestUrl, function(reponseData){
//...
});
})
这里使用 jQuery Ajax 回调函数来处理异步请求操作,但 Rx 就是用来处理异步数据流的。该请求在以后某个时间返回的应答可以用流表示吗?当然可以:
requestStream.subscribe(function(requestUrl) {
// 执行请求操作
var responseStream = Rx.Observable.create(function (observer) {
jQuery.getJSON(requestUrl)
.done(function(response) {
observer.onNext(response);
})
.fail(function(jqXHR, status, error){
observer.onError(error);
})
.always(function() {
observer.onCompleted();
});
});
responseStream.subscribe(function(response){
// ...
});
});
Rx.Observable.create
创建了一个自定义流,该流显式地通知每个观察者(或者说订阅者)有关数据事件 (onNext()) 或错误 (onError())。这里只封装了 jQuery Ajax 的 Promise。因为 Promise 也是一个可观察对象(Observable)。
Observable 是 Promise 的超集。在 Rx 中可以通过 var stream = Rx.Observable.fromPromise(promise)
将一个 Promise 转成 Observable。但是 Observable 不能转成 Promise。一个 Promise 简单来说就是一个只发送单个值的 Observable,而 Rx 流却可以返回多个值。
上面的例子中,使用了回调函数。但在 Rx 中有一些简单机制,能基于流转换创建出新的流。
map(f) 函数,能从流 A 中抽取出每个值,用 f() 进行处理后,将新值插入到流 B。如果在请求和应答流上使用,就可以将请求 URL 映射成应答 Promise(类似流):
var responseMetaStream = requestStream
.map(function(requestUrl) {
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});
responseMetaStream 是一个流的流,即发送出的值还是一个流。
我们需要的是发送出一个 JSON 对象的流。因此可以用 flatMap 将流的流扁平化成流:
var responseMetaStream = requestStream
.flatMap(function(requestUrl) {
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});
因为应答流是对应请求流的,如果之后在请求流上发生更多事件,那么在应答流上也会出现对应的应答事件:
requestStream: --a-----b--c------------|->
responseStream: -----A--------B-----C---|->
(小写是请求,大写是应答)
现在有了应答流,故可以呈现接收到的数据了:
responseStream.subscribe(function(response){
// 将应答呈现在 DOM 中
});
合并目前所有代码:
var requestStream = Rx.Observable.just('https://api.github.com/users');
var responseStream = requestStream
.flatMap(function(requestUrl) {
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});
responseStream.subscribe(function(response){
// 将应答呈现在 DOM 中
});
刷新按钮
每个 JSON 应答中都含 100 个用户信息。该 API 只允许指定页偏移,不能指定页大小,故只使用了 3 个数据对象并浪费了 97 个。
每次当点击刷新按钮时,请求流应该发送出一个新 URL,从而能获得一个新应答。需要 2 样东西:刷新按钮上的点击事件流,并将请求流修改成依赖于刷新点击流。RxJS 有将事件转成流的工具:
var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refresh, 'click');
由于刷新点击事件自己不带 API URL,我们需要将每次点击映射成一个实际的 URL。现将请求流修改成由刷新点击流通过将事件映射成一个 API URL 得到。
var requestStream = refreshClickStream
.map(function(){
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
但是这样破坏了原来的功能,现在在启动时不会有请求发出,只有当点击刷新时才会请求。需要在点击刷新和打开页面时都要进行请求。
我们知道这两种情况分开时的流:
var requestStream = refreshClickStream
.map(function(){
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
var startupRequestStream = Rx.Observable.just('https://api.github.com/users');
可以用 merge()
函数将两者合并。
stream A: ---a--------e-----o----->
stream B: -----B---C-----D-------->
vvvvvvvvv merge vvvvvvvvv
---a-B---C--e--D--o----->
合并后的代码为:
var requestOnRefreshStream = refreshClickStream
.map(function(){
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
var startupRequestStream = Rx.Observable.just('https://api.github.com/users');
var requestStream = Rx.Observable.merge(
requestOnRefreshStream, startupRequestStream
);
也可以简写为:
var requestStream = refreshClickStream
.map(function(){
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
})
.merge(RequestStream = Rx.Observable.just('https://api.github.com/users'));
);
再进一步简写为:
var requestStream = refreshClickStream
.map(function(){
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
})
.startWith('https://api.github.com/users');
startWith()
和你想像的功能一样。无论将 startWith 放在代码的哪个位置,startWith(x) 中的 x 总会在结果流的最前面。
为达到更好的直观效果,这里是要实现在启动时模拟点击刷新按钮,故可以改代码改成:
var requestStream = refreshClickStream.startWith('startup click')
.map(function(){
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
})
用流来建模 3 个推荐
现在有个问题:当你点击刷新时,当前的 3 个推荐不能先清空掉。新推荐只有当应答到达后才会显示,但要使 UI 更好看,需要在点击刷新时就清空掉当前的推荐。
refreshClickStream.subscribe(function()
// clear the 3 suggestion DOM elements
});
但是这样也不好,因为现在有 两个 订阅者可以影响推荐的 DOM 元素(另一个是 responseStream.subscribe()),而这不太符合 责任分离原则。
响应式的口头禅是任何事务都可以是一个流。
因此可以将推荐也建模成流,流中发送的每个值都是包含推荐数据的 JSON 对象。对这 3 个中的每个都分开处理。下面是推荐 #1:
var suggestion1Stream = responseStream
.map(function(listUsers){
// 从列表中抽取一个随机用户
return listUsers[Math.floor(Math.random()*listUsers.length)];
});
suggestion1Stream.subscribe(function(suggestion){
// 将推荐 1 显示到 DOM 上
});
// 推荐 #2 和 #3 也类似
回到 “刷新时清除全部推荐”,可以简单地将刷新点击映射成一个数据为 null 的推荐:
var suggestion1Stream = responseStream
.map(function(listUsers){
// 从列表中抽取一个随机用户
return listUsers[Math.floor(Math.random()*listUsers.length)];
})
.merge(
refreshClickStream.map(function(){ return null; });
);
// 呈现,将 null 解析为 “无数据”,因而隐藏 UI
suggestion1Stream.subscribe(function(suggestion){
if (suggestion === null){
// 隐藏推荐的 DOM 元素
}
else {
// 显示推荐的 DOM 元素,并显示数据
}
});
// 推荐 #2 和 #3 也类似
现在的示意图如下:
refreshClickStream: ----------o--------o---->
requestStream: -r--------r--------r---->
responseStream: ----R---------R------R-->
suggestion1Stream: ----s-----N---s----N-s-->
suggestion2Stream: ----q-----N---q----N-q-->
suggestion3Stream: ----t-----N---t----N-t-->
N 表示 null
进一步,还可以在启动时显示 “空” 推荐。只需在推荐流中添加 startWith(null) 即可:
var suggestion1Stream = responseStream
.map(function(listUsers){
// 从列表中抽取一个随机用户
return listUsers[Math.floor(Math.random()*listUsers.length)];
})
.merge(
refreshClickStream.map(function(){ return null; });
)
.startWith(null);
从而示意图变为:
refreshClickStream: ----------o---------o---->
requestStream: -r--------r---------r---->
responseStream: ----R----------R------R-->
suggestion1Stream: -N--s-----N----s----N-s-->
suggestion2Stream: -N--q-----N----q----N-q-->
suggestion3Stream: -N--t-----N----t----N-t-->
关闭一个推荐并使用缓存应答
在每个推荐上点击 ‘x’ 按钮时,就关闭该推荐并重新加载一个。如果简单地当在点击 ‘x’ 按钮时发送一个请求,如下:
var close1Button = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click');
var requestStream = refreshClickStream.startWith('startup click')
.merge(close1ClickStream)
.map(function(){
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
但是上面的效果会是关闭当前推荐并重新加载所有推荐,而不仅仅加载点击的推荐。我们复用之前的应答来解决这个问题,因为之前的每个应答页共有 100 个用户,而只使用了 3 个,因此无需再次请求。
再次使用流思维,当点击 ‘x’ 时,想在 responseStream 流上最近发送出的应答用户列表中获取一个随机用户:
requestStream: --r--------------->
responseStream: ------R----------->
close1ClickStream: ------------c----->
suggestion1Stream: ------s-----s----->
Rx* 的 combineLatest 函数以两个流 A 和 B 作为输入,当其中任一个流发送出一个值时,combineLatest 将从两个流中抽取最近发送的值 a 和 b,组合并输出一个值 c=f(a, b)。示意图如下:
stream A: --a-----------e--------i-------->
stream B: -----b----c--------d-------q---->
vvvvvvvv combineLatest(f) vvvvvvv
----AB---AC--EC---ED--ID--IQ---->
这里 f 是一个大写转换函数
因此使用 combineLatest 将 close1ClickStream 和 responseStream 组合,从而当点击某个 ‘x’ 按钮时,就能获取最近的应答,并在 suggestion1Stream 上发送出一个新值。同时,combineLatest 还是对称的:当 responseStream 上有新应答时,应答会与最近的 ‘x’ 点击事件组件来产生一个新推荐:
var suggestion1Stream = close1ClickStream
.combineLatest(responseStream,
function(click, listUsers) {
// 从列表中抽取一个随机用户
return listUsers[Math.floor(Math.random()*listUsers.length)];
})
.merge(
refreshClickStream.map(function(){ return null; });
)
combineLatest 需要使用两个数据源,因此如果某个源一直没有发送数据,combineLatest 不会产生数据。我们可以在启动时模拟一个 ‘close 1’ 点击来解决该问题:
var suggestion1Stream = close1ClickStream.startWith('startup click')
.combineLatest(responseStream,
function(click, listUsers) {
// 从列表中抽取一个随机用户
return listUsers[Math.floor(Math.random()*listUsers.length)];
})
.merge(
refreshClickStream.map(function(){ return null; });
)
.startWith(null);
全部代码
var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
var closeButton1 = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click');
// and the same logic for close2 and close3
var requestStream = refreshClickStream.startWith('startup click')
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
var responseStream = requestStream
.flatMap(function (requestUrl) {
return Rx.Observable.fromPromise($.ajax({url: requestUrl}));
});
var suggestion1Stream = close1ClickStream.startWith('startup click')
.combineLatest(responseStream,
function(click, listUsers) {
return listUsers[Math.floor(Math.random()*listUsers.length)];
}
)
.merge(
refreshClickStream.map(function(){ return null; })
)
.startWith(null);
// and the same logic for suggestion2Stream and suggestion3Stream
suggestion1Stream.subscribe(function(suggestion) {
if (suggestion === null) {
// hide the first suggestion DOM element
}
else {
// show the first suggestion DOM element
// and render the data
}
});
运行效果见 http://jsfiddle.net/staltz/8jFJH/48/。
参考
原文来自 @andrestaltz 的 The introduction to Reactive Programming you’ve been missing,可能需要翻墙阅读。