RxJS是ReactiveX在JavaScript上的一个派生。ReactiveX是一个应用的比较广泛的响应式编程框架,这个框架很好的应用了Observer Pattern(观察者模式),让异步编程变得简单且清晰。本文会带大家对RxJS有一个初步的入门。

Prerequisite(先导知识)

本文是RxJS的入门知识,在文章中会涉及到大量的JavaScript示例代码,想要更好的理解本文,你需要具备以下知识。

  1. 理解基本的ES6和TypeScript语法
  2. 了解过观察者模式

Observable? Observer?

在RxJSObservable和Observer。要想很好的理解两个概念,我需要借用一个生活中很常见的例子,买房。

在这个图中,我们的购房者一直在密切的关注我们的房价。房价随着时间波动,购房者可能会根据波动的房价而采取一系列的行动,比如购入或者继续观望。购房者与房价的这样一种关系其实就构成了一种观察者关系。套用到观察者模式中,

  • 房价 – Observable
  • 购房者 – Observer
  • 购房者观察房价 – Subscribe(订阅)

再结合买房的例子,我们可以很学术的描述Observable和Observer的行为。

  • Obserable 可被观察(房价被购房者关注),并且随时间变化发出(emit)不同值(房价波动)。
  • Observer 观察Observable(购房者关注房价),并在Observable(房价)发出不同值(房价波动)时做出响应(买房或者观望)。
  • Observable和Observer之间通过订阅(Subscription)来建立观察关系。
  • 当Observable没有Observer的时候,即使发出了值,也不会产生任何影响(无购房意愿者不会响应房价波动)

RxJS中的Observable和Observer

有了基本的Observable和Observer的概念,我们再来看看RxJS中的Observable和Observer。

创建一个Observable

我们可以调用 Observable.create 方法来创建一个Observable,这个方法接受一个函数作为参数,这个函数叫做 producer 函数, 用来生成 Observable的值。该函数的参数是observer,调用observer.next()就可以生成一个有一系列值的Observable。

1
2
3
4
5
6
import { Observable } from 'rxjs';

const observable = Observable.create(observer => {
  observer.next('foo');
  observer.next('bar');
})

但是运行这段代码后并不会发生任何事情,我们需要一个Observer去Subscribe这个Observable,然后Observer基于Observable发出的值做出响应。

Subscribe一个Observerable

我们通过下面的这段代码去Subscribe一个Observable

1
2
3
4
5
6
7
8
import { Observable } from 'rxjs';

const observable = Observable.create(observer => {
  observer.next('foo');
  observer.next('bar');
})

observable.subscribe(console.log)

运行代码,console中就会依次打印 foo / bar 了

Complete(完成)一个Observable

一个Observable可以被完成,一个Observable完成之后,即使之后还有值发出,任何的Observer也不会再进行响应。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import { Observable } from 'rxjs';

const observable = Observable.create(observer => {
  observer.next('foo');
  observer.next('bar');
  observer.complete();
  observer.next('foobar');
})

observable.subscribe(console.log)

运行上面的例子会发现,console中只打印了 foo 和 bar,因为 foobar 是在observable被完成之后才发出的,所以observer并不会响应。

如果想要在Observable被完成时进行函数回调,可以使用subscribe方法的第三个参数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import { Observable } from 'rxjs';

const observable = Observable.create(observer => {
  observer.next('foo');
  observer.next('bar');
  observer.complete();
  observer.next('foobar');
})

observable.subscribe(console.log, ()=>(), ()=> console.log('complete'));

运行上面的例子,console会依次打印 foo, bar 和 complete。

Unsubscribe(取消订阅)

Observable与Observer之间的订阅关系既可以建立,又可以取消。订阅取消之后,observer不会再对任何observable发出的值产生响应。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import { Observable } from 'rxjs';

const observable = Observable.create(observer => {
  setInterval(()=> {
    observer.next('I am alive')
  }, 1000)
})

const subscription = observable.subscribe(console.log);

setTimeout(() => subscription.unsubscribe(), 5001)

上面的例子中,我们的Observable每隔1s发出一个值,5秒后订阅取消。运行代码看到console中只会打印5个 I am alive 的字符串。

一个Observable可以被多个Observer订阅

当一个Observable被建立后,它能被多个Observer订阅,每个订阅关系相互独立,互不影响。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import { Observable } from 'rxjs';

const observable = Observable.create(observer => {
  setInterval(()=> {
    observer.next('I am alive')
  }, 1000)
})


observable.subscribe(console.log);

setTimeout(()=> observable.subscribe((value) => console.log(`Subscriber2 ${value}`)), 2000);

在上面的例子中,我们建立了两个订阅关系,第二个订阅会在2秒后开始工作,console中在2秒后开始出现Subscriber2的console信息。

Child Subscription

当一个Observable有多个Observer时,如果需要取消所有的订阅关系,可以建立一个Child Subscription关系,然后一次Unsubscribe取消所有的订阅。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import { Observable } from 'rxjs';

const observable = Observable.create(observer => {
  setInterval(()=> {
    observer.next('I am alive')
  }, 1000)
})


const subscription = observable.subscribe(console.log);
const subscription2 =  observable.subscribe((value) => console.log(`Subscriber2 ${value}`));

// 建立Child Subscription关系
subscription.add(subscription2);

// 一次取消所有Subscription
setTimeout(() => subscription.unsubscribe(), 3000)

Cold Observable 和 Hot Observable

Observable分为两种,Cold Observable和 Hot Observable。

Cold Observable

回到上面的多个Observer订阅的例子,我们在setInterval的之前多发出两个值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import { Observable } from 'rxjs';

const observable = Observable.create(observer => {
  observer.next('Start');
  observer.next('Yeah');
  setInterval(()=> {
    observer.next('I am alive')
  }, 1000)
})

observable.subscribe(console.log);
setTimeout(()=> observable.subscribe((value) => console.log(`Subscriber2 ${value}`)), 2000);

此时 console 中的打印结果依次为

需要注意的是打印结果中出现了 Subscribe2: Start 和 Subscribe2: Yeah,说明这个 Observable 仅会在有订阅关系时发出值,并且 Observable 中所有的值都会被发出。

这种 Observable 叫做 Cold Observable

Hot Observable

和 Cold Observable 相反, Hot Observable 的值不管有没有订阅关系,这个值都已经发出了,不会再次发出。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import { Observable } from 'rxjs';
import { share } from 'rxjs/operators';

const observable = Observable.create(observer => {
  observer.next('Start');
  observer.next('Yeah');
  setInterval(()=> {
    observer.next('I am alive')
  }, 1000)
}).pipe(share()); //使用share让一个cold observable 变成一个 hot observable

observable.subscribe(console.log);
setTimeout(()=> observable.subscribe((value) => console.log(`Subscriber2 ${value}`)), 2000);

打印结果如下

可以发现Start和Yeah都不在Subscribe2的打印结果之中,因为这两个值已经在Observer2订阅之前发出过了。

另外一个 Hot Observable 的例子是浏览器的 mouseMove 事件,不管有没有 Observer, mouseMove 事件在鼠标移动时都会发出,当有新的 Observer 订阅时,之前的 mouseMove 事件也不会重新发出。

Subject

Subject 是一种特殊的 Hot Observable,与 Observable 需要 producer 函数不同,Subject 能调用自身的 next 方法来 发出值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import { Subject } from 'rxjs';
import { share } from 'rxjs/operators';

const subject = new Subject();

const subscription = subject.subscribe(console.log);

subject.next('I am a Subject');

const subscription2 = subject.subscribe(value => console.log(`Subscriber2: ${value}`));

subject.next('Heelo');

Console 中会依次打印出 “I am a Subject” / “Heelo” / “Subscriber2: Heelo” 。

Subject 在 RxJS 中还有三种变体,分别是 BehaviorSubject, ReplaySubject 和 AsyncSubject

BehaviorSubject

BehaviorSubject 是一种在有新的订阅时会额外发出最近一次发出的值的Subject。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import { BehaviorSubject } from 'rxjs';
import { share } from 'rxjs/operators';

const subject = new BehaviorSubject('Start');

const subscription = subject.subscribe(console.log);

subject.next('I am a Subject');
subject.next('I am also a Observable');

const subscription2 = subject.subscribe(value => console.log(`Subscriber2: ${value}`));

subject.next('Heelo');

运行发现 Console 中打印了 Subscriber2: I am also a Observable

ReplaySubject

ReplaySubjectBehaviorSubject 相似,不同的是 ReplaySubject 接受一个 number 类型的值 x ,代表会重放先前 x 次的值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import { ReplaySubject } from 'rxjs';
import { share } from 'rxjs/operators';

const subject = new ReplaySubject(2);

const subscription = subject.subscribe(console.log);

subject.next('I am a Subject');
subject.next('I am also a Observable');
subject.next('And I am Hot');

const subscription2 = subject.subscribe(value => console.log(`Subscriber2: ${value}`));

subject.next('Heelo');

运行发现 Console 中打印了 Subscriber2: I am also a Observable 和 Subscriber2: And I am Hot。

AsyncSubject

AsyncSubject 和前面两种 Subject 略有不同。 AsyncSubject 仅仅会在 Observable 被完成的时候发出完成前的最后一个值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import { AsyncSubject } from 'rxjs';
import { share } from 'rxjs/operators';

const subject = new AsyncSubject();

const subscription = subject.subscribe(console.log);

subject.next('I am a Subject');
subject.next('I am also a Observable');
subject.next('And I am Hot');

const subscription2 = subject.subscribe(value => console.log(`Subscriber2: ${value}`));

subject.complete();

subject.next('Heelo');

运行代码发现, console 中仅打印了 And I am Hot 和 Subscriber2: And I am Hot。

operator(操作符)与marble diagram(弹珠图)

RxJS中最复杂的一块就是 Operator 了。 RxJS 使用 Operator 来对 Observable 发出的值进行操作从而完成复杂的逻辑。由于 Operator 实在太多,所以在这里我不会一一讲解。相反,我更想谈一谈 marble diagram。 在官网文档中, 大多数 Operator 都会附有 marble diagram。 只要会看 marble diagram,学起 operator 来会非常的快。

以 map 这个操作符为例, marble diagram 如下。

对应代码如下。

1
2
3
4
5
6
import { Observable, interval } from 'rxjs';
import { map } from 'rxjs/operators';

const observable = interval(1000).pipe(map(value => value * 10));

const subscription = observable.subscribe(console.log);

在 marble diagram 中间的方框就是我们的操作符以及其具体进行的操作。 操作符方框的上方是原始 Observable 数据, 操作符方框下方是经过操作符操作后的结果。

另外一个需要注意的是我们在这里使用了 interval 这个创建操作符去创建 Observable,RxJS还提供了很多的的创建操作符来很方便的取帮助我们创建 Observable。

小结

  1. Observable 可被观察,并且随时间发出不同值,Observer 观察 Observable,并且在 Observable 的值发生变化的时候做出响应。Observable 和 Observer 通过 Subscription 来建立订阅关系。
  2. RxJS 中可以通过 Observerble.create 来创建一个 Observable,通过调用 subscribe 方法来让 Observer 订阅这个 Observable,通过 unSubscribe来取消订阅。
  3. Observable 有 Cold 和 Hot 两种。区别这两种 Observable 的方法是 看起订阅时会不会发出所有的值。
  4. Subject 是一种特殊的 Hot Observable。RxJS 中有三种 Subject 变体,分别是 BehaviorSubject,ReplaySubject 和 AsyncSubject。
  5. RxJS 中有很多 Operator,为了更好的学习 Operator, 你需要做的首先是会读 marble diagram。

本文所有的代码实例均可在 stackblitz 上自己动手尝试

参考资料

Learn RxJS in 60 Minutes for Beginners - Free Crash Course

RxJS docs

learn-rxjs