Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling changes for Elasticsearch connector #2279

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.elasticsearch;

import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
import org.apache.http.Header;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
Expand All @@ -35,6 +36,8 @@
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner;
import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
import software.amazon.awssdk.services.glue.model.ErrorDetails;
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -98,7 +101,7 @@ public void process(final HttpRequest request, final HttpContext context) throws
uriBuilder = new URIBuilder(request.getRequestLine().getUri());
}
catch (URISyntaxException e) {
throw new IOException("Invalid URI", e);
throw new AthenaConnectorException("Invalid URI", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build());
}

// Build the SdkHttpFullRequest
Expand All @@ -111,7 +114,7 @@ public void process(final HttpRequest request, final HttpContext context) throws
.headers(headerArrayToMap(request.getAllHeaders()));
}
catch (URISyntaxException e) {
throw new IOException("Invalid URI", e);
throw new AthenaConnectorException("Invalid URI", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build());
}

// Set the endpoint (host) if present in the context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.elasticsearch;

import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
import com.google.common.base.Splitter;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
Expand Down Expand Up @@ -47,6 +48,8 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner;
import software.amazon.awssdk.services.elasticsearch.ElasticsearchClient;
import software.amazon.awssdk.services.glue.model.ErrorDetails;
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;

import java.io.IOException;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -80,8 +83,14 @@ public AwsRestHighLevelClient(RestClientBuilder builder)
public Set<String> getAliases()
throws IOException
{
GetAliasesRequest getAliasesRequest = new GetAliasesRequest();
GetAliasesResponse getAliasesResponse = indices().getAlias(getAliasesRequest, RequestOptions.DEFAULT);
GetAliasesResponse getAliasesResponse = null;
try {
GetAliasesRequest getAliasesRequest = new GetAliasesRequest();
getAliasesResponse = indices().getAlias(getAliasesRequest, RequestOptions.DEFAULT);
}
catch (Exception e) {
throw new AthenaConnectorException(e.getMessage(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.ACCESS_DENIED_EXCEPTION.toString()).build());
}
return getAliasesResponse.getAliases().keySet();
}

Expand Down Expand Up @@ -124,7 +133,7 @@ public LinkedHashMap<String, Object> getMapping(String index)
Map.Entry<String, MappingMetadata> dsmapping = mappingsResponse.mappings().entrySet()
.stream()
.findFirst()
.orElseThrow(() -> new RuntimeException(String.format("Could not find mapping for data stream name: %s", index)));
.orElseThrow(() -> new AthenaConnectorException(String.format("Could not find mapping for data stream name: %s", index), ErrorDetails.builder().errorCode(FederationSourceErrorCode.ENTITY_NOT_FOUND_EXCEPTION.toString()).build()));
mappingMetadata = dsmapping.getValue();
}

Expand All @@ -151,17 +160,17 @@ public Set<Integer> getShardIds(String index, long timeout)
ClusterHealthResponse response = cluster().health(request, RequestOptions.DEFAULT);

if (response.isTimedOut()) {
throw new RuntimeException("Request timed out for index (" + index + ").");
throw new AthenaConnectorException("Request timed out for index (" + index + ").", ErrorDetails.builder().errorCode(FederationSourceErrorCode.OPERATION_TIMEOUT_EXCEPTION.toString()).build());
}
else if (response.getActiveShards() == 0) {
throw new RuntimeException("There are no active shards for index (" + index + ").");
throw new AthenaConnectorException("There are no active shards for index (" + index + ").", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build());
}
else if (response.getStatus() == ClusterHealthStatus.RED) {
throw new RuntimeException("Request aborted for index (" + index +
") due to cluster's status (RED) - One or more primary shards are unassigned.");
throw new AthenaConnectorException("Request aborted for index (" + index +
") due to cluster's status (RED) - One or more primary shards are unassigned.", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build());
}
else if (!response.getIndices().containsKey(index)) {
throw new RuntimeException("Request has an invalid index (" + index + ").");
throw new AthenaConnectorException("Request has an invalid index (" + index + ").", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}

return response.getIndices().get(index).getShards().keySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.amazonaws.athena.connectors.elasticsearch;

import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
import com.google.common.base.Splitter;
import org.apache.arrow.util.VisibleForTesting;
import org.slf4j.Logger;
Expand All @@ -27,6 +28,8 @@
import software.amazon.awssdk.services.elasticsearch.model.DescribeElasticsearchDomainsRequest;
import software.amazon.awssdk.services.elasticsearch.model.DescribeElasticsearchDomainsResponse;
import software.amazon.awssdk.services.elasticsearch.model.ListDomainNamesResponse;
import software.amazon.awssdk.services.glue.model.ErrorDetails;
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -128,14 +131,11 @@ private Map<String, String> getDomainMapFromAmazonElasticsearch()
}

if (domainMap.isEmpty()) {
throw new RuntimeException("Amazon Elasticsearch Service has no domain information for user.");
throw new AthenaConnectorException("Amazon Elasticsearch Service has no domain information for user.", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}

return domainMap;
}
catch (Exception error) {
throw new RuntimeException("Unable to create domain map: " + error.getMessage(), error);
}
finally {
awsEsClient.close();
}
Expand All @@ -152,20 +152,20 @@ private Map<String, String> getDomainMapFromAmazonElasticsearch()
private Map<String, String> getDomainMapFromEnvironmentVar(String domainMapping)
{
if (domainMapping == null || domainMapping.isEmpty()) {
throw new RuntimeException("Unable to create domain map: Empty or null value found in DomainMapping.");
throw new AthenaConnectorException("Unable to create domain map: Empty or null value found in DomainMapping.", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}
Map<String, String> domainMap;
try {
domainMap = domainSplitter.split(domainMapping);
}
catch (Exception error) {
// Intentional obfuscation of error message as it may contain sensitive info (e.g. username/password).
throw new RuntimeException("Unable to create domain map: DomainMapping Parsing error.");
throw new AthenaConnectorException("Unable to create domain map: DomainMapping Parsing error.", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}

if (domainMap.isEmpty()) {
// Intentional obfuscation of error message: domainMapping contains sensitive info (e.g. username/password).
throw new RuntimeException("Unable to create domain map: Invalid DomainMapping value.");
throw new AthenaConnectorException("Unable to create domain map: Invalid DomainMapping value.", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}

return domainMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
package com.amazonaws.athena.connectors.elasticsearch;

import com.amazonaws.athena.connector.lambda.data.FieldResolver;
import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.model.ErrorDetails;
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -76,8 +79,8 @@ public Object getFieldValue(Field field, Object originalValue)
}
}
else {
throw new IllegalArgumentException("Invalid argument type. Expecting a Map, but got: " +
originalValue.getClass().getTypeName());
throw new AthenaConnectorException("Invalid argument type. Expecting a Map, but got: " +
originalValue.getClass().getTypeName(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}

switch (fieldType) {
Expand All @@ -93,8 +96,8 @@ public Object getFieldValue(Field field, Object originalValue)
return coerceField(field, fieldValue);
}

throw new RuntimeException("Invalid field value encountered in Document for field: " + field +
",value: " + fieldValue);
throw new AthenaConnectorException("Invalid field value encountered in Document for field: " + field +
",value: " + fieldValue, ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}

// Return the field value of a map key
Expand Down Expand Up @@ -151,8 +154,8 @@ else if (!(fieldValue instanceof Map)) {
break;
}

throw new RuntimeException("Invalid field value encountered in Document for field: " + field.toString() +
",value: " + fieldValue.toString());
throw new AthenaConnectorException("Invalid field value encountered in Document for field: " + field.toString() +
",value: " + fieldValue.toString(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.amazonaws.athena.connector.lambda.data.SchemaBuilder;
import com.amazonaws.athena.connector.lambda.domain.Split;
import com.amazonaws.athena.connector.lambda.domain.TableName;
import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
import com.amazonaws.athena.connector.lambda.handlers.GlueMetadataHandler;
import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesResponse;
Expand Down Expand Up @@ -53,6 +54,8 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.model.ErrorDetails;
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;

import java.io.IOException;
Expand Down Expand Up @@ -324,7 +327,7 @@ public GetTableResponse doGetQueryPassthroughSchema(BlockAllocator allocator, Ge
{
logger.debug("doGetQueryPassthroughSchema: enter - " + request);
if (!request.isQueryPassthrough()) {
throw new IllegalArgumentException("No Query passed through [{}]" + request);
throw new AthenaConnectorException("No Query passed through [{}]" + request, ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build());
}
queryPassthrough.verify(request.getQueryPassthroughArguments());
String index = request.getQueryPassthroughArguments().get(ElasticsearchQueryPassthrough.INDEX);
Expand All @@ -344,8 +347,8 @@ private Schema getSchema(String index, String endpoint)
schema = ElasticsearchSchemaUtils.parseMapping(mappings);
}
catch (IOException error) {
throw new RuntimeException("Error retrieving mapping information for index (" +
index + ") ", error);
throw new AthenaConnectorException("Error retrieving mapping information for index (" +
index + ") ", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build());
}
return schema;
}
Expand Down Expand Up @@ -382,7 +385,7 @@ private Set<Integer> getShardsIDsFromES(AwsRestHighLevelClient client, String in
return client.getShardIds(index, queryTimeout);
}
catch (IOException error) {
throw new RuntimeException(String.format("Error trying to get shards ids for index: %s, error message: %s", index, error.getMessage()), error);
throw new AthenaConnectorException(String.format("Error trying to get shards ids for index: %s, error message: %s", index, error.getMessage()), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build());
}
}

Expand All @@ -406,7 +409,7 @@ private String getDomainEndpoint(String domain)
}

if (endpoint == null) {
throw new RuntimeException("Unable to find domain: " + domain);
throw new AthenaConnectorException("Unable to find domain: " + domain, ErrorDetails.builder().errorCode(FederationSourceErrorCode.ENTITY_NOT_FOUND_EXCEPTION.toString()).build());
}

return endpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.amazonaws.athena.connector.lambda.data.BlockSpiller;
import com.amazonaws.athena.connector.lambda.data.writers.GeneratedRowWriter;
import com.amazonaws.athena.connector.lambda.data.writers.extractors.Extractor;
import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
import com.amazonaws.athena.connector.lambda.handlers.RecordHandler;
import com.amazonaws.athena.connector.lambda.records.ReadRecordsRequest;
import com.amazonaws.athena.connectors.elasticsearch.qpt.ElasticsearchQueryPassthrough;
Expand All @@ -43,6 +44,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.glue.model.ErrorDetails;
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;

Expand Down Expand Up @@ -197,7 +200,7 @@ protected void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recor
SearchScrollRequest scrollRequest = new SearchScrollRequest(searchResponse.getScrollId()).scroll(scroll);
searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT);
if (searchResponse.isTimedOut()) {
throw new RuntimeException("Request for index (" + index + ") " + shard + " timed out.");
throw new AthenaConnectorException("Request for index (" + index + ") " + shard + " timed out.", ErrorDetails.builder().errorCode(FederationSourceErrorCode.OPERATION_TIMEOUT_EXCEPTION.toString()).build());
}
}

Expand All @@ -206,7 +209,7 @@ protected void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recor
client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
}
catch (IOException error) {
throw new RuntimeException("Error sending search query: " + error.getMessage(), error);
throw new AthenaConnectorException("Error sending search query: " + error.getMessage(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).errorMessage(error.getMessage()).build());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
package com.amazonaws.athena.connectors.elasticsearch;

import com.amazonaws.athena.connector.lambda.data.SchemaBuilder;
import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.model.ErrorDetails;
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -107,7 +110,7 @@ private static Field inferField(String fieldName, String qualifiedName,
if (meta.containsKey(qualifiedName)) {
String metaValue = (String) meta.get(qualifiedName);
if (!metaValue.equalsIgnoreCase("list")) {
throw new IllegalArgumentException(String.format("_meta only support value `list`, key:%s, value:%s", qualifiedName, metaValue));
throw new AthenaConnectorException(String.format("_meta only support value `list`, key:%s, value:%s", qualifiedName, metaValue), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}

Field field1 = new Field(fieldName, FieldType.nullable(Types.MinorType.STRUCT.getType()), children);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.amazonaws.athena.connector.lambda.data.writers.fieldwriters.FieldWriterFactory;
import com.amazonaws.athena.connector.lambda.data.writers.holders.NullableVarCharHolder;
import com.amazonaws.athena.connector.lambda.domain.predicate.ConstraintProjector;
import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.holders.NullableBigIntHolder;
import org.apache.arrow.vector.holders.NullableBitHolder;
Expand All @@ -47,6 +48,8 @@
import org.apache.arrow.vector.types.pojo.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.model.ErrorDetails;
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;

import java.time.LocalDateTime;
import java.time.ZoneId;
Expand Down Expand Up @@ -525,7 +528,7 @@ protected FieldWriterFactory makeFactory(Field field)
return true;
};
default:
throw new RuntimeException(fieldType + " is not supported");
throw new AthenaConnectorException(fieldType + " is not supported", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}
}
}
Loading