Observable под капотом

Apr 18, 2018

Библиотека Reactive Extensions - реализует механизм асинхронного распространения событий, который используется в Angular для обнаружения изменений и распространения событий (система по отправке и получении уведомлений).

Домашняя страница проекта Reactive Extensions

Как работает: объект observer собирает события и распространяет их среди подписчиков через Observable. Для управления потоком событий к подписчику используются специальные операторы.

В Angular Reactive Extensions необходима для следующий функций Angular: обновление потомков, получение информации о потомках, выдача асинхронных запросов HTTP.

Зачем использовать Reactive Extensions

  • Единный интерфейс для любых источников данных
  • Возможность объединять источники или подключать их по очереди
  • 120+ операторов
  • Отмена подписки, когда данные больше не нужны
  • Lazy - не будет работать пока нет subscribe

Какие задачи легко решать с RxJs

RxJs работает с любыми источниками данных: события, запросы, клики, массивы, интервалы, 'live search' и т.д.

Drag & Drop - пример на CodePen

  • Объединение несколько событий вместе
  • Фильтрация события (delay, debounce, throttle)
  • Управление асинхронными действиями
  • Когда нужна отмена запросов

Операторы RxJS

Позволяют создавать Observable:

  • from() - создает Observable, принимает любые данные
  • fromEvent() - создает Observable из event
  • fromPromise() - создает Observable из Promise
  • scan() - позволяет накапливать данные
  • delay() - позволяет откладывать emit
  • debounceTime() - предотвращает прокидывание значения, если оно приходит, например, чаще чем 200ms: debounceTime(200)
  • map - функция вызывается для каждого события, сгенерированного Observable, и возвращает Observable, который emit результат применения функции (map)
  • filter() - функция вызывается для каждого события, сгенерированного Observable, и отбрасывает те из них, для которых функция возвращает false
  • take() - позволяет взять только несколько значений
  • mergeMap() - оставляет в силе предыдущий Observable
  • switchMap() - делает complete для предыдущего Observable
  • distinctUntilChanged - метод подавляет события до изменения объекта события
  • skipWhile - метод отбрасывает события испускаемые Observable до тех пор пока не выполнится условие (skipwhile)
  • takeWhile - передает события подписчику до тех пор пока условие не станет ложным
  • merge - берет один Observable и замешивает в него значение из другого Observable
  • pluck - передаем имя поля, которое мы хотим получить

Marbles диаграммы

Marbles диаграммы

Observable

Класс Observable (наблюдаемый объект) - предоставляет последовательность наблюдаемых программой событий/источник событий. Мы можем подписаться на Observable и получать уведомления при каждом наступлении события.

Метод subscribe

Метод subscribe в качестве аргументов получает 3 функции:

  • next - функция вызывается при инициализации нового события
  • error - функция вызывается при инициализации ошибки
  • complete - функция вызывается при завершении последовательности событий

А как же Promise

Объекты Observable обладают большей гибкостью и большей функциональность, но все же библиотека Reactive Extensions предоставляет метод Observable.fromPromise - создает Observable, который использует Promise как источник событий.

Также есть метод Observable.toPromise - для того чтобы Observable превратить в Promise.

Observer

Observer (наблюдатель) - механизм создания обновлений.

Методы Observer

  • next(data) - создает новое событие с использованием нового значения (data)
  • error(errObj) - сигнализирует об ошибке
  • complete() - сигнализирует об окончании последовательности событий

Объекты типа Subject

Библиотека Reactive Extensions предоставляет класс Subject, реализующий функциональность как Observable так и Observer. Следовательно можно создавать службы, которые и генерируют и потребляют события в одном объекте.

Subject - очень важно изучить, так как позволяют легко организовать взаимодействие между компонентами: Общение между компонентами через Observable и Subject, Angular (оригинал)

Observable под капотом, пример на функциях

По материалам «RxJS — мифы и реальность», Александр Грибанов

http://jsfiddle.net/dnzl/3d44umnz/16/embed/js/dark/

Что происходит, когда мы начинаем выполнять эту цепочку:

arrayObservable - это объект (с методами filter, map, subscribe), созданный при помощи createObservable; функция startReceivingData идет как параметр в createObservable и присваивается методу subscribe

var observer = {
    next: function(data) {
        console.log(data)
    },
    complete: function() {
        console.log('Done')
    },
    error: function(err) {
        console.log('error: ', err)
    }
};   
    
    
// createObservable - функция helper, которая создает Observable
function createObservable(subscribeFN) {
    return {
        filter: filterFn,
        map: mapFn,
        subscribe: subscribeFN
    }
}

const arrayObservable = createObservable(function startReceivingData(observer) {
        [2, 5, 68, 9, 4].forEach(observer.next);
        observer.complete();
    }
);

на объекте arrayObservable применяем метод filter;
метод filter указывает на filterFn.
filterFn - принимает inputObservable (через this, т.к. он указывает на arrayObservable);
filterFn - возвращает outputObservable (новый Observable), созданный при помощи createObservable, но только с другой ф-ей подписки (subscribe)

arrayObservable
    .filter(x => x % 2)
    .map(x => x * 10)
    .subscribe(observer);

function createObservable(subscribeFN) {
    return {
        filter: filterFn,
        map: mapFn,
        subscribe: subscribeFN
    }
}

// реализуем функцию filter (по аналогии с map):
function filterFn(conditionFn) {
    const inputObservable = this;
    const outputObservable = createObservable(function subscribe(observer) {
        inputObservable.subscribe({
            // условие выполняется, то передаем данные дальше
            next: (x) => {
            	console.log('x in filter: ', x);
                conditionFn(x) && observer.next(x);
            },
            complete:   () => observer.complete(),
            error:      () => observer.error(),
        })
    });

    return outputObservable;
}

далее вызываем метод map;
работает аналогично filter;
возвращает объект (новый Observable), который отличается от того что отдает filter лишь ф-ей подписки (subscribe)

function createObservable(subscribeFN) {
    return {
        filter: filterFn,
        map: mapFn,
        subscribe: subscribeFN
    }
}

// реализуем функцию map:
// ф-я map должна взять предыдущий Observable, создать новый Observable
// mapFn имеет достпуп к текущему контексту (this) = текущему Observable

function mapFn(transformationFn) {
    const inputObservable = this;
    const outputObservable = createObservable(function subscribe(observer) {
        // subscribe, чтобы получить данные
        inputObservable.subscribe({
            // next принимает данные (chunks), мы должны дальше прокинуть эти данные и
            // преобразовать (transformationFn)
            next: (x) => {
            	console.log('x in map: ', x);
                return  observer.next(transformationFn(x))
            },
            complete:   () => observer.complete(),
            error:      () => observer.error(),
        })
    });

    return outputObservable;
}

На объект (Observable), который вернул map мы вызываем ф-ю subscribe и передаем ей observer (объект, который будет получать данные).

когда мы вызываем:

.map(x => x * 10)
.subscribe(observer);

мы по факту вызываем (из map (subscribe)):

function subscribe(observer) {
    // subscribe, чтобы получить данные
    inputObservable.subscribe({
        //  next принимает данные (chunks), мы должны дальше прокинуть эти данные и преобразовать (transformationFn)
        next:       (x) => observer.next(transformationFn(x)),
        complete:   () => observer.complete(),
        error:      () => observer.error(),
    })
}

передавая в данную ф-ю тот observer, который хочет на нас подписаться. В этой функции мы берем inputObservable, то в нашем случае это ,назовем для понимания, 'filterObservable', это тот Observable, который вернулся из функции filter и делаем ему subscribe, передавая новый observer, таким образом мы попадаем в:

function subscribe(observer) {
    inputObservable.subscribe({
        // условие выполняется, то передаем данные дальше
        next:       (x) => conditionFn(x) && observer.next(x),
        complete:   () => observer.complete(),
        error:      () => observer.error(),
    })
}

данная ф-я берет inputObservable, а для filter это arrayObservable и говорит subscribe ('я хочу начать получать данные') и передает ей (subscribe) свой observer. Этот observer попадает вот сюда:

[2, 5, 68, 9, 4].forEach(observer.next);
observer.complete();

То есть здесь observer, который передал filter. И именно здесь начинается emit данных. Повторюсь, [2, 5, 68, 9, 4].forEach(observer.next) Здесь observer - это observer прокинутый через функцию filter:

{
    // условие выполняется, то передаем данные дальше
    next:       (x) => conditionFn(x) && observer.next(x),
    complete:   () => observer.complete(),
    error:      () => observer.error(),
}

вызывая observer.next(x), здесь observer'ом является 'mapObserver' (замыкания наше все: в момент создания функции next мы замкнули ранее переданный observer):

{
    //  next принимает данные (chunks), мы должны дальше прокинуть эти данные и преобразовать (transformationFn)
    next:       (x) => observer.next(transformationFn(x)),
    complete:   () => observer.complete(),
    error:      () => observer.error(),
}

Здесь мы вызываем observer.next(transformationFn(x)), здесь же observer является:

var observer = {
    next: function(data) {
        console.log(data)
    },
    complete: function() {
        console.log('Done')
    },
    error: function(err) {
        console.log('error: ', err)
    }
};

Итого:

arrayObservable
    .filter(x => x % 2)
    .map(x => x * 10)
    .subscribe(observer);

Без конечного .subscribe(observer); данные никогда не начнут emit'ся

Observables под капотом, пример на классах

По материалам Observables под капотом

Шаблон Observer:

Шаблон observer представляет собой шаблон разработки программного обеспечения, в котором объект, называемый subject, обслуживает ряд своих иждивенцев, называемыми observers, и автоматически уведомляет их о любых изменениях состояния, обычно вызывая один из их методов.

Observable:

Мне нравится думать об Observable как о функции, которая пробрасывает ("throws") значения. Он может пробрасывать ("throws") значения синхронным или асинхронным путем. Если у вас есть интерес к этим значениям, вы можете зарегистрировать observer.

Observer:

Observer - это объект с тремя функциями.

  1. next() => Observable, вызови эту функцию, когда у тебя есть новое значение для меня.
  2. error() => Observable, вызови эту функцию, когда у тебя есть новая error для меня.
  3. complete() => Observable, вызови эту функцию, когда закончишь свою работу.

Когда Observable (т.е. функция) выбрасывает ("throws") новое значение, error или completes, он вызовет соответствующую функцию на вашем observer.

Push vs. Pull:

Если вы знакомы с шаблоном Iterator, вы знаете свои обязанности: когды вы хотите новое значение, вам достаточно вызвать метод next, чтобы стянуть (pull) значение.

var it = makeIterator(['yo', 'ya']);
console.log(it.next().value); // 'yo'

С Observabe это как: не звоните нам, мы сами вам позвоним.

Observable это босс. Когда у него будет новое значение, он протолкнет (push) значение вам. Ваша работа просто "слушать".

Метафора из реального мира -  новостная рассылка (NewsLetters):

Как письмо появляется в вашей почте (email)? Вы подписываетесь (subscribing ) на новостную рассылку; когда появляется новая новость менеджер просто толкает (pushes) его на ваш email.

Достаточно речей, давайте начнем разбирать и понимать Observable, путем создания нашей собственной мини Rx.

Нам нужно заставить этот код работать:

let fakeAsyncData$ = new Observable(observer => {
    setTimeout(() => {
        observer.next('New data is coming');
        observer.complete();
    }, 2000);
});

fakeAsyncData$.subscribe({
    next(val) { console.log(val) } ,
    error(e) { console.log(e) } ,
    complete() { console.log('complete') }
});

Начнем.

class Observable {
    constructor(functionThatThrowsValues) {
        this._functionThatThrowsValues = functionThatThrowsValues;
    }
}

Начнем с создания класса Observable и сохранении ссылки на функцию, которая будет пробрасывать значения. Теперь вы можете понять почему Observable называют lazy; вы не вызывает функцию, а просто сохраняете ссылку.

Функция подписки (subscribe):

subscribe(observer) {
    return this._functionThatThrowsValues(observer);
}

Только когда вы вызовите метод subscribe, вы вызовите функцию, которая будет пробрасывать ("throws") значения с observer.

Только что вы создали новый Observable.

http://jsfiddle.net/dnzl/5Lmapcd8/embed/js/dark/

Давайте рассмотрим как создать метод map

fakeAsyncData$.map(val => `New value ${val}`).subscribe({
    next(val) { console.log(val) } ,
    error(e) { console.log(e) } ,
    complete() { console.log(‘complete’) }
});

Когда мы вызываем метод map возвращается новый Observable, который подписан на наш источник, в нашем случае это fakeAsyncData$. Когда источник (fakeAsyncData$) "выбрасывает" новое значение оно доходит до метода map, где к значению применяется функция, далее 'mapObservable' "выбрасывает" значение к нам. (помните мы подписаны на 'mapObservable').

http://jsfiddle.net/dnzl/n6r364am/embed/js/dark/

Давайте посмотрим как создать метод fromEvent

var button = document.getElementById('button');

let clicks$ = Observable.fromEvent(button, 'click')
              .map(e => `${e.pageX}px`);

let unsubscribe = clicks$.subscribe({
    next(val) { console.log(val) },
    error(e) { console.log(e) },
    complete() { console.log('complete') }
});
static fromEvent(element, event) {
    return new Observable(observer => {
        const handler = (e) => observer.next(e);
        element.addEventListener(event, handler);

        return () => {
            element.removeEventListener(event, handler);
        };

    });
}

Метод fromEvent просто возвращает новый Observable, который будет выбрасывать нам объект event, когда событие случится. Нам не нужны утечки памяти, поэтому мы возвращаем функцию, которая даст нам возможность отписаться от подписки, когда нам это потребуется.

setTimeout(() => unsubscribe(), 1000);
http://jsfiddle.net/dnzl/d6rpj7fr/11/embed/js/dark/

Давайте реализуем fromArray метод.

let array$ = Observable.fromArray([1,2,3]);

array$.subscribe({
    next(val) { console.log(val) },
    error(e) { console.log(e) },
    complete() { console.log('complete') }
});
static fromArray(array) {
    return new Observable(observer => {
        array.forEach(val => observer.next(val));
        observer.complete();
    });
}

Мы можем увидеть, что Observable могут быть синхронными.

Используем RxJs, чтобы обработать HTTP-запросы

Использованы материалы rxjs чтобы обработать http-requests

  • Создадим Observable потоки из http-запросов
  • Обработаем ошибки
  • Обработаем завершение http-запроса
  • Реализуем debounce.

Создадим Observables из события 'keyup'

Создадим Observables из события 'keyup' (лучше на input) для текстового поля:

http://jsfiddle.net/dnzl/fpv8ha6s/embed/js,html,result/dark/

p.s. проще, вероятно, брать значение непосредственно из объекта события event.

Обрабатываем http-запрос

Оператор switchMap - complete перыдущий Observable, emit значение. Таким образом switchMap позволяет нам отменить избыточные http-запросы, если это необходимо.

const clicks$ = Rx.Observable.fromEvent(document, 'click');
const innerObservable$ = Rx.Observable.interval(1000);

clicks$.switchMap(event => innerObservable$)
    .subscribe(val => console.log(val));

Оператор switchMap на reactivex.io.

https://jsfiddle.net/dnzl/mkt7fjav/embed/js,html,result/dark/

Реализуем debounce на пользовательский ввод

Пользователю неинтересен результат, пока они не набрали хотя бы несколько символов. Для этого воспользуемся оператором debounceTime - предотвращает прокидывание значения, если оно приходит чаще чем время указанное в параметре. debounceTime проигнорирует значения попавшие в промежуток, когда происходит 'ожидание' значения.

https://jsfiddle.net/dnzl/ymfecevz/embed/js,html,result/dark/

Есть также оператор debounce, но он в качестве параметра принимает функцию.

Полезные приемы

Операторы filter, take - см. раздел операторы

userTypesInSearchBox
    .debounce(250)
    .filter((searchTerm) => {
        return searchTerm.length >= 5;
    })

merge

merge - берет один Observable и замешивает в него значение из другого Observable.

https://jsfiddle.net/dnzl/aw5yw69z/embed/js,html,result/dark/

Оператор combineLatest

Оператор combineLatest объединят несколько Observables, чтобы создать Observable чьи значения вычислены на основе последних значений каждого входящего в него Observables.

combineLatest

В нашем примере есть два поля поиска и они позволяют пользователю сравнить результаты аватарок, как только успешны два запроса.

https://jsfiddle.net/dnzl/nhL2ngjy/5/embed/js,html,result/dark/

Разница между flatmap и map:

Заметка на medium

Rxjs: 6 операторов, которые вы должны знать

Concat, forkJoin, mergeMap, pairwise, switchMap, combineLatest: stepansuvorov.com

Обрабатываем ошибки

Метод catch

Метод catch - ловит ошибки Observable, чтобы их можно было обработать, вернув новый Observable или throwing error. catch на reactivex.io

Observable.of(1, 2, 3, 4, 5)
    .map(n => {
        if (n == 4) {
            throw 'four!';
        }
        return n;
    })
    .catch(err => Observable.of('I', 'II', 'III', 'IV', 'V'))
    .subscribe(x => console.log(x));
    // 1, 2, 3, I, II, III, IV, V

Статичный метод throw

Статичный метод throw - создает новый Observable, который просто содержит ошибку. public static throw на reactivex.io

return this.http.request(new Request({
    url: url,
    body: body,
    headers: headers
})).map(response => response.json())
    .catch((error: Response) => Observable.throw(`Network Error:
        ${error.statusText} (${error.status})`));
Добавить комментарий