Important: This documentation covers Yarn 1 (Classic).
For Yarn 2+ docs and migration guide, see yarnpkg.com.

Package detail

amqp-connection-manager

jwalton1.9mMIT4.1.14TypeScript support: included

Auto-reconnect and round robin support for amqplib.

amqp, rabbitmq, cluster, amqplib

readme

amqp-connection-manager

NPM version Build Status semantic-release

Connection management for amqplib. This is a wrapper around amqplib which provides automatic reconnects.

Features

  • Automatically reconnect when your amqplib broker dies in a fire.
  • Round-robin connections between multiple brokers in a cluster.
  • If messages are sent while the broker is unavailable, queues messages in memory until we reconnect.
  • Supports both promises and callbacks (using promise-breaker)
  • Very un-opinionated library - a thin wrapper around amqplib.

Installation

npm install --save amqplib amqp-connection-manager

Basics

The basic idea here is that, usually, when you create a new channel, you do some setup work at the beginning (like asserting that various queues or exchanges exist, or binding to queues), and then you send and receive messages and you never touch that stuff again.

amqp-connection-manager will reconnect to a new broker whenever the broker it is currently connected to dies. When you ask amqp-connection-manager for a channel, you specify one or more setup functions to run; the setup functions will be run every time amqp-connection-manager reconnects, to make sure your channel and broker are in a sane state.

Before we get into an example, note this example is written using Promises, however much like amqplib, any function which returns a Promise will also accept a callback as an optional parameter.

Here's the example:

var amqp = require('amqp-connection-manager');

// Create a new connection manager
var connection = amqp.connect(['amqp://localhost']);

// Ask the connection manager for a ChannelWrapper.  Specify a setup function to
// run every time we reconnect to the broker.
var channelWrapper = connection.createChannel({
  json: true,
  setup: function (channel) {
    // `channel` here is a regular amqplib `ConfirmChannel`.
    // Note that `this` here is the channelWrapper instance.
    return channel.assertQueue('rxQueueName', { durable: true });
  },
});

// Send some messages to the queue.  If we're not currently connected, these will be queued up in memory
// until we connect.  Note that `sendToQueue()` and `publish()` return a Promise which is fulfilled or rejected
// when the message is actually sent (or not sent.)
channelWrapper
  .sendToQueue('rxQueueName', { hello: 'world' })
  .then(function () {
    return console.log('Message was sent!  Hooray!');
  })
  .catch(function (err) {
    return console.log('Message was rejected...  Boo!');
  });

Sometimes it's handy to modify a channel at run time. For example, suppose you have a channel that's listening to one kind of message, and you decide you now also want to listen to some other kind of message. This can be done by adding a new setup function to an existing ChannelWrapper:

channelWrapper.addSetup(function (channel) {
  return Promise.all([
    channel.assertQueue('my-queue', { exclusive: true, autoDelete: true }),
    channel.bindQueue('my-queue', 'my-exchange', 'create'),
    channel.consume('my-queue', handleMessage),
  ]);
});

addSetup() returns a Promise which resolves when the setup function is finished (or immediately, if the underlying connection is not currently connected to a broker.) There is also a removeSetup(setup, teardown) which will run teardown(channel) if the channel is currently connected to a broker (and will not run teardown at all otherwise.) Note that setup and teardown must either accept a callback or return a Promise.

See a complete example in the examples folder.

API

connect(urls, options)

Creates a new AmqpConnectionManager, which will connect to one of the URLs provided in urls. If a broker is unreachable or dies, then AmqpConnectionManager will try the next available broker, round-robin.

Options:

  • options.heartbeatIntervalInSeconds - Interval to send heartbeats to broker. Defaults to 5 seconds.
  • options.reconnectTimeInSeconds - The time to wait before trying to reconnect. If not specified, defaults to heartbeatIntervalInSeconds.
  • options.findServers(callback) is a function which returns one or more servers to connect to. This should return either a single URL or an array of URLs. This is handy when you're using a service discovery mechanism. such as Consul or etcd. Instead of taking a callback, this can also return a Promise. Note that if this is supplied, then urls is ignored.
  • options.connectionOptions is passed as options to the amqplib connect method.

AmqpConnectionManager events

  • connect({connection, url}) - Emitted whenever we successfully connect to a broker.
  • connectFailed({err, url}) - Emitted whenever we attempt to connect to a broker, but fail.
  • disconnect({err}) - Emitted whenever we disconnect from a broker.
  • blocked({reason}) - Emitted whenever a connection is blocked by a broker
  • unblocked - Emitted whenever a connection is unblocked by a broker

AmqpConnectionManager#createChannel(options)

Create a new ChannelWrapper. This is a proxy for the actual channel (which may or may not exist at any moment, depending on whether or not we are currently connected.)

Options:

  • options.name - Name for this channel. Used for debugging.
  • options.setup(channel, [cb]) - A function to call whenever we reconnect to the broker (and therefore create a new underlying channel.) This function should either accept a callback, or return a Promise. See addSetup below. Note that this inside the setup function will the returned ChannelWrapper. The ChannelWrapper has a special context member you can use to store arbitrary data in.
  • options.json - if true, then ChannelWrapper assumes all messages passed to publish() and sendToQueue() are plain JSON objects. These will be encoded automatically before being sent.
  • options.confirm - if true (default), the created channel will be a ConfirmChannel
  • options.publishTimeout - a default timeout for messages published to this channel.

AmqpConnectionManager#isConnected()

Returns true if the AmqpConnectionManager is connected to a broker, false otherwise.

AmqpConnectionManager#close()

Close this AmqpConnectionManager and free all associated resources.

ChannelWrapper events

  • connect - emitted every time this channel connects or reconnects.
  • error(err, {name}) - emitted if an error occurs setting up the channel.
  • close - emitted when this channel closes via a call to close()

ChannelWrapper#addSetup(setup)

Adds a new 'setup handler'.

setup(channel, [cb]) is a function to call when a new underlying channel is created - handy for asserting exchanges and queues exists, and whatnot. The channel object here is a ConfirmChannel from amqplib. The setup function should return a Promise (or optionally take a callback) - no messages will be sent until this Promise resolves.

If there is a connection, setup() will be run immediately, and the addSetup Promise/callback won't resolve until setup is complete. Note that in this case, if the setup throws an error, no 'error' event will be emitted, since you can just handle the error here (although the setup will still be added for future reconnects, even if it throws an error.)

Setup functions should, ideally, not throw errors, but if they do then the ChannelWrapper will emit an 'error' event.

ChannelWrapper#removeSetup(setup, teardown)

Removes a setup handler. If the channel is currently connected, will call teardown(channel), passing in the underlying amqplib ConfirmChannel. teardown should either take a callback or return a Promise.

ChannelWrapper#publish and ChannelWrapper#sendToQueue

These work exactly like their counterparts in amqplib's Channel, except that they return a Promise (or accept a callback) which resolves when the message is confirmed to have been delivered to the broker. The promise rejects if either the broker refuses the message, or if close() is called on the ChannelWrapper before the message can be delivered.

Both of these functions take an additional option when passing options:

  • timeout - If specified, if a messages is not acked by the amqp broker within the specified number of milliseconds, the message will be rejected. Note that the message may still end up getting delivered after the timeout, as we have no way to cancel the in-flight request.

ChannelWrapper#ack and ChannelWrapper#nack

These are just aliases for calling ack() and nack() on the underlying channel. They do nothing if the underlying channel is not connected.

ChannelWrapper#queueLength()

Returns a count of messages currently waiting to be sent to the underlying channel.

ChannelWrapper#close()

Close a channel, clean up resources associated with it.

changelog

4.1.14 (2023-07-27)

Bug Fixes

  • added type build step (0cc9859)
  • export types in separate directory (5d6cdbf)

4.1.13 (2023-05-02)

Bug Fixes

  • types: move types condition to the front (a1eb206)

4.1.12 (2023-04-03)

Bug Fixes

4.1.11 (2023-02-24)

Bug Fixes

  • Add unbindQueue to ChannelWrapper. (55ce8d3)

4.1.10 (2022-12-31)

Bug Fixes

  • exporting ChannelWrapper as a type without it getting emitted as metadata (a6f7b5c)

4.1.9 (2022-10-24)

Bug Fixes

  • Fail immediately for a bad password on latest amqplib. (412ed92)

4.1.8 (2022-10-24)

Bug Fixes

  • error thrown when queue deleted in amqplib 0.10.0 (60700ee), closes #301

4.1.7 (2022-09-30)

Bug Fixes

  • consumer registered twice during setup (1ca216a), closes #297

4.1.6 (2022-08-11)

Bug Fixes

  • Upgrade promise-breaker to 6.0.0 to fix typescript imports. (c9aff08), closes #234

4.1.5 (2022-08-09)

Reverts

  • Revert "fix: import of promise breaker" (aaeae1e)

4.1.4 (2022-08-05)

Bug Fixes

4.1.3 (2022-05-04)

Bug Fixes

  • accept 0 for heartbeatIntervalInSeconds (208af68)

4.1.2 (2022-04-13)

Bug Fixes

  • types: Export PublishOptions type. (6d20252)

4.1.1 (2022-02-05)

Bug Fixes

  • process unable to exit after connect (8d572b1)

4.1.0 (2022-02-01)

Features

  • cancel specific consumer (5f3b2eb)

4.0.1 (2022-01-21)

Bug Fixes

  • accept type of amqplib.credentials.external() (1db3b2d)

4.0.0 (2022-01-07)

Bug Fixes

  • Emit connectFailed on connection failure. (0f05987), closes #222

Continuous Integration

  • Stop testing on node 10 and 12. (5da9cb0)

BREAKING CHANGES

  • No longer running unit tests on node 10 and 12, although this package may continue to work on these.
  • We will no longer emit a disconnect event on an initial connection failure - instead we now emit connectFailed on each connection failure, and only emit disconnect when we transition from connected to disconnected.

3.9.0 (2022-01-04)

Features

  • proxying every exchange function of amqplib (bca347c)

3.8.1 (2021-12-29)

Bug Fixes

3.8.0 (2021-12-29)

Features

3.7.0 (2021-09-21)

Bug Fixes

  • AmqpConnectionManager: IAmqpConnectionManager interface definition (dedec7e)

Features

  • add default publish timeout (6826be2)
  • expose AmqpConnectionManagerClass (835a81f)
  • timeout option for publish (dee380d)

3.6.0 (2021-08-27)

Features

  • reconnect and cancelAll consumers (fb0c00b)

3.5.2 (2021-08-26)

Bug Fixes

  • Fix handling of resending messages during a disconnect. (e1457a5), closes #152

Performance Improvements

  • Send messages to underlying channel in synchronous batches. (b866ef2)

3.5.1 (2021-08-26)

Bug Fixes

  • types: Make private things private. (8b1338b)

3.5.0 (2021-08-26)

Features

3.4.5 (2021-08-26)

Performance Improvements

  • resolve sent messages immediately (2349da2)

3.4.4 (2021-08-26)

Bug Fixes

  • types: Allow passing object to connect() in addition to strings. (516fd9f)

3.4.3 (2021-08-25)

Bug Fixes

  • types: 'options' should be optional in connect(). (4619149)
  • Fix bluebird warning. (cb2f124), closes #171

3.4.3 (2021-08-25)

Bug Fixes

3.4.2 (2021-08-25)

Bug Fixes

  • types: Minor type fixes. (6865613)

3.4.1 (2021-08-25)

Bug Fixes

  • Only send disconnect event on first error. (efde3b9), closes #145

3.4.0 (2021-08-25)

Features

  • Convert to typescript, add module exports. (5f442b1)

3.3.0 (2021-08-24)

Bug Fixes

  • emit setup errors not caused by closed channel (7c5fe10), closes #95
  • setup on channel/connection closing/closed (b21bd01)

Features

  • immediately reconnect on amqplib connect timeout (ad06108)

3.2.4 (2021-08-23)

Bug Fixes

  • connection close not awaited (8955fe7)

3.2.3 (2021-08-21)

Bug Fixes

  • fixed issue with publish ignoring 'drain' event (e195d9b), closes #129

3.2.2 (2021-02-09)

Bug Fixes

  • When messages are acked/nacked, make sure we remove the correct message from the sent messages queue. (c662026), closes #142

3.2.1 (2020-09-12)

Bug Fixes

  • Push never resolves if error occured (courtesy @SSANSH). (48a78f8)
  • package: resolve hanging retry connection timeout by introducing cancelable timeout (e37dd1a)

3.2.0 (2020-01-20)

Features

  • add bindQueue and assertExchange on ChannelWrapper (879e522)

3.1.1 (2020-01-06)

Bug Fixes

3.1.0 (2019-12-06)

Features

  • Allow using URL object to connect, same format as amqplib accepts. (f046680)

3.0.0 (2019-07-04)

Continuous Integration

  • Stop running tests for node 6 and node 8. (164b882)

BREAKING CHANGES

  • Officially drop support for node 6 and node 8 (although they will probably still work).

2.3.3 (2019-06-25)

Bug Fixes

  • package: update promise-breaker to version 5.0.0 (ed91042)

2.3.2 (2019-05-21)

Bug Fixes

  • Null delta to get semantic-release to pick up #65. Fix #84. (9737135)

2.3.1 (2019-04-01)

Bug Fixes

  • prevent too many connection attempts on error (2760ce5), closes #77

2.3.0 (2018-11-20)

Features

  • Add ChannelWrapper.ackAll() and ChannelWrapper.nackAll(). (0246695), closes #60

2.2.0 (2018-09-25)

Features

  • Set 'this' to be the channel wrapper in the setup function. (551200f)

2.1.2 (2018-09-13)

Bug Fixes

  • Export a default object from root module. (78893c9), closes #51

2.1.1 (2018-09-05)

Bug Fixes

  • Remove reconnection listener when closing the connection manager. (eeb6e2b)

2.1.0 (2018-08-09)

Features

2.0.0 (2018-05-05)

Code Refactoring

  • Rewrite all source in javascript. (377d01d)

BREAKING CHANGES

  • Officially dropping support for node v4.x.x.

1.4.0

  • Add 'blocked' and 'unblocked' events (#25).

1.3.7

  • Fix bug where we would stop sending messages if remote gracefully closes connection.

1.3.6

  • Fix bug where ChannelWrapper would expect setup function to return a Promise and not accept a callback if channel was already connected.