diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java index 65d91bab3..c8091068a 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java @@ -49,7 +49,6 @@ import com.typesafe.config.ConfigValueFactory; import io.temporal.client.WorkflowClient; -import io.temporal.serviceclient.WorkflowServiceStubs; import lombok.Getter; import lombok.Setter; @@ -74,6 +73,7 @@ import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator; import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory; +import org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.FileUtils; @@ -126,6 +126,7 @@ public class GobblinTemporalTaskRunner implements StandardMetricsBridge { private final boolean isMetricReportingFailureFatal; private final boolean isEventReportingFailureFatal; private final List workers; + private final ManagedWorkflowServiceStubs managedWorkflowServiceStubs; public GobblinTemporalTaskRunner(String applicationName, String applicationId, @@ -163,6 +164,9 @@ public GobblinTemporalTaskRunner(String applicationName, ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL); this.workers = new ArrayList<>(); + String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING); + this.managedWorkflowServiceStubs = TemporalWorkflowClientFactory.createServiceInstance(connectionUri); + logger.info("GobblinTaskRunner({}): applicationName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}", this.isTaskDriver ? "taskDriver" : "worker", applicationName, @@ -241,12 +245,10 @@ public void start() private TemporalWorker initiateWorker() throws Exception { logger.info("Starting Temporal Worker"); - String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING); - WorkflowServiceStubs service = TemporalWorkflowClientFactory.createServiceInstance(connectionUri); - String namespace = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE, GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE); - WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(service, namespace); + WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance( + managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace); String workerClassName = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.WORKER_CLASS, GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS); @@ -293,6 +295,7 @@ public synchronized void stop() { } workers.forEach(TemporalWorker::shutdown); + managedWorkflowServiceStubs.close(); logger.info("All services are stopped."); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java index c358b368a..ef0e9de0b 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java @@ -137,13 +137,9 @@ public GobblinJobLauncher(Properties jobProps, Path appWorkDir, @Override public void close() throws IOException { try { - executeCancellation(); + cleanupWorkingDirectory(); } finally { - try { - cleanupWorkingDirectory(); - } finally { - super.close(); - } + super.close(); } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java index 2d17fe20a..26feccee1 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java @@ -17,6 +17,7 @@ package org.apache.gobblin.temporal.joblauncher; +import java.io.IOException; import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -33,7 +34,6 @@ import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowFailedException; import io.temporal.client.WorkflowStub; -import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.workflow.Workflow; import org.apache.hadoop.fs.FileSystem; @@ -48,6 +48,7 @@ import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner; import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs; import org.apache.gobblin.util.ConfigUtils; import static org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.*; @@ -74,7 +75,7 @@ public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher { private static final Logger log = Workflow.getLogger(GobblinTemporalJobLauncher.class); private static final int TERMINATION_TIMEOUT_SECONDS = 3; - protected WorkflowServiceStubs workflowServiceStubs; + protected ManagedWorkflowServiceStubs managedWorkflowServiceStubs; protected WorkflowClient client; protected String queueName; protected String namespace; @@ -87,10 +88,10 @@ public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir, log.info("GobblinTemporalJobLauncher: appWorkDir {}; jobProps {}", appWorkDir, jobProps); String connectionUri = jobProps.getProperty(TEMPORAL_CONNECTION_STRING); - this.workflowServiceStubs = createServiceInstance(connectionUri); + this.managedWorkflowServiceStubs = createServiceInstance(connectionUri); this.namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE, DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE); - this.client = createClientInstance(workflowServiceStubs, namespace); + this.client = createClientInstance(managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace); this.queueName = jobProps.getProperty(GOBBLIN_TEMPORAL_TASK_QUEUE, DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE); @@ -139,7 +140,8 @@ protected void executeCancellation() { .setNamespace(this.namespace) .setExecution(workflowStub.getExecution()) .build(); - DescribeWorkflowExecutionResponse response = workflowServiceStubs.blockingStub().describeWorkflowExecution(request); + DescribeWorkflowExecutionResponse response = managedWorkflowServiceStubs.getWorkflowServiceStubs() + .blockingStub().describeWorkflowExecution(request); WorkflowExecutionStatus status; try { @@ -188,4 +190,17 @@ protected void removeTasksFromCurrentJob(List workUnitIdsToRemove) { protected void addTasksToCurrentJob(List workUnitsToAdd) { log.warn("NOT IMPLEMENTED: Temporal addTasksToCurrentJob"); } + + @Override + public void close() throws IOException { + try { + // Calling cancel before calling close on serviceStubs as it will shutdown the service which is required during cancellation. + cancelJob(jobListener); + } catch (Exception e) { + log.error("Exception occurred while cancelling job", e); + } finally { + managedWorkflowServiceStubs.close(); + super.close(); + } + } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java index 34a1cec4d..fe589e18d 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java @@ -209,16 +209,17 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) { } else { LOGGER.info("No job schedule found, so running job " + jobUri); GobblinTemporalJobLauncherListener listener = new GobblinTemporalJobLauncherListener(this.launcherMetrics); - JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig()); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - launcher.cancelJob(listener); - } catch (JobException e) { - LOGGER.error("Failed to cancel the job during shutdown", e); - throw new RuntimeException(e); - } - })); - launcher.launchJob(listener); + try (JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig())) { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + launcher.cancelJob(listener); + } catch (JobException e) { + LOGGER.error("Failed to cancel the job during shutdown", e); + throw new RuntimeException(e); + } + })); + launcher.launchJob(listener); + } } } catch (Exception je) { LOGGER.error("Failed to schedule or run job " + jobUri, je); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java index c8a1052ae..ec63c7901 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java @@ -46,11 +46,12 @@ import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.temporal.ddm.work.assistance.MDCContextPropagator; import org.apache.gobblin.temporal.workflows.metrics.TemporalMetricsHelper; +import org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs; import org.apache.gobblin.util.ConfigUtils; public class TemporalWorkflowClientFactory { - public static WorkflowServiceStubs createServiceInstance(String connectionUri) throws Exception { + public static ManagedWorkflowServiceStubs createServiceInstance(String connectionUri) throws Exception { GobblinClusterUtils.setSystemProperties(ConfigFactory.load()); Config config = GobblinClusterUtils.addDynamicConfig(ConfigFactory.load()); String SHARED_KAFKA_CONFIG_PREFIX_WITH_DOT = "gobblin.kafka.sharedConfig."; @@ -119,7 +120,7 @@ public static WorkflowServiceStubs createServiceInstance(String connectionUri) t .setSslContext(sslContext) .setMetricsScope(metricsScope) .build(); - return WorkflowServiceStubs.newServiceStubs(options); + return new ManagedWorkflowServiceStubs(WorkflowServiceStubs.newServiceStubs(options)); } public static WorkflowClient createClientInstance(WorkflowServiceStubs service, String namespace) { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/service/ManagedWorkflowServiceStubs.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/service/ManagedWorkflowServiceStubs.java new file mode 100644 index 000000000..e06c6042b --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/service/ManagedWorkflowServiceStubs.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.workflows.service; + +import java.io.Closeable; + +import io.temporal.serviceclient.WorkflowServiceStubs; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * A wrapper class of {@link WorkflowServiceStubs} that implements the Closeable interface. + * It manages the lifecycle of {@link WorkflowServiceStubs}, ensuring proper shutdown and resource cleanup. + */ +@Getter +@Slf4j +public class ManagedWorkflowServiceStubs implements Closeable { + private final WorkflowServiceStubs workflowServiceStubs; + + public ManagedWorkflowServiceStubs(WorkflowServiceStubs serviceStubs) { + this.workflowServiceStubs = serviceStubs; + } + + @Override + public void close() { + try { + workflowServiceStubs.getOptions().getMetricsScope().close(); + } + catch (Exception e) { + log.error("Exception occurred while closing metrics scope", e); + } + try { + workflowServiceStubs.shutdown(); + } + catch (Exception e) { + log.error("Exception occurred while shutting down WorkflowServiceStubs", e); + } + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java index 98d0379b6..9a6446d95 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java @@ -49,6 +49,7 @@ import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory; +import org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs; import org.apache.gobblin.util.JobLauncherUtils; import static org.mockito.Mockito.mock; @@ -83,6 +84,7 @@ protected void submitJob(List workUnits) @BeforeClass public void setUp() throws Exception { mockServiceStubs = mock(WorkflowServiceStubs.class); + ManagedWorkflowServiceStubs managedWorkflowServiceStubs = new ManagedWorkflowServiceStubs(mockServiceStubs); mockClient = mock(WorkflowClient.class); mockExecutionInfo = mock(WorkflowExecutionInfo.class); DescribeWorkflowExecutionResponse mockResponse = mock(DescribeWorkflowExecutionResponse.class); @@ -93,7 +95,7 @@ public void setUp() throws Exception { mockWorkflowClientFactory = Mockito.mockStatic(TemporalWorkflowClientFactory.class); mockWorkflowClientFactory.when(() -> TemporalWorkflowClientFactory.createServiceInstance(Mockito.anyString())) - .thenReturn(mockServiceStubs); + .thenReturn(managedWorkflowServiceStubs); mockWorkflowClientFactory.when(() -> TemporalWorkflowClientFactory.createClientInstance(Mockito.any(), Mockito.anyString())) .thenReturn(mockClient);