Skip to content

Commit 9af7cb6

Browse files
NicoKiaruhinerm
authored andcommitted
Task interface and DefaultTask modifications - backward compatible
With this commit task events are published automatically when an asynchronous task (task.run(runnable))) is started and finished. Additionally, task can be run in a synchronous manner and can trigger task events with additional methods added to the Task interface. This commit brings additional methods to the Task interface (empty default implementations provided): * start and finish method: a job can notify when it is starting and ending in a synchronous manner * setCancelCallback(runnable) method: Can be used to add a callback when a task is canceled The DefaultTask implementation is modified: * TaskEvent are published when start() and finish() are called (synchronous task) * TaskEvent are published just before the runnable is started and after it is finished * A try finally statement is used in the async case to ensure a taskevent is sent even in the case of job execution failure * And event is called when cancel is called, and: * Async job : future.cancel(true) is called, + the custom cancel callback is called, if set * Sync job : the custom cancel callback is called, if set * adds a test to check whether many parallel tasks send an event when they are starting and when they are done * documentation
1 parent 9a9e241 commit 9af7cb6

File tree

3 files changed

+210
-9
lines changed

3 files changed

+210
-9
lines changed

src/main/java/org/scijava/task/DefaultTask.java

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,32 @@
3636
import org.scijava.thread.ThreadService;
3737

3838
/**
39-
* Default implementation of {@link Task}. It launches code via the linked
40-
* {@link ThreadService}, and reports status updates via the linked
41-
* {@link EventService}.
39+
* Default implementation of {@link Task}. Throughout the task (or job),
40+
* {@link Task#setProgressValue(long)} can be called to inform
41+
* how the job is progressing.
4242
*
43-
* @author Curtis Rueden
43+
* Asynchronous case:
44+
* - A job (runnable) is sent for execution to the linked {@link ThreadService}.
45+
* It reports status updates via the linked {@link EventService}.
46+
* A {@link org.scijava.task.event.TaskEvent} is sent before the job
47+
* is started and when finished.
48+
* In the asynchronous case, upon task cancellation ({@link Task#cancel(String)} call),
49+
* the runnable associated to the ThreadService is attempted to be stopped
50+
* by calling {@link Future#cancel(boolean)}.
51+
* This default behaviour can be supplemented by an additional
52+
* custom callback which can be set in {@link Task#setCancelCallBack(Runnable)}.
53+
*
54+
* Synchronous case:
55+
* - A job that reports its status in between calls of {@link Task#start()},
56+
* and {@link Task#finish()}. It also reports its status via
57+
* the linked {@link EventService}.
58+
* Start and finish calls allow publishing proper {@link org.scijava.task.event.TaskEvent}
59+
* to subscribers (with the EventService).
60+
* Upon cancellation of a synchronous task, it is the responsibility
61+
* of the synchronous task to handle its own cancellation through
62+
* a custom callback which can be set via {@link Task#setCancelCallBack(Runnable)}.
63+
*
64+
* @author Curtis Rueden, Nicolas Chiaruttini
4465
*/
4566
public class DefaultTask implements Task {
4667

@@ -51,13 +72,16 @@ public class DefaultTask implements Task {
5172

5273
private boolean canceled;
5374
private String cancelReason;
75+
volatile boolean isDone = false;
5476

5577
private String status;
5678
private long step;
5779
private long max;
5880

5981
private String name;
6082

83+
private Runnable cancelCallBack;
84+
6185
/**
6286
* Creates a new task.
6387
*
@@ -76,20 +100,35 @@ public DefaultTask(final ThreadService threadService,
76100

77101
// -- Task methods --
78102

103+
// - Asynchronous
79104
@Override
80105
public void run(final Runnable r) {
81106
if (r == null) throw new NullPointerException();
82107
future(r);
83108
}
84109

110+
// - Asynchronous
85111
@Override
86112
public void waitFor() throws InterruptedException, ExecutionException {
87113
future().get();
88114
}
89115

116+
// - Synchronous
117+
@Override
118+
public void start() {
119+
fireTaskEvent();
120+
}
121+
122+
// - Synchronous
123+
@Override
124+
public void finish() {
125+
isDone=true;
126+
fireTaskEvent();
127+
}
128+
90129
@Override
91130
public boolean isDone() {
92-
return future != null && future.isDone();
131+
return (isDone) || (future != null && future.isDone());
93132
}
94133

95134
@Override
@@ -136,6 +175,16 @@ public boolean isCanceled() {
136175
public void cancel(final String reason) {
137176
canceled = true;
138177
cancelReason = reason;
178+
if (cancelCallBack!=null) cancelCallBack.run();
179+
if (future!=null) {
180+
isDone = future.cancel(true);
181+
}
182+
fireTaskEvent();
183+
}
184+
185+
@Override
186+
public void setCancelCallBack(Runnable r) {
187+
this.cancelCallBack = r;
139188
}
140189

141190
@Override
@@ -169,7 +218,15 @@ private Future<?> future(final Runnable r) {
169218
private synchronized void initFuture(final Runnable r) {
170219
if (future != null) return;
171220
if (r == null) throw new IllegalArgumentException("Must call run first");
172-
future = threadService.run(r);
221+
future = threadService.run(() -> {
222+
try {
223+
fireTaskEvent(); // Triggers an event just before the task is executed
224+
r.run();
225+
} finally {
226+
isDone = true;
227+
fireTaskEvent(); // Triggers an event just after the task has successfully completed or failed
228+
}
229+
});
173230
}
174231

175232
private void fireTaskEvent() {

src/main/java/org/scijava/task/Task.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,26 +37,56 @@
3737
/**
3838
* A self-aware job which reports its status and progress as it runs.
3939
*
40-
* @author Curtis Rueden
40+
* There are two ways to use a Task object:
41+
* - A job can be run asynchronously by using {@link Task#run(Runnable)}, and
42+
* can report its progression from within the Runnable.
43+
*
44+
* - A {@link Task} object can simply be used to report in a synchronous manner
45+
* the progression of a piece of code. In the case of synchronous reporting,
46+
* the job is considered started when {@link Task#start()} is called and
47+
* finished when {@link Task#finish()} is called. A finished job can be finished
48+
* either because it is done or because it has been cancelled.
49+
*
50+
* A cancel callback can be set with {@link Task#setCancelCallBack(Runnable)}.
51+
* The runnable argument will be executed in the case of an external event
52+
* requesting a cancellation of the task - typically, if a user clicks
53+
* a cancel button on the GUI, task.cancel("User cancellation requested") will
54+
* be called. As a result, the task implementors should run the callback.
55+
* This callback can be used to make the task aware that a cancellation
56+
* has been requested, and should proceed to stop its execution.
57+
*
58+
* See also {@link TaskService}, {@link DefaultTask}
59+
*
60+
* @author Curtis Rueden, Nicolas Chiaruttini
4161
*/
4262
public interface Task extends Cancelable, Named {
4363

4464
/**
45-
* Starts running the task.
65+
* Starts running the task - asynchronous job
4666
*
4767
* @throws IllegalStateException if the task was already started.
4868
*/
4969
void run(Runnable r);
5070

5171
/**
52-
* Waits for the task to complete.
72+
* Waits for the task to complete - asynchronous job
5373
*
5474
* @throws IllegalStateException if {@link #run} has not been called yet.
5575
* @throws InterruptedException if the task is interrupted.
5676
* @throws ExecutionException if the task throws an exception while running.
5777
*/
5878
void waitFor() throws InterruptedException, ExecutionException;
5979

80+
/**
81+
* reports that the task is started - synchronous job
82+
*/
83+
default void start() {}
84+
85+
/**
86+
* reports that the task is finished - synchronous job
87+
*/
88+
default void finish() {}
89+
6090
/** Checks whether the task has completed. */
6191
boolean isDone();
6292

@@ -102,4 +132,12 @@ public interface Task extends Cancelable, Named {
102132
* @see #getProgressMaximum()
103133
*/
104134
void setProgressMaximum(long max);
135+
136+
/**
137+
* If the task is cancelled (external call to {@link Task#cancel(String)}),
138+
* the input runnable argument should be executed by task implementors.
139+
*
140+
* @param runnable : should be executed if this task is cancelled through {@link Task#cancel(String)}
141+
*/
142+
default void setCancelCallBack(Runnable runnable) {}
105143
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package org.scijava.task;
2+
3+
import org.junit.After;
4+
import org.junit.Before;
5+
import org.junit.Test;
6+
import org.scijava.Context;
7+
import org.scijava.event.EventHandler;
8+
import org.scijava.task.event.TaskEvent;
9+
10+
import java.util.HashSet;
11+
import java.util.Set;
12+
13+
import static org.junit.Assert.assertEquals;
14+
15+
/**
16+
* Tests whether many tasks run in parallel consistently trigger an Event
17+
* when each task is started and when each task is ended.
18+
*
19+
* The test fails inconsistently, sometimes with a few tasks remaining, sometimes with almost all tasks remaining.
20+
*/
21+
22+
public class TaskEventTest {
23+
24+
private TaskService taskService;
25+
private TaskEventListener eventListener;
26+
27+
static int nTasks = 500; // Putting higher value can lead to issues because too many threads cannot be launched in parallel
28+
29+
@Before
30+
public void setUp() {
31+
final Context ctx = new Context(TaskService.class);
32+
taskService = ctx.service(TaskService.class);
33+
eventListener = new TaskEventListener();
34+
ctx.inject(eventListener);
35+
}
36+
37+
@After
38+
public void tearDown() {
39+
taskService.context().dispose();
40+
}
41+
42+
@Test
43+
public void testManyTasks() throws InterruptedException {
44+
for (int i=0;i<nTasks;i++) {
45+
createTask(taskService, "Task_"+i, 100, 10, 100);
46+
}
47+
Thread.sleep(5000);
48+
assertEquals(0, eventListener.getLeftOvers().size());
49+
}
50+
51+
public static void createTask(
52+
TaskService taskService,
53+
String taskName,
54+
int msBeforeStart,
55+
int msUpdate,
56+
int msTaskDuration) {
57+
Task task = taskService.createTask(taskName);
58+
59+
new Thread(
60+
() -> {
61+
try {
62+
System.out.println("Waiting to start task "+taskName);
63+
Thread.sleep(msBeforeStart);
64+
65+
// Task started
66+
task.setProgressMaximum(100);
67+
68+
task.run(() -> {
69+
int totalMs = 0;
70+
while(totalMs<msTaskDuration) {
71+
try {
72+
Thread.sleep(msUpdate);
73+
} catch (InterruptedException e) {
74+
e.printStackTrace();
75+
}
76+
totalMs+=msUpdate;
77+
task.setProgressValue((int)(((double)totalMs/msTaskDuration)*100.0));
78+
}
79+
// Task ended
80+
});
81+
82+
} catch (InterruptedException e) {
83+
e.printStackTrace();
84+
}
85+
}).start();
86+
}
87+
88+
public static class TaskEventListener {
89+
90+
Set<Task> tasks = new HashSet<>();
91+
92+
@EventHandler
93+
private synchronized void onEvent(final TaskEvent evt) {
94+
Task task = evt.getTask();
95+
if (task.isDone()) {
96+
tasks.remove(task);
97+
} else {
98+
tasks.add(task);
99+
}
100+
}
101+
102+
public synchronized Set<Task> getLeftOvers() {
103+
return new HashSet<>(tasks);
104+
}
105+
}
106+
}

0 commit comments

Comments
 (0)