Skip to content

Commit

Permalink
Put initial acquisition in pending connection state
Browse files Browse the repository at this point in the history
  • Loading branch information
pfumagalli committed Sep 27, 2024
1 parent 91ba5e4 commit bc8e444
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 19 deletions.
20 changes: 9 additions & 11 deletions workspaces/pool/src/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -553,14 +553,7 @@ export class ConnectionPool extends Emitter<ConnectionPoolEvents> {

/** Acquire a {@link Connection} from this {@link ConnectionPool} */
acquire(): Promise<Connection> {
/* Defer this promise when the connection pool is still starting */
if (this._starting) {
return new Promise<Connection>((resolve, reject) => {
this.once('started', () => process.nextTick(() => this.acquire().then(resolve)))
this.once('error', (error) => reject(error))
})
}
assert(this._started, 'Connection pool not started')
assert(this._started || this._starting, 'Connection pool not started')

/* Add a new entry to our pending connection requests and run the loop */
const deferred = new ConnectionRequest(this._acquireTimeoutMs)
Expand Down Expand Up @@ -651,10 +644,15 @@ export class ConnectionPool extends Emitter<ConnectionPoolEvents> {
this._evict(connection)
}

/* Run our create loop to create all needed (minimum) connections */
this._runCreateLoop()
/* If we have pending connections (acquire while starting) then run our
* boorrow loop to fulfill all request (this in turn will call our create
* loop to fill up the pool), otherwise simply run the create loop to
* create all needed (minimum) connections */
if (this._pending.length) this._runBorrowLoop()
else this._runCreateLoop()
return this
} catch (error) {
} catch (error: any) {
for (const pending of this._pending) pending.reject(error)
this._emit('error', error)
throw error
} finally {
Expand Down
27 changes: 19 additions & 8 deletions workspaces/pool/test/11-pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ describe('Connection Pool', () => {
database: databaseName,
minimumPoolSize: 0,
maximumPoolSize: 10,
maximumIdleConnections: 0,
maximumIdleConnections: 10,
})

const events = captureEvents(pool)
Expand Down Expand Up @@ -275,18 +275,29 @@ describe('Connection Pool', () => {
pool.stop()
}

expect(events()).toMatchContents([
// The order here can be a bit messy, so we check manually...
expect(events().slice(0, 2)).toEqual([
[ 'started' ],
[ 'connection_created', expect.toBeA('string') ],
[ 'connection_destroyed', expect.toBeA('string') ],
[ 'connection_created', id1 ],
[ 'connection_created', id2 ],
]) // first is start and initial connection creation

expect(events().slice(2, 5)).toMatchContents([
[ 'connection_acquired', id1 ],
[ 'connection_created', id2 ],
[ 'connection_acquired', id2 ],
]) // then is acquire conn 1 and create + acquire conn 2

expect(events().slice(5, 7)).toMatchContents([
[ 'connection_released', id1 ],
[ 'connection_released', id2 ],
]) // then connections get released (order might vary)

expect(events()[7]).toEqual([ 'stopped' ]) // then stop

expect(events().slice(8)).toMatchContents([
[ 'connection_destroyed', id1 ],
[ 'connection_destroyed', id2 ],
[ 'stopped' ],
])
]) // finally destroy both connections
})

it('should not start in case the first connection fails', async () => {
Expand Down Expand Up @@ -335,7 +346,7 @@ describe('Connection Pool', () => {
await expect(promise1).toBeRejectedWith(error)
await expect(promise2).toBeRejectedWith(error)

expect(events()).toMatchContents([
expect(events()).toEqual([
[ 'error', error ],
])
})
Expand Down

0 comments on commit bc8e444

Please sign in to comment.