Skip to content

Commit ccae517

Browse files
committed
Releasing jedis resources after finished with them in bulk import code
1 parent 89fdd7f commit ccae517

File tree

2 files changed

+58
-53
lines changed

2 files changed

+58
-53
lines changed

persistence/src/main/java/edu/unc/lib/dl/update/BulkMetadataUIPProcessor.java

+39-35
Original file line numberDiff line numberDiff line change
@@ -215,22 +215,24 @@ public void process(UpdateInformationPackage uip) throws UpdateException, UIPExc
215215
}
216216

217217
private void updateResumptionPoint(PID uipPID, BulkMetadataDatastreamUIP singleUIP) {
218-
Jedis jedis = jedisPool.getResource();
219-
Map<String, String> values = new HashMap<>();
220-
values.put("lastPid", singleUIP.getPID().getPid());
221-
values.put("lastDatastream", singleUIP.getDatastream());
222-
jedis.hmset(RedisWorkerConstants.BULK_RESUME_PREFIX + uipPID.getPid(), values);
218+
try (Jedis jedis = jedisPool.getResource()) {
219+
Map<String, String> values = new HashMap<>();
220+
values.put("lastPid", singleUIP.getPID().getPid());
221+
values.put("lastDatastream", singleUIP.getDatastream());
222+
jedis.hmset(RedisWorkerConstants.BULK_RESUME_PREFIX + uipPID.getPid(), values);
223+
}
223224
}
224225

225226
private void storeUpdateInformation(BulkMetadataUIP uip) {
226-
Jedis jedis = jedisPool.getResource();
227-
Map<String, String> values = new HashMap<>();
228-
values.put("email", uip.getEmailAddress());
229-
values.put("user", uip.getUser());
230-
values.put("groups", uip.getGroups().toString());
231-
values.put("filePath", uip.getImportFile().getAbsolutePath());
232-
values.put("originalFilename", uip.getOriginalFilename());
233-
jedis.hmset(RedisWorkerConstants.BULK_UPDATE_PREFIX + uip.getPID().getPid(), values);
227+
try (Jedis jedis = jedisPool.getResource()) {
228+
Map<String, String> values = new HashMap<>();
229+
values.put("email", uip.getEmailAddress());
230+
values.put("user", uip.getUser());
231+
values.put("groups", uip.getGroups().toString());
232+
values.put("filePath", uip.getImportFile().getAbsolutePath());
233+
values.put("originalFilename", uip.getOriginalFilename());
234+
jedis.hmset(RedisWorkerConstants.BULK_UPDATE_PREFIX + uip.getPID().getPid(), values);
235+
}
234236
}
235237

236238
/**
@@ -242,26 +244,27 @@ private void storeUpdateInformation(BulkMetadataUIP uip) {
242244
* @throws UpdateException
243245
*/
244246
private void resume(BulkMetadataUIP uip) throws UpdateException {
245-
Jedis jedis = jedisPool.getResource();
246-
Map<String, String> resumeValues = jedis.hgetAll(RedisWorkerConstants.BULK_RESUME_PREFIX + uip.getPID().getPid());
247-
if (resumeValues == null) {
248-
// No resumption info, so store update info just in case
249-
storeUpdateInformation(uip);
250-
return;
251-
}
252-
253-
// If the update file doesn't exist anymore, clear this update out so it doesn't stick around forever
254-
if (!uip.getImportFile().exists()) {
255-
cleanup(uip);
256-
throw new UpdateException("Unable to resume update " + uip.getPID() + ", could not find update file");
247+
try (Jedis jedis = jedisPool.getResource()) {
248+
Map<String, String> resumeValues = jedis.hgetAll(RedisWorkerConstants.BULK_RESUME_PREFIX + uip.getPID().getPid());
249+
if (resumeValues == null) {
250+
// No resumption info, so store update info just in case
251+
storeUpdateInformation(uip);
252+
return;
253+
}
254+
255+
// If the update file doesn't exist anymore, clear this update out so it doesn't stick around forever
256+
if (!uip.getImportFile().exists()) {
257+
cleanup(uip);
258+
throw new UpdateException("Unable to resume update " + uip.getPID() + ", could not find update file");
259+
}
260+
261+
// Move the update cursor past the last updated object
262+
try {
263+
uip.seekNextUpdate(new PID(resumeValues.get("lastPid")), resumeValues.get("lastDatastream"));
264+
} catch (Exception e) {
265+
cleanup(uip);
266+
throw new UpdateException("Failed to parse update package while resuming", e);
257267
}
258-
259-
// Move the update cursor past the last updated object
260-
try {
261-
uip.seekNextUpdate(new PID(resumeValues.get("lastPid")), resumeValues.get("lastDatastream"));
262-
} catch (Exception e) {
263-
cleanup(uip);
264-
throw new UpdateException("Failed to parse update package while resuming", e);
265268
}
266269
}
267270

@@ -272,9 +275,10 @@ private void resume(BulkMetadataUIP uip) throws UpdateException {
272275
private void cleanup(BulkMetadataUIP uip) {
273276
String pid = uip.getPID().getPid();
274277

275-
Jedis jedis = jedisPool.getResource();
276-
jedis.del(RedisWorkerConstants.BULK_UPDATE_PREFIX + pid);
277-
jedis.del(RedisWorkerConstants.BULK_RESUME_PREFIX + pid);
278+
try (Jedis jedis = jedisPool.getResource()) {
279+
jedis.del(RedisWorkerConstants.BULK_UPDATE_PREFIX + pid);
280+
jedis.del(RedisWorkerConstants.BULK_RESUME_PREFIX + pid);
281+
}
278282

279283
uip.getImportFile().delete();
280284
}

services-worker/src/main/java/edu/unc/lib/dl/cdr/services/processing/BulkMetadataUpdateConductor.java

+19-18
Original file line numberDiff line numberDiff line change
@@ -57,25 +57,26 @@ public void add(String updateId, String email, String username, Collection<Strin
5757
}
5858

5959
public void resumeIncompleteUpdates() {
60-
Jedis jedis = jedisPool.getResource();
61-
Set<String> incompleteUpdates = jedis.keys(RedisWorkerConstants.BULK_UPDATE_PREFIX + "*");
62-
63-
for (String incomplete : incompleteUpdates) {
64-
Map<String, String> updateValues = jedis.hgetAll(incomplete);
65-
66-
String updateId = incomplete.split(":", 2)[1];
60+
try (Jedis jedis = jedisPool.getResource()) {
61+
Set<String> incompleteUpdates = jedis.keys(RedisWorkerConstants.BULK_UPDATE_PREFIX + "*");
6762

68-
// If the import file doesn't exist, then can't resume
69-
File importFile = new File(updateValues.get("filePath"));
70-
if (!importFile.exists()) {
71-
log.warn("Failed to resume update {} for user {} because the file no longer existed",
72-
updateValues.get("originalFilename"), updateValues.get("user"));
73-
jedis.del(incomplete);
74-
jedis.del(RedisWorkerConstants.BULK_RESUME_PREFIX + updateId);
75-
} else {
76-
add(updateId, updateValues.get("email"), updateValues.get("user"),
77-
Arrays.asList(updateValues.get("groups").split(" ")),
78-
importFile, updateValues.get("originalFilename"));
63+
for (String incomplete : incompleteUpdates) {
64+
Map<String, String> updateValues = jedis.hgetAll(incomplete);
65+
66+
String updateId = incomplete.split(":", 2)[1];
67+
68+
// If the import file doesn't exist, then can't resume
69+
File importFile = new File(updateValues.get("filePath"));
70+
if (!importFile.exists()) {
71+
log.warn("Failed to resume update {} for user {} because the file no longer existed",
72+
updateValues.get("originalFilename"), updateValues.get("user"));
73+
jedis.del(incomplete);
74+
jedis.del(RedisWorkerConstants.BULK_RESUME_PREFIX + updateId);
75+
} else {
76+
add(updateId, updateValues.get("email"), updateValues.get("user"),
77+
Arrays.asList(updateValues.get("groups").split(" ")),
78+
importFile, updateValues.get("originalFilename"));
79+
}
7980
}
8081
}
8182
}

0 commit comments

Comments
 (0)