19
19
20
20
package com .dtstack .flink .sql .sink .hbase ;
21
21
22
+ import com .dtstack .flink .sql .factory .DTThreadFactory ;
22
23
import com .dtstack .flink .sql .outputformat .AbstractDtRichOutputFormat ;
23
24
import com .google .common .collect .Maps ;
24
25
import org .apache .commons .lang3 .StringUtils ;
25
26
import org .apache .flink .api .java .tuple .Tuple2 ;
26
27
import org .apache .flink .configuration .Configuration ;
27
- import org .apache .flink .runtime .util .ExecutorThreadFactory ;
28
28
import org .apache .flink .types .Row ;
29
29
import org .apache .flink .util .Preconditions ;
30
30
import org .apache .hadoop .hbase .AuthUtil ;
47
47
import java .util .LinkedList ;
48
48
import java .util .List ;
49
49
import java .util .Map ;
50
- import java .util .concurrent .CopyOnWriteArrayList ;
51
- import java .util .concurrent .Executors ;
52
50
import java .util .concurrent .ScheduledExecutorService ;
53
51
import java .util .concurrent .ScheduledFuture ;
52
+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
54
53
import java .util .concurrent .TimeUnit ;
55
54
56
55
/**
57
56
58
57
* date: 2017-6-29
59
58
*/
60
- public class HbaseOutputFormat extends AbstractDtRichOutputFormat <Tuple2 > {
59
+ public class HbaseOutputFormat extends AbstractDtRichOutputFormat <Tuple2 < Boolean , Row > > {
61
60
62
61
private static final Logger LOG = LoggerFactory .getLogger (HbaseOutputFormat .class );
63
-
62
+ private final List < Row > records = new ArrayList <>();
64
63
private String host ;
65
64
private String zkParent ;
66
65
private String rowkey ;
67
66
private String tableName ;
68
67
private String [] columnNames ;
69
- private String updateMode ;
70
- private String [] columnTypes ;
71
68
private Map <String , String > columnNameFamily ;
72
-
73
69
private boolean kerberosAuthEnable ;
74
70
private String regionserverKeytabFile ;
75
71
private String regionserverPrincipal ;
76
72
private String securityKrb5Conf ;
77
73
private String zookeeperSaslClient ;
78
74
private String clientPrincipal ;
79
75
private String clientKeytabFile ;
80
-
81
76
private String [] families ;
82
77
private String [] qualifiers ;
83
-
84
78
private transient org .apache .hadoop .conf .Configuration conf ;
85
79
private transient Connection conn ;
86
80
private transient Table table ;
87
-
88
81
private transient ChoreService choreService ;
89
-
82
+ /**
83
+ * 批量写入的参数
84
+ */
90
85
private Integer batchSize ;
91
86
private Long batchWaitInterval ;
87
+ /**
88
+ * 定时任务
89
+ */
90
+ private transient ScheduledExecutorService scheduler ;
91
+ private transient ScheduledFuture <?> scheduledFuture ;
92
92
93
- private transient ScheduledExecutorService executor ;
94
- private transient ScheduledFuture scheduledFuture ;
95
-
96
- private final List <Row > records = new CopyOnWriteArrayList <>();
93
+ private HbaseOutputFormat () {
94
+ }
97
95
96
+ public static HbaseOutputFormatBuilder buildHbaseOutputFormat () {
97
+ return new HbaseOutputFormatBuilder ();
98
+ }
98
99
99
100
@ Override
100
101
public void configure (Configuration parameters ) {
101
- LOG . warn ( "---configure---" );
102
- conf = HBaseConfiguration . create ();
102
+ // 这里不要做耗时较长的操作,否则会导致AKKA通信超时
103
+ // DO NOTHING
103
104
}
104
105
105
106
@ Override
106
107
public void open (int taskNumber , int numTasks ) throws IOException {
107
108
LOG .warn ("---open---" );
109
+ conf = HBaseConfiguration .create ();
108
110
openConn ();
109
111
table = conn .getTable (TableName .valueOf (tableName ));
110
112
LOG .warn ("---open end(get table from hbase) ---" );
111
113
initMetric ();
112
-
113
- // 设置定时任务
114
- if (batchWaitInterval > 0 ) {
115
- this .executor = Executors .newScheduledThreadPool (
116
- 1 , new ExecutorThreadFactory ("hbase-sink-flusher" ));
117
- this .scheduledFuture = this .executor .scheduleAtFixedRate (() -> {
118
- if (!records .isEmpty ()) {
119
- dealBatchOperation (records );
120
- records .clear ();
121
- }
122
- }, batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS );
123
- }
124
114
}
125
115
126
116
private void openConn () {
@@ -137,13 +127,44 @@ private void openConn() {
137
127
} catch (Exception e ) {
138
128
throw new RuntimeException (e );
139
129
}
130
+ initScheduledTask (batchWaitInterval );
131
+ }
140
132
133
+ /**
134
+ * 初始化定时写入任务
135
+ *
136
+ * @param batchWaitInterval 定时任务时间
137
+ */
138
+ private void initScheduledTask (Long batchWaitInterval ) {
139
+ try {
140
+ if (batchWaitInterval > 0 ) {
141
+ this .scheduler = new ScheduledThreadPoolExecutor (
142
+ 1 ,
143
+ new DTThreadFactory ("hbase-batch-flusher" )
144
+ );
145
+
146
+ this .scheduledFuture = this .scheduler .scheduleWithFixedDelay (
147
+ () -> {
148
+ synchronized (this ) {
149
+ if (!records .isEmpty ()) {
150
+ dealBatchOperation (records );
151
+ records .clear ();
152
+ }
153
+ }
154
+ }, batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS
155
+ );
156
+ }
157
+ } catch (Exception e ) {
158
+ LOG .error ("init schedule task failed !" );
159
+ throw new RuntimeException (e );
160
+ }
141
161
}
162
+
142
163
private void openKerberosConn () throws Exception {
143
164
conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_QUORUM , host );
144
165
conf .set (HbaseConfigUtils .KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM , zkParent );
145
166
146
- LOG .info ("kerberos config:{}" , this .toString ());
167
+ LOG .info ("kerberos config:{}" , this .conf . toString ());
147
168
Preconditions .checkArgument (!StringUtils .isEmpty (clientPrincipal ), " clientPrincipal not null!" );
148
169
Preconditions .checkArgument (!StringUtils .isEmpty (clientKeytabFile ), " clientKeytabFile not null!" );
149
170
@@ -173,17 +194,13 @@ private void openKerberosConn() throws Exception {
173
194
});
174
195
}
175
196
176
-
177
197
@ Override
178
- public void writeRecord (Tuple2 tuple2 ) {
179
- Tuple2 <Boolean , Row > tupleTrans = tuple2 ;
180
- Boolean retract = tupleTrans .f0 ;
181
- Row row = tupleTrans .f1 ;
182
- if (retract ) {
198
+ public void writeRecord (Tuple2 <Boolean , Row > record ) {
199
+ if (record .f0 ) {
183
200
if (this .batchSize != 0 ) {
184
- writeBatchRecord (row );
201
+ writeBatchRecord (Row . copy ( record . f1 ) );
185
202
} else {
186
- dealInsert (row );
203
+ dealInsert (record . f1 );
187
204
}
188
205
}
189
206
}
@@ -223,7 +240,8 @@ protected void dealBatchOperation(List<Row> records) {
223
240
}
224
241
// 打印结果
225
242
if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
226
- LOG .info (records .toString ());
243
+ // 只打印最后一条数据
244
+ LOG .info (records .get (records .size () - 1 ).toString ());
227
245
}
228
246
} catch (IOException | InterruptedException e ) {
229
247
LOG .error ("" , e );
@@ -233,6 +251,7 @@ protected void dealBatchOperation(List<Row> records) {
233
251
protected void dealInsert (Row record ) {
234
252
Put put = getPutByRow (record );
235
253
if (put == null || put .isEmpty ()) {
254
+ // 记录脏数据
236
255
outDirtyRecords .inc ();
237
256
return ;
238
257
}
@@ -306,8 +325,8 @@ public void close() throws IOException {
306
325
307
326
if (scheduledFuture != null ) {
308
327
scheduledFuture .cancel (false );
309
- if (executor != null ) {
310
- executor .shutdownNow ();
328
+ if (scheduler != null ) {
329
+ scheduler .shutdownNow ();
311
330
}
312
331
}
313
332
@@ -318,16 +337,48 @@ public void close() throws IOException {
318
337
319
338
}
320
339
321
- private HbaseOutputFormat () {
340
+ private void fillSyncKerberosConfig (org .apache .hadoop .conf .Configuration config ,
341
+ String regionserverPrincipal ,
342
+ String zookeeperSaslClient ,
343
+ String securityKrb5Conf ) {
344
+ if (StringUtils .isEmpty (regionserverPrincipal )) {
345
+ throw new IllegalArgumentException ("Must provide regionserverPrincipal when authentication is Kerberos" );
346
+ }
347
+ config .set (HbaseConfigUtils .KEY_HBASE_MASTER_KERBEROS_PRINCIPAL , regionserverPrincipal );
348
+ config .set (HbaseConfigUtils .KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL , regionserverPrincipal );
349
+ config .set (HbaseConfigUtils .KEY_HBASE_SECURITY_AUTHORIZATION , "true" );
350
+ config .set (HbaseConfigUtils .KEY_HBASE_SECURITY_AUTHENTICATION , "kerberos" );
351
+
352
+
353
+ if (!StringUtils .isEmpty (zookeeperSaslClient )) {
354
+ System .setProperty (HbaseConfigUtils .KEY_ZOOKEEPER_SASL_CLIENT , zookeeperSaslClient );
355
+ }
356
+
357
+ if (!StringUtils .isEmpty (securityKrb5Conf )) {
358
+ String krb5ConfPath = System .getProperty ("user.dir" ) + File .separator + securityKrb5Conf ;
359
+ LOG .info ("krb5ConfPath:{}" , krb5ConfPath );
360
+ System .setProperty (HbaseConfigUtils .KEY_JAVA_SECURITY_KRB5_CONF , krb5ConfPath );
361
+ }
322
362
}
323
363
324
- public static HbaseOutputFormatBuilder buildHbaseOutputFormat () {
325
- return new HbaseOutputFormatBuilder ();
364
+ @ Override
365
+ public String toString () {
366
+ return "HbaseOutputFormat kerberos{" +
367
+ "kerberosAuthEnable=" + kerberosAuthEnable +
368
+ ", regionserverKeytabFile='" + regionserverKeytabFile + '\'' +
369
+ ", regionserverPrincipal='" + regionserverPrincipal + '\'' +
370
+ ", securityKrb5Conf='" + securityKrb5Conf + '\'' +
371
+ ", zookeeperSaslClient='" + zookeeperSaslClient + '\'' +
372
+ ", clientPrincipal='" + clientPrincipal + '\'' +
373
+ ", clientKeytabFile='" + clientKeytabFile + '\'' +
374
+ ", batchSize='" + batchSize + '\'' +
375
+ ", batchWaitInterval='" + batchWaitInterval + '\'' +
376
+ '}' ;
326
377
}
327
378
328
379
public static class HbaseOutputFormatBuilder {
329
380
330
- private HbaseOutputFormat format ;
381
+ private final HbaseOutputFormat format ;
331
382
332
383
private HbaseOutputFormatBuilder () {
333
384
format = new HbaseOutputFormat ();
@@ -359,11 +410,6 @@ public HbaseOutputFormatBuilder setColumnNames(String[] columnNames) {
359
410
return this ;
360
411
}
361
412
362
- public HbaseOutputFormatBuilder setColumnTypes (String [] columnTypes ) {
363
- format .columnTypes = columnTypes ;
364
- return this ;
365
- }
366
-
367
413
public HbaseOutputFormatBuilder setColumnNameFamily (Map <String , String > columnNameFamily ) {
368
414
format .columnNameFamily = columnNameFamily ;
369
415
return this ;
@@ -438,44 +484,5 @@ public HbaseOutputFormat finish() {
438
484
439
485
return format ;
440
486
}
441
-
442
487
}
443
-
444
- private void fillSyncKerberosConfig (org .apache .hadoop .conf .Configuration config , String regionserverPrincipal ,
445
- String zookeeperSaslClient , String securityKrb5Conf ) throws IOException {
446
- if (StringUtils .isEmpty (regionserverPrincipal )) {
447
- throw new IllegalArgumentException ("Must provide regionserverPrincipal when authentication is Kerberos" );
448
- }
449
- config .set (HbaseConfigUtils .KEY_HBASE_MASTER_KERBEROS_PRINCIPAL , regionserverPrincipal );
450
- config .set (HbaseConfigUtils .KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL , regionserverPrincipal );
451
- config .set (HbaseConfigUtils .KEY_HBASE_SECURITY_AUTHORIZATION , "true" );
452
- config .set (HbaseConfigUtils .KEY_HBASE_SECURITY_AUTHENTICATION , "kerberos" );
453
-
454
-
455
- if (!StringUtils .isEmpty (zookeeperSaslClient )) {
456
- System .setProperty (HbaseConfigUtils .KEY_ZOOKEEPER_SASL_CLIENT , zookeeperSaslClient );
457
- }
458
-
459
- if (!StringUtils .isEmpty (securityKrb5Conf )) {
460
- String krb5ConfPath = System .getProperty ("user.dir" ) + File .separator + securityKrb5Conf ;
461
- LOG .info ("krb5ConfPath:{}" , krb5ConfPath );
462
- System .setProperty (HbaseConfigUtils .KEY_JAVA_SECURITY_KRB5_CONF , krb5ConfPath );
463
- }
464
- }
465
-
466
- @ Override
467
- public String toString () {
468
- return "HbaseOutputFormat kerberos{" +
469
- "kerberosAuthEnable=" + kerberosAuthEnable +
470
- ", regionserverKeytabFile='" + regionserverKeytabFile + '\'' +
471
- ", regionserverPrincipal='" + regionserverPrincipal + '\'' +
472
- ", securityKrb5Conf='" + securityKrb5Conf + '\'' +
473
- ", zookeeperSaslClient='" + zookeeperSaslClient + '\'' +
474
- ", clientPrincipal='" + clientPrincipal + '\'' +
475
- ", clientKeytabFile='" + clientKeytabFile + '\'' +
476
- ", batchSize='" + batchSize + '\'' +
477
- ", batchWaitInterval='" + batchWaitInterval + '\'' +
478
- '}' ;
479
- }
480
-
481
488
}
0 commit comments