Skip to content

Pluggable cls #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ language: node_js
env:
- TAP_TIMEOUT=60
node_js:
- "5.*"
- "4.2.*"
- "10"
addons:
postgresql: "9.4"
services:
Expand Down
186 changes: 78 additions & 108 deletions db-session.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
'use strict'

const DOMAIN_TO_SESSION = new WeakMap()
const Promise = require('bluebird')

const TxSessionConnectionPair = require('./lib/tx-session-connpair.js')
const SessionConnectionPair = require('./lib/session-connpair.js')
const domain = require('./lib/domain')

const sym = Symbol('context-to-session')

class NoSessionAvailable extends Error {
constructor () {
Expand All @@ -17,8 +15,14 @@ class NoSessionAvailable extends Error {
function noop () {
}

let getContext = null

const api = module.exports = {
install (domain, getConnection, opts) {
setup (_getContext) {
getContext = _getContext
},

install (getConnection, opts) {
opts = Object.assign({
maxConcurrency: Infinity,
onSubsessionStart: noop,
Expand All @@ -37,27 +41,22 @@ const api = module.exports = {
onAtomicStart: noop,
onAtomicFinish: noop
}, opts || {})
DOMAIN_TO_SESSION.set(domain, new Session(

getContext()[sym] = new Session(
getConnection,
opts
))
)
},

atomic (operation) {
return function atomic$operation () {
return Promise.try(() => {
const args = [].slice.call(arguments)
return api.session.atomic(operation.bind(this), args)
})
return async function atomic$operation (...args) {
return api.session.atomic(operation.bind(this), args)
}
},

transaction (operation) {
return function transaction$operation () {
return Promise.try(() => {
const args = [].slice.call(arguments)
return api.session.transaction(operation.bind(this), args)
})
return async function transaction$operation (...args) {
return api.session.transaction(operation.bind(this), args)
}
},

Expand All @@ -66,8 +65,9 @@ const api = module.exports = {
},

get session () {
var current = DOMAIN_TO_SESSION.get(process.domain)
if (!current || current.inactive || !process.domain) {
const context = getContext()
var current = context && context[sym]
if (!current || current.inactive || !context) {
throw new NoSessionAvailable()
}
return current
Expand Down Expand Up @@ -127,7 +127,7 @@ class Session {
})
}

transaction (operation, args) {
async transaction (operation, args) {
const baton = {}
const getConnPair = this.getConnection()
this.metrics.onTransactionRequest(baton, operation, args)
Expand All @@ -139,21 +139,24 @@ class Session {
failure: `ROLLBACK`
}, operation, args)

const releasePair = getConnPair.then(pair => {
return getResult.reflect().then(result => {
this.metrics.onTransactionFinish(baton, operation, args, result)
return result.isFulfilled()
? pair.release()
: pair.release(result.reason())
})
})
const pair = await getConnPair
try {
const result = await getResult
pair.release()

return result
} catch (err) {
pair.release(err)

return releasePair.return(getResult)
throw err
} finally {
this.metrics.onTransactionFinish(baton, operation, args)
}
}

atomic (operation, args) {
return this.transaction(() => {
return DOMAIN_TO_SESSION.get(process.domain).atomic(operation, args)
return getContext()[sym].atomic(operation, args)
}, args.slice())
}

Expand Down Expand Up @@ -195,7 +198,7 @@ class TransactionSession {
return operation.apply(null, args)
}

atomic (operation, args) {
async atomic (operation, args) {
const baton = {}
const atomicConnPair = this.getConnection()
const savepointName = getSavepointName(operation)
Expand All @@ -208,21 +211,24 @@ class TransactionSession {
failure: `ROLLBACK TO SAVEPOINT ${savepointName}`
}, operation, args)

const releasePair = atomicConnPair.then(pair => {
return getResult.reflect().then(result => {
this.metrics.onAtomicFinish(baton, operation, args, result)
return result.isFulfilled()
? pair.release()
: pair.release(result.reason())
})
})
const pair = await atomicConnPair
try {
const result = await getResult
pair.release()

return result
} catch (err) {
pair.release(err)

return releasePair.return(getResult)
throw err
} finally {
this.metrics.onAtomicFinish(baton, operation, args)
}
}

// NB: for use in tests _only_!)
assign (domain) {
DOMAIN_TO_SESSION.set(domain, this)
assign (context) {
context[sym] = this
}
}

Expand All @@ -233,80 +239,44 @@ class AtomicSession extends TransactionSession {
}
}

function Session$RunWrapped (parent,
createSession,
getConnPair,
before,
after,
operation,
args) {
return getConnPair.then(pair => {
const subdomain = domain.create()
const session = createSession(pair)
parent.metrics.onSubsessionStart(parent, session)
DOMAIN_TO_SESSION.set(subdomain, session)

const runBefore = new Promise((resolve, reject) => {
return pair.connection.query(
before,
err => err ? reject(err) : resolve()
)
})

return runBefore.then(() => {
const getResult = Promise.resolve(
subdomain.run(() => Promise.try(() => {
return operation.apply(null, args)
}))
)

const reflectedResult = getResult.reflect()

const waitOperation = Promise.join(
reflectedResult,
reflectedResult.then(() => session.operation)
)
.finally(markInactive(subdomain))
.return(reflectedResult)

const runCommitStep = waitOperation.then(result => {
return new Promise((resolve, reject) => {
return pair.connection.query(
result.isFulfilled()
? after.success
: after.failure,
err => err ? reject(err) : resolve()
)
})
}).then(
() => parent.metrics.onSubsessionFinish(parent, session),
err => {
parent.metrics.onSubsessionFinish(parent, session)
throw err
}
)
return runCommitStep.return(getResult)
})
})
async function Session$RunWrapped (parent,
createSession,
getConnPair,
before,
after,
operation,
args) {
const pair = await getConnPair
const subcontext = getContext().nest()
const session = createSession(pair)
parent.metrics.onSubsessionStart(parent, session)
subcontext[sym] = session

await pair.connection.query(before)
subcontext.claim()
try {
var result = await operation(...args)
await pair.connection.query(after.success)
return result
} catch (err) {
await pair.connection.query(after.failure)
throw err
} finally {
subcontext.end()
session.inactive = true
subcontext[sym] = null
parent.metrics.onSubsessionFinish(parent, session)
}
}

function getSavepointName (operation) {
const id = getSavepointName.ID++
const dt = new Date().toISOString().replace(/[^\d]/g, '_').slice(0, -1)
const name = (operation.name || 'anon').replace(/[^\w]/g, '_')
// e.g., "save_13_userToOrg_2016_01_03_08_30_00_000"
return `save_${id}_${name}_${dt}`
// e.g., "save_13_userToOrg_120101010"
return `save_${id}_${name}_${process.pid}`
}
getSavepointName.ID = 0

function markInactive (subdomain) {
return () => {
subdomain.exit()
DOMAIN_TO_SESSION.get(subdomain).inactive = true
DOMAIN_TO_SESSION.set(subdomain, null)
}
}

function _defer () {
const pending = {
resolve: null,
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
},
"homepage": "https://github.com/npm/pg-db-session#readme",
"devDependencies": {
"pg": "^6.1.5",
"pg": "^7.1.2",
"standard": "^10.0.2",
"tap": "^10.3.2"
"tap": "^12.1.1"
},
"dependencies": {
"bluebird": "^3.5.0"
Expand Down
41 changes: 26 additions & 15 deletions test/basic-api-errors-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const test = require('tap').test

require('./setup')
const domain = require('../lib/domain.js')
const db = require('../db-session.js')

Expand All @@ -19,25 +20,30 @@ test('test atomic outside of session', assert => {
const testAtomic = db.atomic(function testAtomic () {
})

testAtomic()
return testAtomic()
.then(() => { throw new Error('expected error') })
.catch(db.NoSessionAvailable, () => assert.end())
.catch(err => assert.end(err))
.catch(err => {
assert.type(err, db.NoSessionAvailable)
})
})

test('test getConnection after release', assert => {
const domain1 = domain.create()

db.install(domain1, getConnection, {maxConcurrency: 0})
db.setup(() => process.domain)
domain1.run(() => {
db.install(getConnection, {maxConcurrency: 0})
})

domain1.run(() => {
return db.transaction(() => {
const session = db.session
setImmediate(() => {
session.getConnection()
.then(pair => { throw new Error('should not reach here') })
.catch(db.NoSessionAvailable, () => assert.ok(1, 'caught err'))
.catch(err => assert.fail(err))
.catch(err => {
assert.type(err, db.NoSessionAvailable)
})
.finally(assert.end)
})
})()
Expand All @@ -47,9 +53,10 @@ test('test getConnection after release', assert => {

function getConnection () {
return {
connection: {query (sql, ready) {
return ready()
}},
connection: {
async query (sql) {
}
},
release () {
}
}
Expand All @@ -59,16 +66,19 @@ test('test getConnection after release', assert => {
test('test transaction after release', assert => {
const domain1 = domain.create()

db.install(domain1, getConnection, {maxConcurrency: 0})
domain1.run(() => {
db.install(getConnection, {maxConcurrency: 0})
})

domain1.run(() => {
return db.transaction(() => {
const session = db.session
setImmediate(() => {
session.transaction(() => {})
.then(pair => { throw new Error('should not reach here') })
.catch(db.NoSessionAvailable, () => assert.ok(1, 'caught err'))
.catch(err => assert.fail(err))
.catch(err => {
assert.type(err, db.NoSessionAvailable)
})
.finally(assert.end)
})
})()
Expand All @@ -78,9 +88,10 @@ test('test transaction after release', assert => {

function getConnection () {
return {
connection: {query (sql, ready) {
return ready()
}},
connection: {
async query (sql) {
}
},
release () {
}
}
Expand Down
Loading