32
32
import org .apache .kudu .client .KuduSession ;
33
33
import org .apache .kudu .client .KuduTable ;
34
34
import org .apache .kudu .client .Operation ;
35
+ import org .apache .kudu .client .OperationResponse ;
35
36
import org .apache .kudu .client .PartialRow ;
37
+ import org .apache .kudu .client .RowError ;
36
38
import org .apache .kudu .client .SessionConfiguration ;
37
39
import org .slf4j .Logger ;
38
40
import org .slf4j .LoggerFactory ;
@@ -74,8 +76,6 @@ public class KuduOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolean,
74
76
75
77
private Integer defaultOperationTimeoutMs ;
76
78
77
- private Integer defaultSocketReadTimeoutMs ;
78
-
79
79
/**
80
80
* kerberos
81
81
*/
@@ -133,12 +133,7 @@ private void initSchedulerTask() {
133
133
);
134
134
135
135
this .scheduledFuture = this .scheduler .scheduleWithFixedDelay (
136
- () -> {
137
- synchronized (KuduOutputFormat .this ) {
138
- flush ();
139
- }
140
- }, batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS
141
- );
136
+ this ::flush , batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS );
142
137
}
143
138
} catch (Exception e ) {
144
139
LOG .error ("init schedule task failed !" );
@@ -151,9 +146,6 @@ private void establishConnection() throws IOException {
151
146
if (null != workerCount ) {
152
147
kuduClientBuilder .workerCount (workerCount );
153
148
}
154
- if (null != defaultSocketReadTimeoutMs ) {
155
- kuduClientBuilder .defaultSocketReadTimeoutMs (defaultSocketReadTimeoutMs );
156
- }
157
149
158
150
if (null != defaultOperationTimeoutMs ) {
159
151
kuduClientBuilder .defaultOperationTimeoutMs (defaultOperationTimeoutMs );
@@ -184,14 +176,13 @@ private void establishConnection() throws IOException {
184
176
}
185
177
186
178
/**
187
- * According to the different flush mode, construct different session. Detail see {@link SessionConfiguration.FlushMode}
179
+ * According to the different flush mode, build different session. Detail see {@link SessionConfiguration.FlushMode}
188
180
*
189
181
* @param flushMode flush mode
190
182
* @param kuduClient kudu client
191
183
* @return KuduSession with flush mode
192
- * @throws KuduException kudu exception when session flush
193
184
*/
194
- private KuduSession buildSessionWithFlushMode (String flushMode , KuduClient kuduClient ) throws KuduException {
185
+ private KuduSession buildSessionWithFlushMode (String flushMode , KuduClient kuduClient ) {
195
186
KuduSession kuduSession = kuduClient .newSession ();
196
187
if (flushMode .equalsIgnoreCase (KuduTableInfo .KuduFlushMode .MANUAL_FLUSH .name ())) {
197
188
kuduSession .setFlushMode (SessionConfiguration .FlushMode .MANUAL_FLUSH );
@@ -220,14 +211,6 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
220
211
return ;
221
212
}
222
213
Row row = record .getField (1 );
223
- if (row .getArity () != fieldNames .length ) {
224
- if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
225
- LOG .error ("record insert failed ..{}" , row .toString ());
226
- LOG .error ("cause by row.getArity() != fieldNames.length" );
227
- }
228
- outDirtyRecords .inc ();
229
- return ;
230
- }
231
214
232
215
try {
233
216
if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
@@ -236,33 +219,69 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
236
219
if (rowCount .getAndIncrement () >= batchSize ) {
237
220
flush ();
238
221
}
222
+ // At AUTO_FLUSH_SYNC mode, kudu automatically flush once session apply operation, then get the response from kudu server.
223
+ if (flushMode .equalsIgnoreCase (SessionConfiguration .FlushMode .AUTO_FLUSH_SYNC .name ())) {
224
+ dealResponse (session .apply (toOperation (writeMode , row )));
225
+ }
226
+
239
227
session .apply (toOperation (writeMode , row ));
240
228
outRecords .inc ();
241
229
} catch (KuduException e ) {
242
- if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
243
- LOG .error ("record insert failed, total dirty record:{} current row:{}" , outDirtyRecords .getCount (), row .toString ());
244
- LOG .error ("" , e );
245
- }
246
- outDirtyRecords .inc ();
230
+ throw new RuntimeException (e );
247
231
}
248
232
}
249
233
234
+ /**
235
+ * Flush data with session, then deal the responses of operations and reset rowCount.
236
+ * Detail of flush see {@link KuduSession#flush()}
237
+ */
250
238
private synchronized void flush () {
251
239
try {
252
240
if (session .isClosed ()) {
253
- throw new IllegalStateException ("session is closed! flush data error!" );
241
+ throw new IllegalStateException ("Session is closed! Flush data error!" );
254
242
}
255
243
256
- session .flush ();
244
+ // At AUTO_FLUSH_SYNC mode, kudu automatically flush once session apply operation
245
+ if (flushMode .equalsIgnoreCase (SessionConfiguration .FlushMode .AUTO_FLUSH_SYNC .name ())) {
246
+ return ;
247
+ }
248
+ session .flush ().forEach (this ::dealResponse );
257
249
// clear
258
250
rowCount .set (0 );
259
251
} catch (KuduException kuduException ) {
260
- LOG .error (
261
- "flush data error!" , kuduException );
252
+ LOG .error ("flush data error!" , kuduException );
262
253
throw new RuntimeException (kuduException );
263
254
}
264
255
}
265
256
257
+ /**
258
+ * Deal response when operation apply.
259
+ * At MANUAL_FLUSH mode, response returns after {@link KuduSession#flush()}.
260
+ * But at AUTO_FLUSH_SYNC mode, response returns after {@link KuduSession#apply(Operation)}
261
+ *
262
+ * @param response {@link OperationResponse} response after operation done.
263
+ */
264
+ private void dealResponse (OperationResponse response ) {
265
+ if (response .hasRowError ()) {
266
+ RowError error = response .getRowError ();
267
+ String errorMsg = error .getErrorStatus ().toString ();
268
+ if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
269
+ LOG .error (errorMsg );
270
+ LOG .error (String .format ("Dirty data count: [%s]. Row data: [%s]" ,
271
+ outDirtyRecords .getCount () + 1 , error .getOperation ().getRow ().toString ()));
272
+ }
273
+ outDirtyRecords .inc ();
274
+
275
+ if (error .getErrorStatus ().isNotFound ()
276
+ || error .getErrorStatus ().isIOError ()
277
+ || error .getErrorStatus ().isRuntimeError ()
278
+ || error .getErrorStatus ().isServiceUnavailable ()
279
+ || error .getErrorStatus ().isIllegalState ()) {
280
+ throw new RuntimeException (errorMsg );
281
+ }
282
+ }
283
+ }
284
+
266
285
@ Override
267
286
public void close () {
268
287
if (Objects .nonNull (session ) && !session .isClosed ()) {
@@ -429,11 +448,6 @@ public KuduOutputFormatBuilder setDefaultOperationTimeoutMs(Integer defaultOpera
429
448
return this ;
430
449
}
431
450
432
- public KuduOutputFormatBuilder setDefaultSocketReadTimeoutMs (Integer defaultSocketReadTimeoutMs ) {
433
- kuduOutputFormat .defaultSocketReadTimeoutMs = defaultSocketReadTimeoutMs ;
434
- return this ;
435
- }
436
-
437
451
public KuduOutputFormatBuilder setPrincipal (String principal ) {
438
452
kuduOutputFormat .principal = principal ;
439
453
return this ;
0 commit comments