From eaa15e4c8d72c8e002c4e6b663776ede4e2505b8 Mon Sep 17 00:00:00 2001 From: CosmosNi <40288034+CosmosNi@users.noreply.github.com> Date: Tue, 7 Jan 2025 18:58:32 +0800 Subject: [PATCH] [Feature][connector-elasticsearch] elasticsearch support nested type (#8462) --- .../seatunnel/api/table/type/ArrayType.java | 2 +- .../api/table/type/SeaTunnelRow.java | 10 +++ .../catalog/ElasticSearchTypeConverter.java | 8 +- .../DefaultSeaTunnelRowDeserializer.java | 30 ++++++++ .../elasticsearch/ElasticsearchIT.java | 74 +++++++++++++++++++ ...asticsearch_source_and_sink_with_nest.conf | 53 +++++++++++++ .../elasticsearch/st_index_nest_data.json | 15 ++++ .../elasticsearch/st_index_nest_mapping.json | 23 ++++++ 8 files changed, 212 insertions(+), 3 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java index 36c3362108a..65f7651e790 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java @@ -56,7 +56,7 @@ public class ArrayType implements SeaTunnelDataType { private final Class arrayClass; private final SeaTunnelDataType elementType; - protected ArrayType(Class arrayClass, SeaTunnelDataType elementType) { + public ArrayType(Class arrayClass, SeaTunnelDataType elementType) { this.arrayClass = arrayClass; this.elementType = elementType; } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java index 39f61aee5d0..5c253b4cf57 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java @@ -305,6 +305,16 @@ private int getBytesForValue(Object v) { return getBytesForArray(v, BasicType.FLOAT_TYPE); case "Double[]": return getBytesForArray(v, BasicType.DOUBLE_TYPE); + case "Map[]": + int sizeMaps = 0; + for (Map o : (Map[]) v) { + for (Map.Entry entry : ((Map) o).entrySet()) { + sizeMaps += + getBytesForValue(entry.getKey()) + + getBytesForValue(entry.getValue()); + } + } + return sizeMaps; case "HashMap": case "LinkedHashMap": int size = 0; diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java index 412342cb828..d92d0839988 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java @@ -64,7 +64,6 @@ import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG_RANGE; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.MATCH_ONLY_TEXT; -import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.NESTED; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.OBJECT; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.PERCOLATOR; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.POINT; @@ -150,6 +149,12 @@ public Column convert(BasicTypeDefine typeDefine) { }); builder.dataType(rowType); break; + case EsType.NESTED: + builder.dataType( + new ArrayType<>( + Map[].class, + new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE))); + break; case INTEGER: case TOKEN_COUNT: builder.dataType(BasicType.INT_TYPE); @@ -207,7 +212,6 @@ public Column convert(BasicTypeDefine typeDefine) { case COMPLETION: case STRING: case GEO_SHAPE: - case NESTED: case PERCOLATOR: case POINT: case RANK_FEATURES: diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java index fd176f2f034..f39aeae8fd0 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java @@ -46,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE; import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE; @@ -177,6 +178,35 @@ Object convertValue(SeaTunnelDataType fieldType, String fieldValue) } else if (fieldType instanceof ArrayType) { ArrayType arrayType = (ArrayType) fieldType; SeaTunnelDataType elementType = arrayType.getElementType(); + if (elementType instanceof MapType) { + MapType mapType = (MapType) elementType; + List mapList = JsonUtils.toList(fieldValue, Map.class); + Object arr = Array.newInstance(elementType.getTypeClass(), mapList.size()); + SeaTunnelDataType keyType = mapType.getKeyType(); + SeaTunnelDataType valueType = mapType.getValueType(); + for (int i = 0; i < mapList.size(); i++) { + Map map = mapList.get(i); + Map convertMap = new HashMap<>(); + for (Map.Entry entry : map.entrySet()) { + Object convertKey = + convertValue( + keyType, + Objects.isNull(entry.getKey()) + ? null + : String.valueOf(entry.getKey())); + Object convertValue = + convertValue( + valueType, + Objects.isNull(entry.getValue()) + ? null + : String.valueOf(entry.getValue())); + convertMap.put(convertKey, convertValue); + } + Array.set(arr, i, convertMap); + } + return arr; + } + List stringList = JsonUtils.toList(fieldValue, String.class); Object arr = Array.newInstance(elementType.getTypeClass(), stringList.size()); for (int i = 0; i < stringList.size(); i++) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java index 87730fee46c..fa805d851b9 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java @@ -133,6 +133,7 @@ public void startUp() throws Exception { createIndexDocs(); createIndexWithFullType(); createIndexForResourceNull("st_index4"); + createIndexWithNestType(); } /** create a index,and bulk some documents */ @@ -156,6 +157,31 @@ private void createIndexDocsByName(String indexName, List testDataSet) { esRestClient.bulk(requestBody.toString()); } + private void createIndexWithNestType() throws IOException, InterruptedException { + String mapping = + IOUtils.toString( + ContainerUtil.getResourcesFile("/elasticsearch/st_index_nest_mapping.json") + .toURI(), + StandardCharsets.UTF_8); + esRestClient.createIndex("st_index_nest", mapping); + esRestClient.createIndex("st_index_nest_copy", mapping); + BulkResponse response = + esRestClient.bulk( + "{ \"index\" : { \"_index\" : \"st_index_nest\", \"_id\" : \"1\" } }\n" + + IOUtils.toString( + ContainerUtil.getResourcesFile( + "/elasticsearch/st_index_nest_data.json") + .toURI(), + StandardCharsets.UTF_8) + .replace("\n", "") + + "\n"); + Assertions.assertFalse(response.isErrors(), response.getResponse()); + // waiting index refresh + Thread.sleep(INDEX_REFRESH_MILL_DELAY); + Assertions.assertEquals( + 3, esRestClient.getIndexDocsCount("st_index_nest").get(0).getDocsCount()); + } + private void createIndexWithFullType() throws IOException, InterruptedException { String mapping = IOUtils.toString( @@ -202,6 +228,21 @@ public void testElasticsearchWithSchema(TestContainer container) Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData); } + @TestTemplate + public void testElasticsearchWithNestSchema(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/elasticsearch/elasticsearch_source_and_sink_with_nest.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + List sinkData = readSinkDataWithNestSchema("st_index_nest_copy"); + String data = + "{\"address\":[{\"zipcode\":\"10001\",\"city\":\"New York\",\"street\":\"123 Main St\"}," + + "{\"zipcode\":\"90001\",\"city\":\"Los Angeles\",\"street\":\"456 Elm St\"}],\"name\":\"John Doe\"}"; + + Assertions.assertIterableEquals(Lists.newArrayList(data), sinkData); + } + @TestTemplate public void testElasticsSearchWithMultiSourceByFilter(TestContainer container) throws InterruptedException, IOException { @@ -546,6 +587,13 @@ private List readSinkDataWithSchema(String index) throws InterruptedExce return getDocsWithTransformTimestamp(source, index); } + private List readSinkDataWithNestSchema(String index) throws InterruptedException { + // wait for index refresh + Thread.sleep(INDEX_REFRESH_MILL_DELAY); + List source = Lists.newArrayList("name", "address"); + return getDocsWithNestType(source, index); + } + private List readMultiSinkData(String index, List source) throws InterruptedException { // wait for index refresh @@ -604,6 +652,25 @@ private List getDocsWithTransformTimestamp(List source, String i return docs; } + private List getDocsWithNestType(List source, String index) { + Map query = new HashMap<>(); + query.put("match_all", new HashMap<>()); + ScrollResult scrollResult = esRestClient.searchByScroll(index, source, query, "1m", 1000); + scrollResult + .getDocs() + .forEach( + x -> { + x.remove("_index"); + x.remove("_type"); + x.remove("_id"); + }); + List docs = + scrollResult.getDocs().stream() + .map(JsonUtils::toJsonString) + .collect(Collectors.toList()); + return docs; + } + private List getDocsWithTransformDate(List source, String index) { return getDocsWithTransformDate(source, index, Collections.emptyList()); } @@ -739,6 +806,13 @@ private List mapTestDatasetForDSL(List testDataset) { .collect(Collectors.toList()); } + private List mapTestDatasetForNest(List testDataset) { + return testDataset.stream() + .map(JsonUtils::parseObject) + .map(JsonNode::toString) + .collect(Collectors.toList()); + } + /** * Use custom filtering criteria to query data * diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf new file mode 100644 index 00000000000..6b07c9b80f2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink_with_nest.conf @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" + #checkpoint.interval = 10000 +} + +source { +Elasticsearch { + hosts = ["https://elasticsearch:9200"] + username = "elastic" + password = "elasticsearch" + index = "st_index_nest" + source = ["address","name"] + query = {"match_all": {}} + tls_verify_certificate = false + tls_verify_hostname = false + } +} + +transform { +} + +sink { + Elasticsearch { + hosts = ["https://elasticsearch:9200"] + username = "elastic" + password = "elasticsearch" + index = "st_index_nest_copy" + tls_verify_certificate = false + tls_verify_hostname = false + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json new file mode 100644 index 00000000000..b63bdf962f0 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_data.json @@ -0,0 +1,15 @@ +{ + "name": "John Doe", + "address": [ + { + "street": "123 Main St", + "city": "New York", + "zipcode": "10001" + }, + { + "street": "456 Elm St", + "city": "Los Angeles", + "zipcode": "90001" + } + ] +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json new file mode 100644 index 00000000000..1b4d15a1023 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/st_index_nest_mapping.json @@ -0,0 +1,23 @@ +{ + "mappings": { + "properties": { + "name": { + "type": "text" + }, + "address": { + "type": "nested", + "properties": { + "street": { + "type": "text" + }, + "city": { + "type": "keyword" + }, + "zipcode": { + "type": "keyword" + } + } + } + } + } +}