Skip to content

Commit e28951e

Browse files
committed
Merge remote-tracking branch 'origin/v1.8.0_dev' into 1.8.0_release
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java # core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java
2 parents 01105ac + 011fa58 commit e28951e

File tree

247 files changed

+7889
-9966
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

247 files changed

+7889
-9966
lines changed

README.md

+20-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,18 @@
2626
* Java: JDK8及以上
2727
* Flink集群: 1.4,1.5,1.8(单机模式不需要安装Flink集群)
2828
* 操作系统:理论上不限
29+
* kerberos环境需要在flink-conf.yaml配置security.kerberos.login.keytab以及security.kerberos.login.principal参数,配置案例:
30+
```
31+
## hadoop配置文件路径
32+
fs.hdfs.hadoopconf: /Users/maqi/tmp/hadoopconf/hadoop_250
33+
security.kerberos.login.use-ticket-cache: true
34+
security.kerberos.login.keytab: /Users/maqi/tmp/hadoopconf/hadoop_250/maqi.keytab
35+
security.kerberos.login.principal: [email protected]
36+
security.kerberos.login.contexts: Client,KafkaClient
37+
zookeeper.sasl.service-name: zookeeper
38+
zookeeper.sasl.login-context-name: Client
39+
40+
```
2941

3042
### 1.3 打包
3143

@@ -34,9 +46,16 @@
3446
```
3547
mvn clean package -Dmaven.test.skip
3648
37-
打包结束后,项目根目录下会产生plugins目录,plugins目录下存放编译好的数据同步插件包,在lib目下存放job提交的包
3849
```
3950

51+
打包完成后的包结构:
52+
53+
> * dt-center-flinkStreamSQL
54+
> > * bin: 任务启动脚本
55+
> > * lib: launcher包存储路径,是任务提交的入口
56+
> > * plugins: 插件包存储路径
57+
> > * ........ : core及插件代码
58+
4059
### 1.4 启动
4160

4261
#### 1.4.1 启动命令

cassandra/cassandra-side/cassandra-all-side/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
<goal>shade</goal>
3737
</goals>
3838
<configuration>
39+
<createDependencyReducedPom>false</createDependencyReducedPom>
3940
<artifactSet>
4041
<excludes>
4142
<exclude>org.slf4j</exclude>

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

+9-8
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3939
import com.google.common.collect.Lists;
4040
import com.google.common.collect.Maps;
41+
import org.apache.flink.table.runtime.types.CRow;
4142
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
4243
import org.apache.flink.types.Row;
4344
import org.apache.flink.util.Collector;
@@ -129,14 +130,14 @@ protected void reloadCache() {
129130

130131

131132
@Override
132-
public void flatMap(Row value, Collector<Row> out) throws Exception {
133+
public void flatMap(CRow input, Collector<CRow> out) throws Exception {
133134
List<Object> inputParams = Lists.newArrayList();
134135
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
135-
Object equalObj = value.getField(conValIndex);
136+
Object equalObj = input.row().getField(conValIndex);
136137
if (equalObj == null) {
137138
if(sideInfo.getJoinType() == JoinType.LEFT){
138-
Row data = fillData(value, null);
139-
out.collect(data);
139+
Row data = fillData(input.row(), null);
140+
out.collect(new CRow(data, input.change()));
140141
}
141142
return;
142143
}
@@ -148,8 +149,8 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
148149
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
149150
if (CollectionUtils.isEmpty(cacheList)) {
150151
if (sideInfo.getJoinType() == JoinType.LEFT) {
151-
Row row = fillData(value, null);
152-
out.collect(row);
152+
Row row = fillData(input.row(), null);
153+
out.collect(new CRow(row, input.change()));
153154
} else {
154155
return;
155156
}
@@ -158,7 +159,7 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
158159
}
159160

160161
for (Map<String, Object> one : cacheList) {
161-
out.collect(fillData(value, one));
162+
out.collect(new CRow(fillData(input.row(), one), input.change()));
162163
}
163164

164165
}
@@ -268,7 +269,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
268269
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
269270
Thread.sleep(5 * 1000);
270271
} catch (InterruptedException e1) {
271-
e1.printStackTrace();
272+
LOG.error("", e1);
272273
}
273274
}
274275

cassandra/cassandra-side/cassandra-async-side/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
<goal>shade</goal>
5353
</goals>
5454
<configuration>
55+
<createDependencyReducedPom>false</createDependencyReducedPom>
5556
<artifactSet>
5657
<excludes>
5758
<exclude>org.slf4j</exclude>

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

+13-12
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.flink.configuration.Configuration;
4848
import com.google.common.collect.Lists;
4949
import org.apache.flink.streaming.api.functions.async.ResultFuture;
50+
import org.apache.flink.table.runtime.types.CRow;
5051
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
5152
import org.apache.flink.types.Row;
5253
import org.slf4j.Logger;
@@ -160,17 +161,17 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
160161
}
161162

162163
@Override
163-
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
164-
164+
public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
165+
CRow inputCopy = new CRow(input.row(), input.change());
165166
JsonArray inputParams = new JsonArray();
166167
StringBuffer stringBuffer = new StringBuffer();
167168
String sqlWhere = " where ";
168169

169170
for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) {
170171
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
171-
Object equalObj = input.getField(conValIndex);
172+
Object equalObj = inputCopy.row().getField(conValIndex);
172173
if (equalObj == null) {
173-
dealMissKey(input, resultFuture);
174+
dealMissKey(inputCopy, resultFuture);
174175
return;
175176
}
176177
inputParams.add(equalObj);
@@ -194,13 +195,13 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
194195
if (val != null) {
195196

196197
if (ECacheContentType.MissVal == val.getType()) {
197-
dealMissKey(input, resultFuture);
198+
dealMissKey(inputCopy, resultFuture);
198199
return;
199200
} else if (ECacheContentType.MultiLine == val.getType()) {
200-
List<Row> rowList = Lists.newArrayList();
201+
List<CRow> rowList = Lists.newArrayList();
201202
for (Object jsonArray : (List) val.getContent()) {
202-
Row row = fillData(input, jsonArray);
203-
rowList.add(row);
203+
Row row = fillData(inputCopy.row(), jsonArray);
204+
rowList.add(new CRow(row, inputCopy.change()));
204205
}
205206
resultFuture.complete(rowList);
206207
} else {
@@ -238,20 +239,20 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
238239
cluster.closeAsync();
239240
if (rows.size() > 0) {
240241
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
241-
List<Row> rowList = Lists.newArrayList();
242+
List<CRow> rowList = Lists.newArrayList();
242243
for (com.datastax.driver.core.Row line : rows) {
243-
Row row = fillData(input, line);
244+
Row row = fillData(inputCopy.row(), line);
244245
if (openCache()) {
245246
cacheContent.add(line);
246247
}
247-
rowList.add(row);
248+
rowList.add(new CRow(row,inputCopy.change()));
248249
}
249250
resultFuture.complete(rowList);
250251
if (openCache()) {
251252
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
252253
}
253254
} else {
254-
dealMissKey(input, resultFuture);
255+
dealMissKey(inputCopy, resultFuture);
255256
if (openCache()) {
256257
putCache(key, CacheMissVal.getMissKeyObj());
257258
}

cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideParser.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,8 @@ public class CassandraSideParser extends AbsSideTableParser {
6868

6969
public static final String POOL_TIMEOUT_MILLIS_KEY = "poolTimeoutMillis";
7070

71-
static {
72-
keyPatternMap.put(SIDE_SIGN_KEY, SIDE_TABLE_SIGN);
73-
keyHandlerMap.put(SIDE_SIGN_KEY, CassandraSideParser::dealSideSign);
71+
public CassandraSideParser() {
72+
addParserHandler(SIDE_SIGN_KEY, SIDE_TABLE_SIGN, this::dealSideSign);
7473
}
7574

7675
@Override
@@ -97,7 +96,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
9796
return cassandraSideTableInfo;
9897
}
9998

100-
private static void dealSideSign(Matcher matcher, TableInfo tableInfo) {
99+
private void dealSideSign(Matcher matcher, TableInfo tableInfo) {
101100
}
102101

103102
public Class dbTypeConvertToJavaType(String fieldType) {

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import com.datastax.driver.core.SocketOptions;
4949
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5050
import com.datastax.driver.core.policies.RetryPolicy;
51-
import com.dtstack.flink.sql.sink.MetricOutputFormat;
51+
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
5252
import org.apache.flink.api.common.typeinfo.TypeInformation;
5353
import org.apache.flink.api.java.tuple.Tuple;
5454
import org.apache.flink.api.java.tuple.Tuple2;
@@ -69,7 +69,7 @@
6969
* @see Tuple
7070
* @see DriverManager
7171
*/
72-
public class CassandraOutputFormat extends MetricOutputFormat {
72+
public class CassandraOutputFormat extends DtRichOutputFormat<Tuple2> {
7373
private static final long serialVersionUID = -7994311331389155692L;
7474

7575
private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);
@@ -193,7 +193,6 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
193193
try {
194194
if (retract) {
195195
insertWrite(row);
196-
outRecords.inc();
197196
} else {
198197
//do nothing
199198
}
@@ -204,14 +203,24 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
204203

205204
private void insertWrite(Row row) {
206205
try {
206+
207+
if(outRecords.getCount() % ROW_PRINT_FREQUENCY == 0){
208+
LOG.info("Receive data : {}", row);
209+
}
210+
207211
String cql = buildSql(row);
208212
if (cql != null) {
209213
ResultSet resultSet = session.execute(cql);
210214
resultSet.wasApplied();
215+
outRecords.inc();
211216
}
212217
} catch (Exception e) {
218+
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0){
219+
LOG.error("record insert failed ..", row.toString().substring(0, 100));
220+
LOG.error("", e);
221+
}
222+
213223
outDirtyRecords.inc();
214-
LOG.error("[upsert] is error:" + e.getMessage());
215224
}
216225
}
217226

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.clickhouse;
20+
21+
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
22+
23+
import java.util.Optional;
24+
25+
/**
26+
* Date: 2020/1/15
27+
* Company: www.dtstack.com
28+
* @author maqi
29+
*/
30+
public class ClickhouseDialect implements JDBCDialect {
31+
32+
@Override
33+
public boolean canHandle(String url) {
34+
return url.startsWith("jdbc:clickhouse:");
35+
}
36+
37+
@Override
38+
public Optional<String> defaultDriverName() {
39+
return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
40+
}
41+
42+
@Override
43+
public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
44+
throw new RuntimeException("Clickhouse does not support update sql, please remove primary key or use append mode");
45+
}
46+
}

clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseSink.java

+22-41
Original file line numberDiff line numberDiff line change
@@ -21,58 +21,39 @@
2121

2222

2323
import com.dtstack.flink.sql.sink.IStreamSinkGener;
24+
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
2425
import com.dtstack.flink.sql.sink.rdb.RdbSink;
25-
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
26+
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
2627

2728
import java.util.List;
2829
import java.util.Map;
2930

3031

3132
public class ClickhouseSink extends RdbSink implements IStreamSinkGener<RdbSink> {
32-
33-
private static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
34-
3533
public ClickhouseSink() {
34+
super(new ClickhouseDialect());
3635
}
3736

3837
@Override
39-
public RetractJDBCOutputFormat getOutputFormat() {
40-
return new RetractJDBCOutputFormat();
41-
}
42-
43-
@Override
44-
public void buildSql(String scheam, String tableName, List<String> fields) {
45-
buildInsertSql(tableName, fields);
46-
}
47-
48-
@Override
49-
public String buildUpdateSql(String schema, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
50-
return null;
51-
}
52-
53-
private void buildInsertSql(String tableName, List<String> fields) {
54-
String sqlTmp = "insert into " + tableName + " (${fields}) values (${placeholder})";
55-
String fieldsStr = "";
56-
String placeholder = "";
57-
58-
for (String fieldName : fields) {
59-
fieldsStr += ",`" + fieldName + "`";
60-
placeholder += ",?";
61-
}
62-
63-
fieldsStr = fieldsStr.replaceFirst(",", "");
64-
placeholder = placeholder.replaceFirst(",", "");
65-
66-
sqlTmp = sqlTmp.replace("${fields}", fieldsStr).replace("${placeholder}", placeholder);
67-
this.sql = sqlTmp;
68-
System.out.println("---insert sql----");
69-
System.out.println(sql);
70-
}
71-
72-
73-
@Override
74-
public String getDriverName() {
75-
return CLICKHOUSE_DRIVER;
38+
public JDBCUpsertOutputFormat getOutputFormat() {
39+
JDBCOptions jdbcOptions = JDBCOptions.builder()
40+
.setDBUrl(dbURL)
41+
.setDialect(jdbcDialect)
42+
.setUsername(userName)
43+
.setPassword(password)
44+
.setTableName(tableName)
45+
.build();
46+
47+
return JDBCUpsertOutputFormat.builder()
48+
.setOptions(jdbcOptions)
49+
.setFieldNames(fieldNames)
50+
.setFlushMaxSize(batchNum)
51+
.setFlushIntervalMills(batchWaitInterval)
52+
.setFieldTypes(sqlTypes)
53+
.setKeyFields(primaryKeys)
54+
.setAllReplace(allReplace)
55+
.setUpdateMode(updateMode)
56+
.build();
7657
}
7758

7859

0 commit comments

Comments
 (0)