Skip to content

Commit bd501e6

Browse files
committed
[feat-34632][HBase] HBase BatchSize 添加synchronized关键字避免线程安全问题
1 parent 7d60026 commit bd501e6

File tree

1 file changed

+15
-9
lines changed

1 file changed

+15
-9
lines changed

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
public class HbaseOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolean, Row>> {
6060

6161
private static final Logger LOG = LoggerFactory.getLogger(HbaseOutputFormat.class);
62-
private final List<Row> records = new ArrayList<>();
6362
private String host;
6463
private String zkParent;
6564
private String rowkey;
@@ -79,6 +78,8 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolean
7978
private transient Connection conn;
8079
private transient Table table;
8180
private transient ChoreService choreService;
81+
private transient List<Row> records;
82+
private transient volatile boolean closed = false;
8283
/**
8384
* 批量写入的参数
8485
*/
@@ -106,6 +107,7 @@ public void configure(Configuration parameters) {
106107
@Override
107108
public void open(int taskNumber, int numTasks) throws IOException {
108109
LOG.warn("---open---");
110+
records = new ArrayList<>();
109111
conf = HBaseConfiguration.create();
110112
openConn();
111113
table = conn.getTable(TableName.valueOf(tableName));
@@ -145,10 +147,9 @@ private void initScheduledTask(Long batchWaitInterval) {
145147

146148
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(
147149
() -> {
148-
synchronized (this) {
150+
synchronized (HbaseOutputFormat.this) {
149151
if (!records.isEmpty()) {
150152
dealBatchOperation(records);
151-
records.clear();
152153
}
153154
}
154155
}, batchWaitInterval, batchWaitInterval, TimeUnit.MILLISECONDS
@@ -198,7 +199,7 @@ private void openKerberosConn() throws Exception {
198199
public void writeRecord(Tuple2<Boolean, Row> record) {
199200
if (record.f0) {
200201
if (this.batchSize != 0) {
201-
writeBatchRecord(Row.copy(record.f1));
202+
writeBatchRecord(record.f1);
202203
} else {
203204
dealInsert(record.f1);
204205
}
@@ -210,12 +211,10 @@ public void writeBatchRecord(Row row) {
210211
// 数据累计到batchSize之后开始处理
211212
if (records.size() == this.batchSize) {
212213
dealBatchOperation(records);
213-
// 添加完数据之后数据清空records
214-
records.clear();
215214
}
216215
}
217216

218-
protected void dealBatchOperation(List<Row> records) {
217+
protected synchronized void dealBatchOperation(List<Row> records) {
219218
// A null in the result array means that the call for that action failed, even after retries.
220219
Object[] results = new Object[records.size()];
221220
try {
@@ -245,6 +244,9 @@ protected void dealBatchOperation(List<Row> records) {
245244
}
246245
} catch (IOException | InterruptedException e) {
247246
LOG.error("", e);
247+
} finally {
248+
// 添加完数据之后数据清空records
249+
records.clear();
248250
}
249251
}
250252

@@ -318,7 +320,12 @@ private Map<String, Object> rowConvertMap(Row record) {
318320
}
319321

320322
@Override
321-
public void close() throws IOException {
323+
public synchronized void close() throws IOException {
324+
if (closed) {
325+
return;
326+
}
327+
328+
closed = true;
322329
if (!records.isEmpty()) {
323330
dealBatchOperation(records);
324331
}
@@ -334,7 +341,6 @@ public void close() throws IOException {
334341
conn.close();
335342
conn = null;
336343
}
337-
338344
}
339345

340346
private void fillSyncKerberosConfig(org.apache.hadoop.conf.Configuration config,

0 commit comments

Comments
 (0)