skip to content
OnionTalk

RxJS入门指南

RxJS是ReactiveX在JavaScript上的一个派生。ReactiveX是一个应用的比较广泛的响应式编程框架,这个框架很好的应用了Observer Pattern(观察者模式),让异步编程变得简单且清晰。本文会带大家对RxJS有一个初步的入门。

Prerequisite(先导知识)

本文是 RxJS 的入门知识,在文章中会涉及到大量的 JavaScript 示例代码,想要更好的理解本文,你需要具备以下知识。

  1. 理解基本的 ES6 和 TypeScript 语法
  2. 了解过观察者模式

Observable? Observer?

在 RxJSObservable 和 Observer。要想很好的理解两个概念,我需要借用一个生活中很常见的例子,买房。

在这个图中,我们的购房者一直在密切的关注我们的房价。房价随着时间波动,购房者可能会根据波动的房价而采取一系列的行动,比如购入或者继续观望。购房者与房价的这样一种关系其实就构成了一种观察者关系。套用到观察者模式中,

  • 房价 — Observable
  • 购房者 — Observer
  • 购房者观察房价 — Subscribe(订阅)

再结合买房的例子,我们可以很学术的描述 Observable 和 Observer 的行为。

  • Obserable 可被观察(房价被购房者关注),并且随时间变化发出(emit)不同值(房价波动)。
  • Observer 观察 Observable(购房者关注房价),并在 Observable(房价)发出不同值(房价波动)时做出响应(买房或者观望)。
  • Observable 和 Observer 之间通过订阅(Subscription)来建立观察关系。
  • 当 Observable 没有 Observer 的时候,即使发出了值,也不会产生任何影响(无购房意愿者不会响应房价波动)

RxJS 中的 Observable 和 Observer

有了基本的 Observable 和 Observer 的概念,我们再来看看 RxJS 中的 Observable 和 Observer。

创建一个 Observable

我们可以调用 Observable.create 方法来创建一个 Observable,这个方法接受一个函数作为参数,这个函数叫做 producer 函数, 用来生成 Observable 的值。该函数的参数是 observer,调用 observer.next()就可以生成一个有一系列值的 Observable。

import { Observable } from 'rxjs';

const observable = Observable.create((observer) => {
  observer.next('foo');
  observer.next('bar');
});

但是运行这段代码后并不会发生任何事情,我们需要一个 Observer 去 Subscribe 这个 Observable,然后 Observer 基于 Observable 发出的值做出响应。

Subscribe 一个 Observerable

我们通过下面的这段代码去 Subscribe 一个 Observable

import { Observable } from 'rxjs';

const observable = Observable.create((observer) => {
  observer.next('foo');
  observer.next('bar');
});

observable.subscribe(console.log);

运行代码,console 中就会依次打印 foo / bar 了

Complete(完成)一个 Observable

一个 Observable 可以被完成,一个 Observable 完成之后,即使之后还有值发出,任何的 Observer 也不会再进行响应。

import { Observable } from 'rxjs';

const observable = Observable.create((observer) => {
  observer.next('foo');
  observer.next('bar');
  observer.complete();
  observer.next('foobar');
});

observable.subscribe(console.log);

运行上面的例子会发现,console 中只打印了 foo 和 bar,因为 foobar 是在 observable 被完成之后才发出的,所以 observer 并不会响应。

如果想要在 Observable 被完成时进行函数回调,可以使用 subscribe 方法的第三个参数。

import { Observable } from 'rxjs';

const observable = Observable.create(observer => {
 observer.next('foo');
 observer.next('bar');
 observer.complete();
 observer.next('foobar');
})

observable.subscribe(console.log, ()=>(), ()=> console.log('complete'));

运行上面的例子,console 会依次打印 foo, bar 和 complete。

Unsubscribe(取消订阅)

Observable 与 Observer 之间的订阅关系既可以建立,又可以取消。订阅取消之后,observer 不会再对任何 observable 发出的值产生响应。

import { Observable } from 'rxjs';

const observable = Observable.create((observer) => {
  setInterval(() => {
    observer.next('I am alive');
  }, 1000);
});

const subscription = observable.subscribe(console.log);

setTimeout(() => subscription.unsubscribe(), 5001);

上面的例子中,我们的 Observable 每隔 1s 发出一个值,5 秒后订阅取消。运行代码看到 console 中只会打印 5 个 I am alive 的字符串。

一个 Observable 可以被多个 Observer 订阅

当一个 Observable 被建立后,它能被多个 Observer 订阅,每个订阅关系相互独立,互不影响。

import { Observable } from 'rxjs';

const observable = Observable.create((observer) => {
  setInterval(() => {
    observer.next('I am alive');
  }, 1000);
});

observable.subscribe(console.log);

setTimeout(() => observable.subscribe((value) => console.log(`Subscriber2 ${value}`)), 2000);

在上面的例子中,我们建立了两个订阅关系,第二个订阅会在 2 秒后开始工作,console 中在 2 秒后开始出现 Subscriber2 的 console 信息。

Child Subscription

当一个 Observable 有多个 Observer 时,如果需要取消所有的订阅关系,可以建立一个 Child Subscription 关系,然后一次 Unsubscribe 取消所有的订阅。

import { Observable } from 'rxjs';

const observable = Observable.create((observer) => {
  setInterval(() => {
    observer.next('I am alive');
  }, 1000);
});

const subscription = observable.subscribe(console.log);
const subscription2 = observable.subscribe((value) => console.log(`Subscriber2 ${value}`));

// 建立Child Subscription关系
subscription.add(subscription2);

// 一次取消所有Subscription
setTimeout(() => subscription.unsubscribe(), 3000);

Cold Observable 和 Hot Observable

Observable 分为两种,Cold Observable 和 Hot Observable。

Cold Observable

回到上面的多个 Observer 订阅的例子,我们在 setInterval 的之前多发出两个值。

import { Observable } from 'rxjs';

const observable = Observable.create((observer) => {
  observer.next('Start');
  observer.next('Yeah');
  setInterval(() => {
    observer.next('I am alive');
  }, 1000);
});

observable.subscribe(console.log);
setTimeout(() => observable.subscribe((value) => console.log(`Subscriber2 ${value}`)), 2000);

此时 console 中的打印结果依次为

需要注意的是打印结果中出现了 Subscribe2: Start 和 Subscribe2: Yeah,说明这个 Observable 仅会在有订阅关系时发出值,并且 Observable 中所有的值都会被发出。

这种 Observable 叫做 Cold Observable

Hot Observable

和 Cold Observable 相反, Hot Observable 的值不管有没有订阅关系,这个值都已经发出了,不会再次发出。

import { Observable } from 'rxjs';
import { share } from 'rxjs/operators';

const observable = Observable.create((observer) => {
  observer.next('Start');
  observer.next('Yeah');
  setInterval(() => {
    observer.next('I am alive');
  }, 1000);
}).pipe(share()); //使用share让一个cold observable 变成一个 hot observable

observable.subscribe(console.log);
setTimeout(() => observable.subscribe((value) => console.log(`Subscriber2 ${value}`)), 2000);

打印结果如下

可以发现 Start 和 Yeah 都不在 Subscribe2 的打印结果之中,因为这两个值已经在 Observer2 订阅之前发出过了。

另外一个 Hot Observable 的例子是浏览器的 mouseMove 事件,不管有没有 Observer, mouseMove 事件在鼠标移动时都会发出,当有新的 Observer 订阅时,之前的 mouseMove 事件也不会重新发出。

Subject

Subject 是一种特殊的 Hot Observable,与 Observable 需要 producer 函数不同,Subject 能调用自身的 next 方法来 发出值。

import { Subject } from 'rxjs';
import { share } from 'rxjs/operators';

const subject = new Subject();

const subscription = subject.subscribe(console.log);

subject.next('I am a Subject');

const subscription2 = subject.subscribe((value) => console.log(`Subscriber2: ${value}`));

subject.next('Heelo');

Console 中会依次打印出 “I am a Subject” / “Heelo” / “Subscriber2: Heelo” 。

Subject 在 RxJS 中还有三种变体,分别是 BehaviorSubject, ReplaySubject 和 AsyncSubject

BehaviorSubject

BehaviorSubject 是一种在有新的订阅时会额外发出最近一次发出的值的 Subject。

import { BehaviorSubject } from 'rxjs';
import { share } from 'rxjs/operators';

const subject = new BehaviorSubject('Start');

const subscription = subject.subscribe(console.log);

subject.next('I am a Subject');
subject.next('I am also a Observable');

const subscription2 = subject.subscribe((value) => console.log(`Subscriber2: ${value}`));

subject.next('Heelo');

运行发现 Console 中打印了 Subscriber2: I am also a Observable

ReplaySubject

ReplaySubjectBehaviorSubject 相似,不同的是 ReplaySubject 接受一个 number 类型的值 x ,代表会重放先前 x 次的值。

import { ReplaySubject } from 'rxjs';
import { share } from 'rxjs/operators';

const subject = new ReplaySubject(2);

const subscription = subject.subscribe(console.log);

subject.next('I am a Subject');
subject.next('I am also a Observable');
subject.next('And I am Hot');

const subscription2 = subject.subscribe((value) => console.log(`Subscriber2: ${value}`));

subject.next('Heelo');

运行发现 Console 中打印了 Subscriber2: I am also a Observable 和 Subscriber2: And I am Hot。

AsyncSubject

AsyncSubject 和前面两种 Subject 略有不同。 AsyncSubject 仅仅会在 Observable 被完成的时候发出完成前的最后一个值。

import { AsyncSubject } from 'rxjs';
import { share } from 'rxjs/operators';

const subject = new AsyncSubject();

const subscription = subject.subscribe(console.log);

subject.next('I am a Subject');
subject.next('I am also a Observable');
subject.next('And I am Hot');

const subscription2 = subject.subscribe((value) => console.log(`Subscriber2: ${value}`));

subject.complete();

subject.next('Heelo');

运行代码发现, console 中仅打印了 And I am Hot 和 Subscriber2: And I am Hot。

operator(操作符)与 marble diagram(弹珠图)

RxJS 中最复杂的一块就是 Operator 了。 RxJS 使用 Operator 来对 Observable 发出的值进行操作从而完成复杂的逻辑。由于 Operator 实在太多,所以在这里我不会一一讲解。相反,我更想谈一谈 marble diagram。 在官网文档中, 大多数 Operator 都会附有 marble diagram。 只要会看 marble diagram,学起 operator 来会非常的快。

以 map 这个操作符为例, marble diagram 如下。

对应代码如下。

import { Observable, interval } from 'rxjs';
import { map } from 'rxjs/operators';

const observable = interval(1000).pipe(map((value) => value * 10));

const subscription = observable.subscribe(console.log);

在 marble diagram 中间的方框就是我们的操作符以及其具体进行的操作。 操作符方框的上方是原始 Observable 数据, 操作符方框下方是经过操作符操作后的结果。

另外一个需要注意的是我们在这里使用了 interval 这个创建操作符去创建 Observable,RxJS 还提供了很多的的创建操作符来很方便的取帮助我们创建 Observable。

小结

  1. Observable 可被观察,并且随时间发出不同值,Observer 观察 Observable,并且在 Observable 的值发生变化的时候做出响应。Observable 和 Observer 通过 Subscription 来建立订阅关系。
  2. RxJS 中可以通过 Observerble.create 来创建一个 Observable,通过调用 subscribe 方法来让 Observer 订阅这个 Observable,通过 unSubscribe 来取消订阅。
  3. Observable 有 Cold 和 Hot 两种。区别这两种 Observable 的方法是 看起订阅时会不会发出所有的值。
  4. Subject 是一种特殊的 Hot Observable。RxJS 中有三种 Subject 变体,分别是 BehaviorSubject,ReplaySubject 和 AsyncSubject。
  5. RxJS 中有很多 Operator,为了更好的学习 Operator, 你需要做的首先是会读 marble diagram。

本文所有的代码实例均可在 stackblitz 上自己动手尝试

参考资料

Learn RxJS in 60 Minutes for Beginners - Free Crash Course

RxJS docs

learn-rxjs