Понимание операторов RxJS map, mergeMap, switchMap и concatMap

Aug 7, 2019

map

Оператор map является наиболее распространенным из всех. Для каждого значения, которое эмитит Observable, вы можете применить функцию, в которой вы можете изменять данные. Возвращаемое значение будет, под капотом, заново эмитится как Observable, чтобы вы могли продолжать использовать его в своем потоке. Он работает почти также, как если бы вы использовали его с массивами. Разница в том, что массивы всегда будут просто массивами, и при отображении вы получите значение текущего индекса в массиве. С Observable тип данных может быть любого типа. Это означает, что вам может потребоваться выполнить некоторые дополнительные операции в дополнение к вашей функции Observable map, чтобы получить желаемый результат. Давайте посмотрим на некоторые примеры:

import { of } from 'rxjs'; 
import { map } from 'rxjs/operators';

// наши данные:
const data = of([
  {
    brand: 'porsche',
    model: '911'
  },
  {
    brand: 'porsche',
    model: 'macan'
  },
  {
    brand: 'ferarri',
    model: '458'
  },
  {
    brand: 'lamborghini',
    model: 'urus'
  }
]);

// получим данные как строку вида brand+model. Итог:
// ["porsche 911", "porsche macan", "ferarri 458", "lamborghini urus"]
data
  .pipe(
    map(cars => cars.map(car => `${car.brand} ${car.model}`))
).subscribe(cars => console.log(cars))

// отфильтруем данные, чтобы мы могли получить только porsche's. Итог:
// [
//   {
//     brand: 'porsche',
//     model: '911'
//   },
//   {
//     brand: 'porsche',
//     model: 'macan'
//   }
// ]
data
  .pipe(
    map(cars => cars.filter(car => car.brand === 'porsche'))
).subscribe(cars => console.log(cars))

Сначала мы создали Observable из массива автомобилей. Затем мы подписываемся на этот Observable 2 раза. В первый раз мы модифицируем наши данные таким образом, чтобы получить массив строк вида - 'бренд + модель'. Во второй раз мы модифицируем наши данные так, чтобы получить массив состоящий из автомобилей Porsche. В обоих примерах для изменения данных мы используем оператор map. Мы возвращаем измененные данные, а затем оператор map под капотом позаботится о том, чтобы снова обернуть их в Observable, чтобы впоследствии мы могли подписаться на него.

MergeMap

Скажем, у нас есть Observable, который эмитит массив, и для каждого элемента массива нам требуется получать данные с сервера.

Мы могли бы сделать это, подписавшись на массив, затем настроить map, который вызывает функцию, которая, в свою очередь, обрабатывает вызов API, а затем подписаться на результат. Это может выглядеть следующим образом:

import { of, from } from 'rxjs';
import { map, delay } from 'rxjs/operators';

const getData = (param) => {
  return of(`retrieved new data with param ${param}`).pipe(
    delay(1000)
  )
}

from([1,2,3,4]).pipe(
  map(param => getData(param))
).subscribe(val => console.log(val);


// Observable
// Observable
// Observable
// Observable

Наша функция map возвращает значение функции getData. В данном случае это Observable. Это, однако, создает проблему, потому что теперь мы имеем дело с дополнительным Observable.

Проясним следущие шаги: мы имеем from([1,2,3,4]) в качестве нашего «внешнего» Observable. А результатом getData() является «внутренний» Observable. Теоретически мы должны подписаться как на внешний, так и на внутренний Observable, чтобы получить данные. Это может быть так:

import { of, from } from 'rxjs';
import { map, delay } from 'rxjs/operators';

const getData = (param) => {
  return of(`retrieved new data with param ${param}`).pipe(
    delay(1000)
  )
}

from([1,2,3,4]).pipe(
  map(param => getData(param))
).subscribe(val => val.subscribe(data => console.log(data)));

// retrieved new data with param 1
// retrieved new data with param 2
// retrieved new data with param 3
// retrieved new data with param 4

Как вы догадываетесь, это далеко от идеала, так как нам приходится вызывать subscribe два раза. Здесь нам на помощь приходит mergeMap. mergeMap по сути является комбинацией mergeAll и map. mergeAll заботится о подписке на «внутренний» Observable, чтобы нам больше не требовалось делать subscribe два раза, так как mergeAll объединяет значение «внутреннего» Observable с «внешним» Observable, Это может выглядеть так:

import { of, from } from 'rxjs';
import { map, delay, mergeAll } from 'rxjs/operators';

const getData = (param) => {
  return of(`retrieved new data with param ${param}`).pipe(
    delay(1000)
  )
}

from([1,2,3,4]).pipe(
  map(param => getData(param)),
  mergeAll()
).subscribe(val => console.log(val));

// retrieved new data with param 1
// retrieved new data with param 2
// retrieved new data with param 3
// retrieved new data with param 4

Это уже намного лучше, но, как вы уже могли догадаться, mergeMap будет лучшим решением для этого. Вот полный пример:

import { of, from } from 'rxjs';
import { map, mergeMap, delay, mergeAll } from 'rxjs/operators';

const getData = (param) => {
  return of(`retrieved new data with param ${param}`).pipe(
    delay(1000)
  )
}

// используем  map
from([1,2,3,4]).pipe(
  map(param => getData(param))
).subscribe(val => val.subscribe(data => console.log(data)));

// используем map и mergeAll
from([1,2,3,4]).pipe(
  map(param => getData(param)),
  mergeAll()
).subscribe(val => console.log(val));

// используем mergeMap
from([1,2,3,4]).pipe(
  mergeMap(param => getData(param))
).subscribe(val => console.log(val));

Возможно, вы также слышали о flatMap. flatMap является псевдонимом mergeMap и ведет себя точно также. Не запутайтесь.

stackblitz

SwitchMap

switchMap имеет похожее поведение в том, что он также подписывается на внутренний Observable для нас. Однако switchMap представляет собой комбинацию switchAll и map. switchAll отменяет предыдущую подписку и подписывается на новую. В нашем сценарии, где мы хотим выполнить вызов API для каждого элемента в массиве «внешнего» Observable, switchMap не сработает должным образом, поскольку он отменяет первые 3 подписки и обрабатывает только последнюю. Это означает, что мы получим только один результат. Полный пример можно увидеть здесь:

import { of, from } from 'rxjs';
import { map, delay, switchAll, switchMap } from 'rxjs/operators';

const getData = (param) => {
  return of(`retrieved new data with param ${param}`).pipe(
    delay(1000)
  )
}

// using a regular map
from([1,2,3,4]).pipe(
  map(param => getData(param))
).subscribe(val => val.subscribe(data => console.log(data)));

// retrieved new data with param 1
// retrieved new data with param 2
// retrieved new data with param 3
// retrieved new data with param 4

// using map and switchAll
from([1,2,3,4]).pipe(
  map(param => getData(param)),
  switchAll()
).subscribe(val => console.log(val));

// retrieved new data with param 4

// using switchMap
from([1,2,3,4]).pipe(
  switchMap(param => getData(param))
).subscribe(val => console.log(val));

// retrieved new data with param 4

Хотя switchMap не будет работать для нашего текущего сценария, он будет работать для других сценариев. Например, switchMap может пригодиться, если вы скомпануете список фильтров с потоком данных и выполняете вызов к API при изменении фильтра. Если предыдущие изменения фильтра все еще обрабатываются, в то время как новое изменение уже сделано (фильтр обновлен), switchMap отменит предыдущую подписку и начнет новую подписку на последнем изменении. Пример можно увидеть здесь:

import { of, from, BehaviorSubject } from 'rxjs';
import { map, delay, switchAll, switchMap } from 'rxjs/operators';

const filters = ['brand=porsche', 'model=911', 'horsepower=389', 'color=red']
const activeFilters = new BehaviorSubject('');

const getData = (params) => {
  return of(`retrieved new data with params ${params}`).pipe(
    delay(1000)
  )
}

const applyFilters = () => {
  filters.forEach((filter, index) => {

    let newFilters = activeFilters.value;
    if (index === 0) {
      newFilters = `?${filter}`
    } else {
      newFilters = `${newFilters}&${filter}`
    }

    activeFilters.next(newFilters)
  })
}

// using switchMap
activeFilters.pipe(
  switchMap(param => getData(param))
).subscribe(val => console.log(val));

applyFilters()

// retrieved new data with params ?brand=porsche&model=911&horsepower=389&color=red

Как вы можете видеть в консоли, getData логируется только один раз со всеми параметрами ( retrieved new data with params ?brand=porsche&model=911&horsepower=389&color=red). Этим мы спасли три вызова к API.

ConcatMap

Последний пример - concatMap. Как и следовало ожидать, concatMap также подписывается на внутренний Observable. Но в отличие от switchMap, который отписывается от текущего Observable, если появляется новый Observable, concatMap не будет подписываться на следующий Observable, пока не завершится текущий. Преимущество этого подхда в том, что порядок, в котором эмитятся Observable, поддерживается. Чтобы продемонстрировать это:

import { of, from } from 'rxjs';
import { map, delay, mergeMap, concatMap } from 'rxjs/operators';

const getData = (param) => {
  const delayTime = Math.floor(Math.random() * 10000) + 1;
  return of(`retrieved new data with params: ${param} and delay: ${delayTime}`).pipe(
    delay(delayTime)
  )
}

// using a regular map
from([1,2,3,4]).pipe(
  map(param => getData(param))
).subscribe(val => val.subscribe(data => console.log('map:', data)));

// using mergeMap
from([1, 2, 3 ,4]).pipe(
  mergeMap(param => getData(param))
).subscribe(val => console.log('mergeMap:', val));

// using concatMap
from([1, 2, 3 ,4]).pipe(
  concatMap(param => getData(param))
).subscribe(val => console.log('concatMap:', val));


// map: retrieved new data with params: 2 and delay: 512
// mergeMap: retrieved new data with params: 2 and delay: 2477
// map: retrieved new data with params: 4 and delay: 3051
// mergeMap: retrieved new data with params: 4 and delay: 5182
// map: retrieved new data with params: 3 and delay: 6419
// mergeMap: retrieved new data with params: 3 and delay: 6674
// map: retrieved new data with params: 1 and delay: 7204
// mergeMap: retrieved new data with params: 1 and delay: 9203
// concatMap: retrieved new data with params: 1 and delay: 9380
// concatMap: retrieved new data with params: 2 and delay: 2464
// concatMap: retrieved new data with params: 3 and delay: 8916
// concatMap: retrieved new data with params: 4 and delay: 876

Источник

Добавить комментарий
Комментарии:
Stranger123
May 11, 2020
Пиzдатая инфа!
Олег
Mar 27, 2020
Очень круто расписал, респект!
Олег
Mar 27, 2020
Очень круто расписал, респект!
Олег
Mar 27, 2020
Очень круто расписал, респект!
Дима
Dec 2, 2019
Спасибо!
Ivan
Nov 17, 2019
switchMap >> Этим мы спасли три вызова к API. будет же все равно 3 запроса. в моем примере будет два: jsonplaceholder.typicode.com/photos?albumId=1 jsonplaceholder.typicode.com/photos?albumId=1&id=1 Пример: import { of, from, BehaviorSubject } from 'rxjs'; import { map, delay, switchAll, switchMap } from 'rxjs/operators'; const filters = [{albumId: 1}, {id: 1}]; const activeFilters = new BehaviorSubject({}); const getUrl = (params) => { const url = new URL('https://jsonplaceholder.typicode.com/photos'); Object.keys(params || {}).forEach((paramKey) => { url.searchParams.append(paramKey, params[paramKey]); }); return url.href; } const getData = (params) => { const apiCall = fetch(getUrl(params)) .then(response => response.json()) .then(responseJson => { return responseJson; }); return from(apiCall); } const applyFilters = () => { // just for quick example filters.forEach((item, index) => { let newFilters = {}; if (index === 0) { newFilters = { ...filters[0], }; } else if (index === 1) { newFilters = { ...filters[0], ...filters[1], }; } activeFilters.next(newFilters); }); } // using switchMap activeFilters.pipe( switchMap(param => getData(param)), ).subscribe(val => console.log(val)); applyFilters() // retrieved new data with params ?brand=porsche&model=911&horsepower=389&color=red
Диляра
Oct 1, 2019
Спасибо большое!! очень понятно написано !
Irina
Aug 2, 2019
Очень круто объяснено! Спасибо!