33
33
import com .google .cloud .bigquery .spi .BigQueryRpcFactory ;
34
34
import com .google .cloud .bigquery .spi .v2 .HttpBigQueryRpc ;
35
35
import java .io .IOException ;
36
+ import java .net .ConnectException ;
36
37
import java .net .SocketException ;
38
+ import java .net .UnknownHostException ;
37
39
import java .nio .ByteBuffer ;
38
40
import java .util .Arrays ;
39
41
import java .util .Random ;
@@ -114,18 +116,19 @@ public void testCreate() throws IOException {
114
116
}
115
117
116
118
@ Test
117
- public void testCreateRetryableError () throws IOException {
118
- BigQueryException exception = new BigQueryException (new SocketException ("Socket closed" ));
119
+ public void testCreateRetryableErrors () throws IOException {
119
120
when (bigqueryRpcMock .openSkipExceptionTranslation (
120
121
new com .google .api .services .bigquery .model .Job ()
121
122
.setJobReference (JOB_INFO .getJobId ().toPb ())
122
123
.setConfiguration (LOAD_CONFIGURATION .toPb ())))
123
- .thenThrow (exception )
124
+ .thenThrow (new SocketException ("Socket closed" ))
125
+ .thenThrow (new UnknownHostException ())
126
+ .thenThrow (new ConnectException ())
124
127
.thenReturn (UPLOAD_ID );
125
128
writer = new TableDataWriteChannel (options , JOB_INFO .getJobId (), LOAD_CONFIGURATION );
126
129
assertTrue (writer .isOpen ());
127
130
assertNull (writer .getJob ());
128
- verify (bigqueryRpcMock , times (2 ))
131
+ verify (bigqueryRpcMock , times (4 ))
129
132
.openSkipExceptionTranslation (
130
133
new com .google .api .services .bigquery .model .Job ()
131
134
.setJobReference (JOB_INFO .getJobId ().toPb ())
@@ -134,12 +137,11 @@ public void testCreateRetryableError() throws IOException {
134
137
135
138
@ Test
136
139
public void testCreateNonRetryableError () throws IOException {
137
- RuntimeException ex = new RuntimeException ("expected" );
138
140
when (bigqueryRpcMock .openSkipExceptionTranslation (
139
141
new com .google .api .services .bigquery .model .Job ()
140
142
.setJobReference (JOB_INFO .getJobId ().toPb ())
141
143
.setConfiguration (LOAD_CONFIGURATION .toPb ())))
142
- .thenThrow (ex );
144
+ .thenThrow (new RuntimeException ( "expected" ) );
143
145
try (TableDataWriteChannel channel =
144
146
new TableDataWriteChannel (options , JOB_INFO .getJobId (), LOAD_CONFIGURATION )) {
145
147
Assert .fail ();
@@ -207,7 +209,7 @@ public void testWriteWithFlush() throws IOException {
207
209
}
208
210
209
211
@ Test
210
- public void testWritesAndFlush () throws IOException {
212
+ public void testWritesAndFlushRetryableErrors () throws IOException {
211
213
when (bigqueryRpcMock .openSkipExceptionTranslation (
212
214
new com .google .api .services .bigquery .model .Job ()
213
215
.setJobReference (JOB_INFO .getJobId ().toPb ())
@@ -220,6 +222,9 @@ public void testWritesAndFlush() throws IOException {
220
222
eq (0L ),
221
223
eq (DEFAULT_CHUNK_SIZE ),
222
224
eq (false )))
225
+ .thenThrow (new SocketException ("Socket closed" ))
226
+ .thenThrow (new UnknownHostException ())
227
+ .thenThrow (new ConnectException ())
223
228
.thenReturn (null );
224
229
writer = new TableDataWriteChannel (options , JOB_INFO .getJobId (), LOAD_CONFIGURATION );
225
230
ByteBuffer [] buffers = new ByteBuffer [DEFAULT_CHUNK_SIZE / MIN_CHUNK_SIZE ];
@@ -239,7 +244,48 @@ public void testWritesAndFlush() throws IOException {
239
244
new com .google .api .services .bigquery .model .Job ()
240
245
.setJobReference (JOB_INFO .getJobId ().toPb ())
241
246
.setConfiguration (LOAD_CONFIGURATION .toPb ()));
247
+ verify (bigqueryRpcMock , times (4 ))
248
+ .writeSkipExceptionTranslation (
249
+ eq (UPLOAD_ID ),
250
+ capturedBuffer .capture (),
251
+ eq (0 ),
252
+ eq (0L ),
253
+ eq (DEFAULT_CHUNK_SIZE ),
254
+ eq (false ));
255
+ }
256
+
257
+ @ Test
258
+ public void testWritesAndFlushNonRetryableError () throws IOException {
259
+ when (bigqueryRpcMock .openSkipExceptionTranslation (
260
+ new com .google .api .services .bigquery .model .Job ()
261
+ .setJobReference (JOB_INFO .getJobId ().toPb ())
262
+ .setConfiguration (LOAD_CONFIGURATION .toPb ())))
263
+ .thenReturn (UPLOAD_ID );
264
+ when (bigqueryRpcMock .writeSkipExceptionTranslation (
265
+ eq (UPLOAD_ID ),
266
+ capturedBuffer .capture (),
267
+ eq (0 ),
268
+ eq (0L ),
269
+ eq (DEFAULT_CHUNK_SIZE ),
270
+ eq (false )))
271
+ .thenThrow (new RuntimeException ("expected" ));
272
+ try {
273
+ writer = new TableDataWriteChannel (options , JOB_INFO .getJobId (), LOAD_CONFIGURATION );
274
+ ByteBuffer [] buffers = new ByteBuffer [DEFAULT_CHUNK_SIZE / MIN_CHUNK_SIZE ];
275
+ for (int i = 0 ; i < buffers .length ; i ++) {
276
+ buffers [i ] = randomBuffer (MIN_CHUNK_SIZE );
277
+ assertEquals (MIN_CHUNK_SIZE , writer .write (buffers [i ]));
278
+ }
279
+ Assert .fail ();
280
+ } catch (RuntimeException expected ) {
281
+ Assert .assertEquals ("java.lang.RuntimeException: expected" , expected .getMessage ());
282
+ }
242
283
verify (bigqueryRpcMock )
284
+ .openSkipExceptionTranslation (
285
+ new com .google .api .services .bigquery .model .Job ()
286
+ .setJobReference (JOB_INFO .getJobId ().toPb ())
287
+ .setConfiguration (LOAD_CONFIGURATION .toPb ()));
288
+ verify (bigqueryRpcMock , times (1 ))
243
289
.writeSkipExceptionTranslation (
244
290
eq (UPLOAD_ID ),
245
291
capturedBuffer .capture (),
0 commit comments