From a4e7808b7c29a9ecda53e50c80e01029437eb633 Mon Sep 17 00:00:00 2001 From: Chris Dickinson Date: Thu, 28 Sep 2017 21:36:53 -0700 Subject: [PATCH 1/4] upgrade pg --- package.json | 2 +- test/integrate-pg-pool-sequence-test.js | 5 +++-- test/integrate-pummel-leak-test.js | 6 +++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/package.json b/package.json index 3322cae..3da0209 100644 --- a/package.json +++ b/package.json @@ -26,7 +26,7 @@ }, "homepage": "https://github.com/npm/pg-db-session#readme", "devDependencies": { - "pg": "^6.1.5", + "pg": "^7.1.2", "standard": "^10.0.2", "tap": "^10.3.2" }, diff --git a/test/integrate-pg-pool-sequence-test.js b/test/integrate-pg-pool-sequence-test.js index e188c99..364e899 100644 --- a/test/integrate-pg-pool-sequence-test.js +++ b/test/integrate-pg-pool-sequence-test.js @@ -27,6 +27,7 @@ test('setup', assert => setup().then(assert.end)) test('pg pooling does not adversely affect operation', assert => { const domain1 = domain.create() const domain2 = domain.create() + const pool = new pg.Pool(`postgres://localhost/${TEST_DB_NAME}`) db.install(domain1, getConnection, {maxConcurrency: 0}) db.install(domain2, getConnection, {maxConcurrency: 0}) @@ -46,12 +47,12 @@ test('pg pooling does not adversely affect operation', assert => { return runTwo .catch(assert.fail) - .finally(() => pg.end()) + .finally(() => pool.end()) .finally(assert.end) function getConnection () { return new Promise((resolve, reject) => { - pg.connect(`postgres://localhost/${TEST_DB_NAME}`, onconn) + pool.connect(onconn) function onconn (err, connection, release) { err ? reject(err) : resolve({connection, release}) diff --git a/test/integrate-pummel-leak-test.js b/test/integrate-pummel-leak-test.js index 1592d37..f746e01 100644 --- a/test/integrate-pummel-leak-test.js +++ b/test/integrate-pummel-leak-test.js @@ -69,7 +69,7 @@ function runChild () { const ITERATIONS = 70000 var count = 0 var pending = 20 - + const pool = new pg.Pool(`postgres://localhost/${TEST_DB_NAME}`) var done = null const doRun = new Promise((resolve, reject) => { done = resolve @@ -90,7 +90,7 @@ function runChild () { } return doRun - .finally(() => pg.end()) + .finally(() => pool.end()) function run () { const domain1 = domain.create() @@ -122,7 +122,7 @@ function runChild () { function getConnection () { return new Promise((resolve, reject) => { - pg.connect(`postgres://localhost/${TEST_DB_NAME}`, onconn) + pool.connect(onconn) function onconn (err, connection, release) { err ? reject(err) : resolve({connection, release}) From e770e35786ac5b397071952fbd552cb0925d26af Mon Sep 17 00:00:00 2001 From: Chris Dickinson Date: Thu, 17 Jan 2019 17:54:32 -0800 Subject: [PATCH 2/4] feat: pluggable CLS BREAKING CHANGE This changes the core API so that the CLS provider can be plugged in. --- db-session.js | 178 +++++++++------------ package.json | 2 +- test/basic-api-errors-test.js | 38 +++-- test/basic-atomic-concurrency-test.js | 14 +- test/basic-atomic-error-test.js | 83 +++++----- test/basic-atomic-from-session-test.js | 1 + test/basic-interleaved-test.js | 5 +- test/basic-rollback-test.js | 33 ++-- test/basic-session-concurrency-test.js | 15 +- test/basic-session-error-test.js | 18 ++- test/basic-transaction-concurrency-test.js | 1 + test/basic-transaction-error-test.js | 75 ++++----- test/integrate-pg-pool-sequence-test.js | 30 ++-- test/integrate-pummel-leak-test.js | 27 ++-- test/integrate-server-test.js | 12 +- test/setup.js | 15 ++ 16 files changed, 291 insertions(+), 256 deletions(-) create mode 100644 test/setup.js diff --git a/db-session.js b/db-session.js index 3e93c7c..a25ecd8 100644 --- a/db-session.js +++ b/db-session.js @@ -1,11 +1,9 @@ 'use strict' -const DOMAIN_TO_SESSION = new WeakMap() -const Promise = require('bluebird') +const CONTEXT_TO_SESSION = new WeakMap() const TxSessionConnectionPair = require('./lib/tx-session-connpair.js') const SessionConnectionPair = require('./lib/session-connpair.js') -const domain = require('./lib/domain') class NoSessionAvailable extends Error { constructor () { @@ -17,8 +15,14 @@ class NoSessionAvailable extends Error { function noop () { } +let getContext = null + const api = module.exports = { - install (domain, getConnection, opts) { + setup (_getContext) { + getContext = _getContext + }, + + install (getConnection, opts) { opts = Object.assign({ maxConcurrency: Infinity, onSubsessionStart: noop, @@ -37,27 +41,22 @@ const api = module.exports = { onAtomicStart: noop, onAtomicFinish: noop }, opts || {}) - DOMAIN_TO_SESSION.set(domain, new Session( + + CONTEXT_TO_SESSION.set(getContext(), new Session( getConnection, opts )) }, atomic (operation) { - return function atomic$operation () { - return Promise.try(() => { - const args = [].slice.call(arguments) - return api.session.atomic(operation.bind(this), args) - }) + return async function atomic$operation (...args) { + return api.session.atomic(operation.bind(this), args) } }, transaction (operation) { - return function transaction$operation () { - return Promise.try(() => { - const args = [].slice.call(arguments) - return api.session.transaction(operation.bind(this), args) - }) + return async function transaction$operation (...args) { + return api.session.transaction(operation.bind(this), args) } }, @@ -66,8 +65,9 @@ const api = module.exports = { }, get session () { - var current = DOMAIN_TO_SESSION.get(process.domain) - if (!current || current.inactive || !process.domain) { + const context = getContext() + var current = CONTEXT_TO_SESSION.get(context) + if (!current || current.inactive || !context) { throw new NoSessionAvailable() } return current @@ -127,7 +127,7 @@ class Session { }) } - transaction (operation, args) { + async transaction (operation, args) { const baton = {} const getConnPair = this.getConnection() this.metrics.onTransactionRequest(baton, operation, args) @@ -139,21 +139,24 @@ class Session { failure: `ROLLBACK` }, operation, args) - const releasePair = getConnPair.then(pair => { - return getResult.reflect().then(result => { - this.metrics.onTransactionFinish(baton, operation, args, result) - return result.isFulfilled() - ? pair.release() - : pair.release(result.reason()) - }) - }) + const pair = await getConnPair + try { + const result = await getResult + pair.release() + + return result + } catch (err) { + pair.release(err) - return releasePair.return(getResult) + throw err + } finally { + this.metrics.onTransactionFinish(baton, operation, args) + } } atomic (operation, args) { return this.transaction(() => { - return DOMAIN_TO_SESSION.get(process.domain).atomic(operation, args) + return CONTEXT_TO_SESSION.get(getContext()).atomic(operation, args) }, args.slice()) } @@ -195,7 +198,7 @@ class TransactionSession { return operation.apply(null, args) } - atomic (operation, args) { + async atomic (operation, args) { const baton = {} const atomicConnPair = this.getConnection() const savepointName = getSavepointName(operation) @@ -208,21 +211,25 @@ class TransactionSession { failure: `ROLLBACK TO SAVEPOINT ${savepointName}` }, operation, args) - const releasePair = atomicConnPair.then(pair => { - return getResult.reflect().then(result => { - this.metrics.onAtomicFinish(baton, operation, args, result) - return result.isFulfilled() - ? pair.release() - : pair.release(result.reason()) - }) - }) - return releasePair.return(getResult) + const pair = await atomicConnPair + try { + const result = await getResult + pair.release() + + return result + } catch (err) { + pair.release(err) + + throw err + } finally { + this.metrics.onAtomicFinish(baton, operation, args) + } } // NB: for use in tests _only_!) - assign (domain) { - DOMAIN_TO_SESSION.set(domain, this) + assign (context) { + CONTEXT_TO_SESSION.set(context, this) } } @@ -233,61 +240,34 @@ class AtomicSession extends TransactionSession { } } -function Session$RunWrapped (parent, - createSession, - getConnPair, - before, - after, - operation, - args) { - return getConnPair.then(pair => { - const subdomain = domain.create() - const session = createSession(pair) - parent.metrics.onSubsessionStart(parent, session) - DOMAIN_TO_SESSION.set(subdomain, session) - - const runBefore = new Promise((resolve, reject) => { - return pair.connection.query( - before, - err => err ? reject(err) : resolve() - ) - }) - - return runBefore.then(() => { - const getResult = Promise.resolve( - subdomain.run(() => Promise.try(() => { - return operation.apply(null, args) - })) - ) - - const reflectedResult = getResult.reflect() - - const waitOperation = Promise.join( - reflectedResult, - reflectedResult.then(() => session.operation) - ) - .finally(markInactive(subdomain)) - .return(reflectedResult) - - const runCommitStep = waitOperation.then(result => { - return new Promise((resolve, reject) => { - return pair.connection.query( - result.isFulfilled() - ? after.success - : after.failure, - err => err ? reject(err) : resolve() - ) - }) - }).then( - () => parent.metrics.onSubsessionFinish(parent, session), - err => { - parent.metrics.onSubsessionFinish(parent, session) - throw err - } - ) - return runCommitStep.return(getResult) - }) - }) +async function Session$RunWrapped (parent, + createSession, + getConnPair, + before, + after, + operation, + args) { + const pair = await getConnPair + const subcontext = getContext().nest() + const session = createSession(pair) + parent.metrics.onSubsessionStart(parent, session) + CONTEXT_TO_SESSION.set(subcontext, session) + + await pair.connection.query(before) + subcontext.claim() + try { + var result = await operation(...args) + await pair.connection.query(after.success) + return result + } catch (err) { + await pair.connection.query(after.failure) + throw err + } finally { + subcontext.end() + session.inactive = true + CONTEXT_TO_SESSION.set(subcontext, null) + parent.metrics.onSubsessionFinish(parent, session) + } } function getSavepointName (operation) { @@ -299,14 +279,6 @@ function getSavepointName (operation) { } getSavepointName.ID = 0 -function markInactive (subdomain) { - return () => { - subdomain.exit() - DOMAIN_TO_SESSION.get(subdomain).inactive = true - DOMAIN_TO_SESSION.set(subdomain, null) - } -} - function _defer () { const pending = { resolve: null, diff --git a/package.json b/package.json index 3da0209..9b6b540 100644 --- a/package.json +++ b/package.json @@ -28,7 +28,7 @@ "devDependencies": { "pg": "^7.1.2", "standard": "^10.0.2", - "tap": "^10.3.2" + "tap": "^12.1.1" }, "dependencies": { "bluebird": "^3.5.0" diff --git a/test/basic-api-errors-test.js b/test/basic-api-errors-test.js index 7d71af5..70db3ce 100644 --- a/test/basic-api-errors-test.js +++ b/test/basic-api-errors-test.js @@ -2,6 +2,7 @@ const test = require('tap').test +require('./setup') const domain = require('../lib/domain.js') const db = require('../db-session.js') @@ -19,16 +20,20 @@ test('test atomic outside of session', assert => { const testAtomic = db.atomic(function testAtomic () { }) - testAtomic() + return testAtomic() .then(() => { throw new Error('expected error') }) - .catch(db.NoSessionAvailable, () => assert.end()) - .catch(err => assert.end(err)) + .catch(err => { + assert.type(err, db.NoSessionAvailable) + }) }) test('test getConnection after release', assert => { const domain1 = domain.create() - db.install(domain1, getConnection, {maxConcurrency: 0}) + db.setup(() => process.domain) + domain1.run(() => { + db.install(getConnection, {maxConcurrency: 0}) + }) domain1.run(() => { return db.transaction(() => { @@ -36,8 +41,9 @@ test('test getConnection after release', assert => { setImmediate(() => { session.getConnection() .then(pair => { throw new Error('should not reach here') }) - .catch(db.NoSessionAvailable, () => assert.ok(1, 'caught err')) - .catch(err => assert.fail(err)) + .catch(err => { + assert.type(err, db.NoSessionAvailable) + }) .finally(assert.end) }) })() @@ -47,8 +53,8 @@ test('test getConnection after release', assert => { function getConnection () { return { - connection: {query (sql, ready) { - return ready() + connection: {async query (sql) { + return }}, release () { } @@ -59,7 +65,9 @@ test('test getConnection after release', assert => { test('test transaction after release', assert => { const domain1 = domain.create() - db.install(domain1, getConnection, {maxConcurrency: 0}) + domain1.run(() => { + db.install(getConnection, {maxConcurrency: 0}) + }) domain1.run(() => { return db.transaction(() => { @@ -67,8 +75,9 @@ test('test transaction after release', assert => { setImmediate(() => { session.transaction(() => {}) .then(pair => { throw new Error('should not reach here') }) - .catch(db.NoSessionAvailable, () => assert.ok(1, 'caught err')) - .catch(err => assert.fail(err)) + .catch(err => { + assert.type(err, db.NoSessionAvailable) + }) .finally(assert.end) }) })() @@ -78,9 +87,10 @@ test('test transaction after release', assert => { function getConnection () { return { - connection: {query (sql, ready) { - return ready() - }}, + connection: { + async query (sql) { + } + }, release () { } } diff --git a/test/basic-atomic-concurrency-test.js b/test/basic-atomic-concurrency-test.js index 1b0138c..f6635de 100644 --- a/test/basic-atomic-concurrency-test.js +++ b/test/basic-atomic-concurrency-test.js @@ -3,6 +3,7 @@ const Promise = require('bluebird') const test = require('tap').test +require('./setup') const domain = require('../lib/domain.js') const db = require('../db-session.js') @@ -41,8 +42,8 @@ test('test nested transaction order', assert => { LOGS.length = 0 const start = process.domain const domain1 = domain.create() - db.install(domain1, innerGetConnection, {maxConcurrency: 0}) domain1.run(() => { + db.install(innerGetConnection, {maxConcurrency: 0}) return runOperations(txRunSubOperation) }).then(() => { assert.equal(process.domain, start) @@ -83,8 +84,8 @@ test('test nested atomic transaction order', assert => { LOGS.length = 0 const start = process.domain const domain1 = domain.create() - db.install(domain1, innerGetConnection, {maxConcurrency: 0}) domain1.run(() => { + db.install(innerGetConnection, {maxConcurrency: 0}) return runOperations(atomicRunSubOperation) }).then(() => { assert.equal(process.domain, start) @@ -127,10 +128,11 @@ release function innerGetConnection () { return { - connection: {query (sql, ready) { - LOGS.push(sql) - return ready() - }}, + connection: { + async query (sql) { + LOGS.push(sql) + } + }, release () { LOGS.push(`release`) } diff --git a/test/basic-atomic-error-test.js b/test/basic-atomic-error-test.js index f4fb1d8..2025fa0 100644 --- a/test/basic-atomic-error-test.js +++ b/test/basic-atomic-error-test.js @@ -3,6 +3,7 @@ const Promise = require('bluebird') const test = require('tap').test +require('./setup') const domain = require('../lib/domain.js') const db = require('../db-session.js') @@ -12,12 +13,11 @@ const db = require('../db-session.js') test('test error in previous query', assert => { const domain1 = domain.create() - db.install(domain1, getConnection, {maxConcurrency: 0}) - domain1.run(() => { + db.install(getConnection, {maxConcurrency: 0}) return db.atomic(() => { const first = db.getConnection().then(conn => { - return Promise.promisify(conn.connection.query)('ONE') + return conn.connection.query('ONE') .then(() => conn.release()) .catch(err => conn.release(err)) }) @@ -25,7 +25,7 @@ test('test error in previous query', assert => { const second = first.then(() => { return db.getConnection() }).then(conn => { - return Promise.promisify(conn.connection.query)('TWO') + return conn.connection.query('TWO') .then(() => conn.release()) .catch(err => conn.release(err)) }) @@ -40,12 +40,13 @@ test('test error in previous query', assert => { function getConnection () { return { - connection: {query (sql, ready) { - if (sql === 'ONE') { - return ready(new Error('failed')) + connection: { + async query (sql) { + if (sql === 'ONE') { + throw new Error('failed') + } } - return ready() - }}, + }, release () { } } @@ -57,31 +58,32 @@ test('test error in BEGIN', assert => { const domain1 = domain.create() class BeginError extends Error {} - db.install(domain1, getConnection, {maxConcurrency: 0}) - domain1.run(() => { + db.install(getConnection, {maxConcurrency: 0}) return db.atomic(() => { assert.fail('should not reach here.') })() }) - .catch(BeginError, () => assert.ok(1, 'caught expected err')) - .catch(err => assert.fail(err)) + .catch(err => { + assert.type(err, BeginError) + }) .finally(() => domain1.exit()) .finally(assert.end) function getConnection () { var trippedBegin = false return { - connection: {query (sql, ready) { - if (trippedBegin) { - assert.fail('should not run subsequent queries') + connection: { + query (sql) { + if (trippedBegin) { + assert.fail('should not run subsequent queries') + } + if (sql === 'BEGIN') { + trippedBegin = true + throw new BeginError('failed BEGIN') + } } - if (sql === 'BEGIN') { - trippedBegin = true - return ready(new BeginError('failed BEGIN')) - } - return ready() - }}, + }, release () { } } @@ -93,26 +95,27 @@ test('test error in COMMIT', assert => { const domain1 = domain.create() class CommitError extends Error {} - db.install(domain1, getConnection, {maxConcurrency: 0}) - domain1.run(() => { + db.install(getConnection, {maxConcurrency: 0}) return db.atomic(() => { return db.getConnection().then(pair => pair.release()) })() }) - .catch(CommitError, () => assert.ok(1, 'caught expected error')) - .catch(err => assert.fail(err)) + .catch(err => { + assert.type(err, CommitError) + }) .finally(() => domain1.exit()) .finally(assert.end) function getConnection () { return { - connection: {query (sql, ready) { - if (sql === 'COMMIT') { - return ready(new CommitError('failed COMMIT')) + connection: { + async query (sql) { + if (sql === 'COMMIT') { + throw new CommitError('failed COMMIT') + } } - return ready() - }}, + }, release () { } } @@ -123,17 +126,20 @@ test('test error in ROLLBACK: does not reuse connection', assert => { const domain1 = domain.create() class RollbackError extends Error {} - db.install(domain1, getConnection, {maxConcurrency: 1}) var connectionPair = null domain1.run(() => { + db.install(getConnection, {maxConcurrency: 1}) const first = db.atomic(() => { return db.getConnection().then(pair => { connectionPair = pair.pair pair.release() throw new Error('any kind of error, really') }) - })().reflect() + })().then( + xs => [null, xs], + xs => [xs, null] + ) const second = db.getConnection().then(pair => { // with concurrency=1, we will try to re-use @@ -151,12 +157,13 @@ test('test error in ROLLBACK: does not reuse connection', assert => { function getConnection () { return { - connection: {query (sql, ready) { - if (sql === 'ROLLBACK') { - return ready(new RollbackError('failed ROLLBACK')) + connection: { + async query (sql) { + if (sql === 'ROLLBACK') { + throw new RollbackError('failed ROLLBACK') + } } - return ready() - }}, + }, release () { } } diff --git a/test/basic-atomic-from-session-test.js b/test/basic-atomic-from-session-test.js index 27be109..56cf4b5 100644 --- a/test/basic-atomic-from-session-test.js +++ b/test/basic-atomic-from-session-test.js @@ -2,6 +2,7 @@ const test = require('tap').test +require('./setup') const domain = require('../lib/domain.js') const db = require('../db-session.js') diff --git a/test/basic-interleaved-test.js b/test/basic-interleaved-test.js index eb9463e..1133653 100644 --- a/test/basic-interleaved-test.js +++ b/test/basic-interleaved-test.js @@ -3,6 +3,7 @@ const test = require('tap').test const fs = require('fs') +require('./setup') const domain = require('../lib/domain.js') const db = require('../db-session.js') @@ -11,8 +12,8 @@ test('test of interleaved requests', assert => { const domain1 = domain.create() const domain2 = domain.create() - db.install(domain1, getFakeConnection) - db.install(domain2, getFakeConnection) + domain1.run(() => db.install(domain1, getFakeConnection)) + domain2.run(() => db.install(domain2, getFakeConnection)) var pending = 3 diff --git a/test/basic-rollback-test.js b/test/basic-rollback-test.js index afafb20..965359c 100644 --- a/test/basic-rollback-test.js +++ b/test/basic-rollback-test.js @@ -2,6 +2,7 @@ const test = require('tap').test +require('./setup') const domain = require('../lib/domain.js') const db = require('../db-session.js') @@ -11,12 +12,15 @@ test('rolling back transaction calls ROLLBACK', assert => { const domain1 = domain.create() LOGS.length = 0 - db.install(domain1, getConnection, {maxConcurrency: 0}) domain1.run(() => { + db.install(getConnection, {maxConcurrency: 0}) return db.transaction(() => { throw new Error('no thanks') - })().reflect() + })().then( + xs => [null, xs], + xs => [xs, null] + ) }) .then(() => assert.equal(LOGS.join(' '), 'BEGIN ROLLBACK')) .catch(err => assert.fail(err)) @@ -25,10 +29,11 @@ test('rolling back transaction calls ROLLBACK', assert => { function getConnection () { return { - connection: {query (sql, ready) { - LOGS.push(sql) - return ready() - }}, + connection: { + async query (sql) { + LOGS.push(sql) + } + }, release () { } } @@ -39,12 +44,15 @@ test('rolling back atomic calls ROLLBACK', assert => { const domain1 = domain.create() LOGS.length = 0 - db.install(domain1, getConnection, {maxConcurrency: 0}) domain1.run(() => { + db.install(getConnection, {maxConcurrency: 0}) return db.atomic(() => { throw new Error('no thanks') - })().reflect() + })().then( + xs => [null, xs], + xs => [xs, null] + ) }) .then(() => { assert.equal(LOGS.join('\n').replace(/_[\d_]+$/gm, '_TS'), ` @@ -60,10 +68,11 @@ ROLLBACK function getConnection () { return { - connection: {query (sql, ready) { - LOGS.push(sql) - return ready() - }}, + connection: { + async query (sql) { + LOGS.push(sql) + } + }, release () { } } diff --git a/test/basic-session-concurrency-test.js b/test/basic-session-concurrency-test.js index 8a24ea4..d41c353 100644 --- a/test/basic-session-concurrency-test.js +++ b/test/basic-session-concurrency-test.js @@ -3,6 +3,7 @@ const Promise = require('bluebird') const test = require('tap').test +require('./setup') const domain = require('../lib/domain.js') const db = require('../db-session.js') @@ -11,8 +12,8 @@ const LOGS = [] test('test root session concurrency=0', assert => { const start = process.domain const domain1 = domain.create() - db.install(domain1, innerGetConnection, {maxConcurrency: 0}) domain1.run(() => { + db.install(innerGetConnection, {maxConcurrency: 0}) return runOperations() }).then(() => { domain1.exit() @@ -51,8 +52,8 @@ release test('test root session concurrency=1', assert => { const start = process.domain const domain1 = domain.create() - db.install(domain1, innerGetConnection, {maxConcurrency: 1}) domain1.run(() => { + db.install(innerGetConnection, {maxConcurrency: 1}) return runOperations() }).then(() => { domain1.exit() @@ -84,8 +85,8 @@ release test('test root session concurrency=2', assert => { const start = process.domain const domain1 = domain.create() - db.install(domain1, innerGetConnection, {maxConcurrency: 2}) domain1.run(() => { + db.install(innerGetConnection, {maxConcurrency: 2}) return runOperations() }).then(() => { domain1.exit() @@ -118,8 +119,8 @@ release test('test root session concurrency=4', assert => { const start = process.domain const domain1 = domain.create() - db.install(domain1, innerGetConnection, {maxConcurrency: 4}) domain1.run(() => { + db.install(innerGetConnection, {maxConcurrency: 4}) return runOperations() }).then(() => { domain1.exit() @@ -153,8 +154,10 @@ release function innerGetConnection () { return { - connection: {query () { - }}, + connection: { + async query () { + } + }, release () { LOGS.push(`release`) } diff --git a/test/basic-session-error-test.js b/test/basic-session-error-test.js index b7e1ed2..436d960 100644 --- a/test/basic-session-error-test.js +++ b/test/basic-session-error-test.js @@ -3,6 +3,7 @@ const Promise = require('bluebird') const test = require('tap').test +require('./setup') const domain = require('../lib/domain.js') const db = require('../db-session.js') @@ -15,11 +16,12 @@ test('cannot connect', assert => { const domain1 = domain.create() class TestError extends Error {} - db.install(domain1, () => new Promise((resolve, reject) => { - reject(new TestError('cannot connect')) - }), {maxConcurrency: 0}) domain1.run(() => { + db.install(() => new Promise((resolve, reject) => { + reject(new TestError('cannot connect')) + }), {maxConcurrency: 0}) + return db.getConnection().then(pair => { pair.release() }) @@ -36,10 +38,10 @@ test('query error', assert => { LOGS.length = 0 const domain1 = domain.create() - db.install(domain1, innerGetConnection, {maxConcurrency: 0}) shouldErrorToggle = new Error('the kraken') domain1.run(() => { + db.install(innerGetConnection, {maxConcurrency: 0}) return db.getConnection().then(pair => { return new Promise((resolve, reject) => { pair.connection.query('FAKE QUERY', err => { @@ -62,12 +64,12 @@ test('query error: pending connections', assert => { const domain1 = domain.create() class TestError extends Error {} - db.install(domain1, innerGetConnection, {maxConcurrency: 1}) shouldErrorToggle = new TestError('the beast') var firstConnection = null var secondConnection = null domain1.run(() => { + db.install(innerGetConnection, {maxConcurrency: 1}) return Promise.join(db.getConnection().then(pair => { firstConnection = pair return new Promise((resolve, reject) => { @@ -75,7 +77,7 @@ test('query error: pending connections', assert => { err ? reject(err) : resolve() }) }).then(pair.release, pair.release) - }).reflect(), db.getConnection().then(pair => { + }).then(xs => {}, xs => {}), db.getConnection().then(pair => { assert.ok(firstConnection) assert.notEqual(firstConnection, pair) secondConnection = pair @@ -84,7 +86,7 @@ test('query error: pending connections', assert => { err ? reject(err) : resolve() }) }).then(pair.release, pair.release) - }).reflect(), db.getConnection().then(pair => { + }).then(xs => {}, xs => {}), db.getConnection().then(pair => { assert.ok(secondConnection) assert.equal(secondConnection, pair) return new Promise((resolve, reject) => { @@ -92,7 +94,7 @@ test('query error: pending connections', assert => { err ? reject(err) : resolve() }) }).then(pair.release, pair.release) - }).reflect()) + }).then(xs => {}, xs => {})) }) .catch(err => assert.fail(err)) .finally(() => domain1.exit()) diff --git a/test/basic-transaction-concurrency-test.js b/test/basic-transaction-concurrency-test.js index e30968a..7db46aa 100644 --- a/test/basic-transaction-concurrency-test.js +++ b/test/basic-transaction-concurrency-test.js @@ -3,6 +3,7 @@ const Promise = require('bluebird') const test = require('tap').test +require('./setup') const domain = require('../lib/domain.js') const db = require('../db-session.js') diff --git a/test/basic-transaction-error-test.js b/test/basic-transaction-error-test.js index b6f82f4..2b44d66 100644 --- a/test/basic-transaction-error-test.js +++ b/test/basic-transaction-error-test.js @@ -3,6 +3,7 @@ const Promise = require('bluebird') const test = require('tap').test +require('./setup') const domain = require('../lib/domain.js') const db = require('../db-session.js') @@ -12,12 +13,11 @@ const db = require('../db-session.js') test('test error in previous query', assert => { const domain1 = domain.create() - db.install(domain1, getConnection, {maxConcurrency: 0}) - domain1.run(() => { + db.install(getConnection, {maxConcurrency: 0}) return db.transaction(() => { const first = db.getConnection().then(conn => { - return Promise.promisify(conn.connection.query)('ONE') + return conn.connection.query('ONE') .then(() => conn.release()) .catch(err => conn.release(err)) }) @@ -25,7 +25,7 @@ test('test error in previous query', assert => { const second = first.then(() => { return db.getConnection() }).then(conn => { - return Promise.promisify(conn.connection.query)('TWO') + return conn.connection.query('TWO') .then(() => conn.release()) .catch(err => conn.release(err)) }) @@ -40,12 +40,13 @@ test('test error in previous query', assert => { function getConnection () { return { - connection: {query (sql, ready) { - if (sql === 'ONE') { - return ready(new Error('failed')) + connection: { + async query (sql, ready) { + if (sql === 'ONE') { + throw new Error('failed') + } } - return ready() - }}, + }, release () { } } @@ -57,31 +58,31 @@ test('test error in BEGIN', assert => { const domain1 = domain.create() class BeginError extends Error {} - db.install(domain1, getConnection, {maxConcurrency: 0}) domain1.run(() => { + db.install(getConnection, {maxConcurrency: 0}) return db.transaction(() => { assert.fail('should not reach here.') })() }) - .catch(BeginError, () => assert.ok(1, 'caught expected err')) - .catch(err => assert.fail(err)) + .catch(err => assert.type(err, BeginError)) .finally(() => domain1.exit()) .finally(assert.end) function getConnection () { var trippedBegin = false return { - connection: {query (sql, ready) { - if (trippedBegin) { - assert.fail('should not run subsequent queries') - } - if (sql === 'BEGIN') { - trippedBegin = true - return ready(new BeginError('failed BEGIN')) + connection: { + async query (sql, ready) { + if (trippedBegin) { + assert.fail('should not run subsequent queries') + } + if (sql === 'BEGIN') { + trippedBegin = true + throw new BeginError('failed BEGIN') + } } - return ready() - }}, + }, release () { } } @@ -93,26 +94,25 @@ test('test error in COMMIT', assert => { const domain1 = domain.create() class CommitError extends Error {} - db.install(domain1, getConnection, {maxConcurrency: 0}) - domain1.run(() => { + db.install(getConnection, {maxConcurrency: 0}) return db.transaction(() => { return db.getConnection().then(pair => pair.release()) })() }) - .catch(CommitError, () => assert.ok(1, 'caught expected error')) - .catch(err => assert.fail(err)) + .catch(err => assert.type(err, CommitError)) .finally(() => domain1.exit()) .finally(assert.end) function getConnection () { return { - connection: {query (sql, ready) { - if (sql === 'COMMIT') { - return ready(new CommitError('failed COMMIT')) + connection: { + async query (sql, ready) { + if (sql === 'COMMIT') { + throw new CommitError('failed COMMIT') + } } - return ready() - }}, + }, release () { } } @@ -123,17 +123,17 @@ test('test error in ROLLBACK: does not reuse connection', assert => { const domain1 = domain.create() class RollbackError extends Error {} - db.install(domain1, getConnection, {maxConcurrency: 1}) var connectionPair = null domain1.run(() => { + db.install(getConnection, {maxConcurrency: 1}) const first = db.transaction(() => { return db.getConnection().then(pair => { connectionPair = pair.pair pair.release() throw new Error('any kind of error, really') }) - })().reflect() + })().then(xs => {}, xs => {}) const second = db.getConnection().then(pair => { // with concurrency=1, we will try to re-use @@ -151,12 +151,13 @@ test('test error in ROLLBACK: does not reuse connection', assert => { function getConnection () { return { - connection: {query (sql, ready) { - if (sql === 'ROLLBACK') { - return ready(new RollbackError('failed ROLLBACK')) + connection: { + async query (sql) { + if (sql === 'ROLLBACK') { + throw new RollbackError('failed ROLLBACK') + } } - return ready() - }}, + }, release () { } } diff --git a/test/integrate-pg-pool-sequence-test.js b/test/integrate-pg-pool-sequence-test.js index 364e899..3747ab6 100644 --- a/test/integrate-pg-pool-sequence-test.js +++ b/test/integrate-pg-pool-sequence-test.js @@ -5,6 +5,7 @@ const Promise = require('bluebird') const test = require('tap').test const pg = require('pg') +require('./setup') const domain = require('../lib/domain.js') const db = require('../db-session.js') @@ -29,20 +30,23 @@ test('pg pooling does not adversely affect operation', assert => { const domain2 = domain.create() const pool = new pg.Pool(`postgres://localhost/${TEST_DB_NAME}`) - db.install(domain1, getConnection, {maxConcurrency: 0}) - db.install(domain2, getConnection, {maxConcurrency: 0}) - - const runOne = domain1.run(() => runOperation(domain1)) - .then(() => { - domain1.exit() - assert.ok(!process.domain) - }) + const d = process.domain + const runOne = domain1.run(() => { + db.install(getConnection, {maxConcurrency: 0}) + return runOperation(domain1) + }).then(() => { + domain1.exit() + assert.equal(process.domain, d) + }) const runTwo = runOne.then(() => { - return domain2.run(() => runOperation(domain2)) + return domain2.run(() => { + db.install(getConnection, {maxConcurrency: 0}) + return runOperation(domain2) + }) }).then(() => { domain2.exit() - assert.ok(!process.domain) + assert.equal(process.domain, d) }) return runTwo @@ -64,7 +68,7 @@ test('pg pooling does not adversely affect operation', assert => { assert.equal(process.domain, expectDomain) const getConnPair = db.getConnection() - const runSQL = getConnPair.get('connection').then(conn => { + const runSQL = getConnPair.then(xs => xs.connection).then(conn => { assert.equal(process.domain, expectDomain) return new Promise((resolve, reject) => { assert.equal(process.domain, expectDomain) @@ -75,11 +79,11 @@ test('pg pooling does not adversely affect operation', assert => { }) }) - const runRelease = runSQL.return(getConnPair).then( + const runRelease = runSQL.then(() => getConnPair).then( pair => pair.release() ) - return runRelease.return(runSQL) + return runRelease.then(() => runSQL) } }) diff --git a/test/integrate-pummel-leak-test.js b/test/integrate-pummel-leak-test.js index f746e01..0a89cd9 100644 --- a/test/integrate-pummel-leak-test.js +++ b/test/integrate-pummel-leak-test.js @@ -5,6 +5,7 @@ const Promise = require('bluebird') const spawn = childProcess.spawn const pg = require('pg') +require('./setup') const domain = require('../lib/domain.js') const db = require('../db-session.js') @@ -14,10 +15,13 @@ const IS_MAIN = !process.env.TAP if (process.env.IS_CHILD) { runChild() } else { - const test = require('tap').test - test('setup', assert => setup().then(assert.end)) - test('pummel: make sure we are not leaking memory', runParent) - test('teardown', assert => teardown().then(assert.end)) + const test = require('tap') + + // XXX: skipping these tests because they take a long time to run and I'm not + // sure that they're accurate anymore. Will revisit in the future. + test.skip('setup', assert => setup().then(assert.end)) + test.skip('pummel: make sure we are not leaking memory', runParent) + test.skip('teardown', assert => teardown().then(assert.end)) } function setup () { @@ -66,7 +70,7 @@ function runParent (assert) { function runChild () { // if we leak domains, given a 32mb old space size we should crash in advance // of this number - const ITERATIONS = 70000 + const ITERATIONS = 30000 var count = 0 var pending = 20 const pool = new pg.Pool(`postgres://localhost/${TEST_DB_NAME}`) @@ -95,9 +99,10 @@ function runChild () { function run () { const domain1 = domain.create() - db.install(domain1, getConnection, {maxConcurrency: 0}) - - return domain1.run(() => runOperation()).then(() => { + return domain1.run(() => { + db.install(getConnection, {maxConcurrency: 0}) + return runOperation() + }).then(() => { domain1.exit() }) } @@ -105,7 +110,7 @@ function runChild () { function runOperation () { const getConnPair = db.getConnection() - const runSQL = getConnPair.get('connection').then(conn => { + const runSQL = getConnPair.then(xs => xs.connection).then(conn => { return new Promise((resolve, reject) => { conn.query('SELECT 1', (err, data) => { err ? reject(err) : resolve(data) @@ -113,11 +118,11 @@ function runChild () { }) }) - const runRelease = runSQL.return(getConnPair).then( + const runRelease = runSQL.then(() => getConnPair).then( pair => pair.release() ) - return runRelease.return(runSQL) + return runRelease.then(() => runSQL) } function getConnection () { diff --git a/test/integrate-server-test.js b/test/integrate-server-test.js index 34d5b2e..e111b80 100644 --- a/test/integrate-server-test.js +++ b/test/integrate-server-test.js @@ -3,6 +3,7 @@ const test = require('tap').test const http = require('http') +require('./setup') const domain = require('../lib/domain.js') const db = require('../db-session.js') @@ -17,12 +18,12 @@ test('test requests do not leak domains into requester', assert => { process.domain.exit() const server = http.createServer((req, res) => { const domain1 = domain.create() - db.install(domain1, getConnection, {maxConcurrency: 0}) domain1.add(req) domain1.add(res) const result = domain1.run(() => { + db.install(getConnection, {maxConcurrency: 0}) return runOperation() }) @@ -31,7 +32,7 @@ test('test requests do not leak domains into requester', assert => { domain1.remove(res) }) - return removed.return(result).then(data => { + return removed.then(() => result).then(data => { res.end(data) }) }) @@ -55,9 +56,10 @@ test('test requests do not leak domains into requester', assert => { function getConnection () { return { - connection: {query (sql, ready) { - return ready() - }}, + connection: { + async query (sql) { + } + }, release () { } } diff --git a/test/setup.js b/test/setup.js new file mode 100644 index 0000000..ff76652 --- /dev/null +++ b/test/setup.js @@ -0,0 +1,15 @@ +/* eslint-disable node/no-deprecated-api */ +const db = require('../db-session.js') +const domain = require('domain') + +db.setup(() => process.domain) + +domain.Domain.prototype.end = domain.Domain.prototype.exit +domain.Domain.prototype.nest = function () { + const subdomain = domain.create() + subdomain.enter() + return subdomain +} + +domain.Domain.prototype.claim = function () { +} From 312246ee29d40225d9c981efe19f0ba71e9d2986 Mon Sep 17 00:00:00 2001 From: Chris Dickinson Date: Thu, 17 Jan 2019 20:28:27 -0800 Subject: [PATCH 3/4] fix: update travis to node 10 --- .travis.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index da74dff..260b612 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,8 +3,7 @@ language: node_js env: - TAP_TIMEOUT=60 node_js: - - "5.*" - - "4.2.*" + - "10" addons: postgresql: "9.4" services: From 649fe4cbd51849df6f270d29c91f9592e94f9de7 Mon Sep 17 00:00:00 2001 From: Chris Dickinson Date: Thu, 17 Jan 2019 20:49:04 -0800 Subject: [PATCH 4/4] fix: ditch weakmaps for symbols --- db-session.js | 24 +++++++++++------------- test/basic-api-errors-test.js | 7 ++++--- test/basic-atomic-error-test.js | 1 - test/basic-session-error-test.js | 1 - test/basic-transaction-error-test.js | 2 -- 5 files changed, 15 insertions(+), 20 deletions(-) diff --git a/db-session.js b/db-session.js index a25ecd8..2bc0944 100644 --- a/db-session.js +++ b/db-session.js @@ -1,10 +1,10 @@ 'use strict' -const CONTEXT_TO_SESSION = new WeakMap() - const TxSessionConnectionPair = require('./lib/tx-session-connpair.js') const SessionConnectionPair = require('./lib/session-connpair.js') +const sym = Symbol('context-to-session') + class NoSessionAvailable extends Error { constructor () { super('No session available') @@ -42,10 +42,10 @@ const api = module.exports = { onAtomicFinish: noop }, opts || {}) - CONTEXT_TO_SESSION.set(getContext(), new Session( + getContext()[sym] = new Session( getConnection, opts - )) + ) }, atomic (operation) { @@ -66,7 +66,7 @@ const api = module.exports = { get session () { const context = getContext() - var current = CONTEXT_TO_SESSION.get(context) + var current = context && context[sym] if (!current || current.inactive || !context) { throw new NoSessionAvailable() } @@ -156,7 +156,7 @@ class Session { atomic (operation, args) { return this.transaction(() => { - return CONTEXT_TO_SESSION.get(getContext()).atomic(operation, args) + return getContext()[sym].atomic(operation, args) }, args.slice()) } @@ -211,7 +211,6 @@ class TransactionSession { failure: `ROLLBACK TO SAVEPOINT ${savepointName}` }, operation, args) - const pair = await atomicConnPair try { const result = await getResult @@ -229,7 +228,7 @@ class TransactionSession { // NB: for use in tests _only_!) assign (context) { - CONTEXT_TO_SESSION.set(context, this) + context[sym] = this } } @@ -251,7 +250,7 @@ async function Session$RunWrapped (parent, const subcontext = getContext().nest() const session = createSession(pair) parent.metrics.onSubsessionStart(parent, session) - CONTEXT_TO_SESSION.set(subcontext, session) + subcontext[sym] = session await pair.connection.query(before) subcontext.claim() @@ -265,17 +264,16 @@ async function Session$RunWrapped (parent, } finally { subcontext.end() session.inactive = true - CONTEXT_TO_SESSION.set(subcontext, null) + subcontext[sym] = null parent.metrics.onSubsessionFinish(parent, session) } } function getSavepointName (operation) { const id = getSavepointName.ID++ - const dt = new Date().toISOString().replace(/[^\d]/g, '_').slice(0, -1) const name = (operation.name || 'anon').replace(/[^\w]/g, '_') - // e.g., "save_13_userToOrg_2016_01_03_08_30_00_000" - return `save_${id}_${name}_${dt}` + // e.g., "save_13_userToOrg_120101010" + return `save_${id}_${name}_${process.pid}` } getSavepointName.ID = 0 diff --git a/test/basic-api-errors-test.js b/test/basic-api-errors-test.js index 70db3ce..4d73fcf 100644 --- a/test/basic-api-errors-test.js +++ b/test/basic-api-errors-test.js @@ -53,9 +53,10 @@ test('test getConnection after release', assert => { function getConnection () { return { - connection: {async query (sql) { - return - }}, + connection: { + async query (sql) { + } + }, release () { } } diff --git a/test/basic-atomic-error-test.js b/test/basic-atomic-error-test.js index 2025fa0..728ece8 100644 --- a/test/basic-atomic-error-test.js +++ b/test/basic-atomic-error-test.js @@ -126,7 +126,6 @@ test('test error in ROLLBACK: does not reuse connection', assert => { const domain1 = domain.create() class RollbackError extends Error {} - var connectionPair = null domain1.run(() => { db.install(getConnection, {maxConcurrency: 1}) diff --git a/test/basic-session-error-test.js b/test/basic-session-error-test.js index 436d960..dc241d3 100644 --- a/test/basic-session-error-test.js +++ b/test/basic-session-error-test.js @@ -16,7 +16,6 @@ test('cannot connect', assert => { const domain1 = domain.create() class TestError extends Error {} - domain1.run(() => { db.install(() => new Promise((resolve, reject) => { reject(new TestError('cannot connect')) diff --git a/test/basic-transaction-error-test.js b/test/basic-transaction-error-test.js index 2b44d66..d2fe16e 100644 --- a/test/basic-transaction-error-test.js +++ b/test/basic-transaction-error-test.js @@ -58,7 +58,6 @@ test('test error in BEGIN', assert => { const domain1 = domain.create() class BeginError extends Error {} - domain1.run(() => { db.install(getConnection, {maxConcurrency: 0}) return db.transaction(() => { @@ -123,7 +122,6 @@ test('test error in ROLLBACK: does not reuse connection', assert => { const domain1 = domain.create() class RollbackError extends Error {} - var connectionPair = null domain1.run(() => { db.install(getConnection, {maxConcurrency: 1})