diff --git a/beekeeper-api/src/main/java/com/expediagroup/beekeeper/api/error/BeekeeperExceptionHandler.java b/beekeeper-api/src/main/java/com/expediagroup/beekeeper/api/error/BeekeeperExceptionHandler.java index b83a56ed..93df672e 100644 --- a/beekeeper-api/src/main/java/com/expediagroup/beekeeper/api/error/BeekeeperExceptionHandler.java +++ b/beekeeper-api/src/main/java/com/expediagroup/beekeeper/api/error/BeekeeperExceptionHandler.java @@ -16,6 +16,8 @@ package com.expediagroup.beekeeper.api.error; +import java.time.LocalDateTime; + import javax.servlet.http.HttpServletRequest; import org.springframework.data.mapping.PropertyReferenceException; @@ -24,8 +26,6 @@ import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.RestControllerAdvice; -import java.time.LocalDateTime; - @RestControllerAdvice public class BeekeeperExceptionHandler { diff --git a/beekeeper-api/src/test/java/com/expediagroup/beekeeper/api/error/BeekeeperExceptionHandlerTest.java b/beekeeper-api/src/test/java/com/expediagroup/beekeeper/api/error/BeekeeperExceptionHandlerTest.java index 11406e9d..325cd95c 100644 --- a/beekeeper-api/src/test/java/com/expediagroup/beekeeper/api/error/BeekeeperExceptionHandlerTest.java +++ b/beekeeper-api/src/test/java/com/expediagroup/beekeeper/api/error/BeekeeperExceptionHandlerTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2023 Expedia, Inc. + * Copyright (C) 2019-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,24 +15,22 @@ */ package com.expediagroup.beekeeper.api.error; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collections; +import java.util.List; + import org.junit.jupiter.api.Test; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.mock.web.MockHttpServletRequest; -import org.springframework.data.mapping.PropertyReferenceException; import org.springframework.data.mapping.PropertyPath; +import org.springframework.data.mapping.PropertyReferenceException; import org.springframework.data.util.ClassTypeInformation; import org.springframework.data.util.TypeInformation; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.mock.web.MockHttpServletRequest; -import static org.assertj.core.api.Assertions.assertThat; - -import com.expediagroup.beekeeper.api.error.BeekeeperExceptionHandler; -import com.expediagroup.beekeeper.api.error.ErrorResponse; import com.expediagroup.beekeeper.core.model.HousekeepingPath; -import java.util.Collections; -import java.util.List; - public class BeekeeperExceptionHandlerTest { private final BeekeeperExceptionHandler exceptionHandler = new BeekeeperExceptionHandler(); diff --git a/beekeeper-core/src/main/java/com/expediagroup/beekeeper/core/repository/HousekeepingMetadataRepository.java b/beekeeper-core/src/main/java/com/expediagroup/beekeeper/core/repository/HousekeepingMetadataRepository.java index a31b66c1..a6b2ce06 100644 --- a/beekeeper-core/src/main/java/com/expediagroup/beekeeper/core/repository/HousekeepingMetadataRepository.java +++ b/beekeeper-core/src/main/java/com/expediagroup/beekeeper/core/repository/HousekeepingMetadataRepository.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2023 Expedia, Inc. + * Copyright (C) 2019-2025 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -145,4 +145,20 @@ void deleteScheduledOrFailedPartitionRecordsForTable( @Query(value = "delete from HousekeepingMetadata t where t.cleanupTimestamp < :instant " + "and (t.housekeepingStatus = 'DELETED' or t.housekeepingStatus = 'DISABLED')") void cleanUpOldDeletedRecords(@Param("instant") LocalDateTime instant); + + /** + * Returns the list of partitions of table that are schedule or failed, if there is one. + * + * @param databaseName + * @param tableName + * @return List of records that match the inputs given. + */ + @Query(value = "from HousekeepingMetadata t " + + "where t.databaseName = :databaseName " + + "and t.tableName = :tableName " + + "and t.partitionName IS NOT NULL " + + "and (t.housekeepingStatus = 'SCHEDULED' or t.housekeepingStatus = 'FAILED')") + List findRecordsForCleanupByDbAndTableName( + @Param("databaseName") String databaseName, + @Param("tableName") String tableName); } diff --git a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest.java b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest.java index 057ebc8c..398033b4 100644 --- a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest.java +++ b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2024 Expedia, Inc. + * Copyright (C) 2019-2025 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,8 +15,12 @@ */ package com.expediagroup.beekeeper.integration; +import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY; +import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3; import static org.testcontainers.containers.localstack.LocalStackContainer.Service.SQS; import static com.expediagroup.beekeeper.core.model.HousekeepingStatus.SCHEDULED; @@ -33,9 +37,12 @@ import java.net.URISyntaxException; import java.sql.SQLException; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; @@ -45,6 +52,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import org.testcontainers.containers.localstack.LocalStackContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -53,9 +61,12 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.composite.CompositeMeterRegistry; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.CreateBucketRequest; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.PurgeQueueRequest; import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.google.common.collect.ImmutableMap; import com.expediagroup.beekeeper.core.model.HousekeepingMetadata; import com.expediagroup.beekeeper.core.model.PeriodDuration; @@ -65,8 +76,11 @@ import com.expediagroup.beekeeper.integration.model.AlterTableSqsMessage; import com.expediagroup.beekeeper.integration.model.CreateTableSqsMessage; import com.expediagroup.beekeeper.integration.utils.ContainerTestUtils; +import com.expediagroup.beekeeper.integration.utils.HiveTestUtils; import com.expediagroup.beekeeper.scheduler.apiary.BeekeeperSchedulerApiary; +import com.hotels.beeju.extensions.ThriftHiveMetaStoreJUnitExtension; + @Testcontainers public class BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest extends BeekeeperIntegrationTestBase { @@ -75,6 +89,7 @@ public class BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest extends Beek protected static final String QUEUE = "apiary-receiver-queue"; protected static final String SCHEDULED_EXPIRED_METRIC = "metadata-scheduled"; + protected static final String METASTORE_URI_PROPERTY = "properties.metastore-uri"; protected static final String HEALTHCHECK_URI = "http://localhost:8080/actuator/health"; protected static final String PROMETHEUS_URI = "http://localhost:8080/actuator/prometheus"; @@ -85,10 +100,41 @@ public class BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest extends Beek protected static final String PARTITION_B_NAME = "event_date=2020-01-01/event_hour=1"; protected static final String LOCATION_A = "s3://bucket/table1/partition"; protected static final String LOCATION_B = "s3://bucket/table2/partition"; + protected static final String BUCKET = "test-path-bucket"; + protected static final String PARTITIONED_TABLE_NAME = TABLE_NAME_VALUE + "_partitioned"; + protected static final String ROOT_PATH = "s3a://" + BUCKET + "/" + DATABASE_NAME_VALUE + "/"; + protected static final String PARTITIONED_TABLE_PATH = ROOT_PATH + PARTITIONED_TABLE_NAME; + protected static final String PARTITION_ROOT_PATH = ROOT_PATH + "some_location/"; + protected static final List PARTITION_VALUES = List.of("2020-01-01", "0", "A"); + + protected static final String S3_ACCESS_KEY = "access"; + protected static final String S3_SECRET_KEY = "secret"; @Container protected static final LocalStackContainer SQS_CONTAINER = ContainerTestUtils.awsContainer(SQS); + @Container + protected static final LocalStackContainer S3_CONTAINER = ContainerTestUtils.awsContainer(S3); + protected static AmazonSQS amazonSQS; + protected HiveTestUtils hiveTestUtils; + protected HiveMetaStoreClient metastoreClient; + + static { + S3_CONTAINER.start(); + } + + protected static AmazonS3 amazonS3; + + private static Map metastoreProperties = ImmutableMap + .builder() + .put(ENDPOINT, ContainerTestUtils.awsServiceEndpoint(S3_CONTAINER, S3)) + .put(ACCESS_KEY, S3_ACCESS_KEY) + .put(SECRET_KEY, S3_SECRET_KEY) + .build(); + + @RegisterExtension + protected ThriftHiveMetaStoreJUnitExtension thriftHiveMetaStore = new ThriftHiveMetaStoreJUnitExtension( + DATABASE_NAME_VALUE, metastoreProperties); @BeforeAll public static void init() { @@ -97,18 +143,29 @@ public static void init() { amazonSQS = ContainerTestUtils.sqsClient(SQS_CONTAINER, AWS_REGION); amazonSQS.createQueue(QUEUE); + + amazonS3 = ContainerTestUtils.s3Client(S3_CONTAINER, AWS_REGION); + amazonS3.createBucket(new CreateBucketRequest(BUCKET, AWS_REGION)); } @AfterAll public static void teardown() { System.clearProperty(APIARY_QUEUE_URL_PROPERTY); + System.clearProperty(METASTORE_URI_PROPERTY); amazonSQS.shutdown(); + amazonS3.shutdown(); } @BeforeEach public void setup() { + System.setProperty(METASTORE_URI_PROPERTY, thriftHiveMetaStore.getThriftConnectionUri()); + metastoreClient = thriftHiveMetaStore.client(); + hiveTestUtils = new HiveTestUtils(metastoreClient); + amazonSQS.purgeQueue(new PurgeQueueRequest(ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE))); + amazonS3.listObjectsV2(BUCKET).getObjectSummaries() + .forEach(object -> amazonS3.deleteObject(BUCKET, object.getKey())); executorService.execute(() -> BeekeeperSchedulerApiary.main(new String[] {})); await().atMost(Duration.ONE_MINUTE).until(BeekeeperSchedulerApiary::isRunning); } @@ -156,7 +213,7 @@ public void expiredMetadataAddPartitionEvent() throws SQLException, IOException, List expiredMetadata = getExpiredMetadata(); // check first entry is for the table - assertThat(expiredMetadata.get(0).getPartitionName()).isEqualTo(null); + assertThat(expiredMetadata.get(0).getPartitionName()).isNull(); assertExpiredMetadata(expiredMetadata.get(1), LOCATION_A, PARTITION_A_NAME); } @@ -176,7 +233,7 @@ public void expiredMetadataMultipleAddPartitionEvents() throws SQLException, IOE List expiredMetadata = getExpiredMetadata(); // check first entry is for the table - assertThat(expiredMetadata.get(0).getPartitionName()).isEqualTo(null); + assertThat(expiredMetadata.get(0).getPartitionName()).isNull(); assertExpiredMetadata(expiredMetadata.get(1), LOCATION_A, PARTITION_A_NAME); assertExpiredMetadata(expiredMetadata.get(2), LOCATION_B, PARTITION_B_NAME); } @@ -222,7 +279,7 @@ public void expiredMetadataCreateIcebergTableEvent() throws SQLException, IOExce await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getExpiredMetadataRowCount() == 0); List expiredMetadata = getExpiredMetadata(); - assertThat(expiredMetadata.size()).isEqualTo(0); + assertThat(expiredMetadata).isEmpty(); } @Test @@ -240,6 +297,43 @@ public void testEventAddedToHistoryTable() throws SQLException, IOException, URI assertThat(history.getHousekeepingStatus()).isEqualTo(SCHEDULED.name()); } + @Test + void scheduleExistingPartitionsWhenPropertiesExpireInTable() throws Exception { + Table table = hiveTestUtils.createTable(PARTITIONED_TABLE_PATH, TABLE_NAME_VALUE, true, true); + hiveTestUtils.addPartitionsToTable(PARTITION_ROOT_PATH, table, PARTITION_VALUES); + hiveTestUtils.addPartitionsToTable(PARTITION_ROOT_PATH, table, List.of("2020-01-01", "1", "B")); + + AlterTableSqsMessage alterTableSqsMessage = new AlterTableSqsMessage(LOCATION_A, true); + amazonSQS.sendMessage(sendMessageRequest(alterTableSqsMessage.getFormattedString())); + + await() + .atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getExpiredMetadataRowCount() == 3); + + assertThat(metastoreClient.tableExists(DATABASE_NAME_VALUE, TABLE_NAME_VALUE)).isTrue(); + List expiredMetadata = getExpiredMetadata(); + assertThat(expiredMetadata).hasSize(3); + } + + @Test + void scheduleMissingPartitionsWhenPropertiesExpireInTable() throws Exception { + Table table = hiveTestUtils.createTable(PARTITIONED_TABLE_PATH, TABLE_NAME_VALUE, true, true); + hiveTestUtils.addPartitionsToTable(PARTITION_ROOT_PATH, table, PARTITION_VALUES); + hiveTestUtils.addPartitionsToTable(PARTITION_ROOT_PATH, table, List.of("2020-01-01", "1", "B")); + + insertExpiredMetadata(PARTITION_ROOT_PATH, null); + insertExpiredMetadata(PARTITION_ROOT_PATH + PARTITION_A_NAME + "/event_type=C", PARTITION_A_NAME + "/event_type=C"); + + AlterTableSqsMessage alterTableSqsMessage = new AlterTableSqsMessage(PARTITION_ROOT_PATH, true); + amazonSQS.sendMessage(sendMessageRequest(alterTableSqsMessage.getFormattedString())); + + await() + .atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getExpiredMetadataRowCount() == 4); + + assertThat(metastoreClient.tableExists(DATABASE_NAME_VALUE, TABLE_NAME_VALUE)).isTrue(); + List expiredMetadata = getExpiredMetadata(); + assertThat(expiredMetadata).hasSize(4); + } + @Test public void healthCheck() { CloseableHttpClient client = HttpClientBuilder.create().build(); @@ -253,7 +347,7 @@ public void healthCheck() { public void prometheus() { CloseableHttpClient client = HttpClientBuilder.create().build(); HttpGet request = new HttpGet(PROMETHEUS_URI); - await().atMost(30, TimeUnit.SECONDS).until(() -> client.execute(request).getStatusLine().getStatusCode() == 200); + await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> client.execute(request).getStatusLine().getStatusCode() == 200); } protected SendMessageRequest sendMessageRequest(String payload) { diff --git a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperUnreferencedPathSchedulerApiaryIntegrationTest.java b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperUnreferencedPathSchedulerApiaryIntegrationTest.java index f2f36253..f04051f1 100644 --- a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperUnreferencedPathSchedulerApiaryIntegrationTest.java +++ b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperUnreferencedPathSchedulerApiaryIntegrationTest.java @@ -15,6 +15,8 @@ */ package com.expediagroup.beekeeper.integration; +import static java.util.Collections.emptyMap; + import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.testcontainers.containers.localstack.LocalStackContainer.Service.SQS; @@ -45,6 +47,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import org.testcontainers.containers.localstack.LocalStackContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -67,11 +70,14 @@ import com.expediagroup.beekeeper.integration.utils.ContainerTestUtils; import com.expediagroup.beekeeper.scheduler.apiary.BeekeeperSchedulerApiary; +import com.hotels.beeju.extensions.ThriftHiveMetaStoreJUnitExtension; + @Testcontainers public class BeekeeperUnreferencedPathSchedulerApiaryIntegrationTest extends BeekeeperIntegrationTestBase { protected static final int TIMEOUT = 5; protected static final String APIARY_QUEUE_URL_PROPERTY = "properties.apiary.queue-url"; + protected static final String METASTORE_URI_PROPERTY = "properties.metastore-uri"; protected static final String QUEUE = "apiary-receiver-queue"; protected static final String SCHEDULED_ORPHANED_METRIC = "paths-scheduled"; @@ -82,6 +88,10 @@ public class BeekeeperUnreferencedPathSchedulerApiaryIntegrationTest extends Bee protected static final LocalStackContainer SQS_CONTAINER = ContainerTestUtils.awsContainer(SQS); protected static AmazonSQS amazonSQS; + @RegisterExtension + protected ThriftHiveMetaStoreJUnitExtension thriftHiveMetaStore = new ThriftHiveMetaStoreJUnitExtension( + DATABASE_NAME_VALUE, emptyMap()); + @BeforeAll public static void init() { String queueUrl = ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE); @@ -100,6 +110,8 @@ public static void teardown() { @BeforeEach public void setup() { + System.setProperty(METASTORE_URI_PROPERTY, thriftHiveMetaStore.getThriftConnectionUri()); + amazonSQS.purgeQueue(new PurgeQueueRequest(ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE))); executorService.execute(() -> BeekeeperSchedulerApiary.main(new String[] {})); await().atMost(Duration.ONE_MINUTE).until(BeekeeperSchedulerApiary::isRunning); diff --git a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/api/BeekeeperApiIntegrationTest.java b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/api/BeekeeperApiIntegrationTest.java index 8af7159b..afc0e0c1 100644 --- a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/api/BeekeeperApiIntegrationTest.java +++ b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/api/BeekeeperApiIntegrationTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2023 Expedia, Inc. + * Copyright (C) 2019-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.springframework.http.HttpStatus.OK; -import static com.expediagroup.beekeeper.core.model.HousekeepingStatus.SCHEDULED; import static com.expediagroup.beekeeper.integration.CommonTestVariables.CLEANUP_ATTEMPTS_VALUE; import static com.expediagroup.beekeeper.integration.CommonTestVariables.CLIENT_ID_FIELD; import static com.expediagroup.beekeeper.integration.CommonTestVariables.CREATION_TIMESTAMP_VALUE; diff --git a/beekeeper-metadata-cleanup/src/test/java/com/expediagroup/beekeeper/metadata/cleanup/service/PagingMetadataCleanupServiceTest.java b/beekeeper-metadata-cleanup/src/test/java/com/expediagroup/beekeeper/metadata/cleanup/service/PagingMetadataCleanupServiceTest.java index e0b2ac27..52cc8041 100644 --- a/beekeeper-metadata-cleanup/src/test/java/com/expediagroup/beekeeper/metadata/cleanup/service/PagingMetadataCleanupServiceTest.java +++ b/beekeeper-metadata-cleanup/src/test/java/com/expediagroup/beekeeper/metadata/cleanup/service/PagingMetadataCleanupServiceTest.java @@ -61,7 +61,6 @@ import com.expediagroup.beekeeper.cleanup.hive.HiveClientFactory; import com.expediagroup.beekeeper.cleanup.metadata.MetadataCleaner; import com.expediagroup.beekeeper.cleanup.path.PathCleaner; -import com.expediagroup.beekeeper.core.error.BeekeeperException; import com.expediagroup.beekeeper.core.model.HousekeepingMetadata; import com.expediagroup.beekeeper.core.model.HousekeepingPath; import com.expediagroup.beekeeper.core.model.HousekeepingStatus; diff --git a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java index 35dd5247..bd9d6853 100644 --- a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java +++ b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2020 Expedia, Inc. + * Copyright (C) 2019-2025 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,9 @@ import java.util.EnumMap; import java.util.List; +import java.util.function.Supplier; +import org.apache.hadoop.hive.conf.HiveConf; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.domain.EntityScan; @@ -53,8 +55,14 @@ import com.expediagroup.beekeeper.scheduler.apiary.messaging.BeekeeperEventReader; import com.expediagroup.beekeeper.scheduler.apiary.messaging.MessageReaderAdapter; import com.expediagroup.beekeeper.scheduler.apiary.messaging.RetryingMessageReader; +import com.expediagroup.beekeeper.scheduler.hive.HiveClientFactory; +import com.expediagroup.beekeeper.scheduler.hive.PartitionIteratorFactory; import com.expediagroup.beekeeper.scheduler.service.SchedulerService; +import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient; +import com.hotels.hcommon.hive.metastore.client.closeable.CloseableMetaStoreClientFactory; +import com.hotels.hcommon.hive.metastore.client.supplier.HiveMetaStoreClientSupplier; + @Configuration @ComponentScan(basePackages = { "com.expediagroup.beekeeper.core", "com.expediagroup.beekeeper.scheduler" }) @EntityScan(basePackages = { "com.expediagroup.beekeeper.core" }) @@ -149,4 +157,34 @@ public BeekeeperEventReader eventReader( BeekeeperHistoryService beekeeperHistoryService(BeekeeperHistoryRepository beekeeperHistoryRepository) { return new BeekeeperHistoryService(beekeeperHistoryRepository); } + + @Bean + public HiveConf hiveConf(@Value("${properties.metastore-uri}") String metastoreUri) { + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUri); + return conf; + } + + @Bean + public CloseableMetaStoreClientFactory metaStoreClientFactory() { + return new CloseableMetaStoreClientFactory(); + } + + @Bean + Supplier metaStoreClientSupplier(CloseableMetaStoreClientFactory metaStoreClientFactory, + HiveConf hiveConf) { + String name = "beekeeper-scheduler"; + return new HiveMetaStoreClientSupplier(metaStoreClientFactory, hiveConf, name); + } + + @Bean + public PartitionIteratorFactory partitionIteratorFactory() { + return new PartitionIteratorFactory(); + } + + @Bean(name = "hiveClientFactory") + public HiveClientFactory clientFactory(Supplier metaStoreClientSupplier, + PartitionIteratorFactory partitionIteratorFactory) { + return new HiveClientFactory(metaStoreClientSupplier, partitionIteratorFactory); + } } diff --git a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java index 3b2c2b20..574e2202 100644 --- a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java +++ b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java @@ -15,16 +15,17 @@ */ package com.expediagroup.beekeeper.scheduler.apiary.filter; +import java.util.Map; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.stereotype.Component; import com.expedia.apiary.extensions.receiver.common.event.ListenerEvent; + import com.expediagroup.beekeeper.core.model.LifecycleEventType; import com.expediagroup.beekeeper.core.predicate.IsIcebergTablePredicate; -import java.util.Map; - @Component public class IcebergTableListenerEventFilter implements ListenerEventFilter { diff --git a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java index ec7e4cec..ff1d5443 100644 --- a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java +++ b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2020 Expedia, Inc. + * Copyright (C) 2019-2025 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,12 +21,15 @@ import java.util.Collections; import java.util.EnumMap; import java.util.List; +import java.util.function.Supplier; +import org.apache.hadoop.hive.conf.HiveConf; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import com.expedia.apiary.extensions.receiver.common.messaging.MessageReader; @@ -45,6 +48,10 @@ import com.expediagroup.beekeeper.scheduler.apiary.messaging.RetryingMessageReader; import com.expediagroup.beekeeper.scheduler.service.SchedulerService; +import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient; +import com.hotels.hcommon.hive.metastore.client.closeable.CloseableMetaStoreClientFactory; +import com.hotels.hcommon.hive.metastore.client.supplier.HiveMetaStoreClientSupplier; + @ExtendWith(MockitoExtension.class) public class CommonBeansTest { @@ -142,4 +149,14 @@ public void verifyBeekeeperHistoryService() { BeekeeperHistoryService beekeeperHistoryService = commonBeans.beekeeperHistoryService(beekeeperHistoryRepository); assertThat(beekeeperHistoryService).isInstanceOf(BeekeeperHistoryService.class); } + + @Test + public void verifyMetaStoreClientSupplier() { + CloseableMetaStoreClientFactory metaStoreClientFactory = commonBeans.metaStoreClientFactory(); + HiveConf hiveConf = Mockito.mock(HiveConf.class); + + Supplier metaStoreClientSupplier = commonBeans + .metaStoreClientSupplier(metaStoreClientFactory, hiveConf); + assertThat(metaStoreClientSupplier).isInstanceOf(HiveMetaStoreClientSupplier.class); + } } diff --git a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilterTest.java b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilterTest.java index 66dceebd..8a57424b 100644 --- a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilterTest.java +++ b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilterTest.java @@ -15,8 +15,7 @@ */ package com.expediagroup.beekeeper.scheduler.apiary.filter; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; import java.util.HashMap; import java.util.Map; @@ -34,42 +33,42 @@ public class IcebergTableListenerEventFilterTest { public void shouldFilterWhenTableTypeIsIceberg() { ListenerEvent event = createListenerEventWithTableType("ICEBERG"); boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); - assertTrue(isFiltered); + assertThat(isFiltered).isTrue(); } @Test public void shouldNotFilterWhenTableTypeIsNotIceberg() { ListenerEvent event = createListenerEventWithTableType("HIVE"); boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); - assertFalse(isFiltered); + assertThat(isFiltered).isFalse(); } @Test public void shouldFilterWhenTableTypeIsIcebergIgnoreCase() { ListenerEvent event = createListenerEventWithTableType("iceberg"); boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); - assertTrue(isFiltered); + assertThat(isFiltered).isTrue(); } @Test public void shouldFilterWhenMetadataLocationIsPresent() { ListenerEvent event = createListenerEventWithMetadataLocation("s3://example/path/to/metadata"); boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); - assertTrue(isFiltered); + assertThat(isFiltered).isTrue(); } @Test public void shouldNotFilterWhenMetadataLocationIsEmpty() { ListenerEvent event = createListenerEventWithMetadataLocation(""); boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); - assertFalse(isFiltered); + assertThat(isFiltered).isFalse(); } @Test public void shouldHandleNullTableParameters() { ListenerEvent event = createListenerEventWithTableParameters(null); boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); - assertFalse(isFiltered); + assertThat(isFiltered).isFalse(); } private ListenerEvent createListenerEventWithTableType(String tableType) { diff --git a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandlerTest.java b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandlerTest.java index 9a7f2a92..c1a9dc34 100644 --- a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandlerTest.java +++ b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandlerTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2020 Expedia, Inc. + * Copyright (C) 2019-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package com.expediagroup.beekeeper.scheduler.apiary.handler; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -23,7 +24,6 @@ import java.util.List; -import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; diff --git a/beekeeper-scheduler/pom.xml b/beekeeper-scheduler/pom.xml index 32d26a43..f13f5ec3 100644 --- a/beekeeper-scheduler/pom.xml +++ b/beekeeper-scheduler/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 @@ -29,5 +30,52 @@ org.springframework.boot spring-boot-starter-web + + + org.apache.hive + hive-metastore + ${hive.version} + + + org.apache.hbase + hbase-client + + + org.slf4j + slf4j-log4j12 + + + log4j-slf4j-impl + org.apache.logging.log4j + + + junit + junit + + + org.eclipse.jetty.aggregate + jetty-all + + + org.eclipse.jetty.orbit + javax.servlet + + + javax.servlet + servlet-api + + + + + com.hotels + hcommon-hive-metastore + ${hcommon-hive-metastore.version} + + + net.java.dev.jna + jna + + + diff --git a/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/hive/HiveClient.java b/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/hive/HiveClient.java new file mode 100644 index 00000000..06df3d5f --- /dev/null +++ b/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/hive/HiveClient.java @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2019-2025 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.scheduler.hive; + +import java.io.Closeable; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient; +import com.hotels.hcommon.hive.metastore.iterator.PartitionIterator; + +public class HiveClient implements Closeable { + + private static final Logger log = LoggerFactory.getLogger(HiveClient.class); + + protected final CloseableMetaStoreClient metaStoreClient; + protected final PartitionIteratorFactory partitionIteratorFactory; + + public HiveClient(CloseableMetaStoreClient client, PartitionIteratorFactory partitionIteratorFactory) { + this.metaStoreClient = client; + this.partitionIteratorFactory = partitionIteratorFactory; + } + + public Map getTablePartitionsAndPaths(String databaseName, String tableName) { + try { + Map partitionNamePathMap = new HashMap<>(); + + Table table = metaStoreClient.getTable(databaseName, tableName); + List partitionKeys = table.getPartitionKeys(); + + PartitionIterator iterator = partitionIteratorFactory.newInstance(metaStoreClient, table); + while (iterator.hasNext()) { + Partition partition = iterator.next(); + List values = partition.getValues(); + String path = partition.getSd().getLocation(); + String partitionName = Warehouse.makePartName(partitionKeys, values); + partitionNamePathMap.put(partitionName, path); + + log.debug("Retrieved partition values '{}' with path '{}' for table {}.{}", + values, path, databaseName, table); + } + return partitionNamePathMap; + } catch (TException e) { + log.warn("Got error. Returning empty list. Error message: {}", e.getMessage()); + return Collections.emptyMap(); + } + } + + public void close() { + metaStoreClient.close(); + } +} diff --git a/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/hive/HiveClientFactory.java b/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/hive/HiveClientFactory.java new file mode 100644 index 00000000..4e1414ff --- /dev/null +++ b/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/hive/HiveClientFactory.java @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2019-2025 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.scheduler.hive; + +import java.util.function.Supplier; + +import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient; + +public class HiveClientFactory { + + private final Supplier metaStoreClientSupplier; + private final PartitionIteratorFactory partitionIteratorFactory; + + public HiveClientFactory(Supplier metaStoreClientSupplier, + PartitionIteratorFactory partitionIteratorFactory) { + this.metaStoreClientSupplier = metaStoreClientSupplier; + this.partitionIteratorFactory = partitionIteratorFactory; + } + + public HiveClient newInstance() { + return new HiveClient(metaStoreClientSupplier.get(), partitionIteratorFactory); + } +} diff --git a/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/hive/PartitionIteratorFactory.java b/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/hive/PartitionIteratorFactory.java new file mode 100644 index 00000000..a0c407d1 --- /dev/null +++ b/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/hive/PartitionIteratorFactory.java @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2019-2025 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.scheduler.hive; + +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; + +import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient; +import com.hotels.hcommon.hive.metastore.iterator.PartitionIterator; + +public class PartitionIteratorFactory { + + private static final short MAX_PARTITIONS = (short) 1000; + + public PartitionIterator newInstance(CloseableMetaStoreClient client, Table table) throws TException { + return new PartitionIterator(client, table, MAX_PARTITIONS, PartitionIterator.Ordering.NATURAL); + } +} diff --git a/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/service/ExpiredHousekeepingMetadataSchedulerService.java b/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/service/ExpiredHousekeepingMetadataSchedulerService.java index 768d804c..b0fd62e2 100644 --- a/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/service/ExpiredHousekeepingMetadataSchedulerService.java +++ b/beekeeper-scheduler/src/main/java/com/expediagroup/beekeeper/scheduler/service/ExpiredHousekeepingMetadataSchedulerService.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2020 Expedia, Inc. + * Copyright (C) 2019-2025 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,13 @@ import static com.expediagroup.beekeeper.core.model.HousekeepingStatus.SCHEDULED; import static com.expediagroup.beekeeper.core.model.LifecycleEventType.EXPIRED; +import java.time.Clock; import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,9 +39,12 @@ import com.expediagroup.beekeeper.core.model.HousekeepingMetadata; import com.expediagroup.beekeeper.core.model.HousekeepingStatus; import com.expediagroup.beekeeper.core.model.LifecycleEventType; +import com.expediagroup.beekeeper.core.model.PeriodDuration; import com.expediagroup.beekeeper.core.monitoring.TimedTaggable; import com.expediagroup.beekeeper.core.repository.HousekeepingMetadataRepository; import com.expediagroup.beekeeper.core.service.BeekeeperHistoryService; +import com.expediagroup.beekeeper.scheduler.hive.HiveClient; +import com.expediagroup.beekeeper.scheduler.hive.HiveClientFactory; @Service public class ExpiredHousekeepingMetadataSchedulerService implements SchedulerService { @@ -46,12 +54,16 @@ public class ExpiredHousekeepingMetadataSchedulerService implements SchedulerSer private final HousekeepingMetadataRepository housekeepingMetadataRepository; private final BeekeeperHistoryService beekeeperHistoryService; + private final HiveClientFactory hiveClientFactory; + private final Clock clock; @Autowired public ExpiredHousekeepingMetadataSchedulerService(HousekeepingMetadataRepository housekeepingMetadataRepository, - BeekeeperHistoryService beekeeperHistoryService) { + BeekeeperHistoryService beekeeperHistoryService, HiveClientFactory hiveClientFactory) { this.housekeepingMetadataRepository = housekeepingMetadataRepository; this.beekeeperHistoryService = beekeeperHistoryService; + this.hiveClientFactory = hiveClientFactory; + this.clock = Clock.systemDefaultZone(); } @Override @@ -66,7 +78,7 @@ public void scheduleForHousekeeping(HousekeepingEntity housekeepingEntity) { (HousekeepingMetadata) housekeepingEntity); try { housekeepingMetadataRepository.save(housekeepingMetadata); - log.info(format("Successfully scheduled %s", housekeepingMetadata)); + log.info("Successfully scheduled {}", housekeepingMetadata); saveHistory(housekeepingMetadata, SCHEDULED); } catch (Exception e) { saveHistory(housekeepingMetadata, FAILED_TO_SCHEDULE); @@ -80,44 +92,48 @@ private HousekeepingMetadata createOrUpdateHousekeepingMetadata(HousekeepingMeta housekeepingMetadata.getPartitionName()); if (housekeepingMetadataOptional.isEmpty()) { - if (housekeepingMetadata.getPartitionName() != null) { - updateTableCleanupTimestamp(housekeepingMetadata); - } + handleNewMetadata(housekeepingMetadata); return housekeepingMetadata; } - HousekeepingMetadata existingHousekeepingMetadata = housekeepingMetadataOptional.get(); - existingHousekeepingMetadata.setPath(housekeepingMetadata.getPath()); - existingHousekeepingMetadata.setHousekeepingStatus(housekeepingMetadata.getHousekeepingStatus()); - existingHousekeepingMetadata.setCleanupDelay(housekeepingMetadata.getCleanupDelay()); - existingHousekeepingMetadata.setClientId(housekeepingMetadata.getClientId()); + updateExistingMetadata(existingHousekeepingMetadata, housekeepingMetadata); - if (isPartitionedTable(housekeepingMetadata)) { - updateTableCleanupTimestampToMax(existingHousekeepingMetadata); + if (existingHousekeepingMetadata.getPartitionName() == null) { + handlerAlterTable(existingHousekeepingMetadata); } return existingHousekeepingMetadata; } + private void handleNewMetadata(HousekeepingMetadata housekeepingMetadata) { + if (housekeepingMetadata.getPartitionName() != null) { + updateTableCleanupTimestamp(housekeepingMetadata); + } else { + scheduleTablePartitions(housekeepingMetadata); + } + } + + private void handlerAlterTable(HousekeepingMetadata existingHousekeepingMetadata) { + List scheduledPartitions = housekeepingMetadataRepository.findRecordsForCleanupByDbAndTableName( + existingHousekeepingMetadata.getDatabaseName(), existingHousekeepingMetadata.getTableName()); + scheduleMissingPartitions(existingHousekeepingMetadata, scheduledPartitions); + updateTableCleanupTimestampToMax(existingHousekeepingMetadata); + if (isActionableUpdate(existingHousekeepingMetadata, scheduledPartitions)) { + updateScheduledPartitions(existingHousekeepingMetadata, scheduledPartitions); + } + } + /** - * When the cleanup delay of a table with partitions is altered, the delay should be updated but the cleanup - * timestamp should be the max timestamp of any of the partitions which the table has. + * When an alteration to the table occurs pre-existing partitions should be scheduled. * - * e.g. if the cleanup delay was 10 but now its being updated to 2, the cleanup timestamp should match any partition - * with delay 10 (or above) to prevent premature attempts to cleanup the table. - * - * @param housekeepingMetadata + * @param tableMetadata Entity that stored the cleanup delay. */ - private void updateTableCleanupTimestampToMax(HousekeepingMetadata housekeepingMetadata) { - LocalDateTime currentCleanupTimestamp = housekeepingMetadata.getCleanupTimestamp(); - LocalDateTime maxCleanupTimestamp = housekeepingMetadataRepository.findMaximumCleanupTimestampForDbAndTable( - housekeepingMetadata.getDatabaseName(), housekeepingMetadata.getTableName()); - - if (maxCleanupTimestamp != null && maxCleanupTimestamp.isAfter(currentCleanupTimestamp)) { - log.info("Updating entry for \"{}.{}\". Cleanup timestamp is now \"{}\".", housekeepingMetadata.getDatabaseName(), - housekeepingMetadata.getTableName(), maxCleanupTimestamp); - housekeepingMetadata.setCleanupTimestamp(maxCleanupTimestamp); - } + private void scheduleTablePartitions(HousekeepingMetadata tableMetadata) { + log.info("Scheduling all partitions for table {}.{}", tableMetadata.getDatabaseName(), + tableMetadata.getTableName()); + Map partitionNamesAndPaths = retrieveTablePartitions(tableMetadata.getDatabaseName(), + tableMetadata.getTableName()); + schedule(partitionNamesAndPaths, tableMetadata); } /** @@ -143,11 +159,114 @@ private void updateTableCleanupTimestamp(HousekeepingMetadata partitionMetadata) } } - private boolean isPartitionedTable(HousekeepingMetadata housekeepingMetadata) { - Long numPartitions = housekeepingMetadataRepository.countRecordsForGivenDatabaseAndTableWherePartitionIsNotNull( + private void updateExistingMetadata(HousekeepingMetadata existingMetadata, HousekeepingMetadata newMetadata) { + existingMetadata.setPath(newMetadata.getPath()); + existingMetadata.setHousekeepingStatus(newMetadata.getHousekeepingStatus()); + existingMetadata.setCleanupDelay(newMetadata.getCleanupDelay()); + existingMetadata.setClientId(newMetadata.getClientId()); + } + + /** + * Compares all partitions on the table with any that are currently scheduled. + * If any partitions on the table are missing, they will be scheduled. + */ + private void scheduleMissingPartitions(HousekeepingMetadata tableMetadata, + List scheduledPartitions) { + Map unscheduledPartitionNames = findUnscheduledPartitionNames(tableMetadata, scheduledPartitions); + if (unscheduledPartitionNames.isEmpty()) { + log.info("All table partitions have already been scheduled."); + return; + } + schedule(unscheduledPartitionNames, tableMetadata); + } + + /** + * When the cleanup delay of a table with partitions is altered, the delay should be updated but the cleanup + * timestamp should be the max timestamp of any of the partitions which the table has. + * + * e.g. if the cleanup delay was 10 but now its being updated to 2, the cleanup timestamp should match any partition + * with delay 10 (or above) to prevent premature attempts to cleanup the table. + * + * @param housekeepingMetadata + */ + private void updateTableCleanupTimestampToMax(HousekeepingMetadata housekeepingMetadata) { + LocalDateTime currentCleanupTimestamp = housekeepingMetadata.getCleanupTimestamp(); + LocalDateTime maxCleanupTimestamp = housekeepingMetadataRepository.findMaximumCleanupTimestampForDbAndTable( housekeepingMetadata.getDatabaseName(), housekeepingMetadata.getTableName()); - return numPartitions > 0 && housekeepingMetadata.getPartitionName() == null; + if (maxCleanupTimestamp != null && maxCleanupTimestamp.isAfter(currentCleanupTimestamp)) { + log.info("Updating entry for \"{}.{}\". Cleanup timestamp is now \"{}\".", housekeepingMetadata.getDatabaseName(), + housekeepingMetadata.getTableName(), maxCleanupTimestamp); + housekeepingMetadata.setCleanupTimestamp(maxCleanupTimestamp); + } + } + + private boolean isActionableUpdate(HousekeepingMetadata metadata, List scheduledPartitions) { + if(scheduledPartitions.isEmpty()) { + return false; + } + PeriodDuration tableCleanupDelay = metadata.getCleanupDelay(); + + HousekeepingMetadata scheduledPartition = scheduledPartitions.get(0); + PeriodDuration metadataCleanupDelay = scheduledPartition.getCleanupDelay(); + + return (!tableCleanupDelay.equals(metadataCleanupDelay)); + } + + private void updateScheduledPartitions(HousekeepingMetadata metadata, + List partitions) { + log.info("Updating scheduled partitions."); + partitions.forEach(partition -> { + partition.setCleanupDelay(metadata.getCleanupDelay()); + housekeepingMetadataRepository.save(partition); + beekeeperHistoryService.saveHistory(partition, SCHEDULED); + }); + } + + private Map findUnscheduledPartitionNames(HousekeepingMetadata tableMetadata, + List scheduledPartitions) { + Map tablePartitionNamesAndPaths = retrieveTablePartitions(tableMetadata.getDatabaseName(), + tableMetadata.getTableName()); + + Set scheduledPartitionNames = scheduledPartitions.stream() + .map(HousekeepingMetadata::getPartitionName) + .collect(Collectors.toSet()); + + return tablePartitionNamesAndPaths.entrySet().stream() + .filter(entry -> !scheduledPartitionNames.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private Map retrieveTablePartitions(String database, String tableName) { + try (HiveClient hiveClient = hiveClientFactory.newInstance()) { + return hiveClient.getTablePartitionsAndPaths(database, tableName); + } + } + + private void schedule(Map partitionNamesAndPaths, HousekeepingMetadata tableMetadata) { + partitionNamesAndPaths.forEach((partitionName, path) -> { + HousekeepingMetadata partitionMetadata = createNewMetadata(tableMetadata, partitionName, path); + + housekeepingMetadataRepository.save(partitionMetadata); + beekeeperHistoryService.saveHistory(partitionMetadata, SCHEDULED); + }); + log.info("Scheduled {} partitions for table {}.{}", partitionNamesAndPaths.size(), tableMetadata.getDatabaseName(), + tableMetadata.getTableName()); + } + + private HousekeepingMetadata createNewMetadata(HousekeepingMetadata tableMetadata, String partitionName, + String path) { + return HousekeepingMetadata + .builder() + .housekeepingStatus(SCHEDULED) + .creationTimestamp(LocalDateTime.now(clock)) + .cleanupDelay(tableMetadata.getCleanupDelay()) + .lifecycleType(LIFECYCLE_EVENT_TYPE.toString()) + .path(path) + .databaseName(tableMetadata.getDatabaseName()) + .tableName(tableMetadata.getTableName()) + .partitionName(partitionName) + .build(); } private void saveHistory(HousekeepingMetadata housekeepingMetadata, HousekeepingStatus status) { diff --git a/beekeeper-scheduler/src/test/java/com/expediagroup/beekeeper/scheduler/hive/hive/HiveClientTest.java b/beekeeper-scheduler/src/test/java/com/expediagroup/beekeeper/scheduler/hive/hive/HiveClientTest.java new file mode 100644 index 00000000..a6fdb95a --- /dev/null +++ b/beekeeper-scheduler/src/test/java/com/expediagroup/beekeeper/scheduler/hive/hive/HiveClientTest.java @@ -0,0 +1,142 @@ +/** + * Copyright (C) 2019-2025 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.scheduler.hive.hive; + +import static java.util.Collections.emptyMap; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.expediagroup.beekeeper.scheduler.hive.HiveClient; +import com.expediagroup.beekeeper.scheduler.hive.PartitionIteratorFactory; + +import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient; +import com.hotels.hcommon.hive.metastore.iterator.PartitionIterator; + +@ExtendWith(MockitoExtension.class) +public class HiveClientTest { + + private static final String DATABASE_NAME = "database"; + private static final String TABLE_NAME = "table"; + private static final short NO_LIMIT = (short) -1; + private static final String PARTITION_NAME = "event_date=2024-01-01/event_hour=1"; + private static final String PARTITION_PATH = "path/" + PARTITION_NAME; + + private @Mock CloseableMetaStoreClient metaStoreClient; + private @Mock Table table; + private @Mock FieldSchema eventDatePartitionKey; + private @Mock FieldSchema eventHourPartitionKey; + private @Mock Partition partition; + private @Mock Partition partition2; + private @Mock Partition partition3; + private @Mock StorageDescriptor storageDescriptor; + private @Mock StorageDescriptor storageDescriptor2; + private @Mock StorageDescriptor storageDescriptor3; + private @Mock PartitionIteratorFactory partitionIteratorFactory; + private @Mock PartitionIterator partitionIterator; + + public HiveClient hiveClient; + + @BeforeEach + public void setup() throws TException { + hiveClient = new HiveClient(metaStoreClient, partitionIteratorFactory); + } + + @Test + public void retrievePartitionAndPaths() throws TException { + List partitionNames = List.of(PARTITION_NAME); + + when(table.getDbName()).thenReturn(DATABASE_NAME); + when(table.getTableName()).thenReturn(TABLE_NAME); + when(table.getPartitionKeys()).thenReturn(List.of(eventDatePartitionKey, eventHourPartitionKey)); + when(eventDatePartitionKey.getName()).thenReturn("EVENT_DATE"); + when(eventHourPartitionKey.getName()).thenReturn("event_hour"); + + when(metaStoreClient.listPartitionNames(DATABASE_NAME, TABLE_NAME, NO_LIMIT)).thenReturn(partitionNames); + when(metaStoreClient.getPartitionsByNames(DATABASE_NAME, TABLE_NAME, partitionNames)).thenReturn( + List.of(partition)); + when(metaStoreClient.getTable(DATABASE_NAME, TABLE_NAME)).thenReturn(table); + partitionIterator = new PartitionIteratorFactory().newInstance(metaStoreClient, table); + when(partitionIteratorFactory.newInstance(metaStoreClient, table)).thenReturn(partitionIterator); + + when(partition.getValues()).thenReturn(List.of("2024-01-01", "1")); + when(partition.getSd()).thenReturn(storageDescriptor); + when(storageDescriptor.getLocation()).thenReturn(PARTITION_PATH); + + Map tablePartitionsAndPaths = hiveClient.getTablePartitionsAndPaths(DATABASE_NAME, TABLE_NAME); + assertThat(tablePartitionsAndPaths).isEqualTo(Map.of(PARTITION_NAME, PARTITION_PATH)); + } + + @Test + public void typicalRetrieveMultiLevelPartitionsAndPaths() throws TException { + List partitionNames = List.of(PARTITION_NAME, "event_date=2024-01-01/event_hour=2", + "event_date=2024-01-01/event_hour=3"); + + when(table.getDbName()).thenReturn(DATABASE_NAME); + when(table.getTableName()).thenReturn(TABLE_NAME); + when(table.getPartitionKeys()).thenReturn(List.of(eventDatePartitionKey, eventHourPartitionKey)); + when(eventDatePartitionKey.getName()).thenReturn("event_date"); + when(eventHourPartitionKey.getName()).thenReturn("event_hour"); + + when(metaStoreClient.listPartitionNames(DATABASE_NAME, TABLE_NAME, NO_LIMIT)).thenReturn(partitionNames); + when(metaStoreClient.getPartitionsByNames(DATABASE_NAME, TABLE_NAME, partitionNames)).thenReturn( + List.of(partition, partition2, partition3)); + when(metaStoreClient.getTable(DATABASE_NAME, TABLE_NAME)).thenReturn(table); + partitionIterator = new PartitionIteratorFactory().newInstance(metaStoreClient, table); + when(partitionIteratorFactory.newInstance(metaStoreClient, table)).thenReturn(partitionIterator); + + when(partition.getValues()).thenReturn(List.of("2024-01-01", "1")); + when(partition.getSd()).thenReturn(storageDescriptor); + when(partition2.getValues()).thenReturn(List.of("2024-01-01", "2")); + when(partition2.getSd()).thenReturn(storageDescriptor2); + when(partition3.getValues()).thenReturn(List.of("2024-01-01", "3")); + when(partition3.getSd()).thenReturn(storageDescriptor3); + + String partition2Name = "event_date=2024-01-01/event_hour=2"; + String partition23Name = "event_date=2024-01-01/event_hour=3"; + String partition2Path = "path/event_date=2024-01-01/event_hour=2"; + String partition3Path = "path/event_date=2024-01-01/event_hour=3"; + when(storageDescriptor.getLocation()).thenReturn(PARTITION_PATH); + when(storageDescriptor2.getLocation()).thenReturn(partition2Path); + when(storageDescriptor3.getLocation()).thenReturn(partition3Path); + + Map tablePartitionsAndPaths = hiveClient.getTablePartitionsAndPaths(DATABASE_NAME, TABLE_NAME); + assertThat(tablePartitionsAndPaths).isEqualTo( + Map.of(PARTITION_NAME, PARTITION_PATH, partition2Name, partition2Path, partition23Name, partition3Path)); + } + + @Test + public void throwsExceptionOnTableRetrieval() throws TException { + when(metaStoreClient.getTable(DATABASE_NAME, TABLE_NAME)).thenThrow(TException.class); + + Map partitionNames = hiveClient.getTablePartitionsAndPaths(DATABASE_NAME, TABLE_NAME); + assertThat(partitionNames).isEqualTo(emptyMap()); + } +} diff --git a/beekeeper-scheduler/src/test/java/com/expediagroup/beekeeper/scheduler/service/ExpiredHousekeepingMetadataSchedulerServiceTest.java b/beekeeper-scheduler/src/test/java/com/expediagroup/beekeeper/scheduler/service/ExpiredHousekeepingMetadataSchedulerServiceTest.java index 1e95ee09..3f9a45ef 100644 --- a/beekeeper-scheduler/src/test/java/com/expediagroup/beekeeper/scheduler/service/ExpiredHousekeepingMetadataSchedulerServiceTest.java +++ b/beekeeper-scheduler/src/test/java/com/expediagroup/beekeeper/scheduler/service/ExpiredHousekeepingMetadataSchedulerServiceTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2023 Expedia, Inc. + * Copyright (C) 2019-2025 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,14 @@ package com.expediagroup.beekeeper.scheduler.service; import static java.lang.String.format; +import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -32,6 +34,7 @@ import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.Map; import java.util.Optional; import org.junit.jupiter.api.Test; @@ -45,6 +48,8 @@ import com.expediagroup.beekeeper.core.model.PeriodDuration; import com.expediagroup.beekeeper.core.repository.HousekeepingMetadataRepository; import com.expediagroup.beekeeper.core.service.BeekeeperHistoryService; +import com.expediagroup.beekeeper.scheduler.hive.HiveClient; +import com.expediagroup.beekeeper.scheduler.hive.HiveClientFactory; @ExtendWith(MockitoExtension.class) public class ExpiredHousekeepingMetadataSchedulerServiceTest { @@ -53,6 +58,7 @@ public class ExpiredHousekeepingMetadataSchedulerServiceTest { private static final String DATABASE_NAME = "database"; private static final String TABLE_NAME = "table"; private static final String PARTITION_NAME = "event_date=2020-01-01/event_hour=0/event_type=A"; + private static final String PARTITION_PATH = PATH + "/" + PARTITION_NAME; private static final LocalDateTime CREATION_TIMESTAMP = LocalDateTime.now(ZoneId.of("UTC")); @Mock @@ -61,6 +67,12 @@ public class ExpiredHousekeepingMetadataSchedulerServiceTest { @Mock private BeekeeperHistoryService beekeeperHistoryService; + @Mock + private HiveClientFactory hiveClientFactory; + + @Mock + private HiveClient hiveClient; + @InjectMocks private ExpiredHousekeepingMetadataSchedulerService expiredHousekeepingMetadataSchedulerService; @@ -70,6 +82,7 @@ public void typicalCreateScheduleForHousekeeping() { when(housekeepingMetadataRepository.findRecordForCleanupByDbTableAndPartitionName(DATABASE_NAME, TABLE_NAME, null)) .thenReturn(Optional.empty()); + when(hiveClientFactory.newInstance()).thenReturn(hiveClient); expiredHousekeepingMetadataSchedulerService.scheduleForHousekeeping(metadata); @@ -81,6 +94,7 @@ public void typicalCreateScheduleForHousekeeping() { public void typicalCreatePartitionScheduleForHousekeeping() { HousekeepingMetadata metadata = createHousekeepingMetadataPartition(); HousekeepingMetadata tableMetadata = createHousekeepingMetadataTable(); + tableMetadata.setCleanupDelay(PeriodDuration.parse("P1D")); when(housekeepingMetadataRepository .findRecordForCleanupByDbTableAndPartitionName(DATABASE_NAME, TABLE_NAME, PARTITION_NAME)) @@ -102,6 +116,7 @@ public void typicalUpdateScheduleForHousekeepingWhenChangingCleanupDelay() { when(housekeepingMetadataRepository.findRecordForCleanupByDbTableAndPartitionName(DATABASE_NAME, TABLE_NAME, null)) .thenReturn(Optional.of(existingTable)); + when(hiveClientFactory.newInstance()).thenReturn(hiveClient); expiredHousekeepingMetadataSchedulerService.scheduleForHousekeeping(metadata); @@ -123,8 +138,7 @@ public void typicalUpdatePartitionedTableWithShorterCleanupDelay() { .thenReturn(Optional.of(existingTable)); when(housekeepingMetadataRepository.findMaximumCleanupTimestampForDbAndTable(DATABASE_NAME, TABLE_NAME)) .thenReturn(CREATION_TIMESTAMP.plus(Duration.parse("P30D"))); - when(housekeepingMetadataRepository - .countRecordsForGivenDatabaseAndTableWherePartitionIsNotNull(DATABASE_NAME, TABLE_NAME)).thenReturn(1L); + when(hiveClientFactory.newInstance()).thenReturn(hiveClient); expiredHousekeepingMetadataSchedulerService.scheduleForHousekeeping(metadata); @@ -147,7 +161,7 @@ public void verifyLifecycleType() { @Test public void scheduleFails() { HousekeepingMetadata metadata = createHousekeepingMetadataTable(); - + when(hiveClientFactory.newInstance()).thenReturn(hiveClient); when(housekeepingMetadataRepository.save(metadata)).thenThrow(new RuntimeException()); assertThatExceptionOfType(BeekeeperException.class) @@ -157,6 +171,55 @@ public void scheduleFails() { verify(beekeeperHistoryService).saveHistory(any(), eq(FAILED_TO_SCHEDULE)); } + @Test + public void schedulePreexistingTablePartitionsOnTableEvent() { + HousekeepingMetadata tableMetadata = createHousekeepingMetadataTable(); + + String partitionName2 = "event_date=2020-01-01/event_hour=1/event_type=B"; + String partitionName3 = "event_date=2020-01-01/event_hour=3/event_type=C"; + Map partitionNamesPathMap = Map.of(PARTITION_NAME, PARTITION_PATH, partitionName2, + "path/" + partitionName2, partitionName3, "path/" + partitionName3); + + when(hiveClientFactory.newInstance()).thenReturn(hiveClient); + when(hiveClient.getTablePartitionsAndPaths(DATABASE_NAME, TABLE_NAME)).thenReturn(partitionNamesPathMap); + when(housekeepingMetadataRepository + .findRecordForCleanupByDbTableAndPartitionName(DATABASE_NAME, TABLE_NAME, null)) + .thenReturn(Optional.empty()); + + expiredHousekeepingMetadataSchedulerService.scheduleForHousekeeping(tableMetadata); + + verify(housekeepingMetadataRepository, times(4)).save(any()); + verify(beekeeperHistoryService, times(4)).saveHistory(any(), eq(SCHEDULED)); + } + + @Test + public void updateScheduledPartsAndCreateNewEntriesForExistingPartsOnTableEvent() { + HousekeepingMetadata existingPartition = spy(createHousekeepingMetadataPartition()); + existingPartition.setCleanupDelay(PeriodDuration.parse("P4M")); + HousekeepingMetadata existingTable = createHousekeepingMetadataTable(); + existingTable.setCleanupDelay(PeriodDuration.parse("P4M")); + + HousekeepingMetadata tableMetadata = createHousekeepingMetadataTable(); + String partitionName2 = "event_date=2020-01-01/event_hour=1/event_type=B"; + Map partitionNamesPathMap = Map.of(PARTITION_NAME, PARTITION_PATH, partitionName2, + "path/" + partitionName2); + + when(hiveClientFactory.newInstance()).thenReturn(hiveClient); + when(hiveClient.getTablePartitionsAndPaths(DATABASE_NAME, TABLE_NAME)).thenReturn(partitionNamesPathMap); + + when(housekeepingMetadataRepository + .findRecordForCleanupByDbTableAndPartitionName(DATABASE_NAME, TABLE_NAME, null)) + .thenReturn(Optional.of(existingTable)); + when(housekeepingMetadataRepository.findRecordsForCleanupByDbAndTableName(DATABASE_NAME, TABLE_NAME)).thenReturn( + singletonList(existingPartition)); + + expiredHousekeepingMetadataSchedulerService.scheduleForHousekeeping(tableMetadata); + + verify(housekeepingMetadataRepository, times(3)).save(any()); + verify(existingPartition).setCleanupDelay(PeriodDuration.parse("P3D")); + verify(beekeeperHistoryService, times(3)).saveHistory(any(), eq(SCHEDULED)); + } + private HousekeepingMetadata createHousekeepingMetadataPartition() { return createEntityHousekeepingTable(PARTITION_NAME); } diff --git a/pom.xml b/pom.xml index 0e7ade96..ae0e07b6 100644 --- a/pom.xml +++ b/pom.xml @@ -53,6 +53,8 @@ 1.17.6 11-slim 2.17.2 + 2.3.7 + 1.4.2