Skip to content

Commit 456cd17

Browse files
authored
[Improve][Connector-V2] Replace CommonErrorCodeDeprecated.JSON_OPERATION_FAILED (#5978)
1 parent 2c5b48e commit 456cd17

File tree

13 files changed

+460
-69
lines changed

13 files changed

+460
-69
lines changed

release-note.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@
125125
- [Connector-v2] [Neo4j] Supports neo4j sink batch write mode (#4835)
126126
- [Transform-V2] Optimize SQL Transform package and Fix Spark type conversion bug of transform (#4490)
127127
- [Connector-V2] [Common] Remove assert key word (#5915)
128+
- [Connector-V2] Replace CommonErrorCodeDeprecated.JSON_OPERATION_FAILED. (#5978)
128129

129130
### CI
130131

seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,14 @@ public static SeaTunnelRuntimeException getCatalogTablesWithUnsupportedType(
120120
GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR, params);
121121
}
122122

123-
public static SeaTunnelRuntimeException jsonOperationError(String format, String payload) {
124-
return jsonOperationError(format, payload, null);
123+
public static SeaTunnelRuntimeException jsonOperationError(String identifier, String payload) {
124+
return jsonOperationError(identifier, payload, null);
125125
}
126126

127127
public static SeaTunnelRuntimeException jsonOperationError(
128-
String format, String payload, Throwable cause) {
128+
String identifier, String payload, Throwable cause) {
129129
Map<String, String> params = new HashMap<>();
130-
params.put("format", format);
130+
params.put("identifier", identifier);
131131
params.put("payload", payload);
132132
SeaTunnelErrorCode code = JSON_OPERATION_FAILED;
133133

seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
/** SeaTunnel connector error code interface */
2121
public enum CommonErrorCode implements SeaTunnelErrorCode {
22-
JSON_OPERATION_FAILED("COMMON-02", "<format> JSON convert/parse '<payload>' operation failed."),
22+
JSON_OPERATION_FAILED(
23+
"COMMON-02", "<identifier> JSON convert/parse '<payload>' operation failed."),
2324

2425
UNSUPPORTED_DATA_TYPE(
2526
"COMMON-07", "'<identifier>' unsupported data type '<dataType>' of '<field>'"),

seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCodeDeprecated.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
public enum CommonErrorCodeDeprecated implements SeaTunnelErrorCode {
2727
FILE_OPERATION_FAILED(
2828
"COMMON-01", "File operation failed, such as (read,list,write,move,copy,sync) etc..."),
29-
JSON_OPERATION_FAILED("COMMON-02", "Json covert/parse operation failed"),
3029
REFLECT_CLASS_OPERATION_FAILED("COMMON-03", "Reflect class operation failed"),
3130
SERIALIZE_OPERATION_FAILED("COMMON-04", "Serialize class operation failed"),
3231
UNSUPPORTED_OPERATION("COMMON-05", "Unsupported operation"),

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
2121
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
2222

23-
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
24-
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
23+
import org.apache.seatunnel.common.exception.CommonError;
2524

2625
import java.sql.PreparedStatement;
2726
import java.sql.SQLException;
@@ -51,8 +50,7 @@ public void injectFields(PreparedStatement statement, int index, Object value)
5150
statement.setString(index, value.toString());
5251
}
5352
} catch (JsonProcessingException e) {
54-
throw new ClickhouseConnectorException(
55-
CommonErrorCodeDeprecated.JSON_OPERATION_FAILED, e.getMessage());
53+
throw CommonError.jsonOperationError("Clickhouse", value.toString(), e);
5654
}
5755
}
5856

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java

Lines changed: 64 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2424
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
25+
import org.apache.seatunnel.common.exception.CommonError;
2526
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
2627
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
2728
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
@@ -82,65 +83,82 @@ public String serializeRow(SeaTunnelRow row) {
8283
private String serializeUpsert(SeaTunnelRow row) {
8384
String key = keyExtractor.apply(row);
8485
Map<String, Object> document = toDocumentMap(row);
86+
String documentStr;
8587

8688
try {
87-
if (key != null) {
88-
Map<String, String> upsertMetadata = createMetadata(row, key);
89-
/**
90-
* format example: { "update" : {"_index" : "${your_index}", "_id" :
91-
* "${your_document_id}"} }\n { "doc" : ${your_document_json}, "doc_as_upsert" :
92-
* true }
93-
*/
94-
return new StringBuilder()
95-
.append("{ \"update\" :")
96-
.append(objectMapper.writeValueAsString(upsertMetadata))
97-
.append("}")
98-
.append("\n")
99-
.append("{ \"doc\" :")
100-
.append(objectMapper.writeValueAsString(document))
101-
.append(", \"doc_as_upsert\" : true }")
102-
.toString();
103-
} else {
104-
Map<String, String> indexMetadata = createMetadata(row);
105-
/**
106-
* format example: { "index" : {"_index" : "${your_index}", "_id" :
107-
* "${your_document_id}"} }\n ${your_document_json}
108-
*/
109-
return new StringBuilder()
110-
.append("{ \"index\" :")
111-
.append(objectMapper.writeValueAsString(indexMetadata))
112-
.append("}")
113-
.append("\n")
114-
.append(objectMapper.writeValueAsString(document))
115-
.toString();
89+
documentStr = objectMapper.writeValueAsString(document);
90+
} catch (JsonProcessingException e) {
91+
throw CommonError.jsonOperationError(
92+
"Elasticsearch", "document:" + document.toString(), e);
93+
}
94+
95+
if (key != null) {
96+
Map<String, String> upsertMetadata = createMetadata(row, key);
97+
String upsertMetadataStr;
98+
try {
99+
upsertMetadataStr = objectMapper.writeValueAsString(upsertMetadata);
100+
} catch (JsonProcessingException e) {
101+
throw CommonError.jsonOperationError(
102+
"Elasticsearch", "upsertMetadata:" + upsertMetadata.toString(), e);
116103
}
104+
105+
/**
106+
* format example: { "update" : {"_index" : "${your_index}", "_id" :
107+
* "${your_document_id}"} }\n { "doc" : ${your_document_json}, "doc_as_upsert" : true }
108+
*/
109+
return new StringBuilder()
110+
.append("{ \"update\" :")
111+
.append(upsertMetadataStr)
112+
.append(" }")
113+
.append("\n")
114+
.append("{ \"doc\" :")
115+
.append(documentStr)
116+
.append(", \"doc_as_upsert\" : true }")
117+
.toString();
118+
}
119+
120+
Map<String, String> indexMetadata = createMetadata(row);
121+
String indexMetadataStr;
122+
try {
123+
indexMetadataStr = objectMapper.writeValueAsString(indexMetadata);
117124
} catch (JsonProcessingException e) {
118-
throw new ElasticsearchConnectorException(
119-
CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
120-
"Object json deserialization exception.",
121-
e);
125+
throw CommonError.jsonOperationError(
126+
"Elasticsearch", "indexMetadata:" + indexMetadata.toString(), e);
122127
}
128+
129+
/**
130+
* format example: { "index" : {"_index" : "${your_index}", "_id" : "${your_document_id}"}
131+
* }\n ${your_document_json}
132+
*/
133+
return new StringBuilder()
134+
.append("{ \"index\" :")
135+
.append(indexMetadataStr)
136+
.append(" }")
137+
.append("\n")
138+
.append(documentStr)
139+
.toString();
123140
}
124141

125142
private String serializeDelete(SeaTunnelRow row) {
126143
String key = keyExtractor.apply(row);
127144
Map<String, String> deleteMetadata = createMetadata(row, key);
145+
String deleteMetadataStr;
128146
try {
129-
/**
130-
* format example: { "delete" : {"_index" : "${your_index}", "_id" :
131-
* "${your_document_id}"} }
132-
*/
133-
return new StringBuilder()
134-
.append("{ \"delete\" :")
135-
.append(objectMapper.writeValueAsString(deleteMetadata))
136-
.append("}")
137-
.toString();
147+
deleteMetadataStr = objectMapper.writeValueAsString(deleteMetadata);
138148
} catch (JsonProcessingException e) {
139-
throw new ElasticsearchConnectorException(
140-
CommonErrorCodeDeprecated.JSON_OPERATION_FAILED,
141-
"Object json deserialization exception.",
142-
e);
149+
throw CommonError.jsonOperationError(
150+
"Elasticsearch", "deleteMetadata:" + deleteMetadata.toString(), e);
143151
}
152+
153+
/**
154+
* format example: { "delete" : {"_index" : "${your_index}", "_id" : "${your_document_id}"}
155+
* }
156+
*/
157+
return new StringBuilder()
158+
.append("{ \"delete\" :")
159+
.append(deleteMetadataStr)
160+
.append(" }")
161+
.toString();
144162
}
145163

146164
private Map<String, Object> toDocumentMap(SeaTunnelRow row) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
19+
20+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21+
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
22+
23+
import org.apache.seatunnel.api.table.type.RowKind;
24+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
25+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
26+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
27+
import org.apache.seatunnel.common.exception.CommonError;
28+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
29+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
30+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
31+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
32+
33+
import org.junit.jupiter.api.Assertions;
34+
import org.junit.jupiter.api.Test;
35+
36+
import java.util.Arrays;
37+
import java.util.HashMap;
38+
import java.util.Map;
39+
40+
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
41+
42+
public class ElasticsearchRowSerializerTest {
43+
@Test
44+
public void testSerializeUpsert() {
45+
String index = "st_index";
46+
String primaryKey = "id";
47+
Map<String, Object> confMap = new HashMap<>();
48+
confMap.put(SinkConfig.INDEX.key(), index);
49+
confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));
50+
51+
Config pluginConf = ConfigFactory.parseMap(confMap);
52+
ElasticsearchClusterInfo clusterInfo =
53+
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
54+
IndexInfo indexInfo = new IndexInfo(pluginConf);
55+
SeaTunnelRowType schema =
56+
new SeaTunnelRowType(
57+
new String[] {primaryKey, "name"},
58+
new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE});
59+
60+
final ElasticsearchRowSerializer serializer =
61+
new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema);
62+
63+
String id = "0001";
64+
String name = "jack";
65+
SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, name});
66+
row.setRowKind(RowKind.UPDATE_AFTER);
67+
68+
String expected =
69+
"{ \"update\" :{\"_index\":\""
70+
+ index
71+
+ "\",\"_id\":\""
72+
+ id
73+
+ "\"} }\n"
74+
+ "{ \"doc\" :{\"name\":\""
75+
+ name
76+
+ "\",\"id\":\""
77+
+ id
78+
+ "\"}, \"doc_as_upsert\" : true }";
79+
80+
String upsertStr = serializer.serializeRow(row);
81+
Assertions.assertEquals(expected, upsertStr);
82+
}
83+
84+
@Test
85+
public void testSerializeUpsertWithoutKey() {
86+
String index = "st_index";
87+
Map<String, Object> confMap = new HashMap<>();
88+
confMap.put(SinkConfig.INDEX.key(), index);
89+
90+
Config pluginConf = ConfigFactory.parseMap(confMap);
91+
ElasticsearchClusterInfo clusterInfo =
92+
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
93+
IndexInfo indexInfo = new IndexInfo(pluginConf);
94+
SeaTunnelRowType schema =
95+
new SeaTunnelRowType(
96+
new String[] {"id", "name"},
97+
new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE});
98+
99+
final ElasticsearchRowSerializer serializer =
100+
new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema);
101+
102+
String id = "0001";
103+
String name = "jack";
104+
SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, name});
105+
row.setRowKind(RowKind.UPDATE_AFTER);
106+
107+
String expected =
108+
"{ \"index\" :{\"_index\":\""
109+
+ index
110+
+ "\"} }\n"
111+
+ "{\"name\":\""
112+
+ name
113+
+ "\",\"id\":\""
114+
+ id
115+
+ "\"}";
116+
117+
String upsertStr = serializer.serializeRow(row);
118+
Assertions.assertEquals(expected, upsertStr);
119+
}
120+
121+
@Test
122+
public void testSerializeUpsertDocumentError() {
123+
String index = "st_index";
124+
String primaryKey = "id";
125+
Map<String, Object> confMap = new HashMap<>();
126+
confMap.put(SinkConfig.INDEX.key(), index);
127+
confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));
128+
129+
Config pluginConf = ConfigFactory.parseMap(confMap);
130+
ElasticsearchClusterInfo clusterInfo =
131+
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
132+
IndexInfo indexInfo = new IndexInfo(pluginConf);
133+
SeaTunnelRowType schema =
134+
new SeaTunnelRowType(
135+
new String[] {primaryKey, "name"},
136+
new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE});
137+
138+
final ElasticsearchRowSerializer serializer =
139+
new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema);
140+
141+
String id = "0001";
142+
Object mockObj = new Object();
143+
SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, mockObj});
144+
row.setRowKind(RowKind.UPDATE_AFTER);
145+
146+
Map<String, Object> expectedMap = new HashMap<>();
147+
expectedMap.put(primaryKey, id);
148+
expectedMap.put("name", mockObj);
149+
150+
SeaTunnelRuntimeException expected =
151+
CommonError.jsonOperationError(
152+
"Elasticsearch", "document:" + expectedMap.toString());
153+
SeaTunnelRuntimeException actual =
154+
Assertions.assertThrows(
155+
SeaTunnelRuntimeException.class, () -> serializer.serializeRow(row));
156+
Assertions.assertEquals(expected.getMessage(), actual.getMessage());
157+
}
158+
159+
@Test
160+
public void testSerializeDelete() {
161+
String index = "st_index";
162+
String primaryKey = "id";
163+
Map<String, Object> confMap = new HashMap<>();
164+
confMap.put(SinkConfig.INDEX.key(), index);
165+
confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));
166+
167+
Config pluginConf = ConfigFactory.parseMap(confMap);
168+
ElasticsearchClusterInfo clusterInfo =
169+
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
170+
IndexInfo indexInfo = new IndexInfo(pluginConf);
171+
SeaTunnelRowType schema =
172+
new SeaTunnelRowType(
173+
new String[] {primaryKey, "name"},
174+
new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE});
175+
176+
final ElasticsearchRowSerializer serializer =
177+
new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema);
178+
179+
String id = "0001";
180+
String name = "jack";
181+
SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, name});
182+
row.setRowKind(RowKind.DELETE);
183+
184+
String expected = "{ \"delete\" :{\"_index\":\"" + index + "\",\"_id\":\"" + id + "\"} }";
185+
186+
String upsertStr = serializer.serializeRow(row);
187+
Assertions.assertEquals(expected, upsertStr);
188+
}
189+
}

0 commit comments

Comments
 (0)