11
11
import com .clickhouse .client .data .ClickHouseRowBinaryProcessor ;
12
12
import com .clickhouse .client .data .ClickHouseTabSeparatedProcessor ;
13
13
import com .clickhouse .client .stream .BlockingPipedOutputStream ;
14
+ import com .clickhouse .client .stream .CapacityPolicy ;
14
15
import com .clickhouse .client .stream .NonBlockingPipedOutputStream ;
15
16
16
17
/**
@@ -44,7 +45,7 @@ public static ClickHouseDataStreamFactory getInstance() {
44
45
* {@code input} is null
45
46
* @param settings nullable settings
46
47
* @param columns nullable list of columns
47
- * @return data processor
48
+ * @return data processor, which might be null
48
49
* @throws IOException when failed to read columns from input stream
49
50
*/
50
51
public ClickHouseDataProcessor getProcessor (ClickHouseConfig config , ClickHouseInputStream input ,
@@ -131,7 +132,7 @@ public ClickHousePipedStream createPipedStream(ClickHouseConfig config) {
131
132
return config != null
132
133
? new ClickHousePipedStream (config .getWriteBufferSize (), config .getMaxQueuedBuffers (),
133
134
config .getSocketTimeout ())
134
- : new ClickHousePipedStream ((int ) ClickHouseClientOption .WRITE_BUFFER_SIZE .getDefaultValue (),
135
+ : new ClickHousePipedStream ((int ) ClickHouseClientOption .BUFFER_SIZE .getDefaultValue (),
135
136
(int ) ClickHouseClientOption .MAX_QUEUED_BUFFERS .getDefaultValue (),
136
137
(int ) ClickHouseClientOption .SOCKET_TIMEOUT .getDefaultValue ());
137
138
}
@@ -146,27 +147,31 @@ public ClickHousePipedOutputStream createPipedOutputStream(ClickHouseConfig conf
146
147
final int bufferSize = ClickHouseChecker .nonNull (config , "config" ).getWriteBufferSize ();
147
148
final boolean blocking ;
148
149
final int queue ;
150
+ final CapacityPolicy policy ;
149
151
final int timeout ;
150
152
151
153
if (config .getResponseBuffering () == ClickHouseBufferingMode .PERFORMANCE ) {
152
154
blocking = false ;
153
155
queue = 0 ;
156
+ policy = null ;
154
157
timeout = 0 ; // questionable
155
158
} else {
156
159
blocking = config .isUseBlockingQueue ();
157
160
queue = config .getMaxQueuedBuffers ();
161
+ policy = config .getBufferQueueVariation () < 1 ? CapacityPolicy .fixedCapacity (queue )
162
+ : CapacityPolicy .linearDynamicCapacity (1 , queue , config .getBufferQueueVariation ());
158
163
timeout = config .getSocketTimeout ();
159
164
}
160
165
return blocking
161
166
? new BlockingPipedOutputStream (bufferSize , queue , timeout , postCloseAction )
162
- : new NonBlockingPipedOutputStream (bufferSize , queue , timeout , null , postCloseAction );
167
+ : new NonBlockingPipedOutputStream (bufferSize , queue , timeout , policy , postCloseAction );
163
168
}
164
169
165
- public ClickHousePipedOutputStream createPipedOutputStream (int writeBufferSize , int queueSize , int timeout ,
170
+ public ClickHousePipedOutputStream createPipedOutputStream (int bufferSize , int queueSize , int timeout ,
166
171
Runnable postCloseAction ) {
167
172
return new BlockingPipedOutputStream (
168
- ClickHouseUtils .getBufferSize (writeBufferSize ,
169
- (int ) ClickHouseClientOption .WRITE_BUFFER_SIZE .getDefaultValue (),
173
+ ClickHouseUtils .getBufferSize (bufferSize ,
174
+ (int ) ClickHouseClientOption .BUFFER_SIZE .getDefaultValue (),
170
175
(int ) ClickHouseClientOption .MAX_BUFFER_SIZE .getDefaultValue ()),
171
176
queueSize , timeout , postCloseAction );
172
177
}
0 commit comments