Skip to content

chore: New task execution task id test #1352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.dapr.testcontainers.Component;
import io.dapr.testcontainers.DaprContainer;
import io.dapr.testcontainers.DaprLogLevel;
Expand All @@ -40,6 +41,7 @@
import java.util.Map;

import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

Expand Down Expand Up @@ -117,6 +119,29 @@ public void testWorkflows() throws Exception {
assertEquals(instanceId, workflowOutput.getWorkflowId());
}

@Test
public void testExecutionKeyWorkflows() throws Exception {
TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>());
String instanceId = workflowClient.scheduleNewWorkflow(TestExecutionKeysWorkflow.class, payload);

workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(100), false);

Duration timeout = Duration.ofSeconds(1000);
WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, timeout, true);

assertNotNull(workflowStatus);

TestWorkflowPayload workflowOutput = deserialize(workflowStatus.getSerializedOutput());

assertEquals(1, workflowOutput.getPayloads().size());
assertEquals("Execution key found", workflowOutput.getPayloads().get(0));

String executionKey = workflowOutput.getWorkflowId() +"-"+"io.dapr.it.testcontainers.TaskExecutionKeyActivity";
assertTrue(KeyStore.getInstance().getKey(executionKey));

assertEquals(instanceId, workflowOutput.getWorkflowId());
}

private TestWorkflowPayload deserialize(String value) throws JsonProcessingException {
return OBJECT_MAPPER.readValue(value, TestWorkflowPayload.class);
}
Expand Down
39 changes: 39 additions & 0 deletions sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.dapr.it.testcontainers;

import java.util.HashMap;
import java.util.Map;

public class KeyStore {

private final Map<String, Boolean> keyStore = new HashMap<>();

private static KeyStore instance;

private KeyStore() {
}

public static KeyStore getInstance() {
if (instance == null) {
synchronized (KeyStore.class) {
if (instance == null) {
instance = new KeyStore();
}
}
}
return instance;
}


public void addKey(String key, Boolean value) {
keyStore.put(key, value);
}

public Boolean getKey(String key) {
return keyStore.get(key);
}

public void removeKey(String key) {
keyStore.remove(key);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.it.testcontainers;

import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;

public class TaskExecutionKeyActivity implements WorkflowActivity {

@Override
public Object run(WorkflowActivityContext ctx) {
TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class);
KeyStore keyStore = KeyStore.getInstance();
Boolean exists = keyStore.getKey(ctx.getTaskExecutionKey());
if (!Boolean.TRUE.equals(exists)) {
keyStore.addKey(ctx.getTaskExecutionKey(), true);
workflowPayload.getPayloads().add("Execution key not found");
throw new IllegalStateException("Task execution key not found");
}
workflowPayload.getPayloads().add("Execution key found");
return workflowPayload;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ public WorkflowRuntimeBuilder workflowRuntimeBuilder(
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(new Properties(overrides));

builder.registerWorkflow(TestWorkflow.class);
builder.registerWorkflow(TestExecutionKeysWorkflow.class);
builder.registerActivity(FirstActivity.class);
builder.registerActivity(SecondActivity.class);

builder.registerActivity(TaskExecutionKeyActivity.class);


return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.it.testcontainers;

import io.dapr.durabletask.Task;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryPolicy;

import java.time.Duration;

import org.slf4j.Logger;

public class TestExecutionKeysWorkflow implements Workflow {

@Override
public WorkflowStub create() {
return ctx -> {

Logger logger = ctx.getLogger();
String instanceId = ctx.getInstanceId();
logger.info("Starting Workflow: " + ctx.getName());
logger.info("Instance ID: " + instanceId);
logger.info("Current Orchestration Time: " + ctx.getCurrentInstant());

TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class);
workflowPayload.setWorkflowId(instanceId);

WorkflowTaskOptions options = new WorkflowTaskOptions(WorkflowTaskRetryPolicy.newBuilder()
.setMaxNumberOfAttempts(3)
.setFirstRetryInterval(Duration.ofSeconds(1))
.setMaxRetryInterval(Duration.ofSeconds(10))
.setBackoffCoefficient(2.0)
.setRetryTimeout(Duration.ofSeconds(50))
.build());


Task<TestWorkflowPayload> t = ctx.callActivity(TaskExecutionKeyActivity.class.getName(), workflowPayload, options,TestWorkflowPayload.class);

TestWorkflowPayload payloadAfterExecution = t.await();

ctx.complete(payloadAfterExecution);
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.dapr.it.testcontainers;

import java.util.ArrayList;
import java.util.List;

/**
* Singleton class to hold workflow-related classes and their instances.
*/
public class WorkflowSingleton {
private static WorkflowSingleton instance;
private final List<String> payloads;
private String workflowId;

private WorkflowSingleton() {
this.payloads = new ArrayList<>();
}

public static synchronized WorkflowSingleton getInstance() {
if (instance == null) {
instance = new WorkflowSingleton();
}
return instance;
}

public List<String> getPayloads() {
return payloads;
}

public void addPayload(String payload) {
payloads.add(payload);
}

public String getWorkflowId() {
return workflowId;
}

public void setWorkflowId(String workflowId) {
this.workflowId = workflowId;
}

public void clear() {
payloads.clear();
workflowId = null;
}
}
2 changes: 1 addition & 1 deletion sdk-workflows/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<dependency>
<groupId>io.dapr</groupId>
<artifactId>durabletask-client</artifactId>
<version>1.5.2</version>
<version>1.5.3</version>
</dependency>
<!--
manually declare durabletask-client's jackson dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public interface WorkflowActivityContext {

String getName();

String getTaskExecutionKey();

<T> T getInput(Class<T> targetType);

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,9 @@
public <T> T getInput(Class<T> targetType) {
return this.innerContext.getInput(targetType);
}

@Override
public String getTaskExecutionKey() {
return this.innerContext.getTaskExecutionKey();

Check warning on line 62 in sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java

View check run for this annotation

Codecov / codecov/patch

sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java#L62

Added line #L62 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ public void callChildWorkflowWithOptions() {
WorkflowTaskRetryPolicy retryPolicy = WorkflowTaskRetryPolicy.newBuilder()
.setMaxNumberOfAttempts(1)
.setFirstRetryInterval(Duration.ofSeconds(10))
.setRetryTimeout(Duration.ofSeconds(10))
.build();
WorkflowTaskOptions executionOptions = new WorkflowTaskOptions(retryPolicy);
ArgumentCaptor<TaskOptions> captor = ArgumentCaptor.forClass(TaskOptions.class);
Expand Down
Loading