Skip to content

Commit

Permalink
Merge pull request #175 from luddd3/feat/reconnect
Browse files Browse the repository at this point in the history
feat: manual reconnect
  • Loading branch information
jwalton authored Aug 26, 2021
2 parents 104fc45 + 798b45f commit a53c6f2
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/AmqpConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,31 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
return !!this._currentConnection;
}

/** Force reconnect - noop unless connected */
reconnect(): void {
if (this._closed) {
throw new Error('cannot reconnect after close');
}

// If we have a connection, close it and immediately connect again.
// Wait for ordinary reconnect otherwise.
if (this._currentConnection) {
this._currentConnection.removeAllListeners();
this._currentConnection
.close()
.catch(() => {
// noop
})
.then(() => {
this._currentConnection = undefined;
this._disconnectSent = true;
this.emit('disconnect', { err: new Error('forced reconnect') });
return this._connect();
})
.catch(neverThrows);
}
}

/** The current connection. */
get connection(): Connection | undefined {
return this._currentConnection;
Expand Down
16 changes: 16 additions & 0 deletions test/AmqpConnectionManagerTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,22 @@ describe('AmqpConnectionManager', function () {
expect(amqp?.isConnected()).to.be.true;
});

it('should be able to manually reconnect', async () => {
amqp = new AmqpConnectionManager('amqp://localhost');
await once(amqp, 'connect');

amqp.reconnect();
await once(amqp, 'disconnect');
await once(amqp, 'connect');
});

it('should throw on manual reconnect after close', async () => {
amqp = new AmqpConnectionManager('amqp://localhost');
await once(amqp, 'connect');
await amqp.close()
expect(amqp.reconnect).to.throw()
})

it('should create and clean up channel wrappers', async function () {
amqp = new AmqpConnectionManager('amqp://localhost');
const channel = amqp.createChannel({ name: 'test-chan' });
Expand Down

0 comments on commit a53c6f2

Please sign in to comment.