Skip to content

Commit 1ceddb7

Browse files
committed
Merge branch '1.10_release_4.0.x' into feat_1.10_4.0.x_hbaseBatchSize
2 parents 3d91393 + d8af848 commit 1ceddb7

File tree

55 files changed

+1611
-767
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1611
-767
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,6 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22-
import org.apache.flink.types.Row;
23-
import org.apache.flink.util.Collector;
24-
2521
import com.datastax.driver.core.Cluster;
2622
import com.datastax.driver.core.ConsistencyLevel;
2723
import com.datastax.driver.core.HostDistance;
@@ -43,7 +39,10 @@
4339
import org.apache.calcite.sql.JoinType;
4440
import org.apache.commons.collections.CollectionUtils;
4541
import org.apache.commons.lang3.StringUtils;
42+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4643
import org.apache.flink.table.dataformat.BaseRow;
44+
import org.apache.flink.table.dataformat.GenericRow;
45+
import org.apache.flink.util.Collector;
4746
import org.slf4j.Logger;
4847
import org.slf4j.LoggerFactory;
4948

@@ -103,14 +102,15 @@ protected void reloadCache() {
103102

104103

105104
@Override
106-
public void flatMap(Row input, Collector<BaseRow> out) throws Exception {
105+
public void flatMap(BaseRow input, Collector<BaseRow> out) throws Exception {
106+
GenericRow genericRow = (GenericRow) input;
107107
List<Object> inputParams = Lists.newArrayList();
108108
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
109-
Object equalObj = input.getField(conValIndex);
109+
Object equalObj = genericRow.getField(conValIndex);
110110
if (equalObj == null) {
111111
if (sideInfo.getJoinType() == JoinType.LEFT) {
112-
Row row = fillData(input, null);
113-
RowDataComplete.collectRow(out, row);
112+
BaseRow row = fillData(input, null);
113+
RowDataComplete.collectBaseRow(out, row);
114114
}
115115
return;
116116
}
@@ -122,8 +122,8 @@ public void flatMap(Row input, Collector<BaseRow> out) throws Exception {
122122
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
123123
if (CollectionUtils.isEmpty(cacheList)) {
124124
if (sideInfo.getJoinType() == JoinType.LEFT) {
125-
Row row = fillData(input, null);
126-
RowDataComplete.collectRow(out, row);
125+
BaseRow row = fillData(input, null);
126+
RowDataComplete.collectBaseRow(out, row);
127127
} else {
128128
return;
129129
}
@@ -132,8 +132,8 @@ public void flatMap(Row input, Collector<BaseRow> out) throws Exception {
132132
}
133133

134134
for (Map<String, Object> one : cacheList) {
135-
Row row = fillData(input, one);
136-
RowDataComplete.collectRow(out, row);
135+
BaseRow row = fillData(input, one);
136+
RowDataComplete.collectBaseRow(out, row);
137137
}
138138

139139
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,22 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra;
2121

22-
import com.datastax.driver.core.*;
22+
import com.datastax.driver.core.Cluster;
23+
import com.datastax.driver.core.ConsistencyLevel;
24+
import com.datastax.driver.core.HostDistance;
25+
import com.datastax.driver.core.PoolingOptions;
26+
import com.datastax.driver.core.QueryOptions;
27+
import com.datastax.driver.core.ResultSet;
28+
import com.datastax.driver.core.Session;
29+
import com.datastax.driver.core.SocketOptions;
2330
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
2431
import com.datastax.driver.core.policies.RetryPolicy;
2532
import com.dtstack.flink.sql.enums.ECacheContentType;
26-
import com.dtstack.flink.sql.side.*;
33+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
34+
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
35+
import com.dtstack.flink.sql.side.CacheMissVal;
36+
import com.dtstack.flink.sql.side.FieldInfo;
37+
import com.dtstack.flink.sql.side.JoinInfo;
2738
import com.dtstack.flink.sql.side.cache.CacheObj;
2839
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
2940
import com.dtstack.flink.sql.util.RowDataComplete;
@@ -38,7 +49,7 @@
3849
import org.apache.flink.configuration.Configuration;
3950
import org.apache.flink.streaming.api.functions.async.ResultFuture;
4051
import org.apache.flink.table.dataformat.BaseRow;
41-
import org.apache.flink.types.Row;
52+
import org.apache.flink.table.dataformat.GenericRow;
4253
import org.slf4j.Logger;
4354
import org.slf4j.LoggerFactory;
4455

@@ -149,7 +160,7 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
149160
}
150161

151162
@Override
152-
public void handleAsyncInvoke(Map<String, Object> inputParams, Row input, ResultFuture<BaseRow> resultFuture) throws Exception {
163+
public void handleAsyncInvoke(Map<String, Object> inputParams, BaseRow input, ResultFuture<BaseRow> resultFuture) throws Exception {
153164

154165
String key = buildCacheKey(inputParams);
155166
//connect Cassandra
@@ -180,15 +191,15 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
180191
cluster.closeAsync();
181192
if (rows.size() > 0) {
182193
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
183-
List<Row> rowList = Lists.newArrayList();
194+
List<BaseRow> rowList = Lists.newArrayList();
184195
for (com.datastax.driver.core.Row line : rows) {
185-
Row row = fillData(input, line);
196+
BaseRow row = fillData(input, line);
186197
if (openCache()) {
187198
cacheContent.add(line);
188199
}
189200
rowList.add(row);
190201
}
191-
RowDataComplete.completeRow(resultFuture, rowList);
202+
RowDataComplete.completeBaseRow(resultFuture, rowList);
192203
if (openCache()) {
193204
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
194205
}
@@ -230,11 +241,13 @@ private String buildWhereCondition(Map<String, Object> inputParams){
230241
}
231242

232243
@Override
233-
public Row fillData(Row input, Object line) {
244+
public BaseRow fillData(BaseRow input, Object line) {
245+
GenericRow genericRow = (GenericRow) input;
234246
com.datastax.driver.core.Row rowArray = (com.datastax.driver.core.Row) line;
235-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
247+
GenericRow row = new GenericRow(sideInfo.getOutFieldInfoList().size());
248+
row.setHeader(genericRow.getHeader());
236249
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
237-
Object obj = input.getField(entry.getValue());
250+
Object obj = genericRow.getField(entry.getValue());
238251
obj = convertTimeIndictorTypeInfo(entry.getValue(), obj);
239252
row.setField(entry.getKey(), obj);
240253
}

core/pom.xml

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
<properties>
1818
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1919
<project.package.name>core</project.package.name>
20-
<calcite.server.version>1.16.0</calcite.server.version>
2120
<jackson.version>2.7.9</jackson.version>
2221
<guava.version>19.0</guava.version>
2322
<logger.tool.version>1.0.0-SNAPSHOT</logger.tool.version>
@@ -79,20 +78,6 @@
7978
<version>${flink.version}</version>
8079
</dependency>
8180

82-
83-
<dependency>
84-
<groupId>org.apache.calcite</groupId>
85-
<artifactId>calcite-server</artifactId>
86-
<!-- When updating the Calcite version, make sure to update the dependency exclusions -->
87-
<version>${calcite.server.version}</version>
88-
<exclusions>
89-
<exclusion>
90-
<artifactId>jackson-databind</artifactId>
91-
<groupId>com.fasterxml.jackson.core</groupId>
92-
</exclusion>
93-
</exclusions>
94-
</dependency>
95-
9681
<dependency>
9782
<groupId>com.fasterxml.jackson.core</groupId>
9883
<artifactId>jackson-databind</artifactId>

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,13 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
170170
// cache classPathSets
171171
ExecuteProcessHelper.registerPluginUrlToCachedFile(env, classPathSets);
172172

173-
ExecuteProcessHelper.sqlTranslation(paramsInfo.getLocalSqlPluginPath(), paramsInfo.getPluginLoadMode(),tableEnv, sqlTree, sideTableMap, registerTableCache);
173+
ExecuteProcessHelper.sqlTranslation(
174+
paramsInfo.getLocalSqlPluginPath(),
175+
paramsInfo.getPluginLoadMode(),
176+
tableEnv,
177+
sqlTree,
178+
sideTableMap,
179+
registerTableCache);
174180

175181
if (env instanceof MyLocalStreamEnvironment) {
176182
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
@@ -282,8 +288,14 @@ public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrl
282288
* @return
283289
* @throws Exception
284290
*/
285-
public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
286-
String remoteSqlPluginPath, String pluginLoadMode, Map<String, AbstractSideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
291+
public static Set<URL> registerTable(SqlTree sqlTree,
292+
StreamExecutionEnvironment env,
293+
StreamTableEnvironment tableEnv,
294+
String localSqlPluginPath,
295+
String remoteSqlPluginPath,
296+
String pluginLoadMode,
297+
Map<String, AbstractSideTableInfo> sideTableMap,
298+
Map<String, Table> registerTableCache) throws Exception {
287299
Set<URL> pluginClassPathSets = Sets.newHashSet();
288300
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
289301
for (AbstractTableInfo tableInfo : sqlTree.getTableInfoMap().values()) {
@@ -311,7 +323,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
311323
}
312324

313325
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
314-
tableEnv.registerTable(tableInfo.getName(), regTable);
326+
tableEnv.createTemporaryView(tableInfo.getName(), regTable);
315327
if (LOG.isInfoEnabled()) {
316328
LOG.info("registe table {} success.", tableInfo.getName());
317329
}
@@ -320,10 +332,14 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
320332
URL sourceTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractSourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
321333
pluginClassPathSets.add(sourceTablePathUrl);
322334
} else if (tableInfo instanceof AbstractTargetTableInfo) {
323-
324335
TableSink tableSink = StreamSinkFactory.getTableSink((AbstractTargetTableInfo) tableInfo, localSqlPluginPath, pluginLoadMode);
325-
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
326-
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
336+
// TODO Kafka Sink直接注册,其他的Sink要修复才可以。
337+
if (tableInfo.getType().startsWith("kafka")) {
338+
tableEnv.registerTableSink(tableInfo.getName(), tableSink);
339+
} else {
340+
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
341+
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
342+
}
327343

328344
URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractTargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
329345
pluginClassPathSets.add(sinkTablePathUrl);

core/src/main/java/com/dtstack/flink/sql/function/FunctionManager.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,12 @@ public static void registerAggregateUDF(String classPath, String funcName, Table
124124
}
125125
}
126126

127-
128127
public static TypeInformation[] transformTypes(Class[] fieldTypes) {
129128
TypeInformation[] types = new TypeInformation[fieldTypes.length];
130129
for (int i = 0; i < fieldTypes.length; i++) {
131130
types[i] = TypeInformation.of(fieldTypes[i]);
132131
}
133-
134132
return types;
135133
}
134+
136135
}

0 commit comments

Comments
 (0)