rxjs使用小贴士
from(Promise)
没有你想象中的灵活
由于rxjs提供的订阅者模式封装了Observable
类型,使得我们在很多情况下会有将原生类型对象转换为Observable
的需求。
在这种情况中Promise
对象也会时常成为被转换的对象。但需要注意的是,retry
是无法触发Promise
的重新执行的。
这会导致当你想要捕获到错误进行重试的时候将一直得到错误的结果。
let runTimes = 0;
const promise = new Promise((resolve, reject) => {
console.log('run promise times', ++runTimes);
if (runTimes > 2) {
resolve('success');
} else {
reject('failed');
}
});
from(promise)
.pipe(
retry({
count: 5,
delay: (_, retryCount) => {
console.log('retryCount', retryCount);
return timer(100);
},
})
)
.subscribe({
next: (r) => console.log(r),
error: (err) => console.log(err),
});
// run promise times 1
// retryCount 1
// retryCount 1
// retryCount 1
// retryCount 1
// retryCount 1
// failed
由以上的例子可知,from(Promise)
的结果实际上是只对结果进行转换,不会重新执行运行过程。
对于绝大多数情况我们只期望执行一次,但是如果出现需要重试的情况,对于Promise不要进行直接转换。
可以采用一下方式:
let runTimes = 0;
new Observable(subscriber => {
console.log('run observable times', ++runTimes);
if (runTimes > 2) {
subscriber.next('success');
subscriber.complete();
} else {
subscriber.error('failed');
}
}).pipe(
retry({
count: 5,
delay: (_, retryCount) => {
console.log('retryCount', retryCount);
return timer(100);
},
})
)
.subscribe({
next: (r) => console.log(r),
error: (err) => console.log(err),
});
// run observable times 1
// retryCount 1
// run observable times 2
// retryCount 2
// run observable times 3
// success
时刻记得取消订阅
经常使用Promise
的开发者可能很少有这种取消订阅的意识。这是因为Promise
只有三种状态,而其中成功
与失败
两种状态也同时代表了完成。
但在rxjs中,订阅数据是流式的,发布者在存在第一个订阅者时,将会按照既有逻辑分发数据。后续的订阅者是无法获得已经分发的数据的,只能获取后续分发的数据。由于这种特性,如果发布者不主动结束,订阅者将一直能获取数据,除非订阅者主动取消。
总结来说,Promise
下只能发布者结束订阅流程,且只有一个数据并持续保存。但在rxjs
中发布者可以主动结束,订阅者也可以主动结束。
所以当我们在rxjs
中使用流式数据时,每个订阅者应该主动管理自己的生命周期,在合适的时机结束订阅。否则将会因闭包造成内存泄漏,并可能触发不在期望中的执行逻辑。
以下是几种自动取消订阅的例子:
first
Operator
对于很多异步行为,我们其实只期待得到第一个满足条件的。first
方法提供了条件选项,在接下来的获取到的流中第一个满足条件的数据将会自动取消订阅。
适用于远程请求数据等场景。
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(first(() => {
// some condition return boolean, default true
}));
result.subscribe(x => console.log(x));
take
Operator
take(x)
虽然也是只获取到X个数据后取消订阅,但其是无条件的。
observable.pipe(
take(1)
).subscribe( ... );
takeUntil
Operator
以上两个虽然都可以自动取消订阅,但都需要在有至少一个数据分发后才可。如果在没有数据分发的情况下,组件被销毁,我们依然期望订阅会被自动取消,此时可以使用takeUntil
。
export class ExampleComponent implements OnInit, OnDestroy {
constructor(private someService: SomeService) { }
readonly unsub$ = new Subject<void>();
ngOnInit(): void {
this.someService.getData().pipe(
takeUntil( unsub$ )
).subscribe( ... );
}
ngOnDestroy(): void {
this.unsub$.next();
this.unsub$.complete();
}
}
我们构建一个组件内的信号量 unsub$
,它只有在销毁时才有信号发出,此时采用takeUntil( unsub$ )
将都会被取消订阅。
慎用getter
以得到Observable
export class ExampleComponent {
test$ = new Subject();
get filtered$() {
return this.test$.pipe(filter( ... ), take(5))
}
}
要知道getter每一次做读访问的时候都会重新运行一次,其实也就创建了新的observable,这无疑是有存储空间开销的。
另外如果某处使用getter的地方订阅早,并在某种时机下已经触发了complete条件。再此之后的getter使用者由于得到的是新的实例,并没有处于completed状态。
所以,若某个衍生observable并不依赖于某个状态可变的原生变量,不适用于getter。可使用以下方式:
export class ExampleComponent {
test$ = new Subject();
filtered$ = this.test$.pipe(filter( ... ), take(5));
}