Skip to content

Commit 451c841

Browse files
committed
Merge pull request #352 from UNC-Libraries/prevent-duplicate-jobs
Prevent duplicate deposit jobs from enqueuing when resuming
2 parents 07d9abc + f49bc60 commit 451c841

File tree

4 files changed

+89
-7
lines changed

4 files changed

+89
-7
lines changed

Diff for: deposit/src/main/java/edu/unc/lib/deposit/work/DepositSupervisor.java

+74-7
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import java.io.File;
44
import java.text.MessageFormat;
55
import java.util.Arrays;
6+
import java.util.HashMap;
7+
import java.util.HashSet;
68
import java.util.List;
79
import java.util.Map;
810
import java.util.Set;
@@ -15,6 +17,8 @@
1517

1618
import net.greghaines.jesque.Job;
1719
import net.greghaines.jesque.client.Client;
20+
import net.greghaines.jesque.meta.QueueInfo;
21+
import net.greghaines.jesque.meta.dao.QueueInfoDAO;
1822
import net.greghaines.jesque.worker.Worker;
1923
import net.greghaines.jesque.worker.WorkerEvent;
2024
import net.greghaines.jesque.worker.WorkerListener;
@@ -44,6 +48,7 @@
4448
import edu.unc.lib.dl.util.DepositStatusFactory;
4549
import edu.unc.lib.dl.util.JobStatusFactory;
4650
import edu.unc.lib.dl.util.PackagingType;
51+
import edu.unc.lib.dl.util.RedisWorkerConstants;
4752
import edu.unc.lib.dl.util.RedisWorkerConstants.DepositAction;
4853
import edu.unc.lib.dl.util.RedisWorkerConstants.DepositField;
4954
import edu.unc.lib.dl.util.RedisWorkerConstants.DepositState;
@@ -72,6 +77,9 @@ public class DepositSupervisor implements WorkerListener {
7277
@Autowired
7378
private WorkerPool cdrMetsDepositWorkerPool;
7479

80+
@Autowired
81+
private QueueInfoDAO queueDAO;
82+
7583
@Autowired
7684
private DepositEmailHandler depositEmailHandler;
7785

@@ -137,6 +145,9 @@ public void init() {
137145
}
138146

139147
public void start() {
148+
// Repopulate the queue
149+
requeueAll();
150+
140151
LOG.info("Starting deposit checks and worker pool");
141152
if (timer != null)
142153
return;
@@ -225,15 +236,37 @@ public void run() {
225236
LOG.info("Starting deposit workers");
226237
cdrMetsDepositWorkerPool.run();
227238
}
228-
229-
// Repopulate the queue
230-
requeueAll();
239+
}
240+
241+
private Map<String, Set<String>> getQueuedDepositsWithJobs() {
242+
Map<String, Set<String>> depositMap = new HashMap<>();
243+
addQueuedDeposits(RedisWorkerConstants.DEPOSIT_PREPARE_QUEUE, depositMap);
244+
addQueuedDeposits(RedisWorkerConstants.DEPOSIT_DELAYED_QUEUE, depositMap);
245+
addQueuedDeposits(RedisWorkerConstants.DEPOSIT_CDRMETS_QUEUE, depositMap);
246+
return depositMap;
247+
}
248+
249+
private void addQueuedDeposits(String queueName, Map<String, Set<String>> depositMap) {
250+
QueueInfo info = queueDAO.getQueueInfo(queueName, 0, 0);
251+
252+
for (Job job : info.getJobs()) {
253+
String depositId = (String) job.getArgs()[1];
254+
255+
Set<String> jobs = depositMap.get(depositId);
256+
if (jobs == null) {
257+
jobs = new HashSet<>();
258+
depositMap.put(depositId, jobs);
259+
}
260+
jobs.add(job.getClassName());
261+
}
231262
}
232263

233264
/**
234265
* Add jobs previously running or queued back to the queue
235266
*/
236267
private void requeueAll() {
268+
269+
Map<String, Set<String>> depositSet = getQueuedDepositsWithJobs();
237270
Set<Map<String, String>> depositStatuses = depositStatusFactory.getAll();
238271

239272
LOG.info("Repopulating the deposit queue, {} items in backlog", depositStatuses.size());
@@ -246,7 +279,16 @@ private void requeueAll() {
246279
// Job may have been locked to a particular supervisor depend on when it was interrupted
247280
depositStatusFactory.removeSupervisorLock(uuid);
248281
// Inform supervisor to resume this deposit from where it left off
249-
depositStatusFactory.setActionRequest(uuid, DepositAction.resume);
282+
if (depositSet.containsKey(uuid)) {
283+
// If the job is queued but the job it is waiting on is a cleanup, then it is finished
284+
if (depositSet.get(uuid).contains(CleanupDepositJob.class.getName())) {
285+
depositStatusFactory.setState(uuid, DepositState.finished);
286+
} else {
287+
LOG.debug("Skipping resumption of deposit {} because it already is in the queue", uuid);
288+
}
289+
} else {
290+
depositStatusFactory.setActionRequest(uuid, DepositAction.resume);
291+
}
250292
}
251293
}
252294

@@ -257,7 +299,21 @@ private void requeueAll() {
257299

258300
depositStatusFactory.removeSupervisorLock(uuid);
259301
// Re-register as a new deposit
260-
depositStatusFactory.setActionRequest(uuid, DepositAction.register);
302+
if (depositSet.containsKey(uuid)) {
303+
if (depositSet.get(uuid).contains(CleanupDepositJob.class.getName())) {
304+
depositStatusFactory.setState(uuid, DepositState.finished);
305+
} else {
306+
LOG.debug("Skipping resumption of queued deposit {} because it already is in the queue", uuid);
307+
}
308+
} else {
309+
List<String> successfulJobs = jobStatusFactory.getSuccessfulJobNames(uuid);
310+
if (successfulJobs != null && successfulJobs.size() > 0) {
311+
// Queued but had already performed some jobs, so this is a resumption rather than new deposit
312+
depositStatusFactory.setActionRequest(uuid, DepositAction.resume);
313+
} else {
314+
depositStatusFactory.setActionRequest(uuid, DepositAction.register);
315+
}
316+
}
261317
}
262318
}
263319
}
@@ -607,9 +663,20 @@ private void resumeDeposit(String uuid, Map<String, String> status, long delay)
607663
// Clear out the previous failed job if there was one
608664
jobStatusFactory.clearStale(uuid);
609665
depositStatusFactory.deleteField(uuid, DepositField.errorMessage);
666+
667+
// since we already checked for queued jobs at startup, only check when resuming from a paused state
668+
boolean enqueueNext = true;
669+
if (DepositState.paused.name().equals(status.get(DepositField.state.name()))) {
670+
Map<String, Set<String>> depositSet = getQueuedDepositsWithJobs();
671+
enqueueNext = !depositSet.containsKey(uuid);
672+
}
610673

611-
List<String> successfulJobs = jobStatusFactory.getSuccessfulJobNames(uuid);
612-
queueNextJob(null, uuid, status, successfulJobs, delay);
674+
if (enqueueNext) {
675+
List<String> successfulJobs = jobStatusFactory.getSuccessfulJobNames(uuid);
676+
queueNextJob(null, uuid, status, successfulJobs, delay);
677+
} else {
678+
LOG.info("Resuming {} from paused state. A job is already queued so no new jobs will be enqueued", uuid);
679+
}
613680

614681
depositStatusFactory.setState(uuid, DepositState.queued);
615682
} catch (DepositFailedException e) {

Diff for: deposit/src/main/resources/service-context.xml

+5
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@
6767
<constructor-arg ref="jedisPool" />
6868
</bean>
6969

70+
<bean id="queueDAO" class="net.greghaines.jesque.meta.dao.impl.QueueInfoDAORedisImpl">
71+
<constructor-arg ref="jesqueConfig" />
72+
<constructor-arg ref="jedisPool" />
73+
</bean>
74+
7075
<bean id="dataSet" class="com.hp.hpl.jena.tdb.TDBFactory" factory-method="createDataset" destroy-method="close">
7176
<constructor-arg value="${deposits.dir}/jena-tdb-dataset"/>
7277
</bean>

Diff for: deposit/src/test/resources/service-context.xml

+5
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@
7373
<constructor-arg ref="jedisPool" />
7474
</bean>
7575

76+
<bean id="queueDAO" class="net.greghaines.jesque.meta.dao.impl.QueueInfoDAORedisImpl">
77+
<constructor-arg ref="jesqueConfig" />
78+
<constructor-arg ref="jedisPool" />
79+
</bean>
80+
7681
<bean id="dataSet" class="com.hp.hpl.jena.tdb.TDBFactory" factory-method="createDataset" destroy-method="close">
7782
<constructor-arg value="#{depositsDirectory+'/jena-tdb-dataset'}"/>
7883
</bean>

Diff for: metadata/src/main/java/edu/unc/lib/dl/util/RedisWorkerConstants.java

+5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ public class RedisWorkerConstants {
1212
public static final String BULK_RESUME_PREFIX = "bulk-resume:";
1313
public static final String BULK_UPDATE_QUEUE = "bulk-md-update";
1414

15+
public static final String DEPOSIT_PREPARE_QUEUE = "PREPARE";
16+
public static final String DEPOSIT_DELAYED_QUEUE = "DELAYED_PREPARE";
17+
public static final String DEPOSIT_CDRMETS_QUEUE = "CDRMETSCONVERT";
18+
public static final String RESQUE_QUEUE_PREFIX = "resque:queue:";
19+
1520
public static enum DepositField {
1621
uuid, state, actionRequest, contactName, depositorName, intSenderIdentifier, intSenderDescription,
1722
fileName, resubmitDirName, resubmitFileName, isResubmit, depositMethod, containerId, payLoadOctets,

0 commit comments

Comments
 (0)