RxJS 各种xxxAll方法
concatAll, exhaustAll, mergeAll, switchAll, zipAll, combineLatestAll都会对高阶Observable进行连接合并为一个单一Observable。
concatAll
将一个高阶Observable序列连接成一个单一的Observable,并依次发送。
注意:序列连接过程中只有前一个Observable完成之后,后一个Observable才会被添加到序列中。
import { fromEvent, map, interval, take, concatAll } from "rxjs";
// 外部Observable,每次点击都会触发一次next以发布一个事件
const clicks = fromEvent(document, "click");
// 基于事件Observable衍生的高阶Observable
const higherOrder = clicks.pipe(map(() => interval(1000).pipe(take(4))));
// 采用concatAll打平高阶Observable
const firstOrder = higherOrder.pipe(concatAll());
// 通过exhuastAll打平后,一旦我们触发点击事件,高阶Observable中将会存在一个元素,订阅并触发定时器并且在4秒后完成
// 此时如果我们在4秒内再次点击,高阶Observable中将会存在两个元素
// 但result元素仍然在等待第一个内部Observable完成,将会在第一个内部Observable完成后执行订阅下一个内部Observable(触发定时器)
// 以此类推,每一次点击都会让高阶Observable新增元素,但result元素只有等上一个完成,才会订阅下一个内部Observable
firstOrder.subscribe((x) => console.log(x));
// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
// Time 0s 1s 2s 3s 4s 5s 6s 7s 8s 9s 10s 11s 12s
// Event 1C 2C 3C
// Log 0 1 2 3 0 1 2 3 0 1 2 3
exhaustAll
将一个高阶Observable单一的Observable,并依次发送。
注意:链接过程中如果前一个内部Observable未完成,后面的Observable将会被忽略。 直到当前正在执行的内部Observable完成,才会接收下一个新产生的内部Observable。
import { fromEvent, map, interval, take, exhaustAll } from "rxjs";
// 外部Observable,每次点击都会触发一次next以发布一个事件
const clicks = fromEvent(document, "click");
// 基于事件Observable衍生的高阶Observable
const higherOrder = clicks.pipe(
map(() => interval(1000).pipe(take(4))), // 返回一个每秒触发一次,触发4次结束的内部Observable
);
// 采用exhaustAll打平高阶Observable
const result = higherOrder.pipe(exhaustAll());
// 通过exhuastAll打平后,一旦我们触发点击事件,高阶Observable中将会存在一个元素,订阅并触发定时器并且在4秒后完成
// 此时如果我们在4秒内再次点击,高阶Observable中将会存在两个元素
// 但result元素仍然在等待第一个内部Observable完成,将会忽略第二个内部Observable,并不会订阅
// 4秒后正在被监听的内部元素完成,此时如果再次点击,高阶Observable产生的新的内部Observable才会被result处理
result.subscribe((x) => console.log(x));
// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
// Time 0s 1s 2s 3s 4s 5s 6s 7s 8s 9s 10s 11s 12s
// Event 1C 2C 3C
// Log 0 1 2 3 0 1 2 3
mergeAll
将一个高阶Observable单一的Observable,并依次发送。
注意:链接过程中如果前一个内部Observable未完成,后面的内部Observable同样会被订阅。 如果同时订阅的内部Observable大于max设定值,则新加入的Observable并不会被订阅。 直到正在被订阅的内部Observable完成,才会递补内部Observable。 任何一个内部Observable发出新广播,都将立即插入到输出的的单一Observable。
import { fromEvent, map, interval, take, mergeAll } from "rxjs";
const clicks = fromEvent(document, "click");
const higherOrder = clicks.pipe(map(() => interval(1000).pipe(take(4))));
const firstOrder = higherOrder.pipe(mergeAll(2)); // 最多融合两个子Observable
// 通过mergeAll打平后,一旦我们触发点击事件,高阶Observable中将会存在一个元素,订阅并触发定时器并且在4秒后完成
// 此时如果我们在4秒内再次点击,高阶Observable中将会存在两个元素
// 若同时存在的未完成内部Observable不大于设定max值,对新的内部元素立即进行订阅
// 如大于max,对新元素不进行订阅
// 一旦正在进行订阅的内部Observable完成,递补后进入的未被订阅的内部Observable
// 所有正在被订阅的内部元素发出的广播将立即触发打平后Observable的广播事件
firstOrder.subscribe((x) => console.log(x));
// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
// Time 0s 1s 2s 3s 4s 5s 6s 7s
// Event 1C 2C 3C
// Log 0 1 0,2 1,3 2,0 3,1 2 3
switchAll
将一个高阶Observable转为单一的Observable,并依次发送。
注意:每当下一个内部Observable到来,将会取消之前的订阅,并立即订阅新的Observable。
import { fromEvent, tap, map, interval, switchAll } from "rxjs";
const clicks = fromEvent(document, "click").pipe(tap(() => console.log("click")));
const source = clicks.pipe(map(() => interval(1000)));
source.pipe(switchAll()).subscribe((x) => console.log(x));
// Output
// click
// 0
// 1
// 2
// 3
// ...
// click
// 0
// 1
// 2
// ...
// click
// ...
zipAll
将一个高阶Observable转为单一的Observable,一旦外部Observable完成,便根据内部Observable的元素顺序合并输出子元素数组。
注意:内部子元素的订阅开始于外部依赖完成之后,此时同时订阅内部Observable。
有且只有在index索引上所有内部元素都存在值的情况下,才会输出该索引上的元素数组。如某个内部元素在该元素上缺失,则会等待。
如某个内部Observable已经完成,且在其最新的索引上所有内部Observable有值,即该completed的Observable所有元素已经输出。取消对其余Observable的订阅。将合并Observable置为完成。
如某个内部Observable已完成,但其他内部Observable在该索引上缺少值,则等待。
combineLatestAll
将一个高阶Observable转为单一的Observable,一旦外部Observable完成,便根据内部Observable的最新值合并输出子元素数组。
注意:内部子元素的订阅开始于外部依赖完成之后,此时同时订阅内部Observable。
有且只有在所有内部元素都存在值的情况下,才会输出元素数组。如某个内部元素不存在任何有效元素,则会等待。
任意内部Observable发出事件,都会重新触发合并Observable发出事件。