Понимание операторов RxJS map, mergeMap, switchMap и concatMap

Aug 7, 2019

map

Оператор map является наиболее распространенным из всех. Для каждого значения, которое эмитит Observable, вы можете применить функцию, в которой вы можете изменять данные. Возвращаемое значение будет, под капотом, заново эмитится как Observable, чтобы вы могли продолжать использовать его в своем потоке. Он работает почти также, как если бы вы использовали его с массивами. Разница в том, что массивы всегда будут просто массивами, и при отображении вы получите значение текущего индекса в массиве. С Observable тип данных может быть любого типа. Это означает, что вам может потребоваться выполнить некоторые дополнительные операции в дополнение к вашей функции Observable map, чтобы получить желаемый результат. Давайте посмотрим на некоторые примеры:

import { of } from 'rxjs'; 
import { map } from 'rxjs/operators';

// наши данные:
const data = of([
  {
    brand: 'porsche',
    model: '911'
  },
  {
    brand: 'porsche',
    model: 'macan'
  },
  {
    brand: 'ferarri',
    model: '458'
  },
  {
    brand: 'lamborghini',
    model: 'urus'
  }
]);

// получим данные как строку вида brand+model. Итог:
// ["porsche 911", "porsche macan", "ferarri 458", "lamborghini urus"]
data
  .pipe(
    map(cars => cars.map(car => `${car.brand} ${car.model}`))
).subscribe(cars => console.log(cars))

// отфильтруем данные, чтобы мы могли получить только porsche's. Итог:
// [
//   {
//     brand: 'porsche',
//     model: '911'
//   },
//   {
//     brand: 'porsche',
//     model: 'macan'
//   }
// ]
data
  .pipe(
    map(cars => cars.filter(car => car.brand === 'porsche'))
).subscribe(cars => console.log(cars))

Сначала мы создали Observable из массива автомобилей. Затем мы подписываемся на этот Observable 2 раза. В первый раз мы модифицируем наши данные таким образом, чтобы получить массив строк вида - 'бренд + модель'. Во второй раз мы модифицируем наши данные так, чтобы получить массив состоящий из автомобилей Porsche. В обоих примерах для изменения данных мы используем оператор map. Мы возвращаем измененные данные, а затем оператор map под капотом позаботится о том, чтобы снова обернуть их в Observable, чтобы впоследствии мы могли подписаться на него.

MergeMap

Скажем, у нас есть Observable, который эмитит массив, и для каждого элемента массива нам требуется получать данные с сервера.

Мы могли бы сделать это, подписавшись на массив, затем настроить map, который вызывает функцию, которая, в свою очередь, обрабатывает вызов API, а затем подписаться на результат. Это может выглядеть следующим образом:

import { of, from } from 'rxjs';
import { map, delay } from 'rxjs/operators';

const getData = (param) => {
  return of(`retrieved new data with param ${param}`).pipe(
    delay(1000)
  )
}

from([1,2,3,4]).pipe(
  map(param => getData(param))
).subscribe(val => console.log(val);


// Observable
// Observable
// Observable
// Observable

Наша функция map возвращает значение функции getData. В данном случае это Observable. Это, однако, создает проблему, потому что теперь мы имеем дело с дополнительным Observable.

Проясним следущие шаги: мы имеем from([1,2,3,4]) в качестве нашего «внешнего» Observable. А результатом getData() является «внутренний» Observable. Теоретически мы должны подписаться как на внешний, так и на внутренний Observable, чтобы получить данные. Это может быть так:

import { of, from } from 'rxjs';
import { map, delay } from 'rxjs/operators';

const getData = (param) => {
  return of(`retrieved new data with param ${param}`).pipe(
    delay(1000)
  )
}

from([1,2,3,4]).pipe(
  map(param => getData(param))
).subscribe(val => val.subscribe(data => console.log(data)));

// retrieved new data with param 1
// retrieved new data with param 2
// retrieved new data with param 3
// retrieved new data with param 4

Как вы догадываетесь, это далеко от идеала, так как нам приходится вызывать subscribe два раза. Здесь нам на помощь приходит mergeMap. mergeMap по сути является комбинацией mergeAll и map. mergeAll заботится о подписке на «внутренний» Observable, чтобы нам больше не требовалось делать subscribe два раза, так как mergeAll объединяет значение «внутреннего» Observable с «внешним» Observable, Это может выглядеть так:

import { of, from } from 'rxjs';
import { map, delay, mergeAll } from 'rxjs/operators';

const getData = (param) => {
  return of(`retrieved new data with param ${param}`).pipe(
    delay(1000)
  )
}

from([1,2,3,4]).pipe(
  map(param => getData(param)),
  mergeAll()
).subscribe(val => console.log(val));

// retrieved new data with param 1
// retrieved new data with param 2
// retrieved new data with param 3
// retrieved new data with param 4

Это уже намного лучше, но, как вы уже могли догадаться, mergeMap будет лучшим решением для этого. Вот полный пример:

import { of, from } from 'rxjs';
import { map, mergeMap, delay, mergeAll } from 'rxjs/operators';

const getData = (param) => {
  return of(`retrieved new data with param ${param}`).pipe(
    delay(1000)
  )
}

// используем  map
from([1,2,3,4]).pipe(
  map(param => getData(param))
).subscribe(val => val.subscribe(data => console.log(data)));

// используем map и mergeAll
from([1,2,3,4]).pipe(
  map(param => getData(param)),
  mergeAll()
).subscribe(val => console.log(val));

// используем mergeMap
from([1,2,3,4]).pipe(
  mergeMap(param => getData(param))
).subscribe(val => console.log(val));

Возможно, вы также слышали о flatMap. flatMap является псевдонимом mergeMap и ведет себя точно также. Не запутайтесь.

stackblitz

SwitchMap

switchMap имеет похожее поведение в том, что он также подписывается на внутренний Observable для нас. Однако switchMap представляет собой комбинацию switchAll и map. switchAll отменяет предыдущую подписку и подписывается на новую. В нашем сценарии, где мы хотим выполнить вызов API для каждого элемента в массиве «внешнего» Observable, switchMap не сработает должным образом, поскольку он отменяет первые 3 подписки и обрабатывает только последнюю. Это означает, что мы получим только один результат. Полный пример можно увидеть здесь:

import { of, from } from 'rxjs';
import { map, delay, switchAll, switchMap } from 'rxjs/operators';

const getData = (param) => {
  return of(`retrieved new data with param ${param}`).pipe(
    delay(1000)
  )
}

// using a regular map
from([1,2,3,4]).pipe(
  map(param => getData(param))
).subscribe(val => val.subscribe(data => console.log(data)));

// retrieved new data with param 1
// retrieved new data with param 2
// retrieved new data with param 3
// retrieved new data with param 4

// using map and switchAll
from([1,2,3,4]).pipe(
  map(param => getData(param)),
  switchAll()
).subscribe(val => console.log(val));

// retrieved new data with param 4

// using switchMap
from([1,2,3,4]).pipe(
  switchMap(param => getData(param))
).subscribe(val => console.log(val));

// retrieved new data with param 4

Хотя switchMap не будет работать для нашего текущего сценария, он будет работать для других сценариев. Например, switchMap может пригодиться, если вы скомпануете список фильтров с потоком данных и выполняете вызов к API при изменении фильтра. Если предыдущие изменения фильтра все еще обрабатываются, в то время как новое изменение уже сделано (фильтр обновлен), switchMap отменит предыдущую подписку и начнет новую подписку на последнем изменении. Пример можно увидеть здесь:

import { of, from, BehaviorSubject } from 'rxjs';
import { map, delay, switchAll, switchMap } from 'rxjs/operators';

const filters = ['brand=porsche', 'model=911', 'horsepower=389', 'color=red']
const activeFilters = new BehaviorSubject('');

const getData = (params) => {
  return of(`retrieved new data with params ${params}`).pipe(
    delay(1000)
  )
}

const applyFilters = () => {
  filters.forEach((filter, index) => {

    let newFilters = activeFilters.value;
    if (index === 0) {
      newFilters = `?${filter}`
    } else {
      newFilters = `${newFilters}&${filter}`
    }

    activeFilters.next(newFilters)
  })
}

// using switchMap
activeFilters.pipe(
  switchMap(param => getData(param))
).subscribe(val => console.log(val));

applyFilters()

// retrieved new data with params ?brand=porsche&model=911&horsepower=389&color=red

Как вы можете видеть в консоли, getData логируется только один раз со всеми параметрами ( retrieved new data with params ?brand=porsche&model=911&horsepower=389&color=red). Этим мы спасли три вызова к API.

ConcatMap

Последний пример - concatMap. Как и следовало ожидать, concatMap также подписывается на внутренний Observable. Но в отличие от switchMap, который отписывается от текущего Observable, если появляется новый Observable, concatMap не будет подписываться на следующий Observable, пока не завершится текущий. Преимущество этого подхда в том, что порядок, в котором эмитятся Observable, поддерживается. Чтобы продемонстрировать это:

import { of, from } from 'rxjs';
import { map, delay, mergeMap, concatMap } from 'rxjs/operators';

const getData = (param) => {
  const delayTime = Math.floor(Math.random() * 10000) + 1;
  return of(`retrieved new data with params: ${param} and delay: ${delayTime}`).pipe(
    delay(delayTime)
  )
}

// using a regular map
from([1,2,3,4]).pipe(
  map(param => getData(param))
).subscribe(val => val.subscribe(data => console.log('map:', data)));

// using mergeMap
from([1, 2, 3 ,4]).pipe(
  mergeMap(param => getData(param))
).subscribe(val => console.log('mergeMap:', val));

// using concatMap
from([1, 2, 3 ,4]).pipe(
  concatMap(param => getData(param))
).subscribe(val => console.log('concatMap:', val));


// map: retrieved new data with params: 2 and delay: 512
// mergeMap: retrieved new data with params: 2 and delay: 2477
// map: retrieved new data with params: 4 and delay: 3051
// mergeMap: retrieved new data with params: 4 and delay: 5182
// map: retrieved new data with params: 3 and delay: 6419
// mergeMap: retrieved new data with params: 3 and delay: 6674
// map: retrieved new data with params: 1 and delay: 7204
// mergeMap: retrieved new data with params: 1 and delay: 9203
// concatMap: retrieved new data with params: 1 and delay: 9380
// concatMap: retrieved new data with params: 2 and delay: 2464
// concatMap: retrieved new data with params: 3 and delay: 8916
// concatMap: retrieved new data with params: 4 and delay: 876

Источник

Добавить комментарий
Комментарии:
Диляра
Oct 1, 2019
Спасибо большое!! очень понятно написано !
Irina
Aug 2, 2019
Очень круто объяснено! Спасибо!