1
1
var DB = require ( 'sharedb' ) . DB ;
2
2
var pg = require ( 'pg' ) ;
3
3
4
+ const PG_UNIQUE_VIOLATION = 23505 ;
5
+
4
6
// Postgres-backed ShareDB database
5
7
6
8
function PostgresDB ( options ) {
@@ -9,24 +11,33 @@ function PostgresDB(options) {
9
11
10
12
this . closed = false ;
11
13
12
- this . pg_config = options ;
13
- this . pool = new pg . Pool ( options ) ;
14
+ this . _pool = new pg . Pool ( options ) ;
14
15
} ;
15
16
module . exports = PostgresDB ;
16
17
17
18
PostgresDB . prototype = Object . create ( DB . prototype ) ;
18
19
19
- PostgresDB . prototype . close = function ( callback ) {
20
- this . closed = true ;
21
- this . pool . end ( ) ;
22
-
23
- if ( callback ) callback ( ) ;
20
+ PostgresDB . prototype . close = async function ( callback ) {
21
+ let error ;
22
+ try {
23
+ if ( ! this . closed ) {
24
+ this . closed = true ;
25
+ await this . _pool . end ( ) ;
26
+ }
27
+ } catch ( err ) {
28
+ error = err ;
29
+ }
30
+
31
+ // FIXME: Don't swallow errors. Emit 'error' event?
32
+ if ( callback ) callback ( error ) ;
24
33
} ;
25
34
26
35
27
36
// Persists an op and snapshot if it is for the next version. Calls back with
28
37
// callback(err, succeeded)
29
- PostgresDB . prototype . commit = function ( collection , id , op , snapshot , options , callback ) {
38
+ PostgresDB . prototype . commit = async function ( collection , id , op , snapshot , options , callback ) {
39
+ let client ;
40
+ try {
30
41
/*
31
42
* op: CreateOp {
32
43
* src: '24545654654646',
@@ -37,34 +48,29 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
37
48
* }
38
49
* snapshot: PostgresSnapshot
39
50
*/
40
- this . pool . connect ( ( err , client , done ) => {
41
- if ( err ) {
42
- done ( client ) ;
43
- callback ( err ) ;
44
- return ;
45
- }
51
+ client = await this . _pool . connect ( ) ;
46
52
/*
47
- * This query uses common table expression to upsert the snapshot table
53
+ * This query uses common table expression to upsert the snapshot table
48
54
* (iff the new version is exactly 1 more than the latest table or if
49
55
* the document id does not exists)
50
56
*
51
- * It will then insert into the ops table if it is exactly 1 more than the
57
+ * It will then insert into the ops table if it is exactly 1 more than the
52
58
* latest table or it the first operation and iff the previous insert into
53
59
* the snapshot table is successful.
54
60
*
55
61
* This result of this query the version of the newly inserted operation
56
62
* If either the ops or the snapshot insert fails then 0 rows are returned
57
63
*
58
- * If 0 zeros are return then the callback must return false
64
+ * If 0 zeros are return then the callback must return false
59
65
*
60
66
* Casting is required as postgres thinks that collection and doc_id are
61
- * not varchar
62
- */
67
+ * not varchar
68
+ */
63
69
const query = {
64
70
name : 'sdb-commit-op-and-snap' ,
65
71
text : `WITH snapshot_id AS (
66
- INSERT INTO snapshots (collection, doc_id, doc_type, version , data)
67
- SELECT $1::varchar collection, $2::varchar doc_id, $4 doc_type , $3 v , $5 d
72
+ INSERT INTO snapshots (collection, doc_id, version, doc_type , data, metadata )
73
+ SELECT $1::varchar collection, $2::varchar doc_id, $3 v , $4 doc_type , $5 d, $6 m
68
74
WHERE $3 = (
69
75
SELECT version+1 v
70
76
FROM snapshots
@@ -76,11 +82,11 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
76
82
WHERE collection = $1 AND doc_id = $2
77
83
FOR UPDATE
78
84
)
79
- ON CONFLICT (collection, doc_id) DO UPDATE SET version = $3, data = $5, doc_type = $4
85
+ ON CONFLICT (collection, doc_id) DO UPDATE SET version = $3, data = $5, doc_type = $4, metadata = $5
80
86
RETURNING version
81
87
)
82
88
INSERT INTO ops (collection, doc_id, version, operation)
83
- SELECT $1::varchar collection, $2::varchar doc_id, $3 v, $6 operation
89
+ SELECT $1::varchar collection, $2::varchar doc_id, $3 v, $7 operation
84
90
WHERE (
85
91
$3 = (
86
92
SELECT max(version)+1
@@ -93,66 +99,53 @@ WHERE (
93
99
)
94
100
) AND EXISTS (SELECT 1 FROM snapshot_id)
95
101
RETURNING version` ,
96
- values : [ collection , id , snapshot . v , snapshot . type , snapshot . data , op ]
102
+ values : [ collection , id , snapshot . v , snapshot . type , JSON . stringify ( snapshot . data ) , JSON . stringify ( snapshot . m ) , JSON . stringify ( op ) ]
97
103
}
98
- client . query ( query , ( err , res ) => {
99
- if ( err ) {
100
- callback ( err )
101
- } else if ( res . rows . length === 0 ) {
102
- done ( client ) ;
103
- callback ( null , false )
104
- }
105
- else {
106
- done ( client ) ;
107
- callback ( null , true )
108
- }
109
- } )
110
-
111
- } )
104
+ const result = await client . query ( query ) ;
105
+ const success = result . rowCount > 0 ;
106
+ callback ( null , success ) ;
107
+ } catch ( error ) {
108
+ // Return non-success instead of duplicate key error, since this is
109
+ // expected to occur during simultaneous creates on the same id
110
+ if ( error . code === PG_UNIQUE_VIOLATION ) callback ( null , false ) ;
111
+ else callback ( error ) ;
112
+ } finally {
113
+ if ( client ) client . release ( true ) ;
114
+ }
112
115
} ;
113
116
114
117
// Get the named document from the database. The callback is called with (err,
115
118
// snapshot). A snapshot with a version of zero is returned if the docuemnt
116
119
// has never been created in the database.
117
- PostgresDB . prototype . getSnapshot = function ( collection , id , fields , options , callback ) {
118
- this . pool . connect ( function ( err , client , done ) {
119
- if ( err ) {
120
- done ( client ) ;
121
- callback ( err ) ;
122
- return ;
123
- }
124
- client . query (
125
- 'SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1' ,
120
+ PostgresDB . prototype . getSnapshot = async function ( collection , id , fields , options , callback ) {
121
+ fields ||= { } ;
122
+ options ||= { } ;
123
+ const wantsMetadata = fields . $submit || options . metadata ;
124
+ let client ;
125
+ try {
126
+ client = await this . _pool . connect ( ) ;
127
+ const result = await client . query (
128
+ 'SELECT version, data, doc_type, metadata FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1' ,
126
129
[ collection , id ] ,
127
- function ( err , res ) {
128
- done ( ) ;
129
- if ( err ) {
130
- callback ( err ) ;
131
- return ;
132
- }
133
- if ( res . rows . length ) {
134
- var row = res . rows [ 0 ]
135
- var snapshot = new PostgresSnapshot (
136
- id ,
137
- row . version ,
138
- row . doc_type ,
139
- row . data ,
140
- undefined // TODO: metadata
141
- )
142
- callback ( null , snapshot ) ;
143
- } else {
144
- var snapshot = new PostgresSnapshot (
145
- id ,
146
- 0 ,
147
- null ,
148
- undefined ,
149
- undefined
150
- )
151
- callback ( null , snapshot ) ;
152
- }
153
- }
154
- )
155
- } )
130
+ ) ;
131
+
132
+ var row = result . rows [ 0 ]
133
+ const snapshot = {
134
+ id,
135
+ v : row ?. version || 0 ,
136
+ type : row ?. doc_type || null ,
137
+ data : row ?. data || undefined ,
138
+ m : wantsMetadata ?
139
+ // Postgres returns null but ShareDB expects undefined
140
+ ( row ?. metadata || undefined ) :
141
+ null ,
142
+ } ;
143
+ callback ( null , snapshot ) ;
144
+ } catch ( error ) {
145
+ callback ( error ) ;
146
+ } finally {
147
+ client . release ( true ) ;
148
+ }
156
149
} ;
157
150
158
151
// Get operations between [from, to) noninclusively. (Ie, the range should
@@ -164,37 +157,25 @@ PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, cal
164
157
// The version will be inferred from the parameters if it is missing.
165
158
//
166
159
// Callback should be called as callback(error, [list of ops]);
167
- PostgresDB . prototype . getOps = function ( collection , id , from , to , options , callback ) {
168
- this . pool . connect ( function ( err , client , done ) {
169
- if ( err ) {
170
- done ( client ) ;
171
- callback ( err ) ;
172
- return ;
173
- }
174
-
160
+ PostgresDB . prototype . getOps = async function ( collection , id , from , to , options , callback ) {
161
+ from ||= 0 ;
162
+ options ||= { } ;
163
+ const wantsMetadata = options . metadata ;
164
+ let client ;
165
+ try {
166
+ client = await this . _pool . connect ( ) ;
175
167
var cmd = 'SELECT version, operation FROM ops WHERE collection = $1 AND doc_id = $2 AND version > $3 ' ;
176
168
var params = [ collection , id , from ] ;
177
169
if ( to || to == 0 ) { cmd += ' AND version <= $4' ; params . push ( to ) }
178
170
cmd += ' order by version' ;
179
- client . query ( cmd , params ,
180
- function ( err , res ) {
181
- done ( ) ;
182
- if ( err ) {
183
- callback ( err ) ;
184
- return ;
185
- }
186
- callback ( null , res . rows . map ( function ( row ) {
187
- return row . operation ;
188
- } ) ) ;
189
- }
190
- )
191
- } )
171
+ const result = await client . query ( cmd , params ) ;
172
+ callback ( null , result . rows . map ( ( { operation} ) => {
173
+ if ( ! wantsMetadata ) delete operation . m ;
174
+ return operation ;
175
+ } ) ) ;
176
+ } catch ( error ) {
177
+ callback ( error ) ;
178
+ } finally {
179
+ client . release ( true ) ;
180
+ }
192
181
} ;
193
-
194
- function PostgresSnapshot ( id , version , type , data , meta ) {
195
- this . id = id ;
196
- this . v = version ;
197
- this . type = type ;
198
- this . data = data ;
199
- this . m = meta ;
200
- }
0 commit comments