Menu Search Me

Reactive Extension (rxjs library)

References

http://reactivex.io/rxjs/manual/overview.html

Rx.Observable.fromEvent, scan, throttleTime, map

http://reactivex.io/rxjs/manual/overview.html#first-examples

Here's how you can add the current mouse x position for every click, in plain JavaScript:

var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', (event) => {
  if (Date.now() - lastClick >= rate) {
    count += event.clientX;
    console.log(count)
    lastClick = Date.now();
  }
});

With RxJS:

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .throttleTime(1000)
  .map(event => event.clientX)
  .scan((count, clientX) => count + clientX, 0)
  .subscribe(count => console.log(count)); 

Observable, Observer, Subscription

http://reactivex.io/rxjs/manual/overview.html#observable

  • Subscribing to an Observable is analogous to calling a Function.
  • Observables can "return" multiple values over time, synchronouslyor asynchronously

Example:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(() => {
    observer.next(300); // happens asynchronously
  }, 1000);
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

With output:

"before"
"Hello"
42
100
200
"after"
300

Another Example:

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

console.log('just before subscribe');
observable.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

There are three types of values an Observable Execution can deliver:

"Next" notification: sends a value such as a Number, a String, an Object, etc.
"Error" notification: sends a JavaScript Error or exception.
"Complete" notification: does not send a value.

In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.
The following is an example of an Observable execution that delivers three Next notifications, then completes:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});

Observables strictly adhere to the Observable Contract, so the following code would not deliver the Next notification 4:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
  observer.next(4); // Is not delivered because it would violate the contract
});

It is a good idea to wrap any code in subscribe with try/catch block that will deliver an Error notification if it catches an exception:

var observable = Rx.Observable.create(function subscribe(observer) {
  try {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
  } catch (err) {
    observer.error(err); // delivers an error if it caught one
  }
});

Observables are lazy Push collections of multiple values.

What is an Observer? An Observer is a consumer of values delivered by an Observable. Observers are simply a set of callbacks, one for each type of notification delivered by the Observable: next, error, and complete.

What is a Subscription? A Subscription is an object that represents a disposable resource, usually the execution of an Observable. A Subscription has one important method, unsubscribe, that takes no argument and just disposes the resource held by the subscription.

unsubscribe

When you subscribe, you get back a Subscription, which represents the ongoing execution. Just call unsubscribe() to cancel the execution.

var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe();

Each Observable must define how to dispose resources of that execution when we create the Observable using create(). You can do that by returning a custom unsubscribe function from within function subscribe().

For instance, this is how we clear an interval execution set with setInterval:

var observable = Rx.Observable.create(function subscribe(observer) {
  // Keep track of the interval resource
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);

  // Provide a way of canceling and disposing the interval resource
  return function unsubscribe() {
    clearInterval(intervalID);
  };
});

Subject

What is a Subject? An RxJS Subject is a special type of Observable that allows values to be multicasted to many Observers. While plain Observables are unicast (each subscribed Observer owns an independent execution of the Observable), Subjects are multicast.

In the example below, we have two Observers attached to a Subject, and we feed some values to the Subject:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

With the following output on the console:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

Since a Subject is an Observer, this also means you may provide a Subject as the argument to the subscribe of any Observable, like the example below shows:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // You can subscribe providing a Subject

Which executes as:

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

With the approach above, we essentially just converted a unicast Observable execution to multicast, through the Subject. This demonstrates how Subjects are the only way of making any Observable execution be shared to multiple Observers.

There are also a few specializations of the Subject type: BehaviorSubject, ReplaySubject, and AsyncSubject.

Broadcasting events in Angular 2

Angular 1 has an utility for broadcasting an event, $broadcast and $on. Now, Angular 2 dropped these features. However, we can create a similar utility by using

https://blog.lacolaco.net/post/event-broadcasting-in-angular-2/

Multicasted Observables

Operators

Scheduler

What is a Scheduler? A scheduler controls when a subscription starts and when notifications are delivered. It consists of three components.

In the example below, we take the usual simple Observable that emits values 1, 2, 3 synchronously, and use the operator observeOn to specify the async scheduler to use for delivering those values.

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
})
.observeOn(Rx.Scheduler.async);

console.log('just before subscribe');
observable.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

Which executes with the output:

just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done

Notice how the notifications got value... were delivered after just after subscribe, which is different to the default behavior we have seen so far.