|
19 | 19 |
|
20 | 20 | package com.dtstack.flink.sql.table;
|
21 | 21 |
|
| 22 | +import com.dtstack.flink.sql.side.AbstractSideTableInfo; |
22 | 23 | import com.dtstack.flink.sql.util.ClassUtil;
|
23 | 24 | import com.dtstack.flink.sql.util.DtStringUtil;
|
24 | 25 | import com.google.common.base.Preconditions;
|
25 | 26 | import com.google.common.collect.Maps;
|
26 | 27 | import org.apache.commons.lang3.StringUtils;
|
27 | 28 | import org.apache.flink.api.java.tuple.Tuple2;
|
| 29 | +import org.slf4j.Logger; |
| 30 | +import org.slf4j.LoggerFactory; |
28 | 31 |
|
29 | 32 | import java.util.Arrays;
|
30 | 33 | import java.util.List;
|
|
43 | 46 |
|
44 | 47 | public abstract class AbstractTableParser {
|
45 | 48 |
|
| 49 | + private static final Logger LOG = LoggerFactory.getLogger(AbstractTableParser.class); |
| 50 | + |
46 | 51 | private static final String PRIMARY_KEY = "primaryKey";
|
47 | 52 | private static final String NEST_JSON_FIELD_KEY = "nestFieldKey";
|
48 | 53 | private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
|
@@ -105,30 +110,50 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
|
105 | 110 | continue;
|
106 | 111 | }
|
107 | 112 |
|
108 |
| - Tuple2<String, String> t = extractType(fieldRow, tableInfo.getName()); |
109 |
| - String fieldName = t.f0; |
110 |
| - String fieldType = t.f1; |
| 113 | + handleKeyNotHaveAlias(fieldRow, tableInfo); |
| 114 | + } |
111 | 115 |
|
112 |
| - Class fieldClass; |
113 |
| - AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null; |
| 116 | + /* |
| 117 | + * check whether filed list contains pks and then add pks into field list. |
| 118 | + * because some no-sql database is not primary key. eg :redis、hbase etc... |
| 119 | + */ |
| 120 | + if (tableInfo instanceof AbstractSideTableInfo) { |
| 121 | + tableInfo.getPrimaryKeys().stream() |
| 122 | + .filter(pk -> !tableInfo.getFieldList().contains(pk)) |
| 123 | + .forEach(pk -> { |
| 124 | + try { |
| 125 | + handleKeyNotHaveAlias(String.format("%s varchar", pk), tableInfo); |
| 126 | + } catch (Exception e) { |
| 127 | + LOG.error(String.format("Handle primary key failed. Reason: %s", e.getMessage())); |
| 128 | + } |
| 129 | + }); |
| 130 | + } |
114 | 131 |
|
115 |
| - Matcher matcher = charTypePattern.matcher(fieldType); |
116 |
| - if (matcher.find()) { |
117 |
| - fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH); |
118 |
| - fieldExtraInfo = new AbstractTableInfo.FieldExtraInfo(); |
119 |
| - fieldExtraInfo.setLength(Integer.parseInt(matcher.group(1))); |
120 |
| - } else { |
121 |
| - fieldClass = dbTypeConvertToJavaType(fieldType); |
122 |
| - } |
| 132 | + tableInfo.finish(); |
| 133 | + } |
123 | 134 |
|
124 |
| - tableInfo.addPhysicalMappings(fieldName, fieldName); |
125 |
| - tableInfo.addField(fieldName); |
126 |
| - tableInfo.addFieldClass(fieldClass); |
127 |
| - tableInfo.addFieldType(fieldType); |
128 |
| - tableInfo.addFieldExtraInfo(fieldExtraInfo); |
| 135 | + private void handleKeyNotHaveAlias(String fieldRow, AbstractTableInfo tableInfo) { |
| 136 | + Tuple2<String, String> t = extractType(fieldRow, tableInfo.getName()); |
| 137 | + String fieldName = t.f0; |
| 138 | + String fieldType = t.f1; |
| 139 | + |
| 140 | + Class fieldClass; |
| 141 | + AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null; |
| 142 | + |
| 143 | + Matcher matcher = charTypePattern.matcher(fieldType); |
| 144 | + if (matcher.find()) { |
| 145 | + fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH); |
| 146 | + fieldExtraInfo = new AbstractTableInfo.FieldExtraInfo(); |
| 147 | + fieldExtraInfo.setLength(Integer.parseInt(matcher.group(1))); |
| 148 | + } else { |
| 149 | + fieldClass = dbTypeConvertToJavaType(fieldType); |
129 | 150 | }
|
130 | 151 |
|
131 |
| - tableInfo.finish(); |
| 152 | + tableInfo.addPhysicalMappings(fieldName, fieldName); |
| 153 | + tableInfo.addField(fieldName); |
| 154 | + tableInfo.addFieldClass(fieldClass); |
| 155 | + tableInfo.addFieldType(fieldType); |
| 156 | + tableInfo.addFieldExtraInfo(fieldExtraInfo); |
132 | 157 | }
|
133 | 158 |
|
134 | 159 | private Tuple2<String, String> extractType(String fieldRow, String tableName) {
|
|
0 commit comments