JS Journey into outer space - Day 4
This article was written as part of a knowledge sharing program I created for front-end developers. It builds up to creating reusable abstractions by studying async transducers.
Day 4
Now we have modified transduce
to handle async it's time to figure out how it can handle observables.
🏈
Kickoff First we need to pass the onNext
function which kicks off the callback chain. In RxJs
this is called subscribe
instead, so let's rename the parameter and the callbacks to be more in line with RxJs
:
function transduce(input, fn, subscribe, onError, onComplete, init) {
subscribe(input, {
next: (cur) => {
init = fn(init, cur);
},
error: (err) => {
onError(init, err);
},
complete: () => {
onComplete(init);
}
});
return init;
};
🦉 Observables are considered a push-style of dealing with many values, while generators are pull-based. InRxJS
there are ways to handle this "stream" of values in a more sophisticated way, as they're always coming in. Even in async generators the code needs to get and await the next value, evaluating it when it's resolved. On the flipside, it means that the consumer is in charge: you don't need to think about functionally transforming streams, you can get a single value on demand and operate on it, just like with any imperative construct. What is called "backpressure" is handled in a natural way, where with observables you would need to buffer or even drop values altogether. You can convert between generators and observables, but they generally exist in different paradigms, so the choice is up to you I guess🙃
The subscribe
function for Observable
is simply a way to dispatch the subscribe
method on the object (perhaps it would have been nicer if RxJs
would already expose this function, but it doesn't seem so). Note that this function returns void
. At some point we might need to come back to the return value, because we may have to unsubscribe from the observable.
function observableSubscribe<T>(o: Observable<T>, s: Observer<T>) {
o.subscribe(s);
}
Subject
Just like the Promise
case needed a Deferred
, the Observable
case needs an initial value that allows for "updates from the outside". In RxJS
this typically is a Subject
, which exposes methods next
, error
, parallel to resolve
and reject
. It also has a complete
method, because since an observable "resolve" many values it can complete at any time. When a Subject
is passed as initial value, the handlers can simply dispatch on it:
function observableConcat<T>(a: Observable<T>, c: T) {
a.next(c);
return a;
}
function observableOnError<T>(a: Observable<T>, error: any) {
a.error(error);
}
function observableOnComplete<T>(a: Observable<T>) {
a.complete();
}
Finally we can transduce observables.
const xform = doubleEvens(observableConcat);
const result = transduceObservable(
from([1, 2, 3, 4, 5, 6]),
xform,
observableSubscribe,
observableOnError,
observableOnComplete,
new Subject()
);
result.subscribe(console.log); // 4 8 12
🧑🚀
Ready for takeoff Now that most of our usecases are covered (though not yet all operations) the time has come to make the transducer function handle them all. It's possible to get rid of all the specific arguments related to different types, but before that let's look at the common characteristics.
First it seems that all these type-specific functions are really different, but just like the Iterable
interface is available JS (or, rather, TS), the fp-ts
library has a collection of type classes ready for use. At first the names look seriously alien: Monad, Functor, Semigroup... These names are coming from category theory, a field in math, and not from any practical application (also, not all modules in fp-ts
expose type classes).
However, this road was created as a gentle introduction into higher and higher levels of abstraction, by focusing on what is concretely usable. So let's dive in and look at the Monoid
type class.
🦉 Semigroup
defines the behaviour of types that allow for concatenation, in whatever form that takes. JS types like arrays, strings and numbers all display some form of concatenation, so these types can be considered to belong to this category.Monoid
extends this behaviour with an "empty" or initial value of a type. The empty array, the empty string, and the number zero were chosen respectively. See the relevant modules in thefp-ts
documentation.
Not all implementations in fp-ts
are similar (e.g. there is no Monoid
for Array<unknown>
), so this repo re-exposes Monoid
to be readily used in the transducer functions. We already implemented Monoid
behaviour for promises and observables, but we'll expose it using correct type.
function transduceArray<T>(input: T[], fn) {
return transduce(
input,
fn(array.getMonoid<T>().concat),
iteratorSubscribe,
noop,
noop,
array.getMonoid<T>().empty
);
}
function transduceString(input: string, fn) {
return transduce(
input,
fn(string.getMonoid().concat),
iteratorSubscribe,
noop,
noop,
string.getMonoid().empty
);
}
function transducePromise<T>(input: Promise<T>, fn) {
return transduce(
input,
fn(promise.getMonoid<T>().concat),
promiseSubscribe,
promiseOnError,
noop,
promise.getMonoid<T>().empty
);
}
function transduceObservable<T>(input: Observable<T>, fn) {
return transduce(
input,
fn(observable.getMonoid<T>().concat),
observableSubscribe,
observableOnError,
observableOnComplete,
observable.getMonoid<T>().empty
);
}
All that is needed now is a type class that expresses the subscribe
, onError
and onComplete
behaviour (if any) and we can have a stab at creating the input type for transduce
:
// required typeclasses for the types implementing `transduce`
interface Semigroup<A> {
readonly concat: (x: A, y: A) => A
}
interface Monoid<A> extends Semigroup<A> {
readonly empty: A
}
interface Observer<T> {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
interface Subscribable<A> {
subscribe: <T>(a: A, o: Observer<T>) => void;
}
interface Failable<A> {
onError: (a: A, err: unknown) => void;
}
interface Completable<A> {
onComplete: (a: A) => void;
}
interface Transducable<A, B = A, T = unknown> {
transduce(
input: A,
fn: (a: A, c: T) => A,
subscribe: (a: A, o: Observer<T>) => void,
onError: (a: A, err: unknown) => void,
onComplete: (a: A) => void,
init: B
);
}
There are some things to iron out, but Rome wasn't built in four days.
Comments
Post a Comment