RxJS 各种xxxMap方法


mergeMap, concatMap, exhaustMap, switchMap都会对源Observable进行映射处理,连接合并为一个单一Observable。

mergeMap

将一个源Observable通过内部Observable处理,映射为新的Observable,并依次发送。

注意:序列过程中会对源Observable与内部Observable生成高阶Observable,并对高阶Observable的指定数量值(默认是无限大)同时进行订阅。并将高阶Observable的值依次发送。先到先发送。

其原理对源Observable的每一个值生成一个Observable,然后采用mergeAll对高阶Observable进行连接合并为一个单一Observable。

mergeMap RxJS官方文档 | mergeMap

import { fromEvent, mergeMap, interval, take } from "rxjs";

// 源Observable
const clicks = fromEvent(document, "click");

// 基于源Observable生成的新Observable, 最多同时处理源Observable的两个值
const result = clicks.pipe(mergeMap((ev) => interval(1000).pipe(take(4)), undefined, 2));
result.subscribe((x) => console.log(x));

// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
// Time     0s  1s  2s  3s  4s  5s  6s  7s
// Event    1C      2C  3C
// Log      0   1   2,0 3,1 2,0 3,1 2   3

concatMap

将一个源Observable通过内部Observable处理,映射为新的Observable,并依次发送。

注意:序列连接过程中只有对源Observable中的当前处理元素处理完毕,也就是内部Observable处于完成状态,才会对源Observable的下一个元素进行处理。

其原理对源Observable的每一个值生成一个Observable,然后采用concatAll对高阶Observable进行连接合并为一个单一Observable。

concatMap RxJS官方文档 | concatMap

import { fromEvent, concatMap, interval, take } from "rxjs";

// 源Observable
const clicks = fromEvent(document, "click");

// 基于源Observable生成的新Observable
const result = clicks.pipe(concatMap((ev) => interval(1000).pipe(take(4))));
result.subscribe((x) => console.log(x));

// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
// Time     0s  1s  2s  3s  4s  5s  6s  7s  8s  9s  10s 11s 12s
// Event    1C      2C                          3C
// Log      0   1   2   3   0   1   2   3       0   1   2   3

exhaustMap

将一个源Observable通过内部Observable处理,映射为新的Observable,并依次发送。

注意:链接过程中如果源Observable前一个值未处理完成,后面的值将会被忽略。

直到当前正在执行的源Observable的值处理完成,才会接收下一个新产生的值。

exhaustMap RxJS官方文档 | exhaustMap

import { fromEvent, exhaustMap, interval, take } from "rxjs";

// 源Observable
const clicks = fromEvent(document, "click");

// 基于源Observable生成的新Observable
const result = clicks.pipe(exhaustMap(() => interval(1000).pipe(take(4))));
result.subscribe((x) => console.log(x));

// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
// Time     0s  1s  2s  3s  4s  5s  6s  7s  8s  9s  10s 11s 12s
// Event    1C      2C                          3C
// Log      0   1   2   3                       0   1   2   3

switchMap

将一个高阶Observable转为单一的Observable,并依次发送。

注意:每当下一个源Observable值到来,将会中止当前正在处理的值,并立即对新的值进行处理。

switchMap RxJS官方文档 | switchMap

import { fromEvent, switchMap, interval, take } from "rxjs";

// 源Observable
const clicks = fromEvent(document, "click");

// 基于源Observable生成的新Observable
const result = clicks.pipe(switchMap(() => interval(1000).pipe(take(4))));
result.subscribe((x) => console.log(x));

// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
// Time     0s  1s  2s  3s  4s  5s  6s  7s  8s  9s  10s 11s 12s
// Event    1C      2C                          3C
// Log      0   1   0   1   2   3               0   1   2   3
© 2024 Hogan Hu