@@ -38,9 +38,13 @@ function RedisClient(stream, options) {
38
38
39
39
this . pipeline = 0 ;
40
40
if ( ! stream . cork ) {
41
- stream . cork = function noop ( ) {
42
- self . pipeline_queue = new Queue ( ) ;
43
- } ;
41
+ this . cork = function noop ( len ) { } ;
42
+ this . once ( 'ready' , function ( ) {
43
+ self . cork = function ( len ) {
44
+ self . pipeline = len ;
45
+ self . pipeline_queue = new Queue ( len ) ;
46
+ } ;
47
+ } ) ;
44
48
stream . uncork = function noop ( ) { } ;
45
49
this . write = this . writeStream ;
46
50
}
@@ -128,6 +132,10 @@ RedisClient.prototype.install_stream_listeners = function() {
128
132
} ) ;
129
133
} ;
130
134
135
+ RedisClient . prototype . cork = function ( len ) {
136
+ this . stream . cork ( ) ;
137
+ } ;
138
+
131
139
RedisClient . prototype . initialize_retry_vars = function ( ) {
132
140
this . retry_timer = null ;
133
141
this . retry_totaltime = 0 ;
@@ -1074,8 +1082,7 @@ Multi.prototype.exec_transaction = function (callback) {
1074
1082
var cb ;
1075
1083
this . errors = [ ] ;
1076
1084
this . callback = callback ;
1077
- this . _client . stream . cork ( ) ;
1078
- this . _client . pipeline = len + 2 ;
1085
+ this . _client . cork ( len + 2 ) ;
1079
1086
this . wants_buffers = new Array ( len ) ;
1080
1087
this . send_command ( 'multi' , [ ] ) ;
1081
1088
// drain queue, callback will catch 'QUEUED' or error
@@ -1192,8 +1199,7 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct
1192
1199
return true ;
1193
1200
}
1194
1201
this . results = new Array ( len ) ;
1195
- this . _client . stream . cork ( ) ;
1196
- this . _client . pipeline = len ;
1202
+ this . _client . cork ( len ) ;
1197
1203
var lastCallback = function ( cb ) {
1198
1204
return function ( err , res ) {
1199
1205
cb ( err , res ) ;
0 commit comments