Skip to content

Commit 3ec7760

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.0.x_33089' into 1.10_release_4.0.x
2 parents 46c4530 + 2146aac commit 3ec7760

File tree

4 files changed

+14
-6
lines changed

4 files changed

+14
-6
lines changed

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.google.common.collect.Maps;
2727
import org.apache.commons.lang3.StringUtils;
2828
import org.apache.flink.api.java.tuple.Tuple2;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2931

3032
import java.util.Arrays;
3133
import java.util.List;
@@ -44,6 +46,8 @@
4446

4547
public abstract class AbstractTableParser {
4648

49+
private static final Logger LOG = LoggerFactory.getLogger(AbstractTableParser.class);
50+
4751
private static final String PRIMARY_KEY = "primaryKey";
4852
private static final String NEST_JSON_FIELD_KEY = "nestFieldKey";
4953
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
@@ -115,8 +119,14 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
115119
*/
116120
if (tableInfo instanceof AbstractSideTableInfo) {
117121
tableInfo.getPrimaryKeys().stream()
118-
.filter(pk -> (!tableInfo.getFieldList().contains(pk) && pk.equals("rowkey")))
119-
.forEach(pk -> handleKeyNotHaveAlias(String.format("%s varchar", pk), tableInfo));
122+
.filter(pk -> (!tableInfo.getFieldList().contains(pk)))
123+
.forEach(pk -> {
124+
try {
125+
handleKeyNotHaveAlias(String.format("%s varchar", pk.trim()), tableInfo);
126+
} catch (Exception e) {
127+
LOG.error(String.format("Add primary key to field list failed. Reason: %s", e.getMessage()));
128+
}
129+
});
120130
}
121131

122132
tableInfo.finish();

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/AbstractRowKeyModeDealer.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ public abstract class AbstractRowKeyModeDealer {
5555

5656
protected List<FieldInfo> outFieldInfoList;
5757

58-
protected static final String ROWKEY = "rowkey";
59-
6058
//key:Returns the value of the position, returns the index values ​​in the input data
6159
protected Map<Integer, Integer> inFieldIndex = Maps.newHashMap();
6260

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/PreRowKeyModeDealerDealer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
111111
List<Object> sideVal = Lists.newArrayList();
112112
for (String key : colNames) {
113113

114-
if (ROWKEY.equalsIgnoreCase(key)) {
114+
if (!sideMap.containsKey(key)) {
115115
sideVal.add(rowKeyStr);
116116
continue;
117117
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/RowKeyEqualModeDealer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void asyncGetData(String tableName, String rowKeyStr, BaseRow input, Resu
8585
List<Object> sideVal = Lists.newArrayList();
8686
for(String key : colNames){
8787

88-
if (ROWKEY.equalsIgnoreCase(key)) {
88+
if (!sideMap.containsKey(key)) {
8989
sideVal.add(rowKeyStr);
9090
continue;
9191
}

0 commit comments

Comments
 (0)