RxJS 各种xxxMap方法
mergeMap, concatMap, exhaustMap, switchMap都会对源Observable进行映射处理,连接合并为一个单一Observable。
mergeMap
将一个源Observable通过内部Observable处理,映射为新的Observable,并依次发送。
注意:序列过程中会对源Observable与内部Observable生成高阶Observable,并对高阶Observable的指定数量值(默认是无限大)同时进行订阅。并将高阶Observable的值依次发送。先到先发送。
其原理对源Observable的每一个值生成一个Observable,然后采用
mergeAll
对高阶Observable进行连接合并为一个单一Observable。
import { fromEvent, mergeMap, interval, take } from "rxjs";
// 源Observable
const clicks = fromEvent(document, "click");
// 基于源Observable生成的新Observable, 最多同时处理源Observable的两个值
const result = clicks.pipe(mergeMap((ev) => interval(1000).pipe(take(4)), undefined, 2));
result.subscribe((x) => console.log(x));
// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
// Time 0s 1s 2s 3s 4s 5s 6s 7s
// Event 1C 2C 3C
// Log 0 1 2,0 3,1 2,0 3,1 2 3
concatMap
将一个源Observable通过内部Observable处理,映射为新的Observable,并依次发送。
注意:序列连接过程中只有对源Observable中的当前处理元素处理完毕,也就是内部Observable处于完成状态,才会对源Observable的下一个元素进行处理。
其原理对源Observable的每一个值生成一个Observable,然后采用
concatAll
对高阶Observable进行连接合并为一个单一Observable。
import { fromEvent, concatMap, interval, take } from "rxjs";
// 源Observable
const clicks = fromEvent(document, "click");
// 基于源Observable生成的新Observable
const result = clicks.pipe(concatMap((ev) => interval(1000).pipe(take(4))));
result.subscribe((x) => console.log(x));
// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
// Time 0s 1s 2s 3s 4s 5s 6s 7s 8s 9s 10s 11s 12s
// Event 1C 2C 3C
// Log 0 1 2 3 0 1 2 3 0 1 2 3
exhaustMap
将一个源Observable通过内部Observable处理,映射为新的Observable,并依次发送。
注意:链接过程中如果源Observable前一个值未处理完成,后面的值将会被忽略。
直到当前正在执行的源Observable的值处理完成,才会接收下一个新产生的值。
import { fromEvent, exhaustMap, interval, take } from "rxjs";
// 源Observable
const clicks = fromEvent(document, "click");
// 基于源Observable生成的新Observable
const result = clicks.pipe(exhaustMap(() => interval(1000).pipe(take(4))));
result.subscribe((x) => console.log(x));
// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
// Time 0s 1s 2s 3s 4s 5s 6s 7s 8s 9s 10s 11s 12s
// Event 1C 2C 3C
// Log 0 1 2 3 0 1 2 3
switchMap
将一个高阶Observable转为单一的Observable,并依次发送。
注意:每当下一个源Observable值到来,将会中止当前正在处理的值,并立即对新的值进行处理。
import { fromEvent, switchMap, interval, take } from "rxjs";
// 源Observable
const clicks = fromEvent(document, "click");
// 基于源Observable生成的新Observable
const result = clicks.pipe(switchMap(() => interval(1000).pipe(take(4))));
result.subscribe((x) => console.log(x));
// Results in the following:
// (results are not concurrent)
// For every click on the "document" it will emit values 0 to 3 spaced
// on a 1000ms interval
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
// Time 0s 1s 2s 3s 4s 5s 6s 7s 8s 9s 10s 11s 12s
// Event 1C 2C 3C
// Log 0 1 0 1 2 3 0 1 2 3