Skip to content

Commit 36c3f50

Browse files
committed
[hotfix-33089][hbase-side]Compatible with composite primary keys.
1 parent 8930f0b commit 36c3f50

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919

2020
package com.dtstack.flink.sql.table;
2121

22+
import com.dtstack.flink.sql.parser.SqlParser;
2223
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2324
import com.dtstack.flink.sql.util.ClassUtil;
2425
import com.dtstack.flink.sql.util.DtStringUtil;
2526
import com.google.common.base.Preconditions;
2627
import com.google.common.collect.Maps;
2728
import org.apache.commons.lang3.StringUtils;
2829
import org.apache.flink.api.java.tuple.Tuple2;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
2932

3033
import java.util.ArrayList;
3134
import java.util.Arrays;
@@ -45,6 +48,8 @@
4548

4649
public abstract class AbstractTableParser {
4750

51+
private static final Logger LOG = LoggerFactory.getLogger(AbstractTableParser.class);
52+
4853
private static final String PRIMARY_KEY = "primaryKey";
4954
private static final String NEST_JSON_FIELD_KEY = "nestFieldKey";
5055
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
@@ -117,7 +122,13 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
117122
if (tableInfo instanceof AbstractSideTableInfo) {
118123
tableInfo.getPrimaryKeys().stream()
119124
.filter(pk -> !tableInfo.getFieldList().contains(pk))
120-
.forEach(pk -> handleKeyNotHaveAlias(String.format("%s varchar", pk), tableInfo));
125+
.forEach(pk -> {
126+
try {
127+
handleKeyNotHaveAlias(String.format("%s varchar", pk), tableInfo);
128+
} catch (Exception e) {
129+
LOG.error(String.format("Handle primary key failed. Reason: %s", e.getMessage()));
130+
}
131+
});
121132
}
122133

123134
tableInfo.finish();

docs/plugin/hbaseSide.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ CREATE TABLE MyResult(
146146
CREATE TABLE sideTable (
147147
wtz:message varchar as message,
148148
wtz:info varchar as info ,
149-
PRIMARY KEY (rowkey),
149+
PRIMARY KEY (md5(rowkey1) + rowkey2 + 'test'),
150150
PERIOD FOR SYSTEM_TIME
151151
) WITH (
152152
type = 'hbase',
@@ -174,7 +174,7 @@ into
174174
MyTable a
175175
left join
176176
sideTable b
177-
on a.id=b.rowkey;
177+
on a.id=b.rowkey1 and a.name = b.rowkey2;
178178
```
179179
### kerberos维表示例
180180
```

0 commit comments

Comments
 (0)