Skip to content

Commit 4c45d13

Browse files
[edison-jobs]: add local scheduling
1 parent bfc0b31 commit 4c45d13

File tree

3 files changed

+155
-0
lines changed

3 files changed

+155
-0
lines changed

edison-jobs/src/main/java/de/otto/edison/jobs/configuration/JobsConfiguration.java

+8
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
2424
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
2525
import org.springframework.boot.context.properties.EnableConfigurationProperties;
26+
import org.springframework.boot.task.ThreadPoolTaskSchedulerBuilder;
2627
import org.springframework.context.annotation.Bean;
2728
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.scheduling.TaskScheduler;
2830
import org.springframework.scheduling.annotation.EnableAsync;
2931
import org.springframework.scheduling.annotation.EnableScheduling;
3032

@@ -80,6 +82,12 @@ public Thread newThread(Runnable r) {
8082
});
8183
}
8284

85+
@Bean
86+
@ConditionalOnMissingBean(TaskScheduler.class)
87+
public TaskScheduler taskScheduler(ThreadPoolTaskSchedulerBuilder builder) {
88+
return builder.build();
89+
}
90+
8391
@Bean
8492
@ConditionalOnMissingBean(JobMetaRepository.class)
8593
public JobMetaRepository jobMetaRepository() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package de.otto.edison.jobs.service;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
6+
import org.springframework.boot.context.event.ApplicationReadyEvent;
7+
import org.springframework.context.event.EventListener;
8+
import org.springframework.scheduling.TaskScheduler;
9+
import org.springframework.scheduling.Trigger;
10+
import org.springframework.scheduling.TriggerContext;
11+
import org.springframework.scheduling.support.CronTrigger;
12+
import org.springframework.scheduling.support.PeriodicTrigger;
13+
import org.springframework.scheduling.support.SimpleTriggerContext;
14+
import org.springframework.stereotype.Service;
15+
16+
import java.util.List;
17+
18+
@Service
19+
@ConditionalOnProperty(value = "edison.jobs.localScheduling.enabled", havingValue = "true")
20+
public class LocalJobScheduler {
21+
private static final Logger LOG = LoggerFactory.getLogger(LocalJobScheduler.class);
22+
23+
private final List<JobRunnable> jobRunnables;
24+
private final JobService jobService;
25+
private final TaskScheduler taskScheduler;
26+
27+
public LocalJobScheduler(List<JobRunnable> jobRunnables, JobService jobService, TaskScheduler taskScheduler) {
28+
this.jobRunnables = jobRunnables;
29+
this.jobService = jobService;
30+
this.taskScheduler = taskScheduler;
31+
}
32+
33+
@EventListener(ApplicationReadyEvent.class)
34+
public void schedule() {
35+
TriggerContext dummyTriggerContext = new SimpleTriggerContext();
36+
jobRunnables.stream()
37+
.map(JobRunnable::getJobDefinition)
38+
.filter(jobDefinition -> jobDefinition.cron().isPresent() || jobDefinition.fixedDelay().isPresent())
39+
.forEach(jobDefinition -> {
40+
Trigger trigger;
41+
if (jobDefinition.cron().isPresent()) {
42+
trigger = new CronTrigger(jobDefinition.cron().get());
43+
} else {
44+
trigger = new PeriodicTrigger(jobDefinition.fixedDelay().get());
45+
}
46+
47+
taskScheduler.schedule(() -> jobService.startAsyncJob(jobDefinition.jobType()), trigger);
48+
LOG.info("Scheduled {} for {}", jobDefinition.jobType(), trigger.nextExecution(dummyTriggerContext));
49+
});
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package de.otto.edison.jobs.service;
2+
3+
import org.junit.jupiter.api.BeforeEach;
4+
import org.junit.jupiter.api.Test;
5+
import org.mockito.ArgumentCaptor;
6+
import org.mockito.Mock;
7+
import org.springframework.scheduling.TaskScheduler;
8+
import org.springframework.scheduling.Trigger;
9+
import org.springframework.scheduling.support.CronTrigger;
10+
import org.springframework.scheduling.support.PeriodicTrigger;
11+
12+
import java.time.Duration;
13+
import java.util.List;
14+
import java.util.Optional;
15+
16+
import static de.otto.edison.jobs.definition.DefaultJobDefinition.*;
17+
import static org.junit.jupiter.api.Assertions.assertTrue;
18+
import static org.mockito.Mockito.*;
19+
import static org.mockito.MockitoAnnotations.openMocks;
20+
21+
class LocalJobSchedulerTest {
22+
23+
@Mock
24+
private JobRunnable fixedDelayJobRunnable;
25+
@Mock
26+
private JobRunnable manualJobRunnable;
27+
@Mock
28+
private JobRunnable cronJobRunnable;
29+
30+
@Mock
31+
private JobService jobService;
32+
@Mock
33+
private TaskScheduler taskScheduler;
34+
35+
@BeforeEach
36+
public void setUp() {
37+
openMocks(this);
38+
when(fixedDelayJobRunnable.getJobDefinition()).thenReturn(
39+
fixedDelayJobDefinition("FIXED", "", "", Duration.ofSeconds(2), 0, Optional.empty())
40+
);
41+
when(manualJobRunnable.getJobDefinition()).thenReturn(
42+
manuallyTriggerableJobDefinition("MANUAL", "", "", 0, Optional.empty())
43+
);
44+
when(cronJobRunnable.getJobDefinition()).thenReturn(
45+
cronJobDefinition("CRON", "", "", "0 0 * * * *", 0, Optional.empty())
46+
);
47+
}
48+
49+
@Test
50+
public void shouldScheduleRunnable() {
51+
// given
52+
LocalJobScheduler localJobScheduler = new LocalJobScheduler(List.of(fixedDelayJobRunnable, cronJobRunnable), jobService, taskScheduler);
53+
54+
// when
55+
localJobScheduler.schedule();
56+
57+
// then
58+
ArgumentCaptor<Trigger> triggerCaptor = ArgumentCaptor.forClass(Trigger.class);
59+
verify(taskScheduler, times(2)).schedule(any(), triggerCaptor.capture());
60+
61+
assertTrue(triggerCaptor.getAllValues().stream().anyMatch(trigger -> trigger instanceof CronTrigger));
62+
assertTrue(triggerCaptor.getAllValues().stream().anyMatch(trigger -> trigger instanceof PeriodicTrigger));
63+
}
64+
65+
@Test
66+
public void shouldStartJobFromRunnable() {
67+
// given
68+
doAnswer(invocation -> {
69+
Runnable runnable = invocation.getArgument(0);
70+
runnable.run();
71+
return null;
72+
}).when(taskScheduler).schedule(any(), (Trigger) any());
73+
LocalJobScheduler localJobScheduler = new LocalJobScheduler(List.of(fixedDelayJobRunnable, cronJobRunnable), jobService, taskScheduler);
74+
75+
// when
76+
localJobScheduler.schedule();
77+
78+
// then
79+
ArgumentCaptor<String> stringCaptor = ArgumentCaptor.forClass(String.class);
80+
verify(jobService, times(2)).startAsyncJob(stringCaptor.capture());
81+
assertTrue(stringCaptor.getAllValues().contains("FIXED"));
82+
assertTrue(stringCaptor.getAllValues().contains("CRON"));
83+
}
84+
85+
@Test
86+
public void shouldFilterManuallyTriggeredJobs() {
87+
// given
88+
LocalJobScheduler localJobScheduler = new LocalJobScheduler(List.of(fixedDelayJobRunnable, cronJobRunnable, manualJobRunnable), jobService, taskScheduler);
89+
90+
// when
91+
localJobScheduler.schedule();
92+
93+
// then
94+
verify(taskScheduler, times(2)).schedule(any(), (Trigger) any());
95+
}
96+
}

0 commit comments

Comments
 (0)