Skip to content

Commit b4d57b1

Browse files
committed
[feat-34672][kudu] kudu结果表支持batchSize
1 parent bd501e6 commit b4d57b1

File tree

5 files changed

+208
-14
lines changed

5 files changed

+208
-14
lines changed

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduOutputFormat.java

Lines changed: 130 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
package com.dtstack.flink.sql.sink.kudu;
2020

21+
import com.dtstack.flink.sql.factory.DTThreadFactory;
2122
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
23+
import com.dtstack.flink.sql.sink.kudu.table.KuduTableInfo;
2224
import com.dtstack.flink.sql.util.KrbUtils;
2325
import org.apache.flink.api.common.typeinfo.TypeInformation;
2426
import org.apache.flink.api.java.tuple.Tuple2;
@@ -31,6 +33,7 @@
3133
import org.apache.kudu.client.KuduTable;
3234
import org.apache.kudu.client.Operation;
3335
import org.apache.kudu.client.PartialRow;
36+
import org.apache.kudu.client.SessionConfiguration;
3437
import org.slf4j.Logger;
3538
import org.slf4j.LoggerFactory;
3639

@@ -40,12 +43,17 @@
4043
import java.sql.Timestamp;
4144
import java.util.Date;
4245
import java.util.Objects;
46+
import java.util.concurrent.ScheduledExecutorService;
47+
import java.util.concurrent.ScheduledFuture;
48+
import java.util.concurrent.ScheduledThreadPoolExecutor;
49+
import java.util.concurrent.TimeUnit;
50+
import java.util.concurrent.atomic.AtomicInteger;
4351

4452
/**
4553
* @author gituser
4654
* @modify xiuzhu
4755
*/
48-
public class KuduOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
56+
public class KuduOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolean, Row>> {
4957

5058
private static final long serialVersionUID = 1L;
5159

@@ -75,6 +83,24 @@ public class KuduOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
7583
private String keytab;
7684
private String krb5conf;
7785

86+
/**
87+
* batch size
88+
*/
89+
private Integer batchSize;
90+
private Integer batchWaitInterval;
91+
/**
92+
* kudu session flush mode
93+
*/
94+
private String flushMode;
95+
96+
private transient AtomicInteger rowCount;
97+
98+
/**
99+
* 定时任务
100+
*/
101+
private transient ScheduledExecutorService scheduler;
102+
private transient ScheduledFuture<?> scheduledFuture;
103+
78104
private KuduOutputFormat() {
79105
}
80106

@@ -91,6 +117,33 @@ public void configure(Configuration parameters) {
91117
public void open(int taskNumber, int numTasks) throws IOException {
92118
establishConnection();
93119
initMetric();
120+
initSchedulerTask();
121+
rowCount = new AtomicInteger(0);
122+
}
123+
124+
/**
125+
* init the scheduler task of {@link KuduOutputFormat#flush()}
126+
*/
127+
private void initSchedulerTask() {
128+
try {
129+
if (batchWaitInterval > 0) {
130+
this.scheduler = new ScheduledThreadPoolExecutor(
131+
1,
132+
new DTThreadFactory("kudu-batch-flusher")
133+
);
134+
135+
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(
136+
() -> {
137+
synchronized (KuduOutputFormat.this) {
138+
flush();
139+
}
140+
}, batchWaitInterval, batchWaitInterval, TimeUnit.MILLISECONDS
141+
);
142+
}
143+
} catch (Exception e) {
144+
LOG.error("init schedule task failed !");
145+
throw new RuntimeException(e);
146+
}
94147
}
95148

96149
private void establishConnection() throws IOException {
@@ -127,17 +180,46 @@ private void establishConnection() throws IOException {
127180
}
128181
LOG.info("connect kudu is succeed!");
129182

130-
session = client.newSession();
183+
session = buildSessionWithFlushMode(flushMode, client);
184+
}
185+
186+
/**
187+
* According to the different flush mode, construct different session. Detail see {@link SessionConfiguration.FlushMode}
188+
*
189+
* @param flushMode flush mode
190+
* @param kuduClient kudu client
191+
* @return KuduSession with flush mode
192+
* @throws KuduException kudu exception when session flush
193+
*/
194+
private KuduSession buildSessionWithFlushMode(String flushMode, KuduClient kuduClient) throws KuduException {
195+
KuduSession kuduSession = kuduClient.newSession();
196+
if (flushMode.equalsIgnoreCase(KuduTableInfo.KuduFlushMode.MANUAL_FLUSH.name())) {
197+
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
198+
kuduSession.setMutationBufferSpace(
199+
Integer.parseInt(String.valueOf(Math.round(batchSize * 1.2)))
200+
);
201+
}
202+
203+
if (flushMode.equalsIgnoreCase(KuduTableInfo.KuduFlushMode.AUTO_FLUSH_SYNC.name())) {
204+
LOG.warn("Parameter [batchSize] will not take effect at AUTO_FLUSH_SYNC mode.");
205+
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
206+
}
207+
208+
if (flushMode.equalsIgnoreCase(KuduTableInfo.KuduFlushMode.AUTO_FLUSH_BACKGROUND.name())) {
209+
LOG.warn("Unable to determine the order of data at AUTO_FLUSH_BACKGROUND mode.");
210+
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
211+
}
212+
213+
return kuduSession;
131214
}
132215

133216
@Override
134-
public void writeRecord(Tuple2 record) throws IOException {
135-
Tuple2<Boolean, Row> tupleTrans = record;
136-
Boolean retract = tupleTrans.getField(0);
217+
public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
218+
Boolean retract = record.getField(0);
137219
if (!retract) {
138220
return;
139221
}
140-
Row row = tupleTrans.getField(1);
222+
Row row = record.getField(1);
141223
if (row.getArity() != fieldNames.length) {
142224
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0) {
143225
LOG.error("record insert failed ..{}", row.toString());
@@ -151,6 +233,9 @@ public void writeRecord(Tuple2 record) throws IOException {
151233
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
152234
LOG.info("Receive data : {}", row);
153235
}
236+
if (rowCount.getAndIncrement() >= batchSize) {
237+
flush();
238+
}
154239
session.apply(toOperation(writeMode, row));
155240
outRecords.inc();
156241
} catch (KuduException e) {
@@ -162,6 +247,22 @@ public void writeRecord(Tuple2 record) throws IOException {
162247
}
163248
}
164249

250+
private synchronized void flush() {
251+
try {
252+
if (session.isClosed()) {
253+
throw new IllegalStateException("session is closed! flush data error!");
254+
}
255+
256+
session.flush();
257+
// clear
258+
rowCount.set(0);
259+
} catch (KuduException kuduException) {
260+
LOG.error(
261+
"flush data error!", kuduException);
262+
throw new RuntimeException(kuduException);
263+
}
264+
}
265+
165266
@Override
166267
public void close() {
167268
if (Objects.nonNull(session) && !session.isClosed()) {
@@ -179,6 +280,14 @@ public void close() {
179280
throw new IllegalArgumentException("[closeKuduClient]:" + e.getMessage());
180281
}
181282
}
283+
284+
if (scheduledFuture != null) {
285+
scheduledFuture.cancel(false);
286+
}
287+
288+
if (scheduler != null) {
289+
scheduler.shutdownNow();
290+
}
182291
}
183292

184293
private Operation toOperation(WriteMode writeMode, Row row) {
@@ -345,6 +454,21 @@ public KuduOutputFormatBuilder setEnableKrb(boolean enableKrb) {
345454
return this;
346455
}
347456

457+
public KuduOutputFormatBuilder setBatchSize(Integer batchSize) {
458+
kuduOutputFormat.batchSize = batchSize;
459+
return this;
460+
}
461+
462+
public KuduOutputFormatBuilder setBatchWaitInterval(Integer batchWaitInterval) {
463+
kuduOutputFormat.batchWaitInterval = batchWaitInterval;
464+
return this;
465+
}
466+
467+
public KuduOutputFormatBuilder setFlushMode(String flushMode) {
468+
kuduOutputFormat.flushMode = flushMode;
469+
return this;
470+
}
471+
348472
public KuduOutputFormat finish() {
349473
if (kuduOutputFormat.kuduMasters == null) {
350474
throw new IllegalArgumentException("No kuduMasters supplied.");

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduSink.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ public class KuduSink implements RetractStreamTableSink<Row>, Serializable, IStr
3434
private String principal;
3535
private String keytab;
3636
private String krb5conf;
37+
private Integer batchSize;
38+
private Integer batchWaitInterval;
39+
private String flushMode;
3740

3841
@Override
3942
public KuduSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
@@ -51,6 +54,9 @@ public KuduSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
5154
this.parallelism = Objects.isNull(kuduTableInfo.getParallelism()) ?
5255
parallelism : kuduTableInfo.getParallelism();
5356

57+
this.batchSize = kuduTableInfo.getBatchSize();
58+
this.batchWaitInterval = kuduTableInfo.getBatchWaitInterval();
59+
this.flushMode = kuduTableInfo.getFlushMode();
5460
return this;
5561
}
5662

@@ -74,6 +80,9 @@ public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<
7480
.setKeytab(this.keytab)
7581
.setKrb5conf(this.krb5conf)
7682
.setEnableKrb(this.enableKrb)
83+
.setBatchSize(this.batchSize)
84+
.setBatchWaitInterval(this.batchWaitInterval)
85+
.setFlushMode(this.flushMode)
7786
.finish();
7887
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(kuduOutputFormat);
7988
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction);

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/table/KuduSinkParser.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22

33
import com.dtstack.flink.sql.constant.PluginParamConsts;
44
import com.dtstack.flink.sql.sink.kudu.KuduOutputFormat;
5-
import com.dtstack.flink.sql.table.AbstractTableParser;
65
import com.dtstack.flink.sql.table.AbstractTableInfo;
6+
import com.dtstack.flink.sql.table.AbstractTableParser;
77
import com.dtstack.flink.sql.util.MathUtil;
88

99
import java.math.BigDecimal;
1010
import java.sql.Date;
1111
import java.sql.Timestamp;
1212
import java.util.Map;
13+
import java.util.Objects;
1314

1415
import static com.dtstack.flink.sql.table.AbstractTableInfo.PARALLELISM_KEY;
1516

@@ -27,6 +28,16 @@ public class KuduSinkParser extends AbstractTableParser {
2728

2829
public static final String SOCKET_READ_TIMEOUT_MS = "defaultSocketReadTimeoutMs";
2930

31+
public static final String BATCH_SIZE_KEY = "batchSize";
32+
33+
public static final Integer DEFAULT_BATCH_SIZE = 1000;
34+
35+
public static final String BATCH_WAIT_INTERVAL_KEY = "batchWaitInterval";
36+
37+
public static final Integer DEFAULT_BATCH_WAIT_INTERVAL = 60 * 1000;
38+
39+
public static final String SESSION_FLUSH_MODE_KEY = "flushMode";
40+
3041
@Override
3142
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
3243
KuduTableInfo kuduTableInfo = new KuduTableInfo();
@@ -40,15 +51,27 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
4051
kuduTableInfo.setWorkerCount(MathUtil.getIntegerVal(props.get(WORKER_COUNT.toLowerCase())));
4152
kuduTableInfo.setDefaultOperationTimeoutMs(MathUtil.getIntegerVal(props.get(OPERATION_TIMEOUT_MS.toLowerCase())));
4253
kuduTableInfo.setDefaultSocketReadTimeoutMs(MathUtil.getIntegerVal(props.get(SOCKET_READ_TIMEOUT_MS.toLowerCase())));
54+
kuduTableInfo.setBatchSize(MathUtil.getIntegerVal(props.getOrDefault(BATCH_SIZE_KEY, DEFAULT_BATCH_SIZE)));
55+
kuduTableInfo.setBatchWaitInterval(MathUtil.getIntegerVal(props.getOrDefault(BATCH_WAIT_INTERVAL_KEY, DEFAULT_BATCH_WAIT_INTERVAL)));
56+
57+
if (Objects.isNull(props.get(SESSION_FLUSH_MODE_KEY))) {
58+
if (kuduTableInfo.getBatchSize() > 1) {
59+
kuduTableInfo.setFlushMode(KuduTableInfo.KuduFlushMode.MANUAL_FLUSH.name());
60+
} else {
61+
kuduTableInfo.setFlushMode(KuduTableInfo.KuduFlushMode.AUTO_FLUSH_SYNC.name());
62+
}
63+
} else {
64+
kuduTableInfo.setFlushMode(MathUtil.getString(props.get(SESSION_FLUSH_MODE_KEY)));
65+
}
4366

4467
kuduTableInfo.setPrincipal(
45-
MathUtil.getString(props.get(PluginParamConsts.PRINCIPAL))
68+
MathUtil.getString(props.get(PluginParamConsts.PRINCIPAL))
4669
);
4770
kuduTableInfo.setKeytab(
48-
MathUtil.getString(props.get(PluginParamConsts.KEYTAB))
71+
MathUtil.getString(props.get(PluginParamConsts.KEYTAB))
4972
);
5073
kuduTableInfo.setKrb5conf(
51-
MathUtil.getString(props.get(PluginParamConsts.KRB5_CONF))
74+
MathUtil.getString(props.get(PluginParamConsts.KRB5_CONF))
5275
);
5376
kuduTableInfo.judgeKrbEnable();
5477

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/table/KuduTableInfo.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ public class KuduTableInfo extends AbstractTargetTableInfo implements KerberosTa
2828
private String keytab;
2929
private String krb5conf;
3030
boolean enableKrb;
31+
/**
32+
* batchSize
33+
*/
34+
private Integer batchSize;
35+
private Integer batchWaitInterval;
36+
/**
37+
* kudu session flush mode
38+
*/
39+
private String flushMode;
3140

3241
public KuduTableInfo() {
3342
setType(CURR_TYPE);
@@ -133,4 +142,33 @@ public void setEnableKrb(boolean enableKrb) {
133142
this.enableKrb = enableKrb;
134143
}
135144

145+
public Integer getBatchSize() {
146+
return batchSize;
147+
}
148+
149+
public void setBatchSize(Integer batchSize) {
150+
this.batchSize = batchSize;
151+
}
152+
153+
public Integer getBatchWaitInterval() {
154+
return batchWaitInterval;
155+
}
156+
157+
public void setBatchWaitInterval(Integer batchWaitInterval) {
158+
this.batchWaitInterval = batchWaitInterval;
159+
}
160+
161+
public String getFlushMode() {
162+
return flushMode;
163+
}
164+
165+
public void setFlushMode(String flushMode) {
166+
this.flushMode = flushMode;
167+
}
168+
169+
public enum KuduFlushMode {
170+
AUTO_FLUSH_SYNC,
171+
AUTO_FLUSH_BACKGROUND,
172+
MANUAL_FLUSH
173+
}
136174
}

localTest/pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,25 +77,25 @@
7777

7878
<dependency>
7979
<groupId>com.dtstack.flink</groupId>
80-
<artifactId>sql.mysql</artifactId>
80+
<artifactId>sql.kudu</artifactId>
8181
<version>1.0-SNAPSHOT</version>
8282
</dependency>
8383

8484
<dependency>
8585
<groupId>com.dtstack.flink</groupId>
86-
<artifactId>sql.side.all.mysql</artifactId>
86+
<artifactId>sql.side.all.kudu</artifactId>
8787
<version>1.0-SNAPSHOT</version>
8888
</dependency>
8989

9090
<dependency>
9191
<groupId>com.dtstack.flink</groupId>
92-
<artifactId>sql.sink.mysql</artifactId>
92+
<artifactId>sql.sink.kudu</artifactId>
9393
<version>1.0-SNAPSHOT</version>
9494
</dependency>
9595

9696
<dependency>
9797
<groupId>com.dtstack.flink</groupId>
98-
<artifactId>sql.side.async.mysql</artifactId>
98+
<artifactId>sql.side.async.kudu</artifactId>
9999
<version>1.0-SNAPSHOT</version>
100100
</dependency>
101101

0 commit comments

Comments
 (0)