Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Task Operations and shell commands for task logs #3393

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,23 @@ public interface TaskOperations {
*/
TaskExecutionResource taskExecutionStatus(long id);

/**
* Return the task execution log. The platform to from which to retrieve the log will be set to {@code default}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove to

*
* @param externalExecutionId the external execution identifier of the task execution.
* @return {@link String} containing the log.
*/
String taskExecutionLog(String externalExecutionId);

/**
* Return the task execution log.
*
* @param externalExecutionId the external execution identifier of the task execution.
* @param platform the platform from which to obtain the log.
* @return {@link String} containing the log.
*/
String taskExecutionLog(String externalExecutionId, String platform);

/**
* Return information including the count of currently executing tasks and task execution
* limits.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -71,6 +72,8 @@ public class TaskTemplate implements TaskOperations {

private static final String PLATFORM_LIST_RELATION = "tasks/platforms";

private static final String RETRIEVE_LOG = "tasks/logs";

private final RestTemplate restTemplate;

private final Link definitionsLink;
Expand All @@ -91,6 +94,8 @@ public class TaskTemplate implements TaskOperations {

private final String dataFlowServerVersion;

private final Link retrieveLogLink;

TaskTemplate(RestTemplate restTemplate, ResourceSupport resources, String dataFlowServerVersion) {
Assert.notNull(resources, "URI Resources must not be be null");
Assert.notNull(resources.getLink(EXECUTIONS_RELATION), "Executions relation is required");
Expand All @@ -101,6 +106,7 @@ public class TaskTemplate implements TaskOperations {
Assert.notNull(resources.getLink(EXECUTION_RELATION), "Execution relation is required");
Assert.notNull(resources.getLink(EXECUTION_RELATION_BY_NAME), "Execution by name relation is required");
Assert.notNull(dataFlowServerVersion, "dataFlowVersion must not be null");
Assert.notNull(resources.getLink(RETRIEVE_LOG), "Log relation is required");

this.dataFlowServerVersion = dataFlowServerVersion;

Expand All @@ -125,6 +131,7 @@ public class TaskTemplate implements TaskOperations {
this.executionsCurrentLink = resources.getLink(EXECUTIONS_CURRENT_RELATION);
this.validationLink = resources.getLink(VALIDATION_REL);
this.platformListLink = resources.getLink(PLATFORM_LIST_RELATION);
this.retrieveLogLink = resources.getLink(RETRIEVE_LOG);
}

@Override
Expand Down Expand Up @@ -194,6 +201,19 @@ public TaskExecutionResource taskExecutionStatus(long id) {
return restTemplate.getForObject(executionLink.expand(id).getHref(), TaskExecutionResource.class);
}

@Override
public String taskExecutionLog(String externalExecutionId) {
return taskExecutionLog(externalExecutionId, "default");
}

@Override
public String taskExecutionLog(String externalExecutionId, String platform) {
Map<String,String> map = new HashMap<>();
map.put("taskExternalExecutionId",externalExecutionId);
map.put("platformName", platform);
return restTemplate.getForObject(retrieveLogLink.expand(map).getHref(), String.class);
}

@Override
public Collection<CurrentTaskExecutionsResource> currentTaskExecutions() {
ParameterizedTypeReference<Collection<CurrentTaskExecutionsResource>> typeReference =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class TaskCommands implements CommandMarker {

private static final String LAUNCH = "task launch";
private static final String STOP = "task execution stop";
private static final String LOG = "task execution log";

// Destroy Role

Expand Down Expand Up @@ -229,6 +230,20 @@ public String stop(@CliOption(key = { "", "ids" }, help = "the task execution id
return String.format("Request to stop the task execution with id(s): %s has been submitted", ids);
}

@CliCommand(value = LOG, help = "Retrieve task execution log")
public String retrieveTaskExecutionLog(@CliOption(key = { "", "id" }, help = "the task execution id", mandatory = true) long id,
@CliOption(key = { "", "platform" }, help = "the platform of the task execution", mandatory = false) String platform) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove key "" option for the platform key as we can only have one of the options with the default empty key.

TaskExecutionResource taskExecutionResource = taskOperations().taskExecutionStatus(id);
String result;
if(platform != null) {
result = taskOperations().taskExecutionLog(taskExecutionResource.getExternalExecutionId(), platform);
}
else {
result = taskOperations().taskExecutionLog(taskExecutionResource.getExternalExecutionId());
}
return result;
}

@CliCommand(value = DESTROY, help = "Destroy an existing task")
public String destroy(
@CliOption(key = { "", "name" }, help = "the name of the task to destroy", mandatory = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
*/
public class TaskCommandTemplate {

private final static int WAIT_INTERVAL = 500;

private final static int MAX_WAIT_TIME = 3000;

private final JLineShellComponent shell;

private List<String> tasks = new ArrayList<String>();
Expand Down Expand Up @@ -103,6 +107,65 @@ public long launchWithAlternateCTR(String taskName, String ctrAppName) {
return value;
}

/**
* Launch a task and validate the result from shell on default platform.
*
* @param taskName the name of the task
*/
public String getTaskExecutionLog(String taskName) throws Exception{
long id = launchTaskExecutionForLog(taskName);
CommandResult cr = shell.executeCommand("task execution log --id " + id);
assertTrue(cr.toString().contains("Starting TimestampTaskApplication"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking TimestampTaskApplication here may not be correct as this method looks generic.


return cr.toString();
}

/**
* Launch a task with invalid platform.
*
* @param taskName the name of the task
*/
public void getTaskExecutionLogInvalidPlatform(String taskName) throws Exception{
long id = launchTaskExecutionForLog(taskName);
shell.executeCommand(String.format("task execution log --id %s --platform %s", id, "foo"));
}

/**
* Launch a task with invalid task execution id

*/
public void getTaskExecutionLogInvalidId() throws Exception{
CommandResult cr = shell.executeCommand(String.format("task execution log --id %s", 88));
}

private long launchTaskExecutionForLog(String taskName) throws Exception{
// add the task name to the tasks list before assertion
tasks.add(taskName);
CommandResult cr = shell.executeCommand(String.format("task launch %s", taskName));
CommandResult idResult = shell.executeCommand("task execution list --name " + taskName);
Table taskExecutionResult = (Table) idResult.getResult();

long id = (long) taskExecutionResult.getModel().getValue(1, 1);
assertTrue(cr.toString().contains("with execution id " + id));
waitForDBToBePopulated(id);
return id;
}

private void waitForDBToBePopulated(long id) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why do we need this method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I get this now.

for (int waitTime = 0; waitTime <= MAX_WAIT_TIME; waitTime += WAIT_INTERVAL) {
Thread.sleep(WAIT_INTERVAL);
if (isEndTime(id)) {
break;
}
}
}

private boolean isEndTime(long id) {
CommandResult cr = taskExecutionStatus(id);
Table table = (Table) cr.getResult();
return (table.getModel().getValue(6, 1) != null);

}
/**
* Stop a task execution.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,27 @@ public void testTaskLaunchCTRUsingAltCtrName() {
task().launchWithAlternateCTR(taskName, "timestamp");
}

@Test
public void testGetLog() throws Exception{
logger.info("Retrieving task execution log");
String taskName = generateUniqueStreamOrTaskName();
task().create(taskName, "timestamp");
task().getTaskExecutionLog(taskName);
}

@Test(expected = IllegalArgumentException.class)
public void testGetLogInvalidPlatform() throws Exception{
logger.info("Retrieving task execution log");
String taskName = generateUniqueStreamOrTaskName();
task().create(taskName, "timestamp");
task().getTaskExecutionLogInvalidPlatform(taskName);
}

@Test(expected = IllegalArgumentException.class)
public void testGetLogInvalidId() throws Exception{
task().getTaskExecutionLogInvalidId();
}

@Test
public void testTaskLaunchCTRUsingInvalidAltCtrAppName() {
testInvalidCTRLaunch("1: timestamp && 2: timestamp", "timesdaftamp",
Expand Down Expand Up @@ -245,8 +266,7 @@ public void testViewExecution() {

CommandResult idResult = task().taskExecutionList();
Table result = (Table) idResult.getResult();

long value = (long) result.getModel().getValue(1, 1);
long value = (long) result.getModel().getValue(findRowForExecutionId(result, TASK_EXECUTION_ID), 1);
logger.info("Looking up id " + value);
CommandResult cr = task().taskExecutionStatus(value);
assertTrue("task execution status command must be successful", cr.isSuccess());
Expand Down Expand Up @@ -339,4 +359,17 @@ private void verifyTableValue(Table table, int row, int col, Object expected) {
assertEquals(String.format("Row %d, Column %d should be: %s", row, col, expected),expected,
table.getModel().getValue(row, col));
}

private int findRowForExecutionId(Table table, long id) {
int result = -1;
for(int rowNum = 0; rowNum < table.getModel().getRowCount(); rowNum++) {
if(table.getModel().getValue(rowNum, 1).equals(id)) {
result = rowNum;
break;
}
}
assertTrue("Task Execution Id specified was not found in execution list", id > -1);
return result;
}

}