RxJS практика

Nov 16, 2018

Классы, функции для создания Observable. Операторы.

/* Классы/функции для создания Observable */

import {
    Observable, Observer, fromEvent, of, from,
    timer, interval,
    range, empty, throwError, combineLatest, zip,
    forkJoin
} from 'rxjs';


/* Операторы */

import { switchMap } from 'rxjs/operators';     // переключить наш поток
import { debounceTime } from 'rxjs/operators';  // ставим задержку
import { filter, first, debounce, distinctUntilChanged, throttle,
    throttleTime, audit, auditTime, concat, merge, startWith,
    withLatestFrom, pairwise, race, catchError, retry, retryWhen,
    delay, tap, timeout, finalize } from "rxjs/operators";
import { skip, skipLast, skipUntil, skipWhile } from "rxjs/operators";
import { take, takeLast, takeUntil, takeWhile } from "rxjs/operators";
// методы трансформации:
import { pluck, reduce, scan, map, mapTo } from "rxjs/operators";
// операторы, которые работают с Observable высшего порядка:
import { flatMap, exhaustMap, concatMap, mergeMap } from "rxjs/operators";

Пример использования Observable №1

По событию input в текстовом поле отправляем запрос на сервер, выводим ответ в консоли:

const myInput = document.querySelector('#myInput');

// делаем Observable на событие input
const obs = fromEvent(myInput, 'input');

obs.pipe(
    // ставим задержку (перед тем как делать запрос, чтобы не перегружать сервер)
    debounceTime(500),
    // переключаемся на запрос к серверу
    switchMap((event: KeyboardEvent) => {
        return fetch(`https://api.github.com/search/repositories?q=
                        ${(event.target as HTMLInputElement).value}`)
            .then(res => res.json());
    })
)
.subscribe(res => {
    console.log('res: ', res);
})

Пример использования Observable. Создаем Observable.

Метод create позволяет создать Observable.

Внутри метода create мы описываем функцию-конструктор - в нее передается Observer. У Observer есть 4 метода: complete, error, next.

Результат Observable нам покажет subscribe.

const o = Observable.create((observer: Observer<string>) => {
    observer.next('next');
    observer.next('next1');
    observer.next('next2');

    setInterval(() => {
        observer.next('next');
    }, 1000);

    setTimeout(() => {
        observer.complete();
    }, 10000)
});
o.subscribe({
    next: (value: string) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});

Вспомогательные методы для создания Observable

of

of - создаем Observable на основе простого значения.

const o = of(4); // что аналогично Promise.resolve(4)

// const o = of(4, 5, 6, 7); // каждый попадет в next

o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});

from

from - мы можем передавать массив значений

const o = of([4, 5, 6, 7]);

// также в from мы можем передавать другие сущности, например, Promise
// const o = from(Promise.resolve(10));

o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});

fromEvent

fromEvent - первым параметром указываем объект, на котором слушаем событие; 2-м само событие.

const o = fromEvent(document.body, 'mousemove');
o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});

timer

Создадим Observable на основе таймеров timer.

timer на learnrxjs.io
1-й параметр - старт,
2-й - периодичность.

const o = timer(0, 300);
o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});
http://jsfiddle.net/dnzl/4bt5zwgx/embed/js,html/dark/

interval

Создадим Observable на основе таймеров interval.

interval эмитит числа на основе временного интервала.
1-й параметр - временный интервал

const o = interval(600);
o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});
http://jsfiddle.net/dnzl/1eL3fprx/embed/js,html/dark/

range

range - позволяет задать последовательность в диапозоне.

range на learnrxjs.io.
1-й параметр - старт,
2-й - количество итераций.

const o = range(0, 10);
o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});
http://jsfiddle.net/dnzl/cgm5Lujk/embed/js,html/dark/

empty

empty сразу сделает complete

const o = empty(); // сразу сделает complete
o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});

throwError

throwError сразу сделает error: вызовет error на подписке.

throwError на learnrxjs.io.

rxjs.of("A", "B", "C", "D", "E").pipe(
  rxjs.operators.map(el => {
    if (el === "C") {
      throw new Error("Error occurred.");
    }
    return el;
  }),
  rxjs.operators.catchError(err => {
    console.error(err.message);
    console.log("Error is handled");
    return rxjs.throwError("Error thrown from catchError");
    //return rxjs.of("good");
  })
).subscribe(el => console.log(el),
            err => console.error(err),
            () => console.log("Processing Complete."));
    
/*
A 
B 
Error occurred.
Error is handled 
Error thrown from catchError
*/
const o = throwError('err'); // сразу сделает error
o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});
http://jsfiddle.net/dnzl/z79L2xcy/1/embed/js,html/dark/

Оператор filter и другие операторы для фильтрации

filter

const o = range(0, 100)
    .pipe(
        // примени filter и выведем значения > 40
        filter(value => value > 50)
    );

first и last

const o = range(0, 100)
    .pipe(
        // выведем первое значение (last - последнее)
        first(value => value > 50)
    );

debounce

debounce - отбрасывает испускаемые значения в заданном интервале, но по истечении заданного интервала будет эмитить значение.

debounce - возвращает Observable, поэтому необходимо прописать, например, timer (статич. метод по созданию Observable).

const o = range(0, 100)
    .pipe(
        debounce(value => timer(500 * value))
    );

Обратите внимание: когда debounce time станет выше, чем interval time, все значения будут отброшены.

http://jsfiddle.net/dnzl/ofz14b7L/5/embed/js,html/dark/

debounceTime

debounceTime - отбрасывает испускаемые значения, которые попадают в указанный промежуток времени между выходными данными - по прошествии временного интервала эмитит последнее значение.

debounceTime - ставим задержку, но в отличие от debounce не нужно прописывать статический метод для создания Observable.

const o = range(0, 100)
    .pipe(
        debounceTime(1000)
    );
http://jsfiddle.net/dnzl/vkp4937b/embed/js,html/dark/

distinctUntilChanged

distinctUntilChanged (на learnrxjs.io) - выдает только те значения, которые отличаются от предыдущих.

const o = from([1,1,1,3,4,5,5,5])
    .pipe(
        distinctUntilChanged()
    );
http://jsfiddle.net/dnzl/5opL8vca/embed/js,html/dark/

throttle

throttle (на learnrxjs.io) - эмитит первое значение и подавляет новые значения до тех пор пока не будет завершен durationSelector.

http://jsfiddle.net/dnzl/y95fu2od/embed/js,html/dark/

В стиле RxJS5:

// эмитим значение каждую секунду
const source = Rx.Observable.interval(1000);
// 'тормозим ' 2 секунды, эмитим последнее значение
const example = source.throttle(val => Rx.Observable.interval(2000));
// вывод: 0...3...6...9
const subscribe = example.subscribe(val => console.log(val));

throttleTime

throttleTime (на learnrxjs.io) - в отличие от debounceTime throttleTime берет первое значение, а остальные, например, в течение 1 сек игнорирует.

const o = timer(0, 200)
    .pipe(
        throttleTime(1000)
    );
http://jsfiddle.net/dnzl/Lehr7k6d/2/embed/js,html/dark/

skip

skip - задаем кол-во элементов, которые мы хотим пропустить.

const o = range(0, 100)
    .pipe(
        skip(10)
    );
http://jsfiddle.net/dnzl/o4v8qwcm/embed/js,html/dark/

skipLast

skipLast - задаем кол-во элементов, которые мы хотим пропустить с конца.

const o = range(0, 100)
    .pipe(
        skipLast(10)
    );

skipUntil

skipUntil (на learnrxjs.io) - получает Observable, пропускает все элементы, например, которые попадают в timer(1000).

const o = range(0, 100)
    .pipe(
        skipUntil(timer(1000))
    );
http://jsfiddle.net/dnzl/4zfeLm1y/embed/js,html/dark/

skipWhile

skipWhile (на learnrxjs.io) - отличие от skipUntil в том, что в skipUntil вы передаете Observable, а в skipWhile callback.

Пропустит все элементы удовлетворяющие условию:

const o = range(0, 100)
    .pipe(
        skipWhile(value => value < 33)
    );
http://jsfiddle.net/dnzl/qon9d2Lp/1/embed/js,html/dark/

take

take (на learnrxjs.io) - задаем кол-во элементов, которые мы хотим взять.
1-й параметр - число значений, которые необходимо взять со старта потока.

const o = range(0, 100)
    .pipe(
        take(10)
    );
http://jsfiddle.net/dnzl/Lsrnyptx/embed/js,html/dark/

takeLast

takeLast (на reactivex.io) - задаем кол-во элементов, которые мы хотим взять с конца.

takeUntil

takeUntil (на learnrxjs.io) - эмитит значения пока предоставленный Observable эмитит 'работает'.

Часто используется в Angular, когда требуется отписаться от Observable:

export class TakeUntilCardComponent implements OnInit, OnDestroy {
    message: string;
    private unsubscribe$ = new Subject();
    constructor(private upperCaseService: UpperCaseService) {}

    ngOnInit() {
        this.upperCaseService.getUpperCaseMessage().pipe(
            takeUntil(this.unsubscribe$)
        )
        .subscribe((message: string) => this.message = message);
    }

    ngOnDestroy(): void {
        this.unsubscribe$.next();
        this.unsubscribe$.complete();
    }
}

takeUntil - получает Observable, берем все элементы, которые, например, попадают в timer(1000).

const o = range(0, 100)
    .pipe(
        takeUntil(timer(1000))
    );
http://jsfiddle.net/dnzl/ehL3xak5/embed/js,html/dark/

takeWhile

takeWhile (на learnrxjs.io) - отличие от takeUntil в том, что в takeUntil вы передаете Observable, а в takeWhile callback. Возьмет все элементы удовлетворяющие условию (прекратит на false в условии):

//emit 1,2,3,4,5
const source = of(1, 2, 3, 4, 5);
//allow values until value from source is greater than 4, then complete
const example = source.pipe(takeWhile(val => val <= 4));
//output: 1,2,3,4
const subscribe = example.subscribe(val => console.log(val));

Общий subscribe

o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});

 

Комбинирование (combine) Observable

 

const timerOne = timer(1000, 4000).pipe(
    take(3)
);
const timerTwo = timer(2000, 4000).pipe(
    take(3)
);
const timerThree = timer(3000, 4000).pipe(
    take(3)
);

Мы хотим скомбинировать три приведенных выше Observable в один поток.

combineLatest

combineLatest (на learnrxjs.io) - когда любой Observable эмитит значение, эмитит последние значения от каждого (в виде массива).

const timerOne = timer(1000, 4000).pipe(
    take(3)
);
const timerTwo = timer(2000, 4000).pipe(
    take(3)
);
const timerThree = timer(3000, 4000).pipe(
    take(3)
);

const o = combineLatest(timerOne, timerTwo, timerThree);
// output
next:Array(3) [ 0, 0, 0 ]
next:Array(3) [ 1, 0, 0 ]
next:Array(3) [ 1, 1, 0 ]
next:Array(3) [ 1, 1, 1 ]
next:Array(3) [ 2, 1, 1 ]
next:Array(3) [ 2, 2, 1 ]
next:Array(3) [ 2, 2, 2 ]
complete
http://jsfiddle.net/dnzl/nj5340hm/embed/js,html/dark/

zip

zip (на learnrxjs.io) - ждет значение из каждого потока и формирует группу (массив) на основе этих значений; если значение не придет из какого-либо потока, то группа не будет сформирована.

const timerOne = timer(1000, 4000).pipe(
    take(3)
);
const timerTwo = timer(2000, 4000).pipe(
    take(3)
);
const timerThree = timer(3000, 4000).pipe(
    take(3)
);

const o = zip(timerOne, timerTwo, timerThree);
next:Array(3) [ 0, 0, 0 ]
next:Array(3) [ 1, 1, 1 ]
next:Array(3) [ 2, 2, 2 ]
complete
http://jsfiddle.net/dnzl/sL2raeh4/embed/js,html/dark/

forkJoin

forkJoin - только когда все последовательности входящие в forkJoin завершены (complete), тогда forkJoin 'выстрелит' последним значением.

Отличный вариант: когда вам требуется выполнить несколько http-запросов и вам нужно дождаться, пока ответят все http-запросы, чтобы, например, отрендерить view. forkjoin на medium

const timerOne = timer(1000, 4000).pipe(
    take(3)
);
const timerTwo = timer(2000, 4000).pipe(
    take(3)
);
const timerThree = timer(3000, 4000).pipe(
    take(3)
);

const o = forkJoin(timerOne, timerTwo, timerThree);
// output
next:Array(3) [ 2, 2, 2 ]
complete

Общий subscribe

o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});

 

Комбинирование (combine) Observable, операторы concat и merge

 

concat

concat (на reactivex.io, на jiodev.com ) Эмитит значения от двух или более Observables не чередуя их (лишь после того, как complete предыдущий Observable, стартует следующий Observable). Сначала будут выведены значения из 1-го Observable, затем из 2-го. В независимости от того, что 2-й генерировал значения намного быстрее.

// emits 1,2,3
const sourceOne = of(1, 2, 3);
// emits 4,5,6
const sourceTwo = of(4, 5, 6);
// эмитит значение из sourceOne, когда complete, подписывается на sourceTwo
const example = sourceOne.pipe(concat(sourceTwo));
// output: 1,2,3,4,5,6
const subscribe = example.subscribe(val =>
  console.log('Example: Basic concat:', val)
);

 

const o1 = timer(0, 1000).pipe(take(3)); // быстро
const o2 = timer(0, 100).pipe(take(3));  // медленно

const o = o1.pipe(concat(o2));

 

http://jsfiddle.net/dnzl/j7vskg3t/embed/js,html/dark/

merge

merge (на learnrxjs.io) - превращает несколько Observables в один Observable. Выдает значения по мере поступления в независимости от того в каком порядке они были.

const o1 = timer(0, 1000).pipe(take(3)); // быстро
const o2 = timer(0, 100).pipe(take(3));  // медленно

const o = o1.pipe(merge(o2));
http://jsfiddle.net/dnzl/ux38whjL/embed/js,html/dark/

startWith

startWith (на learnrxjs.io) - эмитит переданное значение первым. Можно использовать, когда мы просто хотим дополнить какой-либо из потоков, то есть в нашем примере - начать с 5.

const o1 = timer(0, 1000).pipe(take(3));

const o = o1.pipe(startWith(5));
http://jsfiddle.net/dnzl/az6soxp3/embed/js,html/dark/

withLatestFrom

withLatestFrom (на learnrxjs.io) - получаем последнее значение из другого Observable - в нашем случае мы получим группу (массив) сформированный относительно времени:

const o1 = timer(0, 1000).pipe(take(3));
const o2 = timer(0, 100).pipe(take(3));

const o = o1.pipe(withLatestFrom(o2));
//jsfiddle.net/dnzl/eLk3wr4p/embed/js,html/dark/
// output
next:Array [ 0, 0 ]
next:Array [ 1, 2 ]
next:Array [ 2, 2 ]

pairwise

pairwise (на learnrxjs.io) - эмитит предыдущее и текущее значения как массив.

const o1 = rxjs.timer(0, 1000).pipe(rxjs.operators.take(3));
const o = o1.pipe(rxjs.operators.pairwise());
// output:
next:Array [ 0, 1 ]
next:Array [ 1, 2 ]
complete
//jsfiddle.net/dnzl/0e4uytgL/embed/js,html/dark/

race

race - выявляет победителей между Observable - используется тот Observable, который эмитит первым.

// Throws an error and ignores the other observables.
const first = rxjs.of('first').pipe(
  rxjs.operators.delay(100),
  rxjs.operators.map(_ => {
    throw 'error';
  })
);
const second = rxjs.of('second').pipe(rxjs.operators.delay(200));
const third = rxjs.of('third').pipe(rxjs.operators.delay(300));
// nothing logged
const o = rxjs.race(first, second, third);

o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});
//jsfiddle.net/dnzl/wo781gpt/7/embed/js,html/dark/
// output:
error: error

Общий subscribe

o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});

 

Операторы Трансформации pluck, reduce, scan

 

pluck

pluck - когда в последовательность передаются сложные составные объекты и вам не нужны все свойства данного объекта, то при помощи оператора pluck можно вычленить (ощипать) одно из них: в операторе pluck указываем свойство, которые мы хотим получить

const o = of({
    name: 'John',
    age: 21
})
.pipe(
    pluck('name')
)

reduce

reduce - практически аналогично функции reduce из нативного js, см. пример ниже:

const o = of(1, 2, 3, 4, 5, 6, 7)
    .pipe(
        reduce((accumulator, current) => {
            // получаем сумму всех значений
            return accumulator + current;
        })
    );

scan

scan - данный оператор отличается от оператора reduce тем, что он выстреливает значения на каждом шаге.

const o = of(1, 2, 3, 4, 5, 6, 7)
    .pipe(
        scan((accumulator, current) => {
            // получаем сумму всех значений
            return accumulator + current;
        })
    );

Общий subscribe

o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});

 

Операторы Трансформации типа map, mapTo, switchMap, exhaustMap

 

map

map - работает аналогично нативному методу map: пробегается по массиву и формируем новый массиив, преобразуя значения.

 
const o = range(0, 10)
        .pipe(map((val) => {
            return val * 2;
        }));

mapTo

mapTo - применяется в том случае, когда нам не нужны значения, а нужно преобразовать все к одному значения для каждого элемента.

const o = range(0, 10)
    .pipe(mapTo('cool'));

flatMap

flatMap - работает с Observable высших порядков (это означает, что у Observable элементом внутри является также Observable).

flatMap === mergeMap (на learnrxjs.io): все Observable сливаются в одну последовательность (значения выстреливают по мере поступления).

const clicks = fromEvent(document,  'click');
const o = clicks.pipe(flatMap((_) => {
    return interval(500);
}))
//jsfiddle.net/dnzl/p5mth1y6/embed/js,html/dark/

switchMap

switchMap (на learnrxjs.io) - предыдущая внутренняя последовательность останавливается (complete) и стартует новая последовательность.

const clicks = fromEvent(document,  'click');
const o = clicks.pipe(switchMap((_) => {
    return interval(500);
}))
//jsfiddle.net/dnzl/sa2zdfvp/embed/js,html/dark/

switchmap на stackblitz.com

exhaustMap

exhaustMap - (на learnrxjs.io) переключает последовательность, но после переключения exhaustMap уже все равно (он не реагирует) и идет все та же последовательность; то есть в нашем случае сработает переключение на 1-й клик, все остальные клики будут проигнорированы и будет идти последовательность от интервала.

const clicks = fromEvent(document,  'click');
const o = clicks.pipe(exhaustMap((_) => {
    return interval(500);
}))
//jsfiddle.net/dnzl/g0h6e3yo/embed/js,html/dark/

concatMap

concatMap (на reactivex.io/rxjs, на learnrxjs.io) - работает также как и flatMap, но в отличие от flatMap сохраняет последовательность в работе внутренних Observable (то есть внутренний Observable получает значение от внешнего Observable и выполняется до тех пор пока не совершит complete и далее ожидает следующее значение от внешнего Observable).

var clicks = rxjs.fromEvent(document, 'click');
var o = clicks.pipe(
	rxjs.operators.concatMap(ev => rxjs.interval(1000).pipe(
  	rxjs.operators.take(4
  )))
);
//jsfiddle.net/dnzl/zp739Lmg/embed/js,html/dark/

Общий subscribe

o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});

 

Операторы обработки ошибок

 

catchError

catchError (на learnrxjs.io) - обрабатывает ошибки в последовательности.

// catchError
const o = interval(500)
    .pipe(
        mergeMap((val) => {
            if (val > 3) {
                // генерируем Observable ошибки
                return throwError('error > 3');
            }
            return of(val);
        }),
        // поймать ошибку мы можем с помощью оператора catchError (возвращает Observable)
        catchError((err) => {
            console.log('err on catchError: ', err);
            // попадет в next (поток)
            return of(false);
        })
    );
//jsfiddle.net/dnzl/5d2zgtmu/3/embed/js,html/dark/

retry

retry (на learnrxjs.io) - повторяет последовательность Observable n раз, пока не будет ошибок. Параметром принимает кол-во попыток на перезапуск (пока не будет ошибок).

const o = interval(500)
    .pipe(
        mergeMap((val) => {
            if (val > 3) {
                // генерируем Observable ошибки
                return throwError('error > 3');
            }
            return of(val);
        }),
        retry(2)
    );
//jsfiddle.net/dnzl/by6at1dr/embed/js,html/dark/

retryWhen

retryWhen - параметром принимает Observable ошибки, на котором мы можем поставить какие-либо условия, например, в нашем случае это задержка в 2 сек на перезапуск потока.

const o = interval(500)
    .pipe(
        mergeMap((val) => {
            if (val > 3) {
                // генерируем Observable ошибки
                return throwError('error > 3');
            }
            return of(val);
        }),
        retryWhen((errorObservable) => {
            return errorObservable.pipe(
                delay(2000)
            )
        })
    );

Общий subscribe

o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});

Операторы-утилиты: tap, delay, delayWhen

delay и delayWhen

delay - выводит последовательность через определенное время.

delayWhen - в delayWhen передаем Observable, в котором задаем то, как мы должны реализовать delay.

tap

tap - позволяет делать side эффекты, то есть какие-либо действия, которые не влияют на последовательность.

const o = range(0,100)
    .pipe(
        tap((n) => {
            console.log(n+'ho');
        })
    )

timeOut

timeOut - в timeout мы указываем время, и если в это время не было сгенерировано значение, то значит произошла ошибка.

const o = interval(1000)
    .pipe(
        delay(5000),
        tap((n) => {
            console.log(n+'ho');
        }),
        timeout(3000)
    )

finalize

finalize - работает аналогично секции finally блока try/catch нативного js - то есть выполнится в любом случае.

const o = interval(1000)
    .pipe(
        delay(5000),
        tap((n) => {
            console.log(n+'ho');
        }),
        timeout(3000),
        finalize(() => {
            console.log('finalize');
        })
    )

toPromise

toPromise (на learnrxjs.io) - встроенный метод, который есть у всех Observable; позволяет превратить любой Observable в Promise и далее работать с ник как с Promise.

//jsfiddle.net/dnzl/e72z39oL/embed/js,html/dark/

Общий subscribe

o.subscribe({
    next: (value: any) => console.log('next: ', value),
    complete: () => console.log('complete'),
    error: (error) => console.log('error: ', error)
});

Интерактивные marbles диаграммы: rxmarbles.com

Примеры использования RxJS в Angular

debounceTime, distinctUntilChanged, startWith, combineLatest + async pipe

@Component({
    selector: 'app-search-tickets',
    templateUr: ' ./searh-tickets.component.html',
    styteUrls: [' ./seach-tickets.component.scss'],
})
export class SearchTicketsComponent implements OnInit {

    searchTerm = new FormControl();
    assignedToUser = new FormControl();

    searchResults$: Observable<searchResult[]>;
    users$: Observable<string[]>;

    constructor(private ticketService:TicketService, private userService:UserService) {
    }

    ngOnInit():void {
        const users$ = throttle(this.assignedToUser.valueChanges);
        const searchBy$ = throttle(this.searchTerm.valueChanges);

        this.searchResults$ = combineLatest(searchBy$, users$).pipe(
            switchMap(([ticket, user]) => {
                const hasCriteria = ticket.Length || user.length;
                return !hasCriteria ? of([]) : this.ticketService.searchTickets(ticket, user);
            })
        );

        this.users$ = users$.pipe(
            switchMap(searchTerm => {
                const extractFullNames = users => users.map(it => it.fullName);
                const pending$ = this.userService.users(searchTerm);
                return !searchTerm ? of([]) : pending$.pipe(map(extractFullNames));
            })
        );
    }
}

function throttle(source$: Observable<strtng>) {
    return source$.pipe(debounceTime(350), distinctUntilChanged(), startWith(''));
}
<mat-autocomplete #users="matAutocomplete">
    <mat-option
            *ngFor="let user of (users$ | async)"
            [value]="user">{{user}}</mat-option>
</mat-autocomplete>


<a routerLtnk="/ticket/{{result.id}}"
   *ngFor="let result of searchResults$ | async">
    <div>
        <span>{{ result.message}}</span>
        <span>Status: {{result.status}}</span>
    </div>
</a>

switchMap, debounceTime

searchBook() {
  this.bookId.valueChanges.pipe(
     debounceTime(500),
     switchMap(id => {
       console.log(id);
       return this.bookService.getBook(id);
     })
  ).subscribe(res => this.book = res);
} 

Полезные ссылки

jiodev.com,
reactivex.io,
learnrxjs.io,
живые примеры на stackblitz.com, например, switchmap, concatMap, mergeMap

Добавить комментарий
Комментарии:
Евгенй
Jul 14, 2020
Присоединяюсь. Спасибо!!!
Denys
Apr 17, 2020
Большое спасибо за труды!ОЧень круто и доходчиво все изложено!)
Юрий
Jan 20, 2020
Реально хорошее описание опреторов. Спасибо, чтиво великолепное