File tree 4 files changed +9
-10
lines changed
core/src/main/java/com/dtstack/flink/sql/dirtyManager/consumer
console/src/main/java/com/dtstack/flink/sql/dirty/console
mysql/src/main/java/com/dtstack/flink/sql/dirty/mysql
hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase
4 files changed +9
-10
lines changed Original file line number Diff line number Diff line change @@ -83,8 +83,7 @@ public void run() {
83
83
}
84
84
} catch (Exception e ) {
85
85
LOG .error ("consume dirtyData error" , e );
86
- errorCount .incrementAndGet ();
87
- if (errorCount .get () > errorLimit ) {
86
+ if (errorCount .getAndIncrement () > errorLimit ) {
88
87
throw new RuntimeException ("The task failed due to the number of dirty data consume failed reached the limit " + errorLimit );
89
88
}
90
89
}
Original file line number Diff line number Diff line change @@ -43,8 +43,7 @@ public class ConsoleDirtyDataConsumer extends AbstractDirtyDataConsumer {
43
43
@ Override
44
44
public void consume () throws InterruptedException {
45
45
DirtyDataEntity dataEntity = queue .take ();
46
- count .incrementAndGet ();
47
- if (count .get () % printLimit == 0 ) {
46
+ if (count .getAndIncrement () % printLimit == 0 ) {
48
47
LOG .warn ("\n get dirtyData: " + dataEntity .getDirtyData () + "\n "
49
48
+ "cause: " + dataEntity .getCause () + "\n "
50
49
+ "processTime: " + dataEntity .getProcessDate () + "\n "
Original file line number Diff line number Diff line change @@ -131,6 +131,7 @@ public void consume() throws Exception {
131
131
statement .addBatch ();
132
132
133
133
if (count .get () % batchSize == 0 ) {
134
+ LOG .warn ("Get dirty Data: " + entity .getDirtyData ());
134
135
statement .executeBatch ();
135
136
}
136
137
}
Original file line number Diff line number Diff line change 19
19
20
20
package com .dtstack .flink .sql .sink .hbase ;
21
21
22
- import com .dtstack .flink .sql .factory .DTThreadFactory ;
23
22
import com .dtstack .flink .sql .dirtyManager .manager .DirtyDataManager ;
24
- import com .dtstack .flink .sql .enums . EUpdateMode ;
23
+ import com .dtstack .flink .sql .factory . DTThreadFactory ;
25
24
import com .dtstack .flink .sql .outputformat .AbstractDtRichOutputFormat ;
26
25
import com .google .common .collect .Maps ;
27
26
import org .apache .commons .lang3 .StringUtils ;
@@ -202,7 +201,7 @@ private void openKerberosConn() throws Exception {
202
201
@ Override
203
202
public void writeRecord (Tuple2 <Boolean , Row > record ) {
204
203
if (record .f0 ) {
205
- if (this .batchSize != 0 ) {
204
+ if (this .batchSize > 1 ) {
206
205
writeBatchRecord (record .f1 );
207
206
} else {
208
207
dealInsert (record .f1 );
@@ -231,10 +230,11 @@ protected synchronized void dealBatchOperation(List<Row> records) {
231
230
// 判断数据是否插入成功
232
231
for (int i = 0 ; i < results .length ; i ++) {
233
232
if (results [i ] == null ) {
234
- if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 || LOG .isDebugEnabled ()) {
235
- LOG .error ("record insert failed ..{}" , records .get (i ).toString ());
236
- }
237
233
// 脏数据记录
234
+ dirtyDataManager .collectDirtyData (
235
+ records .get (i ).toString (),
236
+ "Batch Hbase Sink Error"
237
+ );
238
238
outDirtyRecords .inc ();
239
239
} else {
240
240
// 输出结果条数记录
You can’t perform that action at this time.
0 commit comments