Skip to content

Commit f61e779

Browse files
committed
[opt-34810][hbase][dirty] 调整hbase batch 提交脏数据处理逻辑;优化脏数据日志打印内容
1 parent 86fc22a commit f61e779

File tree

2 files changed

+14
-13
lines changed

2 files changed

+14
-13
lines changed

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/DirtyDataManager.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,17 @@ public void close() {
129129
public void collectDirtyData(String dataInfo, String cause) {
130130
DirtyDataEntity dirtyDataEntity = new DirtyDataEntity(dataInfo, System.currentTimeMillis(), cause);
131131
try {
132-
consumer.collectDirtyData(dirtyDataEntity, blockingInterval);
133132
count.incrementAndGet();
134-
} catch (Exception ignored) {
133+
consumer.collectDirtyData(dirtyDataEntity, blockingInterval);
134+
} catch (Exception e) {
135135
LOG.warn("dirty Data insert error ... Failed number: " + errorCount.incrementAndGet());
136-
LOG.warn("error dirty data:" + dirtyDataEntity.toString());
136+
LOG.warn("error cause: " + e.getMessage());
137+
LOG.warn("error dirty data:" + dirtyDataEntity.getDirtyData());
137138
if (errorCount.get() > Math.ceil(count.longValue() * errorLimitRate)) {
138139
// close consumer and manager
139140
close();
140-
throw new RuntimeException(String.format("The number of failed number 【%s】 reaches the limit, manager fails", errorCount.get()));
141+
throw new RuntimeException(
142+
String.format("The number of failed number 【%s】 reaches the limit, manager fails", errorCount.get()));
141143
}
142144
}
143145
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

+8-9
Original file line numberDiff line numberDiff line change
@@ -227,27 +227,26 @@ protected synchronized void dealBatchOperation(List<Row> records) {
227227
}
228228
table.batch(puts, results);
229229

230+
// 打印结果
231+
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
232+
// 只打印最后一条数据
233+
LOG.info(records.get(records.size() - 1).toString());
234+
}
235+
} catch (IOException | InterruptedException e) {
230236
// 判断数据是否插入成功
231237
for (int i = 0; i < results.length; i++) {
232-
if (results[i] == null) {
238+
if (results[i] != null) {
233239
// 脏数据记录
234240
dirtyDataManager.collectDirtyData(
235241
records.get(i).toString(),
236-
"Batch Hbase Sink Error"
242+
((Exception) results[i]).getMessage()
237243
);
238244
outDirtyRecords.inc();
239245
} else {
240246
// 输出结果条数记录
241247
outRecords.inc();
242248
}
243249
}
244-
// 打印结果
245-
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
246-
// 只打印最后一条数据
247-
LOG.info(records.get(records.size() - 1).toString());
248-
}
249-
} catch (IOException | InterruptedException e) {
250-
LOG.error("", e);
251250
} finally {
252251
// 添加完数据之后数据清空records
253252
records.clear();

0 commit comments

Comments
 (0)