@@ -70,7 +70,7 @@ Queue.prototype.add = function(payload, opts, callback) {
70
70
opts = { }
71
71
}
72
72
var delay = opts . delay || self . delay
73
- var visible = delay ? nowPlusSecs ( delay ) : now ( )
73
+ var visible = delay ? ( delay instanceof Date ? delay . toISOString ( ) : nowPlusSecs ( delay ) ) : now ( )
74
74
75
75
var msgs = [ ]
76
76
if ( payload instanceof Array ) {
@@ -92,9 +92,9 @@ Queue.prototype.add = function(payload, opts, callback) {
92
92
}
93
93
94
94
self . col . insertMany ( msgs , function ( err , results ) {
95
- if ( err ) return callback ( err )
96
- if ( payload instanceof Array ) return callback ( null , '' + results . insertedIds )
97
- callback ( null , '' + results . ops [ 0 ] . _id )
95
+ if ( err ) return callback ( err ) ;
96
+ if ( payload instanceof Array ) return callback ( null , '' + results . insertedIds ) ;
97
+ callback ( null , '' + results . insertedIds [ "0" ] ) ;
98
98
} )
99
99
}
100
100
@@ -121,8 +121,11 @@ Queue.prototype.get = function(opts, callback) {
121
121
}
122
122
}
123
123
124
- self . col . findOneAndUpdate ( query , update , { sort : sort , returnOriginal : false } , function ( err , result ) {
125
- if ( err ) return callback ( err )
124
+ self . col . findOneAndUpdate ( query , update , { sort : sort , returnDocument : 'after' } , function ( err , result ) {
125
+ if ( err ) {
126
+ return callback ( err ) ;
127
+ }
128
+
126
129
var msg = result . value
127
130
if ( ! msg ) return callback ( )
128
131
@@ -152,7 +155,6 @@ Queue.prototype.get = function(opts, callback) {
152
155
return
153
156
}
154
157
}
155
-
156
158
callback ( null , msg )
157
159
} )
158
160
}
@@ -175,7 +177,7 @@ Queue.prototype.ping = function(ack, opts, callback) {
175
177
visible : nowPlusSecs ( visibility )
176
178
}
177
179
}
178
- self . col . findOneAndUpdate ( query , update , { returnOriginal : false } , function ( err , msg , blah ) {
180
+ self . col . findOneAndUpdate ( query , update , { returnDocument : 'after' } , function ( err , msg , blah ) {
179
181
if ( err ) return callback ( err )
180
182
if ( ! msg . value ) {
181
183
return callback ( new Error ( "Queue.ping(): Unidentified ack : " + ack ) )
@@ -197,7 +199,8 @@ Queue.prototype.ack = function(ack, callback) {
197
199
deleted : now ( ) ,
198
200
}
199
201
}
200
- self . col . findOneAndUpdate ( query , update , { returnOriginal : false } , function ( err , msg , blah ) {
202
+
203
+ self . col . findOneAndUpdate ( query , update , { returnDocument : 'after' } , function ( err , msg , blah ) {
201
204
if ( err ) return callback ( err )
202
205
if ( ! msg . value ) {
203
206
return callback ( new Error ( "Queue.ack(): Unidentified ack : " + ack ) )
0 commit comments