|
1 | 1 | package edu.unc.lib.deposit.work;
|
2 | 2 |
|
3 | 3 | import java.io.File;
|
4 |
| -import java.io.IOException; |
5 | 4 | import java.text.MessageFormat;
|
6 | 5 | import java.util.Arrays;
|
7 | 6 | import java.util.HashMap;
|
|
18 | 17 |
|
19 | 18 | import net.greghaines.jesque.Job;
|
20 | 19 | import net.greghaines.jesque.client.Client;
|
| 20 | +import net.greghaines.jesque.meta.QueueInfo; |
| 21 | +import net.greghaines.jesque.meta.dao.QueueInfoDAO; |
21 | 22 | import net.greghaines.jesque.worker.Worker;
|
22 | 23 | import net.greghaines.jesque.worker.WorkerEvent;
|
23 | 24 | import net.greghaines.jesque.worker.WorkerListener;
|
24 | 25 | import net.greghaines.jesque.worker.WorkerPool;
|
25 | 26 |
|
26 |
| -import org.codehaus.jackson.JsonNode; |
27 |
| -import org.codehaus.jackson.map.ObjectMapper; |
28 | 27 | import org.slf4j.Logger;
|
29 | 28 | import org.slf4j.LoggerFactory;
|
30 | 29 | import org.springframework.beans.factory.annotation.Autowired;
|
31 | 30 |
|
32 |
| -import redis.clients.jedis.Jedis; |
33 |
| -import redis.clients.jedis.JedisPool; |
34 | 31 | import edu.unc.lib.deposit.CleanupDepositJob;
|
35 | 32 | import edu.unc.lib.deposit.PrepareResubmitJob;
|
36 | 33 | import edu.unc.lib.deposit.fcrepo3.IngestDeposit;
|
@@ -81,7 +78,7 @@ public class DepositSupervisor implements WorkerListener {
|
81 | 78 | private WorkerPool cdrMetsDepositWorkerPool;
|
82 | 79 |
|
83 | 80 | @Autowired
|
84 |
| - private JedisPool jedisPool; |
| 81 | + private QueueInfoDAO queueDAO; |
85 | 82 |
|
86 | 83 | @Autowired
|
87 | 84 | private DepositEmailHandler depositEmailHandler;
|
@@ -243,40 +240,24 @@ public void run() {
|
243 | 240 |
|
244 | 241 | private Map<String, Set<String>> getQueuedDepositsWithJobs() {
|
245 | 242 | Map<String, Set<String>> depositMap = new HashMap<>();
|
246 |
| - addQueuedDeposits( |
247 |
| - RedisWorkerConstants.RESQUE_QUEUE_PREFIX + RedisWorkerConstants.DEPOSIT_PREPARE_QUEUE, depositMap); |
248 |
| - addQueuedDeposits( |
249 |
| - RedisWorkerConstants.RESQUE_QUEUE_PREFIX + RedisWorkerConstants.DEPOSIT_DELAYED_QUEUE, depositMap); |
250 |
| - addQueuedDeposits( |
251 |
| - RedisWorkerConstants.RESQUE_QUEUE_PREFIX + RedisWorkerConstants.DEPOSIT_CDRMETS_QUEUE, depositMap); |
| 243 | + addQueuedDeposits(RedisWorkerConstants.DEPOSIT_PREPARE_QUEUE, depositMap); |
| 244 | + addQueuedDeposits(RedisWorkerConstants.DEPOSIT_DELAYED_QUEUE, depositMap); |
| 245 | + addQueuedDeposits(RedisWorkerConstants.DEPOSIT_CDRMETS_QUEUE, depositMap); |
252 | 246 | return depositMap;
|
253 | 247 | }
|
254 | 248 |
|
255 | 249 | private void addQueuedDeposits(String queueName, Map<String, Set<String>> depositMap) {
|
256 |
| - Jedis jedis = jedisPool.getResource(); |
257 |
| - Set<String> queue; |
258 |
| - try { |
259 |
| - queue = jedis.zrange(queueName, 0, -1); |
260 |
| - } catch (Exception e) { |
261 |
| - // Resque seems to sometimes switch the type of the queue |
262 |
| - LOG.warn("Redis did not return a zset for {}, trying to retrieve as a list", queueName, e); |
263 |
| - queue = new HashSet<>(jedis.lrange(queueName, 0, -1)); |
264 |
| - } |
| 250 | + QueueInfo info = queueDAO.getQueueInfo(queueName, 0, 0); |
265 | 251 |
|
266 |
| - ObjectMapper mapper = new ObjectMapper(); |
267 |
| - for (String entry : queue) { |
268 |
| - try { |
269 |
| - JsonNode node = mapper.readTree(entry); |
270 |
| - String depositId = node.get("args").get(1).asText(); |
271 |
| - Set<String> jobs = depositMap.get(depositId); |
272 |
| - if (jobs == null) { |
273 |
| - jobs = new HashSet<>(); |
274 |
| - depositMap.put(depositId, jobs); |
275 |
| - } |
276 |
| - jobs.add(node.get("class").asText()); |
277 |
| - } catch (IOException e) { |
278 |
| - LOG.error("Failed to parse deposit job from resque", e); |
| 252 | + for (Job job : info.getJobs()) { |
| 253 | + String depositId = (String) job.getArgs()[1]; |
| 254 | + |
| 255 | + Set<String> jobs = depositMap.get(depositId); |
| 256 | + if (jobs == null) { |
| 257 | + jobs = new HashSet<>(); |
| 258 | + depositMap.put(depositId, jobs); |
279 | 259 | }
|
| 260 | + jobs.add(job.getClassName()); |
280 | 261 | }
|
281 | 262 | }
|
282 | 263 |
|
|
0 commit comments