Important: This documentation covers Yarn 1 (Classic).
For Yarn 2+ docs and migration guide, see yarnpkg.com.

Package detail

redis-x-stream

calebboyd104.1kMIT3.2.3TypeScript support: included

An async iterable interface for redis streams

queue, stream, redis, tasks, redis-queue, job-queue, jobs, redis-stream, consumer, consumer-group

readme

redis-x-stream

Create async iterables that emit redis stream entries. Requires Redis 5 or greater.

release license

test

Getting Started

import { RedisStream } from 'redis-x-stream'
import Redis from 'ioredis'

const myStream = 'my-stream'
await populate(myStream, 1e5)

let i = 0
for await (const [streamName, [id, keyvals]] of new RedisStream(myStream)) {
  i++;
}
console.log(`read ${i} stream entries from ${myStream}`)

async function populate(stream, count) {
  const writer = new Redis({ enableAutoPipelining: true })
  await Promise.all(
    Array.from(Array(count), (_, j) => writer.xadd(stream, '*', 'index', j))
  )
  writer.quit()
  await new Promise(resolve => writer.once('close', resolve))
  console.log(`wrote ${count} stream entries to ${stream}`)
}

Usage

See the API Docs for available options.

Advanced Usage

Task Processing

If you have a cluster of processes reading redis stream entries you likely want to utilize redis consumer groups

A task processing application may look like the following:

const control = {
  /* some control event emitter */
}
const stream = new RedisStream({
  streams: ['my-stream'],
  group: 'my-group',
  //eg. k8s StatefulSet hostname. or Cloud Foundry instance index
  consumer: 'tpc_' + process.env.SOME_ORDINAL_IDENTIFIER,
  block: Infinity,
  count: 10,
  deleteOnAck: true,
})
const lock = new Semaphore(11)
const release = lock.release.bind(lock)

control.on('new-source', (streamName) => {
  //Add an additional source stream to a blocked stream.
  stream.addStream(streamName)
})
control.on('shutdown', async () => {
  //drain will process all claimed entries (the PEL) and stop iteration
  await stream.drain()
})

async function tryTask(stream, streamName, id, entry) {
  //...process entry...
  stream.ack(streamName, id)
}

for await (const [streamName, [id, keyvals]] of stream) {
  await lock.acquire()
  void tryTask(stream, streamName, id, keyvals).finally(release)
}

changelog

Changelog

All notable changes to this project will be documented in this file. See Conventional Commits for commit guidelines.

3.2.3 (2023-01-09)

Bug Fixes

  • broader ioredis version range (cc2a18c)

3.2.2 (2023-01-09)

Bug Fixes

  • make ioredis a peer dep (a0d2ed2)

3.2.1 (2023-01-09)

Bug Fixes

  • remove node module uri prefix (4c809d2)

3.2.0 (2023-01-06)

Features

  • add flush for consuming stream outside of blocked mode (a89c7f2)

3.1.5 (2023-01-06)

Bug Fixes

  • handle ending acks on quit (20cacbc)

3.1.4 (2023-01-06)

Bug Fixes

  • remove node condition from exports (e9eea3b)

3.1.3 (2023-01-06)

Bug Fixes

3.1.2 (2022-11-29)

Bug Fixes

  • handle added stream group creation (0fac64a)

3.1.1 (2022-11-26)

Bug Fixes

  • docs: correct badge links (92326e5)

3.1.0 (2022-11-26)

Features

  • support addStream and drain during iteration (e9d3c18)

3.0.0 (2022-11-20)

Features

  • add control client for adjusting blocking behavior (fa88e5d)

BREAKING CHANGES

  • only permitted options are accepted

2.1.0 (2022-11-19)

Features

2.0.3 (2022-11-15)

Bug Fixes

2.0.2 (2022-11-12)

Bug Fixes

  • deps: remove ioredis 4 types and update package lock (ae693dc)

2.0.1 (2022-10-30)

Bug Fixes

2.0.0 (2022-10-30)

Features

BREAKING CHANGES

  • update ioredis to 5x, and drop node < 14

1.4.1 (2022-10-30)

Bug Fixes

  • revert "feat: update ioredis to 5x" (9fae44c)

1.4.0 (2022-10-30)

Features

1.3.1 (2022-10-30)

Bug Fixes

1.3.0 (2022-10-30)

Features

1.2.1 (2021-02-15)

Bug Fixes

  • more flexible option types (b9f7e8e)

1.2.0 (2021-02-11)

Features

  • stream factory default export (08864fe)

1.1.1 (2021-02-08)

Bug Fixes

  • cursor + block for xreadgroup (28f91b8)

1.1.0 (2021-01-03)

Features

1.0.0 (2021-01-03)

Bug Fixes

Features