RxJS 各种xxxAll方法


concatAll, exhaustAll, mergeAll, switchAll, zipAll, combineLatestAll都会对高阶Observable进行连接合并为一个单一Observable。

concatAll

将一个高阶Observable序列连接成一个单一的Observable,并依次发送。

注意:序列连接过程中只有前一个Observable完成之后,后一个Observable才会被添加到序列中。

concatAll RxJS官方文档 | concatAll

import { fromEvent, map, interval, take, concatAll } from "rxjs";

// 外部Observable,每次点击都会触发一次next以发布一个事件
const clicks = fromEvent(document, "click");

// 基于事件Observable衍生的高阶Observable
const higherOrder = clicks.pipe(map(() => interval(1000).pipe(take(4))));

// 采用concatAll打平高阶Observable
const firstOrder = higherOrder.pipe(concatAll());

// 通过exhuastAll打平后,一旦我们触发点击事件,高阶Observable中将会存在一个元素,订阅并触发定时器并且在4秒后完成
// 此时如果我们在4秒内再次点击,高阶Observable中将会存在两个元素
// 但result元素仍然在等待第一个内部Observable完成,将会在第一个内部Observable完成后执行订阅下一个内部Observable(触发定时器)
// 以此类推,每一次点击都会让高阶Observable新增元素,但result元素只有等上一个完成,才会订阅下一个内部Observable
firstOrder.subscribe((x) => console.log(x));

// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
// Time     0s  1s  2s  3s  4s  5s  6s  7s  8s  9s  10s 11s 12s
// Event    1C      2C                          3C
// Log      0   1   2   3   0   1   2   3       0   1   2   3

exhaustAll

将一个高阶Observable单一的Observable,并依次发送。

注意:链接过程中如果前一个内部Observable未完成,后面的Observable将会被忽略。 直到当前正在执行的内部Observable完成,才会接收下一个新产生的内部Observable。

exhaustAll RxJS官方文档 | exhaustAll

import { fromEvent, map, interval, take, exhaustAll } from "rxjs";

// 外部Observable,每次点击都会触发一次next以发布一个事件
const clicks = fromEvent(document, "click");

// 基于事件Observable衍生的高阶Observable
const higherOrder = clicks.pipe(
    map(() => interval(1000).pipe(take(4))), // 返回一个每秒触发一次,触发4次结束的内部Observable
);

// 采用exhaustAll打平高阶Observable
const result = higherOrder.pipe(exhaustAll());

// 通过exhuastAll打平后,一旦我们触发点击事件,高阶Observable中将会存在一个元素,订阅并触发定时器并且在4秒后完成
// 此时如果我们在4秒内再次点击,高阶Observable中将会存在两个元素
// 但result元素仍然在等待第一个内部Observable完成,将会忽略第二个内部Observable,并不会订阅
// 4秒后正在被监听的内部元素完成,此时如果再次点击,高阶Observable产生的新的内部Observable才会被result处理
result.subscribe((x) => console.log(x));

// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
// Time     0s  1s  2s  3s  4s  5s  6s  7s  8s  9s  10s 11s 12s
// Event    1C      2C                          3C
// Log      0   1   2   3                       0   1   2   3

mergeAll

将一个高阶Observable单一的Observable,并依次发送。

注意:链接过程中如果前一个内部Observable未完成,后面的内部Observable同样会被订阅。 如果同时订阅的内部Observable大于max设定值,则新加入的Observable并不会被订阅。 直到正在被订阅的内部Observable完成,才会递补内部Observable。 任何一个内部Observable发出新广播,都将立即插入到输出的的单一Observable。

mergeAll RxJS官方文档 | mergeAll

import { fromEvent, map, interval, take, mergeAll } from "rxjs";

const clicks = fromEvent(document, "click");
const higherOrder = clicks.pipe(map(() => interval(1000).pipe(take(4))));

const firstOrder = higherOrder.pipe(mergeAll(2)); // 最多融合两个子Observable

// 通过mergeAll打平后,一旦我们触发点击事件,高阶Observable中将会存在一个元素,订阅并触发定时器并且在4秒后完成
// 此时如果我们在4秒内再次点击,高阶Observable中将会存在两个元素
// 若同时存在的未完成内部Observable不大于设定max值,对新的内部元素立即进行订阅
// 如大于max,对新元素不进行订阅
// 一旦正在进行订阅的内部Observable完成,递补后进入的未被订阅的内部Observable
// 所有正在被订阅的内部元素发出的广播将立即触发打平后Observable的广播事件
firstOrder.subscribe((x) => console.log(x));

// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
// Time     0s  1s  2s  3s  4s  5s  6s  7s
// Event    1C      2C  3C
// Log      0   1   0,2 1,3 2,0 3,1 2   3

switchAll

将一个高阶Observable转为单一的Observable,并依次发送。

注意:每当下一个内部Observable到来,将会取消之前的订阅,并立即订阅新的Observable。

switchAll RxJS官方文档 | switchAll

import { fromEvent, tap, map, interval, switchAll } from "rxjs";

const clicks = fromEvent(document, "click").pipe(tap(() => console.log("click")));
const source = clicks.pipe(map(() => interval(1000)));

source.pipe(switchAll()).subscribe((x) => console.log(x));

// Output
// click
// 0
// 1
// 2
// 3
// ...
// click
// 0
// 1
// 2
// ...
// click
// ...

zipAll

将一个高阶Observable转为单一的Observable,一旦外部Observable完成,便根据内部Observable的元素顺序合并输出子元素数组。

注意:内部子元素的订阅开始于外部依赖完成之后,此时同时订阅内部Observable。

有且只有在index索引上所有内部元素都存在值的情况下,才会输出该索引上的元素数组。如某个内部元素在该元素上缺失,则会等待。

如某个内部Observable已经完成,且在其最新的索引上所有内部Observable有值,即该completed的Observable所有元素已经输出。取消对其余Observable的订阅。将合并Observable置为完成。

如某个内部Observable已完成,但其他内部Observable在该索引上缺少值,则等待。

zipAll RxJS官方文档 | zipAll

combineLatestAll

将一个高阶Observable转为单一的Observable,一旦外部Observable完成,便根据内部Observable的最新值合并输出子元素数组。

注意:内部子元素的订阅开始于外部依赖完成之后,此时同时订阅内部Observable。

有且只有在所有内部元素都存在值的情况下,才会输出元素数组。如某个内部元素不存在任何有效元素,则会等待。

任意内部Observable发出事件,都会重新触发合并Observable发出事件。

RxJS官方文档 | combineLatestAll

© 2024 Hogan Hu