Skip to content

Commit ef5fac3

Browse files
committed
Verifying that jobs are not already queued for a deposit before resuming or re-enqueuing during service startup to prevent duplicate jobs being created
1 parent f02cd1c commit ef5fac3

File tree

2 files changed

+97
-8
lines changed

2 files changed

+97
-8
lines changed

deposit/src/main/java/edu/unc/lib/deposit/work/DepositSupervisor.java

+92-8
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package edu.unc.lib.deposit.work;
22

33
import java.io.File;
4+
import java.io.IOException;
45
import java.text.MessageFormat;
56
import java.util.Arrays;
7+
import java.util.HashMap;
8+
import java.util.HashSet;
69
import java.util.List;
710
import java.util.Map;
811
import java.util.Set;
@@ -20,10 +23,14 @@
2023
import net.greghaines.jesque.worker.WorkerListener;
2124
import net.greghaines.jesque.worker.WorkerPool;
2225

26+
import org.codehaus.jackson.JsonNode;
27+
import org.codehaus.jackson.map.ObjectMapper;
2328
import org.slf4j.Logger;
2429
import org.slf4j.LoggerFactory;
2530
import org.springframework.beans.factory.annotation.Autowired;
2631

32+
import redis.clients.jedis.Jedis;
33+
import redis.clients.jedis.JedisPool;
2734
import edu.unc.lib.deposit.CleanupDepositJob;
2835
import edu.unc.lib.deposit.PrepareResubmitJob;
2936
import edu.unc.lib.deposit.fcrepo3.IngestDeposit;
@@ -36,14 +43,15 @@
3643
import edu.unc.lib.deposit.normalize.UnpackDepositJob;
3744
import edu.unc.lib.deposit.normalize.VocabularyEnforcementJob;
3845
import edu.unc.lib.deposit.validate.PackageIntegrityCheckJob;
46+
import edu.unc.lib.deposit.validate.ValidateFileAvailabilityJob;
3947
import edu.unc.lib.deposit.validate.ValidateMODS;
4048
import edu.unc.lib.deposit.validate.VirusScanJob;
41-
import edu.unc.lib.deposit.validate.ValidateFileAvailabilityJob;
4249
import edu.unc.lib.dl.fedora.FedoraTimeoutException;
4350
import edu.unc.lib.dl.util.DepositConstants;
4451
import edu.unc.lib.dl.util.DepositStatusFactory;
4552
import edu.unc.lib.dl.util.JobStatusFactory;
4653
import edu.unc.lib.dl.util.PackagingType;
54+
import edu.unc.lib.dl.util.RedisWorkerConstants;
4755
import edu.unc.lib.dl.util.RedisWorkerConstants.DepositAction;
4856
import edu.unc.lib.dl.util.RedisWorkerConstants.DepositField;
4957
import edu.unc.lib.dl.util.RedisWorkerConstants.DepositState;
@@ -72,6 +80,9 @@ public class DepositSupervisor implements WorkerListener {
7280
@Autowired
7381
private WorkerPool cdrMetsDepositWorkerPool;
7482

83+
@Autowired
84+
private JedisPool jedisPool;
85+
7586
@Autowired
7687
private DepositEmailHandler depositEmailHandler;
7788

@@ -137,6 +148,9 @@ public void init() {
137148
}
138149

139150
public void start() {
151+
// Repopulate the queue
152+
requeueAll();
153+
140154
LOG.info("Starting deposit checks and worker pool");
141155
if (timer != null)
142156
return;
@@ -225,15 +239,53 @@ public void run() {
225239
LOG.info("Starting deposit workers");
226240
cdrMetsDepositWorkerPool.run();
227241
}
228-
229-
// Repopulate the queue
230-
requeueAll();
242+
}
243+
244+
private Map<String, Set<String>> getQueuedDepositsWithJobs() {
245+
Map<String, Set<String>> depositMap = new HashMap<>();
246+
addQueuedDeposits(
247+
RedisWorkerConstants.RESQUE_QUEUE_PREFIX + RedisWorkerConstants.DEPOSIT_PREPARE_QUEUE, depositMap);
248+
addQueuedDeposits(
249+
RedisWorkerConstants.RESQUE_QUEUE_PREFIX + RedisWorkerConstants.DEPOSIT_DELAYED_QUEUE, depositMap);
250+
addQueuedDeposits(
251+
RedisWorkerConstants.RESQUE_QUEUE_PREFIX + RedisWorkerConstants.DEPOSIT_CDRMETS_QUEUE, depositMap);
252+
return depositMap;
253+
}
254+
255+
private void addQueuedDeposits(String queueName, Map<String, Set<String>> depositMap) {
256+
Jedis jedis = jedisPool.getResource();
257+
Set<String> queue;
258+
try {
259+
queue = jedis.zrange(queueName, 0, -1);
260+
} catch (Exception e) {
261+
// Resque seems to sometimes switch the type of the queue
262+
LOG.warn("Redis did not return a zset for {}, trying to retrieve as a list", queueName, e);
263+
queue = new HashSet<>(jedis.lrange(queueName, 0, -1));
264+
}
265+
266+
ObjectMapper mapper = new ObjectMapper();
267+
for (String entry : queue) {
268+
try {
269+
JsonNode node = mapper.readTree(entry);
270+
String depositId = node.get("args").get(1).asText();
271+
Set<String> jobs = depositMap.get(depositId);
272+
if (jobs == null) {
273+
jobs = new HashSet<>();
274+
depositMap.put(depositId, jobs);
275+
}
276+
jobs.add(node.get("class").asText());
277+
} catch (IOException e) {
278+
LOG.error("Failed to parse deposit job from resque", e);
279+
}
280+
}
231281
}
232282

233283
/**
234284
* Add jobs previously running or queued back to the queue
235285
*/
236286
private void requeueAll() {
287+
288+
Map<String, Set<String>> depositSet = getQueuedDepositsWithJobs();
237289
Set<Map<String, String>> depositStatuses = depositStatusFactory.getAll();
238290

239291
LOG.info("Repopulating the deposit queue, {} items in backlog", depositStatuses.size());
@@ -246,7 +298,16 @@ private void requeueAll() {
246298
// Job may have been locked to a particular supervisor depend on when it was interrupted
247299
depositStatusFactory.removeSupervisorLock(uuid);
248300
// Inform supervisor to resume this deposit from where it left off
249-
depositStatusFactory.setActionRequest(uuid, DepositAction.resume);
301+
if (depositSet.containsKey(uuid)) {
302+
// If the job is queued but the job it is waiting on is a cleanup, then it is finished
303+
if (depositSet.get(uuid).contains(CleanupDepositJob.class.getName())) {
304+
depositStatusFactory.setState(uuid, DepositState.finished);
305+
} else {
306+
LOG.debug("Skipping resumption of deposit {} because it already is in the queue", uuid);
307+
}
308+
} else {
309+
depositStatusFactory.setActionRequest(uuid, DepositAction.resume);
310+
}
250311
}
251312
}
252313

@@ -257,7 +318,21 @@ private void requeueAll() {
257318

258319
depositStatusFactory.removeSupervisorLock(uuid);
259320
// Re-register as a new deposit
260-
depositStatusFactory.setActionRequest(uuid, DepositAction.register);
321+
if (depositSet.containsKey(uuid)) {
322+
if (depositSet.get(uuid).contains(CleanupDepositJob.class.getName())) {
323+
depositStatusFactory.setState(uuid, DepositState.finished);
324+
} else {
325+
LOG.debug("Skipping resumption of queued deposit {} because it already is in the queue", uuid);
326+
}
327+
} else {
328+
List<String> successfulJobs = jobStatusFactory.getSuccessfulJobNames(uuid);
329+
if (successfulJobs != null && successfulJobs.size() > 0) {
330+
// Queued but had already performed some jobs, so this is a resumption rather than new deposit
331+
depositStatusFactory.setActionRequest(uuid, DepositAction.resume);
332+
} else {
333+
depositStatusFactory.setActionRequest(uuid, DepositAction.register);
334+
}
335+
}
261336
}
262337
}
263338
}
@@ -607,9 +682,18 @@ private void resumeDeposit(String uuid, Map<String, String> status, long delay)
607682
// Clear out the previous failed job if there was one
608683
jobStatusFactory.clearStale(uuid);
609684
depositStatusFactory.deleteField(uuid, DepositField.errorMessage);
685+
686+
boolean enqueueNext = true;
687+
if (DepositState.paused.name().equals(status.get(DepositField.state.name()))) {
688+
Map<String, Set<String>> depositSet = getQueuedDepositsWithJobs();
689+
enqueueNext = !depositSet.containsKey(uuid);
690+
LOG.info("Resuming from paused state. {} will enqueue a new job {}", uuid, enqueueNext);
691+
}
610692

611-
List<String> successfulJobs = jobStatusFactory.getSuccessfulJobNames(uuid);
612-
queueNextJob(null, uuid, status, successfulJobs, delay);
693+
if (enqueueNext) {
694+
List<String> successfulJobs = jobStatusFactory.getSuccessfulJobNames(uuid);
695+
queueNextJob(null, uuid, status, successfulJobs, delay);
696+
}
613697

614698
depositStatusFactory.setState(uuid, DepositState.queued);
615699
} catch (DepositFailedException e) {

metadata/src/main/java/edu/unc/lib/dl/util/RedisWorkerConstants.java

+5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ public class RedisWorkerConstants {
99
public static final String DEPOSIT_TO_JOBS_PREFIX = "deposit-to-jobs:";
1010
public static final String JOB_STATUS_PREFIX = "job-status:";
1111

12+
public static final String DEPOSIT_PREPARE_QUEUE = "PREPARE";
13+
public static final String DEPOSIT_DELAYED_QUEUE = "DELAYED_PREPARE";
14+
public static final String DEPOSIT_CDRMETS_QUEUE = "CDRMETSCONVERT";
15+
public static final String RESQUE_QUEUE_PREFIX = "resque:queue:";
16+
1217
public static enum DepositField {
1318
uuid, state, actionRequest, contactName, depositorName, intSenderIdentifier, intSenderDescription,
1419
fileName, resubmitDirName, resubmitFileName, isResubmit, depositMethod, containerId, payLoadOctets,

0 commit comments

Comments
 (0)