xstream
- Version 11.14.0
- Published
- 715 kB
- 2 dependencies
- MIT license
Install
npm i xstream
yarn add xstream
pnpm add xstream
Overview
An extremely intuitive, small, and fast functional reactive stream library for JavaScript
Index
Variables
Classes
Stream
- addListener()
- combine
- compose()
- create()
- createWithMemory()
- debug()
- drop()
- empty()
- endWhen()
- filter()
- flatten()
- fold()
- from()
- fromArray()
- fromObservable()
- fromPromise()
- imitate()
- last()
- map()
- mapTo()
- merge
- never()
- of()
- periodic()
- remember()
- removeListener()
- replaceError()
- setDebugListener()
- shamefullySendComplete()
- shamefullySendError()
- shamefullySendNext()
- startWith()
- subscribe()
- take()
- throw()
Interfaces
Type Aliases
Variables
Classes
class MemoryStream
class MemoryStream<T> extends Stream<T> {}
constructor
constructor(producer: InternalProducer<T>);
method debug
debug: { (): MemoryStream<T>; (labelOrSpy: string): MemoryStream<T>; (labelOrSpy: (t: T) => any): MemoryStream<T>;};
method endWhen
endWhen: (other: Stream<any>) => MemoryStream<T>;
method map
map: <U>(project: (t: T) => U) => MemoryStream<U>;
method mapTo
mapTo: <U>(projectedValue: U) => MemoryStream<U>;
method remember
remember: () => MemoryStream<T>;
method replaceError
replaceError: (replace: (err: any) => Stream<T>) => MemoryStream<T>;
method take
take: (amount: number) => MemoryStream<T>;
class Stream
class Stream<T> implements InternalListener<T> {}
constructor
constructor(producer?: InternalProducer<T>);
property combine
static combine: CombineSignature;
Combines multiple input streams together to return a stream whose events are arrays that collect the latest events from each input stream.
*combine* internally remembers the most recent event from each of the input streams. When any of the input streams emits an event, that event together with all the other saved events are combined into an array. That array will be emitted on the output stream. It's essentially a way of joining together the events from multiple streams.
Marble diagram:
--1----2-----3--------4-------a-----b-----c--d------combine----1a-2a-2b-3b-3c-3d-4d--true
Parameter stream1
A stream to combine together with other streams.
Parameter stream2
A stream to combine together with other streams. Multiple streams, not just two, may be given as arguments. {Stream}
property merge
static merge: MergeSignature;
Blends multiple streams together, emitting events from all of them concurrently.
*merge* takes multiple streams as arguments, and creates a stream that behaves like each of the argument streams, in parallel.
Marble diagram:
--1----2-----3--------4-------a-----b----c---d------merge--1-a--2--b--3-c---d--4---true
Parameter stream1
A stream to merge together with other streams.
Parameter stream2
A stream to merge together with other streams. Two or more streams may be given as arguments. {Stream}
method addListener
addListener: (listener: Partial<Listener<T>>) => void;
Adds a Listener to the Stream.
Parameter listener
method compose
compose: <U>(operator: (stream: Stream<T>) => U) => U;
Passes the input stream to a custom operator, to produce an output stream.
*compose* is a handy way of using an existing function in a chained style. Instead of writing
outStream = f(inStream)
you can writeoutStream = inStream.compose(f)
.Parameter operator
A function that takes a stream as input and returns a stream as well. {Stream}
method create
static create: <T>(producer?: Producer<T>) => Stream<T>;
Creates a new Stream given a Producer.
true
Parameter producer
An optional Producer that dictates how to start, generate events, and stop the Stream. {Stream}
method createWithMemory
static createWithMemory: <T>(producer?: Producer<T>) => MemoryStream<T>;
Creates a new MemoryStream given a Producer.
true
Parameter producer
An optional Producer that dictates how to start, generate events, and stop the Stream. {MemoryStream}
method debug
debug: { (): Stream<T>; (labelOrSpy: string): Stream<T>; (labelOrSpy: (t: T) => any): Stream<T>;};
method drop
drop: (amount: number) => Stream<T>;
Ignores the first
amount
many events from the input stream, and then after that starts forwarding events from the input stream to the output stream.Marble diagram:
--a---b--c----d---e--drop(3)--------------d---e--Parameter amount
How many events to ignore from the input stream before forwarding all events from the input stream to the output stream. {Stream}
method empty
static empty: <T = any>() => Stream<T>;
Creates a Stream that immediately emits the "complete" notification when started, and that's it.
Marble diagram:
empty-|true {Stream}
method endWhen
endWhen: (other: Stream<any>) => Stream<T>;
Uses another stream to determine when to complete the current stream.
When the given
other
stream emits an event or completes, the output stream will complete. Before that happens, the output stream will behaves like the input stream.Marble diagram:
---1---2-----3--4----5----6---endWhen( --------a--b--| )---1---2-----3--4--|Parameter other
Some other stream that is used to know when should the output stream of this operator complete. {Stream}
method filter
filter: { <S extends T>(passes: (t: T) => t is S): Stream<S>; (passes: (t: T) => boolean): Stream<T>;};
method flatten
flatten: <R>(this: Stream<Stream<R> | MemoryStream<R>>) => Stream<R>;
Flattens a "stream of streams", handling only one nested stream at a time (no concurrency).
If the input stream is a stream that emits streams, then this operator will return an output stream which is a flat stream: emits regular events. The flattening happens without concurrency. It works like this: when the input stream emits a nested stream, *flatten* will start imitating that nested one. However, as soon as the next nested stream is emitted on the input stream, *flatten* will forget the previous nested one it was imitating, and will start imitating the new nested one.
Marble diagram:
--+--------+---------------\ \\ ----1----2---3----a--b----c----d--------flatten-----a--b------1----2---3--{Stream}
method fold
fold: <R>(accumulate: (acc: R, t: T) => R, seed: R) => MemoryStream<R>;
"Folds" the stream onto itself.
Combines events from the past throughout the entire execution of the input stream, allowing you to accumulate them together. It's essentially like
Array.prototype.reduce
. The returned stream is a MemoryStream, which means it is alreadyremember()
'd.The output stream starts by emitting the
seed
which you give as argument. Then, when an event happens on the input stream, it is combined with that seed value through theaccumulate
function, and the output value is emitted on the output stream.fold
remembers that output value asacc
("accumulator"), and then when a new input eventt
happens,acc
will be combined with that to produce the newacc
and so forth.Marble diagram:
------1-----1--2----1----1------fold((acc, x) => acc + x, 3)3-----4-----5--7----8----9------Parameter accumulate
A function of type
(acc: R, t: T) => R
that takes the previous accumulated valueacc
and the incoming event from the input stream and produces the new accumulated value.Parameter seed
The initial accumulated value, of type
R
. {MemoryStream}
method from
static from: <T>( input: PromiseLike<T> | Stream<T> | Array<T> | Observable<T>) => Stream<T>;
Creates a stream from an Array, Promise, or an Observable.
true
Parameter input
The input to make a stream from. {Stream}
method fromArray
static fromArray: <T>(array: Array<T>) => Stream<T>;
Converts an array to a stream. The returned stream will emit synchronously all the items in the array, and then complete.
Marble diagram:
fromArray([1,2,3])123|true
Parameter array
The array to be converted as a stream. {Stream}
method fromObservable
static fromObservable: <T>(obs: { subscribe: any }) => Stream<T>;
Converts an Observable into a Stream.
true
Parameter observable
The observable to be converted as a stream. {Stream}
method fromPromise
static fromPromise: <T>(promise: PromiseLike<T>) => Stream<T>;
Converts a promise to a stream. The returned stream will emit the resolved value of the promise, and then complete. However, if the promise is rejected, the stream will emit the corresponding error.
Marble diagram:
fromPromise( ----42 )-----------------42|true
Parameter promise
The promise to be converted as a stream. {Stream}
method imitate
imitate: (target: Stream<T>) => void;
*imitate* changes this current Stream to emit the same events that the
other
given Stream does. This method returns nothing.This method exists to allow one thing: **circular dependency of streams**. For instance, let's imagine that for some reason you need to create a circular dependency where stream
first$
depends on streamsecond$
which in turn depends onfirst$
:<!-- skip-example -->
import delay from 'xstream/extra/delay'var first$ = second$.map(x => x * 10).take(3);var second$ = first$.map(x => x + 1).startWith(1).compose(delay(100));However, that is invalid JavaScript, because
second$
is undefined on the first line. This is how *imitate* can help solve it:import delay from 'xstream/extra/delay'var secondProxy$ = xs.create();var first$ = secondProxy$.map(x => x * 10).take(3);var second$ = first$.map(x => x + 1).startWith(1).compose(delay(100));secondProxy$.imitate(second$);We create
secondProxy$
before the others, so it can be used in the declaration offirst$
. Then, after bothfirst$
andsecond$
are defined, we hooksecondProxy$
withsecond$
withimitate()
to tell that they are "the same".imitate
will not trigger the start of any stream, it just bindssecondProxy$
andsecond$
together.The following is an example where
imitate()
is important in Cycle.js applications. A parent component contains some child components. A child has an action stream which is given to the parent to define its state:<!-- skip-example -->
const childActionProxy$ = xs.create();const parent = Parent({...sources, childAction$: childActionProxy$});const childAction$ = parent.state$.map(s => s.child.action$).flatten();childActionProxy$.imitate(childAction$);Note, though, that **
imitate()
does not support MemoryStreams**. If we would attempt to imitate a MemoryStream in a circular dependency, we would either get a race condition (where the symptom would be "nothing happens") or an infinite cyclic emission of values. It's useful to think about MemoryStreams as cells in a spreadsheet. It doesn't make any sense to define a spreadsheet cellA1
with a formula that depends onB1
and cellB1
defined with a formula that depends onA1
.If you find yourself wanting to use
imitate()
with a MemoryStream, you should rework your code aroundimitate()
to use a Stream instead. Look for the stream in the circular dependency that represents an event stream, and that would be a candidate for creating a proxy Stream which then imitates the target Stream.Parameter target
The other stream to imitate on the current one. Must not be a MemoryStream.
method last
last: () => Stream<T>;
When the input stream completes, the output stream will emit the last event emitted by the input stream, and then will also complete.
Marble diagram:
--a---b--c--d----|last()-----------------d|{Stream}
method map
map: <U>(project: (t: T) => U) => Stream<U>;
Transforms each event from the input Stream through a
project
function, to get a Stream that emits those transformed events.Marble diagram:
--1---3--5-----7------map(i => i * 10)--10--30-50----70-----Parameter project
A function of type
(t: T) => U
that takes eventt
of typeT
from the input Stream and produces an event of typeU
, to be emitted on the output Stream. {Stream}
method mapTo
mapTo: <U>(projectedValue: U) => Stream<U>;
It's like
map
, but transforms each input event to always the same constant value on the output Stream.Marble diagram:
--1---3--5-----7-----mapTo(10)--10--10-10----10----Parameter projectedValue
A value to emit on the output Stream whenever the input Stream emits any value. {Stream}
method never
static never: <T = any>() => Stream<T>;
Creates a Stream that does nothing when started. It never emits any event.
Marble diagram:
never-----------------------true {Stream}
method of
static of: <T>(...items: Array<T>) => Stream<T>;
Creates a Stream that immediately emits the arguments that you give to *of*, then completes.
Marble diagram:
of(1,2,3)123|true
Parameter a
The first value you want to emit as an event on the stream.
Parameter b
The second value you want to emit as an event on the stream. One or more of these values may be given as arguments. {Stream}
method periodic
static periodic: (period: number) => Stream<number>;
Creates a stream that periodically emits incremental numbers, every
period
milliseconds.Marble diagram:
periodic(1000)---0---1---2---3---4---...true
Parameter period
The interval in milliseconds to use as a rate of emission. {Stream}
method remember
remember: () => MemoryStream<T>;
Returns an output stream that behaves like the input stream, but also remembers the most recent event that happens on the input stream, so that a newly added listener will immediately receive that memorised event.
{MemoryStream}
method removeListener
removeListener: (listener: Partial<Listener<T>>) => void;
Removes a Listener from the Stream, assuming the Listener was added to it.
Parameter listener
method replaceError
replaceError: (replace: (err: any) => Stream<T>) => Stream<T>;
Replaces an error with another stream.
When (and if) an error happens on the input stream, instead of forwarding that error to the output stream, *replaceError* will call the
replace
function which returns the stream that the output stream will replicate. And, in case that new stream also emits an error,replace
will be called again to get another stream to start replicating.Marble diagram:
--1---2-----3--4-----XreplaceError( () => --10--| )--1---2-----3--4--------10--|Parameter replace
A function of type
(err) => Stream
that takes the error that occurred on the input stream or on the previous replacement stream and returns a new stream. The output stream will behave like the stream that this function returns. {Stream}
method setDebugListener
setDebugListener: (listener: Partial<Listener<T>> | null | undefined) => void;
Adds a "debug" listener to the stream. There can only be one debug listener, that's why this is 'setDebugListener'. To remove the debug listener, just call setDebugListener(null).
A debug listener is like any other listener. The only difference is that a debug listener is "stealthy": its presence/absence does not trigger the start/stop of the stream (or the producer inside the stream). This is useful so you can inspect what is going on without changing the behavior of the program. If you have an idle stream and you add a normal listener to it, the stream will start executing. But if you set a debug listener on an idle stream, it won't start executing (not until the first normal listener is added).
As the name indicates, we don't recommend using this method to build app logic. In fact, in most cases the debug operator works just fine. Only use this one if you know what you're doing.
Parameter listener
method shamefullySendComplete
shamefullySendComplete: () => void;
Forces the Stream to emit the "completed" event to its listeners.
As the name indicates, if you use this, you are most likely doing something The Wrong Way. Please try to understand the reactive way before using this method. Use it only when you know what you are doing.
method shamefullySendError
shamefullySendError: (error: any) => void;
Forces the Stream to emit the given error to its listeners.
As the name indicates, if you use this, you are most likely doing something The Wrong Way. Please try to understand the reactive way before using this method. Use it only when you know what you are doing.
Parameter error
The error you want to broadcast to all the listeners of this Stream.
method shamefullySendNext
shamefullySendNext: (value: T) => void;
Forces the Stream to emit the given value to its listeners.
As the name indicates, if you use this, you are most likely doing something The Wrong Way. Please try to understand the reactive way before using this method. Use it only when you know what you are doing.
Parameter value
The "next" value you want to broadcast to all listeners of this Stream.
method startWith
startWith: (initial: T) => MemoryStream<T>;
Prepends the given
initial
value to the sequence of events emitted by the input stream. The returned stream is a MemoryStream, which means it is alreadyremember()
'd.Marble diagram:
---1---2-----3---startWith(0)0--1---2-----3---Parameter initial
The value or event to prepend. {MemoryStream}
method subscribe
subscribe: (listener: Partial<Listener<T>>) => Subscription;
Adds a Listener to the Stream returning a Subscription to remove that listener.
Parameter listener
Returns
{Subscription}
method take
take: (amount: number) => Stream<T>;
Lets the first
amount
many events from the input stream pass to the output stream, then makes the output stream complete.Marble diagram:
--a---b--c----d---e--take(3)--a---b--c|Parameter amount
How many events to allow from the input stream before completing the output stream. {Stream}
method throw
static throw: (error: any) => Stream<any>;
Creates a Stream that immediately emits an "error" notification with the value you passed as the
error
argument when the stream starts, and that's it.Marble diagram:
throw(X)-Xtrue
Parameter error
The error event to emit on the created stream. {Stream}
Interfaces
interface Aggregator
interface Aggregator<T, U> extends InternalProducer<U>, OutSender<U> {}
interface CombineSignature
interface CombineSignature {}
call signature
(): Stream<Array<any>>;
call signature
<T1>(s1: Stream<T1>): Stream<[T1]>;
call signature
<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>( s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>, s4: Stream<T4>, s5: Stream<T5>, s6: Stream<T6>, s7: Stream<T7>, s8: Stream<T8>, s9: Stream<T9>, s10: Stream<T10>): Stream<[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10]>;
call signature
<T>(...stream: Array<Stream<T>>): Stream<Array<T>>;
call signature
(...stream: Array<Stream<any>>): Stream<Array<any>>;
call signature
<T1, T2>(s1: Stream<T1>, s2: Stream<T2>): Stream<[T1, T2]>;
call signature
<T1, T2, T3>(s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>): Stream< [T1, T2, T3]>;
call signature
<T1, T2, T3, T4>( s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>, s4: Stream<T4>): Stream<[T1, T2, T3, T4]>;
call signature
<T1, T2, T3, T4, T5>( s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>, s4: Stream<T4>, s5: Stream<T5>): Stream<[T1, T2, T3, T4, T5]>;
call signature
<T1, T2, T3, T4, T5, T6>( s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>, s4: Stream<T4>, s5: Stream<T5>, s6: Stream<T6>): Stream<[T1, T2, T3, T4, T5, T6]>;
call signature
<T1, T2, T3, T4, T5, T6, T7>( s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>, s4: Stream<T4>, s5: Stream<T5>, s6: Stream<T6>, s7: Stream<T7>): Stream<[T1, T2, T3, T4, T5, T6, T7]>;
call signature
<T1, T2, T3, T4, T5, T6, T7, T8>( s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>, s4: Stream<T4>, s5: Stream<T5>, s6: Stream<T6>, s7: Stream<T7>, s8: Stream<T8>): Stream<[T1, T2, T3, T4, T5, T6, T7, T8]>;
call signature
<T1, T2, T3, T4, T5, T6, T7, T8, T9>( s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>, s4: Stream<T4>, s5: Stream<T5>, s6: Stream<T6>, s7: Stream<T7>, s8: Stream<T8>, s9: Stream<T9>): Stream<[T1, T2, T3, T4, T5, T6, T7, T8, T9]>;
interface InternalListener
interface InternalListener<T> {}
interface InternalProducer
interface InternalProducer<T> {}
interface Listener
interface Listener<T> {}
interface MergeSignature
interface MergeSignature {}
call signature
(): Stream<any>;
call signature
<T1>(s1: Stream<T1>): Stream<T1>;
call signature
<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>( s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>, s4: Stream<T4>, s5: Stream<T5>, s6: Stream<T6>, s7: Stream<T7>, s8: Stream<T8>, s9: Stream<T9>, s10: Stream<T10>): Stream<T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10>;
call signature
<T>(...stream: Array<Stream<T>>): Stream<T>;
call signature
<T1, T2>(s1: Stream<T1>, s2: Stream<T2>): Stream<T1 | T2>;
call signature
<T1, T2, T3>(s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>): Stream< T1 | T2 | T3>;
call signature
<T1, T2, T3, T4>( s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>, s4: Stream<T4>): Stream<T1 | T2 | T3 | T4>;
call signature
<T1, T2, T3, T4, T5>( s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>, s4: Stream<T4>, s5: Stream<T5>): Stream<T1 | T2 | T3 | T4 | T5>;
call signature
<T1, T2, T3, T4, T5, T6>( s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>, s4: Stream<T4>, s5: Stream<T5>, s6: Stream<T6>): Stream<T1 | T2 | T3 | T4 | T5 | T6>;
call signature
<T1, T2, T3, T4, T5, T6, T7>( s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>, s4: Stream<T4>, s5: Stream<T5>, s6: Stream<T6>, s7: Stream<T7>): Stream<T1 | T2 | T3 | T4 | T5 | T6 | T7>;
call signature
<T1, T2, T3, T4, T5, T6, T7, T8>( s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>, s4: Stream<T4>, s5: Stream<T5>, s6: Stream<T6>, s7: Stream<T7>, s8: Stream<T8>): Stream<T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8>;
call signature
<T1, T2, T3, T4, T5, T6, T7, T8, T9>( s1: Stream<T1>, s2: Stream<T2>, s3: Stream<T3>, s4: Stream<T4>, s5: Stream<T5>, s6: Stream<T6>, s7: Stream<T7>, s8: Stream<T8>, s9: Stream<T9>): Stream<T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9>;
interface Observable
interface Observable<T> {}
method subscribe
subscribe: (listener: Listener<T>) => Subscription;
interface Operator
interface Operator<T, R> extends InternalProducer<R>, InternalListener<T>, OutSender<R> {}
interface Producer
interface Producer<T> {}
interface Subscription
interface Subscription {}
method unsubscribe
unsubscribe: () => void;
Type Aliases
type xs
type xs<T> = Stream<T>;
Package Files (1)
Dependencies (2)
Dev Dependencies (24)
Peer Dependencies (0)
No peer dependencies.
Badge
To add a badge like this oneto your package's README, use the codes available below.
You may also use Shields.io to create a custom badge linking to https://www.jsdocs.io/package/xstream
.
- Markdown[![jsDocs.io](https://img.shields.io/badge/jsDocs.io-reference-blue)](https://www.jsdocs.io/package/xstream)
- HTML<a href="https://www.jsdocs.io/package/xstream"><img src="https://img.shields.io/badge/jsDocs.io-reference-blue" alt="jsDocs.io"></a>
- Updated .
Package analyzed in 1943 ms. - Missing or incorrect documentation? Open an issue for this package.