Skip to content

Commit

Permalink
[GOBBLIN-2194] Close temporal metrics scope on job completion (#4097)
Browse files Browse the repository at this point in the history
Close temporal metrics scope on job completion
Override close in GobblinTemporalJobLauncher
  • Loading branch information
abhishekmjain authored and Will-Lo committed Feb 17, 2025
1 parent 6650efd commit 105765a
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import com.typesafe.config.ConfigValueFactory;

import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import lombok.Getter;
import lombok.Setter;

Expand All @@ -74,6 +73,7 @@
import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
import org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.FileUtils;
Expand Down Expand Up @@ -126,6 +126,7 @@ public class GobblinTemporalTaskRunner implements StandardMetricsBridge {
private final boolean isMetricReportingFailureFatal;
private final boolean isEventReportingFailureFatal;
private final List<TemporalWorker> workers;
private final ManagedWorkflowServiceStubs managedWorkflowServiceStubs;

public GobblinTemporalTaskRunner(String applicationName,
String applicationId,
Expand Down Expand Up @@ -163,6 +164,9 @@ public GobblinTemporalTaskRunner(String applicationName,
ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
this.workers = new ArrayList<>();

String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
this.managedWorkflowServiceStubs = TemporalWorkflowClientFactory.createServiceInstance(connectionUri);

logger.info("GobblinTaskRunner({}): applicationName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
this.isTaskDriver ? "taskDriver" : "worker",
applicationName,
Expand Down Expand Up @@ -241,12 +245,10 @@ public void start()
private TemporalWorker initiateWorker() throws Exception {
logger.info("Starting Temporal Worker");

String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
WorkflowServiceStubs service = TemporalWorkflowClientFactory.createServiceInstance(connectionUri);

String namespace = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE,
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(service, namespace);
WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(
managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace);

String workerClassName = ConfigUtils.getString(clusterConfig,
GobblinTemporalConfigurationKeys.WORKER_CLASS, GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
Expand Down Expand Up @@ -293,6 +295,7 @@ public synchronized void stop() {
}

workers.forEach(TemporalWorker::shutdown);
managedWorkflowServiceStubs.close();

logger.info("All services are stopped.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,9 @@ public GobblinJobLauncher(Properties jobProps, Path appWorkDir,
@Override
public void close() throws IOException {
try {
executeCancellation();
cleanupWorkingDirectory();
} finally {
try {
cleanupWorkingDirectory();
} finally {
super.close();
}
super.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.temporal.joblauncher;

import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -33,7 +34,6 @@
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowFailedException;
import io.temporal.client.WorkflowStub;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.workflow.Workflow;

import org.apache.hadoop.fs.FileSystem;
Expand All @@ -48,6 +48,7 @@
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
import org.apache.gobblin.util.ConfigUtils;

import static org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.*;
Expand All @@ -74,7 +75,7 @@ public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher {
private static final Logger log = Workflow.getLogger(GobblinTemporalJobLauncher.class);
private static final int TERMINATION_TIMEOUT_SECONDS = 3;

protected WorkflowServiceStubs workflowServiceStubs;
protected ManagedWorkflowServiceStubs managedWorkflowServiceStubs;
protected WorkflowClient client;
protected String queueName;
protected String namespace;
Expand All @@ -87,10 +88,10 @@ public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir,
log.info("GobblinTemporalJobLauncher: appWorkDir {}; jobProps {}", appWorkDir, jobProps);

String connectionUri = jobProps.getProperty(TEMPORAL_CONNECTION_STRING);
this.workflowServiceStubs = createServiceInstance(connectionUri);
this.managedWorkflowServiceStubs = createServiceInstance(connectionUri);

this.namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE, DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
this.client = createClientInstance(workflowServiceStubs, namespace);
this.client = createClientInstance(managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace);

this.queueName = jobProps.getProperty(GOBBLIN_TEMPORAL_TASK_QUEUE, DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);

Expand Down Expand Up @@ -139,7 +140,8 @@ protected void executeCancellation() {
.setNamespace(this.namespace)
.setExecution(workflowStub.getExecution())
.build();
DescribeWorkflowExecutionResponse response = workflowServiceStubs.blockingStub().describeWorkflowExecution(request);
DescribeWorkflowExecutionResponse response = managedWorkflowServiceStubs.getWorkflowServiceStubs()
.blockingStub().describeWorkflowExecution(request);

WorkflowExecutionStatus status;
try {
Expand Down Expand Up @@ -188,4 +190,17 @@ protected void removeTasksFromCurrentJob(List<String> workUnitIdsToRemove) {
protected void addTasksToCurrentJob(List<WorkUnit> workUnitsToAdd) {
log.warn("NOT IMPLEMENTED: Temporal addTasksToCurrentJob");
}

@Override
public void close() throws IOException {
try {
// Calling cancel before calling close on serviceStubs as it will shutdown the service which is required during cancellation.
cancelJob(jobListener);
} catch (Exception e) {
log.error("Exception occurred while cancelling job", e);
} finally {
managedWorkflowServiceStubs.close();
super.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,17 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
} else {
LOGGER.info("No job schedule found, so running job " + jobUri);
GobblinTemporalJobLauncherListener listener = new GobblinTemporalJobLauncherListener(this.launcherMetrics);
JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig());
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
launcher.cancelJob(listener);
} catch (JobException e) {
LOGGER.error("Failed to cancel the job during shutdown", e);
throw new RuntimeException(e);
}
}));
launcher.launchJob(listener);
try (JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig())) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
launcher.cancelJob(listener);
} catch (JobException e) {
LOGGER.error("Failed to cancel the job during shutdown", e);
throw new RuntimeException(e);
}
}));
launcher.launchJob(listener);
}
}
} catch (Exception je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.assistance.MDCContextPropagator;
import org.apache.gobblin.temporal.workflows.metrics.TemporalMetricsHelper;
import org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
import org.apache.gobblin.util.ConfigUtils;


public class TemporalWorkflowClientFactory {
public static WorkflowServiceStubs createServiceInstance(String connectionUri) throws Exception {
public static ManagedWorkflowServiceStubs createServiceInstance(String connectionUri) throws Exception {
GobblinClusterUtils.setSystemProperties(ConfigFactory.load());
Config config = GobblinClusterUtils.addDynamicConfig(ConfigFactory.load());
String SHARED_KAFKA_CONFIG_PREFIX_WITH_DOT = "gobblin.kafka.sharedConfig.";
Expand Down Expand Up @@ -119,7 +120,7 @@ public static WorkflowServiceStubs createServiceInstance(String connectionUri) t
.setSslContext(sslContext)
.setMetricsScope(metricsScope)
.build();
return WorkflowServiceStubs.newServiceStubs(options);
return new ManagedWorkflowServiceStubs(WorkflowServiceStubs.newServiceStubs(options));
}

public static WorkflowClient createClientInstance(WorkflowServiceStubs service, String namespace) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.workflows.service;

import java.io.Closeable;

import io.temporal.serviceclient.WorkflowServiceStubs;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
* A wrapper class of {@link WorkflowServiceStubs} that implements the Closeable interface.
* It manages the lifecycle of {@link WorkflowServiceStubs}, ensuring proper shutdown and resource cleanup.
*/
@Getter
@Slf4j
public class ManagedWorkflowServiceStubs implements Closeable {
private final WorkflowServiceStubs workflowServiceStubs;

public ManagedWorkflowServiceStubs(WorkflowServiceStubs serviceStubs) {
this.workflowServiceStubs = serviceStubs;
}

@Override
public void close() {
try {
workflowServiceStubs.getOptions().getMetricsScope().close();
}
catch (Exception e) {
log.error("Exception occurred while closing metrics scope", e);
}
try {
workflowServiceStubs.shutdown();
}
catch (Exception e) {
log.error("Exception occurred while shutting down WorkflowServiceStubs", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
import org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
import org.apache.gobblin.util.JobLauncherUtils;

import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -83,6 +84,7 @@ protected void submitJob(List<WorkUnit> workUnits)
@BeforeClass
public void setUp() throws Exception {
mockServiceStubs = mock(WorkflowServiceStubs.class);
ManagedWorkflowServiceStubs managedWorkflowServiceStubs = new ManagedWorkflowServiceStubs(mockServiceStubs);
mockClient = mock(WorkflowClient.class);
mockExecutionInfo = mock(WorkflowExecutionInfo.class);
DescribeWorkflowExecutionResponse mockResponse = mock(DescribeWorkflowExecutionResponse.class);
Expand All @@ -93,7 +95,7 @@ public void setUp() throws Exception {

mockWorkflowClientFactory = Mockito.mockStatic(TemporalWorkflowClientFactory.class);
mockWorkflowClientFactory.when(() -> TemporalWorkflowClientFactory.createServiceInstance(Mockito.anyString()))
.thenReturn(mockServiceStubs);
.thenReturn(managedWorkflowServiceStubs);
mockWorkflowClientFactory.when(() -> TemporalWorkflowClientFactory.createClientInstance(Mockito.any(), Mockito.anyString()))
.thenReturn(mockClient);

Expand Down

0 comments on commit 105765a

Please sign in to comment.