import{Observable}from'rxjs';constobservable=Observable.create(observer=>{setInterval(()=>{observer.next('I am alive')},1000)})constsubscription=observable.subscribe(console.log);setTimeout(()=>subscription.unsubscribe(),5001)
上面的例子中,我们的Observable每隔1s发出一个值,5秒后订阅取消。运行代码看到console中只会打印5个 I am alive 的字符串。
import{Observable}from'rxjs';constobservable=Observable.create(observer=>{setInterval(()=>{observer.next('I am alive')},1000)})observable.subscribe(console.log);setTimeout(()=>observable.subscribe((value)=>console.log(`Subscriber2 ${value}`)),2000);
import{Observable}from'rxjs';constobservable=Observable.create(observer=>{observer.next('Start');observer.next('Yeah');setInterval(()=>{observer.next('I am alive')},1000)})observable.subscribe(console.log);setTimeout(()=>observable.subscribe((value)=>console.log(`Subscriber2 ${value}`)),2000);
和 Cold Observable 相反, Hot Observable 的值不管有没有订阅关系,这个值都已经发出了,不会再次发出。
1
2
3
4
5
6
7
8
9
10
11
12
13
import{Observable}from'rxjs';import{share}from'rxjs/operators';constobservable=Observable.create(observer=>{observer.next('Start');observer.next('Yeah');setInterval(()=>{observer.next('I am alive')},1000)}).pipe(share());//使用share让一个cold observable 变成一个 hot observable
observable.subscribe(console.log);setTimeout(()=>observable.subscribe((value)=>console.log(`Subscriber2 ${value}`)),2000);
Subject 是一种特殊的 Hot Observable,与 Observable 需要 producer 函数不同,Subject 能调用自身的 next 方法来 发出值。
1
2
3
4
5
6
7
8
9
10
11
12
import{Subject}from'rxjs';import{share}from'rxjs/operators';constsubject=newSubject();constsubscription=subject.subscribe(console.log);subject.next('I am a Subject');constsubscription2=subject.subscribe(value=>console.log(`Subscriber2: ${value}`));subject.next('Heelo');
Console 中会依次打印出 “I am a Subject” / “Heelo” / “Subscriber2: Heelo” 。
import{BehaviorSubject}from'rxjs';import{share}from'rxjs/operators';constsubject=newBehaviorSubject('Start');constsubscription=subject.subscribe(console.log);subject.next('I am a Subject');subject.next('I am also a Observable');constsubscription2=subject.subscribe(value=>console.log(`Subscriber2: ${value}`));subject.next('Heelo');
运行发现 Console 中打印了 Subscriber2: I am also a Observable
ReplaySubject 和 BehaviorSubject 相似,不同的是 ReplaySubject 接受一个 number 类型的值 x ,代表会重放先前 x 次的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import{ReplaySubject}from'rxjs';import{share}from'rxjs/operators';constsubject=newReplaySubject(2);constsubscription=subject.subscribe(console.log);subject.next('I am a Subject');subject.next('I am also a Observable');subject.next('And I am Hot');constsubscription2=subject.subscribe(value=>console.log(`Subscriber2: ${value}`));subject.next('Heelo');
运行发现 Console 中打印了 Subscriber2: I am also a Observable 和 Subscriber2: And I am Hot。
import{AsyncSubject}from'rxjs';import{share}from'rxjs/operators';constsubject=newAsyncSubject();constsubscription=subject.subscribe(console.log);subject.next('I am a Subject');subject.next('I am also a Observable');subject.next('And I am Hot');constsubscription2=subject.subscribe(value=>console.log(`Subscriber2: ${value}`));subject.complete();subject.next('Heelo');
运行代码发现, console 中仅打印了 And I am Hot 和 Subscriber2: And I am Hot。