11var DB = require ( 'sharedb' ) . DB ;
22var pg = require ( 'pg' ) ;
33
4+ const PG_UNIQUE_VIOLATION = '23505' ;
5+
46// Postgres-backed ShareDB database
57
68function PostgresDB ( options ) {
@@ -9,24 +11,32 @@ function PostgresDB(options) {
911
1012 this . closed = false ;
1113
12- this . pg_config = options ;
13- this . pool = new pg . Pool ( options ) ;
14+ this . _pool = new pg . Pool ( options ) ;
1415} ;
1516module . exports = PostgresDB ;
1617
1718PostgresDB . prototype = Object . create ( DB . prototype ) ;
1819
19- PostgresDB . prototype . close = function ( callback ) {
20- this . closed = true ;
21- this . pool . end ( ) ;
22-
23- if ( callback ) callback ( ) ;
20+ PostgresDB . prototype . close = async function ( callback ) {
21+ let error ;
22+ try {
23+ if ( ! this . closed ) {
24+ this . closed = true ;
25+ await this . _pool . end ( ) ;
26+ }
27+ } catch ( err ) {
28+ error = err ;
29+ }
30+
31+ // FIXME: Don't swallow errors. Emit 'error' event?
32+ if ( callback ) callback ( error ) ;
2433} ;
2534
2635
2736// Persists an op and snapshot if it is for the next version. Calls back with
2837// callback(err, succeeded)
29- PostgresDB . prototype . commit = function ( collection , id , op , snapshot , options , callback ) {
38+ PostgresDB . prototype . commit = async function ( collection , id , op , snapshot , options , callback ) {
39+ try {
3040 /*
3141 * op: CreateOp {
3242 * src: '24545654654646',
@@ -37,34 +47,28 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
3747 * }
3848 * snapshot: PostgresSnapshot
3949 */
40- this . pool . connect ( ( err , client , done ) => {
41- if ( err ) {
42- done ( client ) ;
43- callback ( err ) ;
44- return ;
45- }
4650 /*
47- * This query uses common table expression to upsert the snapshot table
51+ * This query uses common table expression to upsert the snapshot table
4852 * (iff the new version is exactly 1 more than the latest table or if
4953 * the document id does not exists)
5054 *
51- * It will then insert into the ops table if it is exactly 1 more than the
55+ * It will then insert into the ops table if it is exactly 1 more than the
5256 * latest table or it the first operation and iff the previous insert into
5357 * the snapshot table is successful.
5458 *
5559 * This result of this query the version of the newly inserted operation
5660 * If either the ops or the snapshot insert fails then 0 rows are returned
5761 *
58- * If 0 zeros are return then the callback must return false
62+ * If 0 zeros are return then the callback must return false
5963 *
6064 * Casting is required as postgres thinks that collection and doc_id are
61- * not varchar
62- */
65+ * not varchar
66+ */
6367 const query = {
6468 name : 'sdb-commit-op-and-snap' ,
6569 text : `WITH snapshot_id AS (
66- INSERT INTO snapshots (collection, doc_id, doc_type, version , data)
67- SELECT $1::varchar collection, $2::varchar doc_id, $4 doc_type , $3 v , $5 d
70+ INSERT INTO snapshots (collection, doc_id, version, doc_type , data, metadata )
71+ SELECT $1::varchar collection, $2::varchar doc_id, $3 v , $4 doc_type , $5 d, $6 m
6872 WHERE $3 = (
6973 SELECT version+1 v
7074 FROM snapshots
@@ -76,11 +80,11 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
7680 WHERE collection = $1 AND doc_id = $2
7781 FOR UPDATE
7882 )
79- ON CONFLICT (collection, doc_id) DO UPDATE SET version = $3, data = $5, doc_type = $4
83+ ON CONFLICT (collection, doc_id) DO UPDATE SET version = $3, data = $5, doc_type = $4, metadata = $5
8084 RETURNING version
8185)
8286INSERT INTO ops (collection, doc_id, version, operation)
83- SELECT $1::varchar collection, $2::varchar doc_id, $3 v, $6 operation
87+ SELECT $1::varchar collection, $2::varchar doc_id, $3 v, $7 operation
8488WHERE (
8589 $3 = (
8690 SELECT max(version)+1
@@ -93,66 +97,47 @@ WHERE (
9397 )
9498) AND EXISTS (SELECT 1 FROM snapshot_id)
9599RETURNING version` ,
96- values : [ collection , id , snapshot . v , snapshot . type , snapshot . data , op ]
100+ values : [ collection , id , snapshot . v , snapshot . type , JSON . stringify ( snapshot . data ) , JSON . stringify ( snapshot . m ) , JSON . stringify ( op ) ]
97101 }
98- client . query ( query , ( err , res ) => {
99- if ( err ) {
100- callback ( err )
101- } else if ( res . rows . length === 0 ) {
102- done ( client ) ;
103- callback ( null , false )
104- }
105- else {
106- done ( client ) ;
107- callback ( null , true )
108- }
109- } )
110-
111- } )
102+ const result = await this . _pool . query ( query ) ;
103+ const success = result . rowCount > 0 ;
104+ callback ( null , success ) ;
105+ } catch ( error ) {
106+ // Return non-success instead of duplicate key error, since this is
107+ // expected to occur during simultaneous creates on the same id
108+ if ( error . code === PG_UNIQUE_VIOLATION ) callback ( null , false ) ;
109+ else callback ( error ) ;
110+ }
112111} ;
113112
114113// Get the named document from the database. The callback is called with (err,
115114// snapshot). A snapshot with a version of zero is returned if the docuemnt
116115// has never been created in the database.
117- PostgresDB . prototype . getSnapshot = function ( collection , id , fields , options , callback ) {
118- this . pool . connect ( function ( err , client , done ) {
119- if ( err ) {
120- done ( client ) ;
121- callback ( err ) ;
122- return ;
123- }
124- client . query (
125- 'SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1' ,
116+ PostgresDB . prototype . getSnapshot = async function ( collection , id , fields , options , callback ) {
117+ fields ||= { } ;
118+ options ||= { } ;
119+ const wantsMetadata = fields . $submit || options . metadata ;
120+ try {
121+ const result = await this . _pool . query (
122+ 'SELECT version, data, doc_type, metadata FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1' ,
126123 [ collection , id ] ,
127- function ( err , res ) {
128- done ( ) ;
129- if ( err ) {
130- callback ( err ) ;
131- return ;
132- }
133- if ( res . rows . length ) {
134- var row = res . rows [ 0 ]
135- var snapshot = new PostgresSnapshot (
136- id ,
137- row . version ,
138- row . doc_type ,
139- row . data ,
140- undefined // TODO: metadata
141- )
142- callback ( null , snapshot ) ;
143- } else {
144- var snapshot = new PostgresSnapshot (
145- id ,
146- 0 ,
147- null ,
148- undefined ,
149- undefined
150- )
151- callback ( null , snapshot ) ;
152- }
153- }
154- )
155- } )
124+ ) ;
125+
126+ var row = result . rows [ 0 ]
127+ const snapshot = {
128+ id,
129+ v : row ?. version || 0 ,
130+ type : row ?. doc_type || null ,
131+ data : row ?. data || undefined ,
132+ m : wantsMetadata ?
133+ // Postgres returns null but ShareDB expects undefined
134+ ( row ?. metadata || undefined ) :
135+ null ,
136+ } ;
137+ callback ( null , snapshot ) ;
138+ } catch ( error ) {
139+ callback ( error ) ;
140+ }
156141} ;
157142
158143// Get operations between [from, to) noninclusively. (Ie, the range should
@@ -164,37 +149,21 @@ PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, cal
164149// The version will be inferred from the parameters if it is missing.
165150//
166151// Callback should be called as callback(error, [list of ops]);
167- PostgresDB . prototype . getOps = function ( collection , id , from , to , options , callback ) {
168- this . pool . connect ( function ( err , client , done ) {
169- if ( err ) {
170- done ( client ) ;
171- callback ( err ) ;
172- return ;
173- }
174-
152+ PostgresDB . prototype . getOps = async function ( collection , id , from , to , options , callback ) {
153+ from ||= 0 ;
154+ options ||= { } ;
155+ const wantsMetadata = options . metadata ;
156+ try {
175157 var cmd = 'SELECT version, operation FROM ops WHERE collection = $1 AND doc_id = $2 AND version > $3 ' ;
176158 var params = [ collection , id , from ] ;
177159 if ( to || to == 0 ) { cmd += ' AND version <= $4' ; params . push ( to ) }
178160 cmd += ' order by version' ;
179- client . query ( cmd , params ,
180- function ( err , res ) {
181- done ( ) ;
182- if ( err ) {
183- callback ( err ) ;
184- return ;
185- }
186- callback ( null , res . rows . map ( function ( row ) {
187- return row . operation ;
188- } ) ) ;
189- }
190- )
191- } )
161+ const result = await this . _pool . query ( cmd , params ) ;
162+ callback ( null , result . rows . map ( ( { operation} ) => {
163+ if ( ! wantsMetadata ) delete operation . m ;
164+ return operation ;
165+ } ) ) ;
166+ } catch ( error ) {
167+ callback ( error ) ;
168+ }
192169} ;
193-
194- function PostgresSnapshot ( id , version , type , data , meta ) {
195- this . id = id ;
196- this . v = version ;
197- this . type = type ;
198- this . data = data ;
199- this . m = meta ;
200- }
0 commit comments