18
18
19
19
package com .dtstack .flink .sql .sink .kudu ;
20
20
21
+ import com .dtstack .flink .sql .factory .DTThreadFactory ;
21
22
import com .dtstack .flink .sql .outputformat .AbstractDtRichOutputFormat ;
23
+ import com .dtstack .flink .sql .sink .kudu .table .KuduTableInfo ;
22
24
import com .dtstack .flink .sql .util .KrbUtils ;
23
25
import org .apache .flink .api .common .typeinfo .TypeInformation ;
24
26
import org .apache .flink .api .java .tuple .Tuple2 ;
30
32
import org .apache .kudu .client .KuduSession ;
31
33
import org .apache .kudu .client .KuduTable ;
32
34
import org .apache .kudu .client .Operation ;
35
+ import org .apache .kudu .client .OperationResponse ;
33
36
import org .apache .kudu .client .PartialRow ;
37
+ import org .apache .kudu .client .RowError ;
38
+ import org .apache .kudu .client .SessionConfiguration ;
34
39
import org .slf4j .Logger ;
35
40
import org .slf4j .LoggerFactory ;
36
41
40
45
import java .sql .Timestamp ;
41
46
import java .util .Date ;
42
47
import java .util .Objects ;
48
+ import java .util .concurrent .ScheduledExecutorService ;
49
+ import java .util .concurrent .ScheduledFuture ;
50
+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
51
+ import java .util .concurrent .TimeUnit ;
52
+ import java .util .concurrent .atomic .AtomicInteger ;
43
53
44
54
/**
45
55
* @author gituser
46
56
* @modify xiuzhu
47
57
*/
48
- public class KuduOutputFormat extends AbstractDtRichOutputFormat <Tuple2 > {
58
+ public class KuduOutputFormat extends AbstractDtRichOutputFormat <Tuple2 < Boolean , Row > > {
49
59
50
60
private static final long serialVersionUID = 1L ;
51
61
@@ -66,15 +76,31 @@ public class KuduOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
66
76
67
77
private Integer defaultOperationTimeoutMs ;
68
78
69
- private Integer defaultSocketReadTimeoutMs ;
70
-
71
79
/**
72
80
* kerberos
73
81
*/
74
82
private String principal ;
75
83
private String keytab ;
76
84
private String krb5conf ;
77
85
86
+ /**
87
+ * batch size
88
+ */
89
+ private Integer batchSize ;
90
+ private Integer batchWaitInterval ;
91
+ /**
92
+ * kudu session flush mode
93
+ */
94
+ private String flushMode ;
95
+
96
+ private transient AtomicInteger rowCount ;
97
+
98
+ /**
99
+ * 定时任务
100
+ */
101
+ private transient ScheduledExecutorService scheduler ;
102
+ private transient ScheduledFuture <?> scheduledFuture ;
103
+
78
104
private KuduOutputFormat () {
79
105
}
80
106
@@ -91,16 +117,35 @@ public void configure(Configuration parameters) {
91
117
public void open (int taskNumber , int numTasks ) throws IOException {
92
118
establishConnection ();
93
119
initMetric ();
120
+ initSchedulerTask ();
121
+ rowCount = new AtomicInteger (0 );
122
+ }
123
+
124
+ /**
125
+ * init the scheduler task of {@link KuduOutputFormat#flush()}
126
+ */
127
+ private void initSchedulerTask () {
128
+ try {
129
+ if (batchWaitInterval > 0 ) {
130
+ this .scheduler = new ScheduledThreadPoolExecutor (
131
+ 1 ,
132
+ new DTThreadFactory ("kudu-batch-flusher" )
133
+ );
134
+
135
+ this .scheduledFuture = this .scheduler .scheduleWithFixedDelay (
136
+ this ::flush , batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS );
137
+ }
138
+ } catch (Exception e ) {
139
+ LOG .error ("init schedule task failed !" );
140
+ throw new RuntimeException (e );
141
+ }
94
142
}
95
143
96
144
private void establishConnection () throws IOException {
97
145
KuduClient .KuduClientBuilder kuduClientBuilder = new KuduClient .KuduClientBuilder (kuduMasters );
98
146
if (null != workerCount ) {
99
147
kuduClientBuilder .workerCount (workerCount );
100
148
}
101
- if (null != defaultSocketReadTimeoutMs ) {
102
- kuduClientBuilder .defaultSocketReadTimeoutMs (defaultSocketReadTimeoutMs );
103
- }
104
149
105
150
if (null != defaultOperationTimeoutMs ) {
106
151
kuduClientBuilder .defaultOperationTimeoutMs (defaultOperationTimeoutMs );
@@ -127,38 +172,113 @@ private void establishConnection() throws IOException {
127
172
}
128
173
LOG .info ("connect kudu is succeed!" );
129
174
130
- session = client .newSession ();
175
+ session = buildSessionWithFlushMode (flushMode , client );
176
+ }
177
+
178
+ /**
179
+ * According to the different flush mode, build different session. Detail see {@link SessionConfiguration.FlushMode}
180
+ *
181
+ * @param flushMode flush mode
182
+ * @param kuduClient kudu client
183
+ * @return KuduSession with flush mode
184
+ */
185
+ private KuduSession buildSessionWithFlushMode (String flushMode , KuduClient kuduClient ) {
186
+ KuduSession kuduSession = kuduClient .newSession ();
187
+ if (flushMode .equalsIgnoreCase (KuduTableInfo .KuduFlushMode .MANUAL_FLUSH .name ())) {
188
+ kuduSession .setFlushMode (SessionConfiguration .FlushMode .MANUAL_FLUSH );
189
+ kuduSession .setMutationBufferSpace (
190
+ Integer .parseInt (String .valueOf (Math .round (batchSize * 1.2 )))
191
+ );
192
+ }
193
+
194
+ if (flushMode .equalsIgnoreCase (KuduTableInfo .KuduFlushMode .AUTO_FLUSH_SYNC .name ())) {
195
+ LOG .warn ("Parameter [batchSize] will not take effect at AUTO_FLUSH_SYNC mode." );
196
+ kuduSession .setFlushMode (SessionConfiguration .FlushMode .AUTO_FLUSH_SYNC );
197
+ }
198
+
199
+ if (flushMode .equalsIgnoreCase (KuduTableInfo .KuduFlushMode .AUTO_FLUSH_BACKGROUND .name ())) {
200
+ LOG .warn ("Unable to determine the order of data at AUTO_FLUSH_BACKGROUND mode." );
201
+ kuduSession .setFlushMode (SessionConfiguration .FlushMode .AUTO_FLUSH_BACKGROUND );
202
+ }
203
+
204
+ return kuduSession ;
131
205
}
132
206
133
207
@ Override
134
- public void writeRecord (Tuple2 record ) throws IOException {
135
- Tuple2 <Boolean , Row > tupleTrans = record ;
136
- Boolean retract = tupleTrans .getField (0 );
208
+ public void writeRecord (Tuple2 <Boolean , Row > record ) throws IOException {
209
+ Boolean retract = record .getField (0 );
137
210
if (!retract ) {
138
211
return ;
139
212
}
140
- Row row = tupleTrans .getField (1 );
141
- if (row .getArity () != fieldNames .length ) {
142
- if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
143
- LOG .error ("record insert failed ..{}" , row .toString ());
144
- LOG .error ("cause by row.getArity() != fieldNames.length" );
145
- }
146
- outDirtyRecords .inc ();
147
- return ;
148
- }
213
+ Row row = record .getField (1 );
149
214
150
215
try {
151
216
if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
152
217
LOG .info ("Receive data : {}" , row );
153
218
}
219
+ if (rowCount .getAndIncrement () >= batchSize ) {
220
+ flush ();
221
+ }
222
+ // At AUTO_FLUSH_SYNC mode, kudu automatically flush once session apply operation, then get the response from kudu server.
223
+ if (flushMode .equalsIgnoreCase (SessionConfiguration .FlushMode .AUTO_FLUSH_SYNC .name ())) {
224
+ dealResponse (session .apply (toOperation (writeMode , row )));
225
+ }
226
+
154
227
session .apply (toOperation (writeMode , row ));
155
228
outRecords .inc ();
156
229
} catch (KuduException e ) {
230
+ throw new RuntimeException (e );
231
+ }
232
+ }
233
+
234
+ /**
235
+ * Flush data with session, then deal the responses of operations and reset rowCount.
236
+ * Detail of flush see {@link KuduSession#flush()}
237
+ */
238
+ private synchronized void flush () {
239
+ try {
240
+ if (session .isClosed ()) {
241
+ throw new IllegalStateException ("Session is closed! Flush data error!" );
242
+ }
243
+
244
+ // At AUTO_FLUSH_SYNC mode, kudu automatically flush once session apply operation
245
+ if (flushMode .equalsIgnoreCase (SessionConfiguration .FlushMode .AUTO_FLUSH_SYNC .name ())) {
246
+ return ;
247
+ }
248
+ session .flush ().forEach (this ::dealResponse );
249
+ // clear
250
+ rowCount .set (0 );
251
+ } catch (KuduException kuduException ) {
252
+ LOG .error ("flush data error!" , kuduException );
253
+ throw new RuntimeException (kuduException );
254
+ }
255
+ }
256
+
257
+ /**
258
+ * Deal response when operation apply.
259
+ * At MANUAL_FLUSH mode, response returns after {@link KuduSession#flush()}.
260
+ * But at AUTO_FLUSH_SYNC mode, response returns after {@link KuduSession#apply(Operation)}
261
+ *
262
+ * @param response {@link OperationResponse} response after operation done.
263
+ */
264
+ private void dealResponse (OperationResponse response ) {
265
+ if (response .hasRowError ()) {
266
+ RowError error = response .getRowError ();
267
+ String errorMsg = error .getErrorStatus ().toString ();
157
268
if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
158
- LOG .error ("record insert failed, total dirty record:{} current row:{}" , outDirtyRecords .getCount (), row .toString ());
159
- LOG .error ("" , e );
269
+ LOG .error (errorMsg );
270
+ LOG .error (String .format ("Dirty data count: [%s]. Row data: [%s]" ,
271
+ outDirtyRecords .getCount () + 1 , error .getOperation ().getRow ().toString ()));
160
272
}
161
273
outDirtyRecords .inc ();
274
+
275
+ if (error .getErrorStatus ().isNotFound ()
276
+ || error .getErrorStatus ().isIOError ()
277
+ || error .getErrorStatus ().isRuntimeError ()
278
+ || error .getErrorStatus ().isServiceUnavailable ()
279
+ || error .getErrorStatus ().isIllegalState ()) {
280
+ throw new RuntimeException (errorMsg );
281
+ }
162
282
}
163
283
}
164
284
@@ -179,6 +299,14 @@ public void close() {
179
299
throw new IllegalArgumentException ("[closeKuduClient]:" + e .getMessage ());
180
300
}
181
301
}
302
+
303
+ if (scheduledFuture != null ) {
304
+ scheduledFuture .cancel (false );
305
+ }
306
+
307
+ if (scheduler != null ) {
308
+ scheduler .shutdownNow ();
309
+ }
182
310
}
183
311
184
312
private Operation toOperation (WriteMode writeMode , Row row ) {
@@ -320,11 +448,6 @@ public KuduOutputFormatBuilder setDefaultOperationTimeoutMs(Integer defaultOpera
320
448
return this ;
321
449
}
322
450
323
- public KuduOutputFormatBuilder setDefaultSocketReadTimeoutMs (Integer defaultSocketReadTimeoutMs ) {
324
- kuduOutputFormat .defaultSocketReadTimeoutMs = defaultSocketReadTimeoutMs ;
325
- return this ;
326
- }
327
-
328
451
public KuduOutputFormatBuilder setPrincipal (String principal ) {
329
452
kuduOutputFormat .principal = principal ;
330
453
return this ;
@@ -345,6 +468,21 @@ public KuduOutputFormatBuilder setEnableKrb(boolean enableKrb) {
345
468
return this ;
346
469
}
347
470
471
+ public KuduOutputFormatBuilder setBatchSize (Integer batchSize ) {
472
+ kuduOutputFormat .batchSize = batchSize ;
473
+ return this ;
474
+ }
475
+
476
+ public KuduOutputFormatBuilder setBatchWaitInterval (Integer batchWaitInterval ) {
477
+ kuduOutputFormat .batchWaitInterval = batchWaitInterval ;
478
+ return this ;
479
+ }
480
+
481
+ public KuduOutputFormatBuilder setFlushMode (String flushMode ) {
482
+ kuduOutputFormat .flushMode = flushMode ;
483
+ return this ;
484
+ }
485
+
348
486
public KuduOutputFormat finish () {
349
487
if (kuduOutputFormat .kuduMasters == null ) {
350
488
throw new IllegalArgumentException ("No kuduMasters supplied." );
0 commit comments