Important: This documentation covers Yarn 1 (Classic).
For Yarn 2+ docs and migration guide, see yarnpkg.com.

Package detail

@n1ru4l/push-pull-async-iterable-iterator

n1ru4l2mMIT3.2.0TypeScript support: included

readme

@n1ru4l/push-pull-async-iterable-iterator

TypeScript npm version npm downloads Dependents Build Status

Create an AsyncIterableIterator from anything (on any modern platform) while handling back-pressure!

yarn install -E @n1ru4l/push-pull-async-iterable-iterator

Standalone Usage

import { makePushPullAsyncIterableIterator } from "@n1ru4l/push-pull-async-iterable-iterator";

const {
  pushValue,
  asyncIterableIterator
} = makePushPullAsyncIterableIterator();
pushValue(1);
pushValue(2);
pushValue(3);

// prints 1, 2, 3
for await (const value of asyncIterableIterator) {
  console.log(value);
}

Check if something is an AsyncIterable

import { isAsyncIterable } from "@n1ru4l/push-pull-async-iterable-iterator";

if (isAsyncIterable(something)) {
  for await (const value of something) {
    console.log(value);
  }
}

Note: On Safari iOS Symbol.asyncIterator is not available, therefore all async iterators used must be build using AsyncGenerators. If a AsyncIterable that is NO AsyncGenerator is passed to isAsyncIterable on the Safari iOS environment, it will return the value false.

Wrap a Sink

import { makeAsyncIterableIteratorFromSink } from "@n1ru4l/push-pull-async-iterable-iterator";
// let's use some GraphQL client :)
import { createClient } from "graphql-ws/lib/use/ws";

const client = createClient({
  url: "ws://localhost:3000/graphql"
});

const asyncIterableIterator = makeAsyncIterableIteratorFromSink(sink => {
  const dispose = client.subscribe(
    {
      query: "{ hello }"
    },
    {
      next: sink.next,
      error: sink.error,
      complete: sink.complete
    }
  );
  return () => dispose();
});

for await (const value of asyncIterableIterator) {
  console.log(value);
}

Apply an AsyncIterableIterator to a sink

import Observable from "zen-observable";
import {
  makePushPullAsyncIterableIterator,
  applyAsyncIterableIteratorToSink
} from "@n1ru4l/push-pull-async-iterable-iterator";

const { asyncIterableIterator } = makePushPullAsyncIterableIterator();

const observable = new Observable(sink => {
  const dispose = applyAsyncIterableIteratorToSink(asyncIterableIterator, sink);
  // dispose will be called when the observable subscription got destroyed
  // the dispose call will ensure that the async iterator is completed.
  return () => dispose();
});

const subscription = observable.subscribe({
  next: console.log,
  complete: () => console.log("done."),
  error: () => console.log("error.")
});

const interval = setInterval(() => {
  iterator.push("hi");
}, 1000);

setTimeout(() => {
  subscription.unsubscribe();
  clearInterval(interval);
}, 5000);

Put it all together

import { Observable, RequestParameters, Variables } from "relay-runtime";
import { createClient } from "graphql-ws/lib/use/ws";
import {
  makeAsyncIterableFromSink,
  applyAsyncIterableIteratorToSink
} from "@n1ru4l/push-pull-async-iterable-iterator";
import { createApplyLiveQueryPatch } from "@n1ru4l/graphql-live-query-patch";

const client = createClient({
  url: "ws://localhost:3000/graphql"
});

export const execute = (request: RequestParameters, variables: Variables) => {
  if (!request.text) {
    throw new Error("Missing document.");
  }
  const query = request.text;

  return Observable.create<GraphQLResponse>(sink => {
    // Create our asyncIterator from a Sink
    const executionResultIterator = makeAsyncIterableFromSink(wsSink => {
      const dispose = client.subscribe({ query }, wsSink);
      return () => dispose();
    });

    const applyLiveQueryPatch = createApplyLiveQueryPatch();

    // apply some middleware to our asyncIterator
    const compositeIterator = applyLiveQueryPatch(executionResultIterator);

    // Apply our async iterable to the relay sink
    // unfortunately relay cannot consume an async iterable right now.
    const dispose = applyAsyncIterableIteratorToSink(compositeIterator, sink);
    // dispose will be called by relay when the observable is disposed
    // the dispose call will ensure that the async iterator is completed.
    return () => dispose();
  });
};

Operators

This package also ships a few utilities that make your life easier!

map

Map a source

import { map } from "@n1ru4l/push-pull-async-iterable-iterator";

async function* source() {
  yield 1;
  yield 2;
  yield 3;
}

const square = map((value: number): number => value * value);

for await (const value of square(source())) {
  console.log(value);
}
// logs 1, 4, 9

filter

Filter a source

import { filter } from "@n1ru4l/push-pull-async-iterable-iterator";

async function* source() {
  yield 1;
  yield 2;
  yield 3;
}

const biggerThan1 = filter((value: number): number => value > 1);

for await (const value of biggerThan1(source())) {
  console.log(value);
}
// logs 2, 3

Other helpers

withHandlers

Attach a return and throw handler to a source.

import { withReturn } from "@n1ru4l/push-pull-async-iterable-iterator";

async function* source() {
  yield 1;
  yield 2;
  yield 3;
}

const sourceInstance = source();

const newSourceWithHandlers = withHandlers(
  sourceInstance,
  () => sourceInstance.return(),
  err => sourceInstance.throw(err)
);

for await (const value of stream) {
  // ...
}

changelog

@n1ru4l/push-pull-async-iterable-iterator

3.2.0

Minor Changes

  • 2d1d87d: Add operators filter and map
  • 2d1d87d: Add helpers withHandlers and withHandlersFrom

3.1.0

Minor Changes

  • c1d143c: Change usage of type AsyncIterableIterator to AsyncGenerator.

    This library and other libraries such as graphql-js typed what should be AsyncGenerator as AsyncIterableIterator.

    The main difference between those two types is that on the former the return method is not optional. This resulted in confusion when using TypeScript as the return method is actually always present.

    Here are the TypeScript type definitions for comparison.

    interface AsyncGenerator<T = unknown, TReturn = any, TNext = unknown>
      extends AsyncIterator<T, TReturn, TNext> {
      // NOTE: 'next' is defined using a tuple to ensure we report the correct assignability errors in all places.
      next(...args: [] | [TNext]): Promise<IteratorResult<T, TReturn>>;
      return(
        value: TReturn | PromiseLike<TReturn>
      ): Promise<IteratorResult<T, TReturn>>;
      throw(e: any): Promise<IteratorResult<T, TReturn>>;
      [Symbol.asyncIterator](): AsyncGenerator<T, TReturn, TNext>;
    }
    interface AsyncIterator<T, TReturn = any, TNext = undefined> {
      // NOTE: 'next' is defined using a tuple to ensure we report the correct assignability errors in all places.
      next(...args: [] | [TNext]): Promise<IteratorResult<T, TReturn>>;
      return?(
        value?: TReturn | PromiseLike<TReturn>
      ): Promise<IteratorResult<T, TReturn>>;
      throw?(e?: any): Promise<IteratorResult<T, TReturn>>;
    }
    
    interface AsyncIterableIterator<T> extends AsyncIterator<T> {
      [Symbol.asyncIterator](): AsyncIterableIterator<T>;
    }

    Unfortunately, the name of this library is now a bit misleading. @n1ru4l/push-pull-async-generator might be the be the better pick. For now I will not deprecate and rename it.

3.0.0

Major Changes

  • 21a2470: drop support for Node 12; support ESM; use bob-the-bundler for bundling instead of tsdx