Skip to content

Commit 2551e72

Browse files
committed
error handling changes for sdk
1 parent 531f7d5 commit 2551e72

File tree

68 files changed

+421
-230
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+421
-230
lines changed

athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/QueryStatusChecker.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919
*/
2020
package com.amazonaws.athena.connector.lambda;
2121

22+
import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
2223
import com.amazonaws.services.athena.AmazonAthena;
2324
import com.amazonaws.services.athena.model.GetQueryExecutionRequest;
2425
import com.amazonaws.services.athena.model.GetQueryExecutionResult;
2526
import com.amazonaws.services.athena.model.InvalidRequestException;
27+
import com.amazonaws.services.glue.model.ErrorDetails;
28+
import com.amazonaws.services.glue.model.FederationSourceErrorCode;
2629
import com.google.common.collect.ImmutableSet;
2730
import org.slf4j.Logger;
2831
import org.slf4j.LoggerFactory;
@@ -126,7 +129,7 @@ private void checkStatus(String queryId, int attempt)
126129
if (e instanceof InvalidRequestException) {
127130
// query does not exist, so no need to keep calling Athena
128131
logger.debug("Athena reports query {} not found. Interrupting checker thread", queryId);
129-
throw new InterruptedException();
132+
throw new AthenaConnectorException(e, e.getMessage(), new ErrorDetails().withErrorCode(FederationSourceErrorCode.InternalServiceException.toString()));
130133
}
131134
}
132135
}

athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/ThrottlingInvoker.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
*/
2222

2323
import com.amazonaws.athena.connector.lambda.data.BlockSpiller;
24-
import com.amazonaws.athena.connector.lambda.exceptions.FederationThrottleException;
24+
import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
25+
import com.amazonaws.services.glue.model.ErrorDetails;
26+
import com.amazonaws.services.glue.model.FederationSourceErrorCode;
2527
import com.google.common.base.MoreObjects;
2628
import org.apache.arrow.util.VisibleForTesting;
2729
import org.slf4j.Logger;
@@ -99,15 +101,15 @@ public ThrottlingInvoker(Builder builder)
99101
BlockSpiller spiller)
100102
{
101103
if (decrease > 1 || decrease < .001) {
102-
throw new IllegalArgumentException("decrease was " + decrease + " but should be between .001 and 1");
104+
throw new AthenaConnectorException("decrease was " + decrease + " but should be between .001 and 1", new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()));
103105
}
104106

105107
if (maxDelayMs < 1) {
106-
throw new IllegalArgumentException("maxDelayMs was " + maxDelayMs + " but must be >= 1");
108+
throw new AthenaConnectorException("maxDelayMs was " + maxDelayMs + " but must be >= 1", new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()));
107109
}
108110

109111
if (increase < 1) {
110-
throw new IllegalArgumentException("increase was " + increase + " but must be >= 1");
112+
throw new AthenaConnectorException("increase was " + increase + " but must be >= 1", new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()));
111113
}
112114

113115
this.initialDelayMs = initialDelayMs;
@@ -198,7 +200,7 @@ public <T> T invoke(Callable<T> callable, long timeoutMillis)
198200
}
199201
while (!isTimedOut(startTime, timeoutMillis));
200202

201-
throw new TimeoutException("Timed out before call succeeded after " + (System.currentTimeMillis() - startTime) + " ms");
203+
throw new AthenaConnectorException("Timed out before call succeeded after " + (System.currentTimeMillis() - startTime) + " ms", new ErrorDetails().withErrorCode(FederationSourceErrorCode.OperationTimeoutException.toString()));
202204
}
203205

204206
/**
@@ -254,7 +256,7 @@ else if (newDelay > maxDelayMs) {
254256

255257
if (spillerRef.get() != null && !spillerRef.get().spilled()) {
256258
//If no blocks have spilled, it is better to signal the Throttle to Athena by propagating.
257-
throw new FederationThrottleException("ThrottlingInvoker requesting slow down due to " + ex, ex);
259+
throw new AthenaConnectorException("ThrottlingInvoker requesting slow down due to " + ex, new ErrorDetails().withErrorCode(FederationSourceErrorCode.ThrottlingException.toString()));
258260
}
259261
}
260262

@@ -281,7 +283,7 @@ private void applySleep()
281283
}
282284
catch (InterruptedException ex) {
283285
Thread.currentThread().interrupt();
284-
throw new RuntimeException(ex);
286+
throw new AthenaConnectorException(ex, ex.getMessage(), new ErrorDetails().withErrorCode(FederationSourceErrorCode.InternalServiceException.toString()));
285287
}
286288
}
287289
}

athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/data/ArrowTypeComparator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
* #L%
2121
*/
2222

23+
import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
24+
import com.amazonaws.services.glue.model.ErrorDetails;
25+
import com.amazonaws.services.glue.model.FederationSourceErrorCode;
2326
import org.apache.arrow.vector.complex.reader.FieldReader;
2427
import org.apache.arrow.vector.types.Types;
2528
import org.apache.arrow.vector.types.pojo.ArrowType;
@@ -115,7 +118,7 @@ else if (lhs.hashCode() < rhs.hashCode()) {
115118
default:
116119
//logging because throwing in a comparator gets swallowed in many unit tests that use equality asserts
117120
logger.warn("compare: Unknown type " + type + " object: " + lhs.getClass());
118-
throw new IllegalArgumentException("Unknown type " + type + " object: " + lhs.getClass());
121+
throw new AthenaConnectorException("Unknown type " + type + " object: " + lhs.getClass(), new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()));
119122
}
120123
}
121124
}

athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/data/AthenaFederationIpcOption.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020

2121
package com.amazonaws.athena.connector.lambda.data;
2222

23+
import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
24+
import com.amazonaws.services.glue.model.ErrorDetails;
25+
import com.amazonaws.services.glue.model.FederationSourceErrorCode;
2326
import org.apache.arrow.vector.ipc.message.IpcOption;
2427
import org.apache.arrow.vector.types.MetadataVersion;
2528
import org.slf4j.Logger;
@@ -60,7 +63,7 @@ private static IpcOption getIpcOption()
6063
}
6164
catch (ReflectiveOperationException ex) {
6265
// Rethrow as unchecked because most of the callers do not already declare any throws
63-
throw new RuntimeException(ex);
66+
throw new AthenaConnectorException(ex.getMessage(), new ErrorDetails().withErrorCode(FederationSourceErrorCode.InternalServiceException.toString()).withErrorMessage(ex.getMessage()));
6467
}
6568
}
6669
}

athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/data/BlockAllocatorImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
* #L%
2121
*/
2222

23+
import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
24+
import com.amazonaws.services.glue.model.ErrorDetails;
25+
import com.amazonaws.services.glue.model.FederationSourceErrorCode;
2326
import org.apache.arrow.memory.ArrowBuf;
2427
import org.apache.arrow.memory.BufferAllocator;
2528
import org.apache.arrow.memory.RootAllocator;
@@ -197,7 +200,7 @@ public synchronized ArrowRecordBatch registerBatch(BatchGenerator generator)
197200
throw ex;
198201
}
199202
catch (Exception ex) {
200-
throw new RuntimeException(ex);
203+
throw new AthenaConnectorException(ex.getMessage(), new ErrorDetails().withErrorCode(FederationSourceErrorCode.InternalServiceException.toString()));
201204
}
202205
}
203206

athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/data/BlockUtils.java

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
* #L%
2121
*/
2222

23+
import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
24+
import com.amazonaws.services.glue.model.ErrorDetails;
25+
import com.amazonaws.services.glue.model.FederationSourceErrorCode;
2326
import org.apache.arrow.memory.ArrowBuf;
2427
import org.apache.arrow.memory.BufferAllocator;
2528
import org.apache.arrow.util.VisibleForTesting;
@@ -151,7 +154,7 @@ public static Block newBlock(BlockAllocator allocator, String columnName, ArrowT
151154
setValue(block.getFieldVector(columnName), count++, next);
152155
}
153156
catch (Exception ex) {
154-
throw new RuntimeException("Error for " + type + " " + columnName + " " + next, ex);
157+
throw new AthenaConnectorException("Error for " + type + " " + columnName + " " + next, new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()).withErrorMessage(ex.getMessage()));
155158
}
156159
}
157160
block.setRowCount(count);
@@ -217,8 +220,8 @@ else if (vector instanceof StructVector) {
217220
resolver);
218221
}
219222
else {
220-
throw new RuntimeException("Unsupported 'Complex' vector " +
221-
vector.getClass().getSimpleName() + " for field " + vector.getField().getName());
223+
throw new AthenaConnectorException("Unsupported 'Complex' vector " +
224+
vector.getClass().getSimpleName() + " for field " + vector.getField().getName(), new ErrorDetails().withErrorMessage(FederationSourceErrorCode.OperationNotSupportedException.toString()));
222225
}
223226
}
224227

@@ -391,14 +394,14 @@ else if (value instanceof Boolean && (boolean) value) {
391394
}
392395
break;
393396
default:
394-
throw new IllegalArgumentException("Unknown type " + vector.getMinorType());
397+
throw new AthenaConnectorException("Unknown type " + vector.getMinorType(), new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()));
395398
}
396399
}
397400
catch (RuntimeException ex) {
398401
String fieldName = (vector != null) ? vector.getField().getName() : "null_vector";
399-
throw new RuntimeException("Unable to set value for field " + fieldName
402+
throw new AthenaConnectorException("Unable to set value for field " + fieldName
400403
+ " using value " + value
401-
+ " of type " + vector.getMinorType(), ex);
404+
+ " of type " + vector.getMinorType(), new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()).withErrorMessage(ex.getMessage()));
402405
}
403406
}
404407

@@ -413,7 +416,7 @@ else if (value instanceof Boolean && (boolean) value) {
413416
public static String rowToString(Block block, int row)
414417
{
415418
if (row > block.getRowCount()) {
416-
throw new IllegalArgumentException(row + " exceeds available rows " + block.getRowCount());
419+
throw new AthenaConnectorException(row + " exceeds available rows " + block.getRowCount(), new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()));
417420
}
418421

419422
StringBuilder sb = new StringBuilder();
@@ -430,7 +433,7 @@ public static String rowToString(Block block, int row)
430433
sb.append("]");
431434
}
432435
catch (RuntimeException ex) {
433-
throw new RuntimeException("Error processing field " + nextReader.getField().getName(), ex);
436+
throw new AthenaConnectorException("Error processing field " + nextReader.getField().getName(), new ErrorDetails().withErrorCode(FederationSourceErrorCode.InternalServiceException.toString()).withErrorMessage(ex.getMessage()));
434437
}
435438
}
436439

@@ -527,8 +530,8 @@ public static String fieldToString(FieldReader reader)
527530
public static int copyRows(Block srcBlock, Block dstBlock, int firstRow, int lastRow)
528531
{
529532
if (firstRow > lastRow || lastRow > srcBlock.getRowCount() - 1) {
530-
throw new RuntimeException("src has " + srcBlock.getRowCount()
531-
+ " but requested copy of " + firstRow + " to " + lastRow);
533+
throw new AthenaConnectorException("src has " + srcBlock.getRowCount()
534+
+ " but requested copy of " + firstRow + " to " + lastRow, new ErrorDetails().withErrorCode(FederationSourceErrorCode.InternalServiceException.toString()));
532535
}
533536

534537
for (FieldReader src : srcBlock.getFieldReaders()) {
@@ -555,8 +558,8 @@ public static int copyRows(Block srcBlock, Block dstBlock, int firstRow, int las
555558
public static boolean isNullRow(Block block, int row)
556559
{
557560
if (row > block.getRowCount() - 1) {
558-
throw new RuntimeException("block has " + block.getRowCount()
559-
+ " rows but requested to check " + row);
561+
throw new AthenaConnectorException("block has " + block.getRowCount()
562+
+ " rows but requested to check " + row, new ErrorDetails().withErrorCode(FederationSourceErrorCode.InternalServiceException.toString()));
560563
}
561564

562565
//If any column is non-null then return false
@@ -677,26 +680,26 @@ protected static void writeMap(BufferAllocator allocator,
677680
List<Field> children = field.getChildren();
678681
Field keyValueStructField;
679682
if (children.size() != 1) {
680-
throw new IllegalStateException("Invalid Arrow Map schema: " + field);
683+
throw new AthenaConnectorException("Invalid Arrow Map schema: " + field, new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()));
681684
}
682685
else {
683686
keyValueStructField = children.get(0);
684687
if (!MapVector.DATA_VECTOR_NAME.equals(keyValueStructField.getName()) || !(keyValueStructField.getType() instanceof ArrowType.Struct)) {
685-
throw new IllegalStateException("Invalid Arrow Map schema: " + field);
688+
throw new AthenaConnectorException("Invalid Arrow Map schema: " + field, new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()));
686689
}
687690
}
688691

689692
List<Field> keyValueChildren = keyValueStructField.getChildren();
690693
Field keyField;
691694
Field valueField;
692695
if (keyValueChildren.size() != 2) {
693-
throw new IllegalStateException("Invalid Arrow Map schema: " + field);
696+
throw new AthenaConnectorException("Invalid Arrow Map schema: " + field, new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()));
694697
}
695698
else {
696699
keyField = keyValueChildren.get(0);
697700
valueField = keyValueChildren.get(1);
698701
if (!MapVector.KEY_NAME.equals(keyField.getName()) || !MapVector.VALUE_NAME.equals(valueField.getName())) {
699-
throw new IllegalStateException("Invalid Arrow Map schema: " + field);
702+
throw new AthenaConnectorException("Invalid Arrow Map schema: " + field, new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()));
700703
}
701704
}
702705

@@ -1011,13 +1014,13 @@ else if (value instanceof Boolean && (boolean) value) {
10111014
}
10121015
break;
10131016
default:
1014-
throw new IllegalArgumentException("Unknown type " + type);
1017+
throw new AthenaConnectorException("Unknown type " + type, new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()));
10151018
}
10161019
}
10171020
catch (RuntimeException ex) {
1018-
throw new RuntimeException("Unable to write value for field "
1021+
throw new AthenaConnectorException("Unable to write value for field "
10191022
+ field.getName() + " using value " + value
1020-
+ " with minor type " + Types.getMinorTypeForArrowType(type), ex);
1023+
+ " with minor type " + Types.getMinorTypeForArrowType(type), new ErrorDetails().withErrorCode(FederationSourceErrorCode.InternalServiceException.toString()).withErrorMessage(ex.getMessage()));
10211024
}
10221025
}
10231026

@@ -1085,7 +1088,7 @@ private static void setNullValue(FieldVector vector, int pos)
10851088
((BitVector) vector).setNull(pos);
10861089
break;
10871090
default:
1088-
throw new IllegalArgumentException("Unknown type " + vector.getMinorType());
1091+
throw new AthenaConnectorException("Unknown type " + vector.getMinorType(), new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()));
10891092
}
10901093
}
10911094

@@ -1169,7 +1172,7 @@ public static void unsetRow(int row, Block block)
11691172
((MapVector) vector).setNull(row);
11701173
break;
11711174
default:
1172-
throw new IllegalArgumentException("Unknown type " + vector.getMinorType());
1175+
throw new AthenaConnectorException("Unknown type " + vector.getMinorType(), new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()));
11731176
}
11741177
}
11751178
}
@@ -1220,7 +1223,7 @@ public static Class getJavaType(Types.MinorType minorType)
12201223
case STRUCT:
12211224
return Map.class;
12221225
default:
1223-
throw new IllegalArgumentException("Unknown type " + minorType);
1226+
throw new AthenaConnectorException("Unknown type " + minorType, new ErrorDetails().withErrorCode(FederationSourceErrorCode.InvalidInputException.toString()));
12241227
}
12251228
}
12261229

athena-federation-sdk/src/main/java/com/amazonaws/athena/connector/lambda/data/DateTimeFormatterUtil.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
*/
2020
package com.amazonaws.athena.connector.lambda.data;
2121

22+
import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
23+
import com.amazonaws.services.glue.model.ErrorDetails;
24+
import com.amazonaws.services.glue.model.FederationSourceErrorCode;
2225
import com.amazonaws.util.StringUtils;
2326
import com.google.common.annotations.VisibleForTesting;
2427
import org.apache.arrow.vector.types.pojo.ArrowType;
@@ -330,7 +333,7 @@ public static ZonedDateTime constructZonedDateTime(long epochTimestamp, ArrowTyp
330333
ZoneId timeZone = ZoneId.of(arrowType.getTimezone());
331334
if (packTimezone) {
332335
if (!org.apache.arrow.vector.types.TimeUnit.MILLISECOND.equals(timeunit)) {
333-
throw new UnsupportedOperationException("Unpacking is only supported for milliseconds");
336+
throw new AthenaConnectorException("Unpacking is only supported for milliseconds", new ErrorDetails().withErrorCode(FederationSourceErrorCode.OperationNotSupportedException.toString()));
334337
}
335338
// arrowType's timezone is ignored in this case since the timezone is packed into the long
336339
TimeZoneKey timeZoneKey = unpackZoneKey(epochTimestamp);
@@ -355,7 +358,7 @@ public static java.time.temporal.ChronoUnit arrowTimeUnitToChronoUnit(org.apache
355358
return java.time.temporal.ChronoUnit.SECONDS;
356359
}
357360

358-
throw new UnsupportedOperationException(String.format("Unsupported timeunit: %s", timeunit));
361+
throw new AthenaConnectorException(String.format("Unsupported timeunit: %s", timeunit), new ErrorDetails().withErrorCode(FederationSourceErrorCode.OperationNotSupportedException.toString()));
359362
}
360363

361364
public static ZonedDateTime zonedDateTimeFromObject(Object value)
@@ -370,7 +373,7 @@ else if (value instanceof Date) {
370373
return ((Date) value).toInstant().atZone(ZoneId.of("UTC"));
371374
}
372375

373-
throw new UnsupportedOperationException(String.format("Type: %s not supported", value.getClass()));
376+
throw new AthenaConnectorException(String.format("Type: %s not supported", value.getClass()), new ErrorDetails().withErrorCode(FederationSourceErrorCode.OperationNotSupportedException.toString()));
374377
}
375378

376379
public static org.apache.arrow.vector.holders.TimeStampMilliTZHolder timestampMilliTzHolderFromObject(Object value, String targetTimeZoneId)
@@ -401,7 +404,7 @@ public static org.apache.arrow.vector.holders.TimeStampMilliTZHolder timestampMi
401404
public static org.apache.arrow.vector.holders.TimeStampMicroTZHolder timestampMicroTzHolderFromObject(Object value, String targetTimeZoneId)
402405
{
403406
if (packTimezone) {
404-
throw new UnsupportedOperationException("Packing for TimeStampMicroTZ is not currently supported");
407+
throw new AthenaConnectorException("Packing for TimeStampMicroTZ is not currently supported", new ErrorDetails().withErrorCode(FederationSourceErrorCode.OperationNotSupportedException.toString()));
405408
}
406409

407410
ZonedDateTime zdt = zonedDateTimeFromObject(value);

0 commit comments

Comments
 (0)