Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enables locking on an arbitrary lockID #164

Merged
merged 7 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public LockModel(String jobIndexName, String jobId, Instant lockTime,
long lockDurationSeconds, boolean released, long seqNo, long primaryTerm) {
this.lockId = jobIndexName + LOCK_ID_DELIMITR + jobId;
this.jobIndexName = jobIndexName;
// The jobId parameter does not necessarily need to represent the id of a job scheduler job, as it is being used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be create an issue in repo to rename this completely to just lockId everywhere? so the paradigm is changed in job scheduler - cleaning up stale locks and any other refactor needed with this change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

// to scope the lock, and could represent any resource.
this.jobId = jobId;
this.lockTime = lockTime;
this.lockDurationSeconds = lockDurationSeconds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,46 @@ public void acquireLock(final ScheduledJobParameter jobParameter,
final JobExecutionContext context, ActionListener<LockModel> listener) {
final String jobIndexName = context.getJobIndexName();
final String jobId = context.getJobId();
if (jobParameter.getLockDurationSeconds() == null) {
final long lockDurationSeconds = jobParameter.getLockDurationSeconds();
acquireLockWithId(jobIndexName, lockDurationSeconds, jobId, listener);
}

/**
* Attempts to acquire a lock with a specific lock Id. If the lock does not exist it attempts to create the lock document.
* If the Lock document exists, it will try to update and acquire the lock.
*
* @param jobIndexName a non-null job index name.
* @param lockDurationSeconds the amount of time in seconds that the lock should exist
* @param lockId the unique Id for the lock. This should represent the resource that the lock is on, whether it be
* a job, or some other arbitrary resource. If the lockID matches a jobID, then the lock will be deleted
* when the job is deleted.
* @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the lock if it was acquired
* or else null. Passes {@code IllegalArgumentException} to onFailure if the {@code ScheduledJobParameter} does not
* have {@code LockDurationSeconds}.
*/
public void acquireLockWithId(final String jobIndexName,
final Long lockDurationSeconds,
final String lockId,
ActionListener<LockModel> listener) {
if (lockDurationSeconds == null) {
listener.onFailure(new IllegalArgumentException("Job LockDuration should not be null"));
} else if (jobIndexName == null) {
listener.onFailure(new IllegalArgumentException("Job index name should not be null"));
} else if (lockId == null) {
listener.onFailure(new IllegalArgumentException("Lock ID should not be null"));
} else {
final long lockDurationSecond = jobParameter.getLockDurationSeconds();
createLockIndex(ActionListener.wrap(
created -> {
if (created) {
try {
findLock(LockModel.generateLockId(jobIndexName, jobId), ActionListener.wrap(
findLock(LockModel.generateLockId(jobIndexName, lockId), ActionListener.wrap(
existingLock -> {
if (existingLock != null) {
if (isLockReleasedOrExpired(existingLock)) {
// Lock is expired. Attempt to acquire lock.
logger.debug("lock is released or expired: " + existingLock);
LockModel updateLock = new LockModel(existingLock, getNow(),
lockDurationSecond, false);
lockDurationSeconds, false);
updateLock(updateLock, listener);
} else {
logger.debug("Lock is NOT released or expired. " + existingLock);
Expand All @@ -130,8 +154,9 @@ public void acquireLock(final ScheduledJobParameter jobParameter,
}
} else {
// There is no lock object and it is first time. Create new lock.
LockModel tempLock = new LockModel(jobIndexName, jobId, getNow(),
lockDurationSecond, false);
// Note that the lockID will be set to {jobIndexName}-{lockId}
LockModel tempLock = new LockModel(jobIndexName, lockId, getNow(),
lockDurationSeconds, false);
logger.debug("Lock does not exist. Creating new lock" + tempLock);
createLock(tempLock, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@

package org.opensearch.jobscheduler.spi.utils;

import org.junit.Before;
import org.junit.Ignore;
import org.mockito.Mockito;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.jobscheduler.spi.JobDocVersion;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.schedule.Schedule;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
import org.junit.Ignore;
import org.mockito.Mockito;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -117,17 +117,56 @@ public void testSanity() throws Exception {
latch.await(5L, TimeUnit.SECONDS);
}

public void testSanityWithCustomLockID() throws Exception {
String lockID = "sanity_test_lock";
String uniqSuffix = "_sanity";
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0),
lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix);
Instant testTime = Instant.now();
lockService.setTime(testTime);
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock -> {
assertNotNull("Expected to successfully grab lock.", lock);
assertEquals("job_id does not match.", lockID, lock.getJobId());
assertEquals("job_index_name does not match.", JOB_INDEX_NAME + uniqSuffix, lock.getJobIndexName());
assertEquals("lock_id does not match.", lock.getJobIndexName() + "-" + lockID, lock.getLockId());
assertEquals("lock_duration_seconds does not match.", LOCK_DURATION_SECONDS, lock.getLockDurationSeconds());
assertEquals("lock_time does not match.", testTime.getEpochSecond(), lock.getLockTime().getEpochSecond());
assertFalse("Lock should not be released.", lock.isReleased());
assertFalse("Lock should not expire.", lock.isExpired());
lockService.release(lock, ActionListener.wrap(
released -> {
assertTrue("Failed to release lock.", released);
lockService.deleteLock(lock.getLockId(), ActionListener.wrap(
deleted -> {
assertTrue("Failed to delete lock.", deleted);
latch.countDown();
},
exception -> fail(exception.getMessage())
));
},
exception -> fail(exception.getMessage())
));
},
exception -> fail(exception.getMessage())
));
latch.await(5L, TimeUnit.SECONDS);
}

public void testSecondAcquireLockFail() throws Exception {
String uniqSuffix = "_second_acquire";
String lockID = randomAlphaOfLengthBetween(6, 15);
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0),
lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix);

lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock -> {
assertNotNull("Expected to successfully grab lock", lock);
lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock2 -> {
assertNull("Expected to failed to get lock.", lock2);
lockService.release(lock, ActionListener.wrap(
Expand All @@ -154,18 +193,19 @@ public void testSecondAcquireLockFail() throws Exception {

public void testLockReleasedAndAcquired() throws Exception {
String uniqSuffix = "_lock_release+acquire";
String lockID = randomAlphaOfLengthBetween(6, 15);
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0),
lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix);

lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock -> {
assertNotNull("Expected to successfully grab lock", lock);
lockService.release(lock, ActionListener.wrap(
released -> {
assertTrue("Failed to release lock.", released);
lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock2 -> {
assertNotNull("Expected to successfully grab lock2", lock2);
lockService.release(lock2, ActionListener.wrap(
Expand Down Expand Up @@ -195,20 +235,20 @@ public void testLockReleasedAndAcquired() throws Exception {

public void testLockExpired() throws Exception {
String uniqSuffix = "_lock_expire";
String lockID = randomAlphaOfLengthBetween(6, 15);
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
// Set lock time in the past.
lockService.setTime(Instant.now().minus(Duration.ofSeconds(LOCK_DURATION_SECONDS + LOCK_DURATION_SECONDS)));
final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0),
lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix);


lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock -> {
assertNotNull("Expected to successfully grab lock", lock);
// Set lock back to current time to make the lock expire.
lockService.setTime(null);
lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock2 -> {
assertNotNull("Expected to successfully grab lock", lock2);
lockService.release(lock, ActionListener.wrap(
Expand Down Expand Up @@ -280,20 +320,20 @@ public void testDeleteNonExistingLock() throws Exception {
@Ignore
public void testMultiThreadCreateLock() throws Exception {
String uniqSuffix = "_multi_thread_create";
String lockID = randomAlphaOfLengthBetween(6, 15);
CountDownLatch latch = new CountDownLatch(1);
final LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0),
lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix);


lockService.createLockIndex(ActionListener.wrap(
created -> {
if (created) {
ExecutorService executor = Executors.newFixedThreadPool(3);
final AtomicReference<LockModel> lockModelAtomicReference = new AtomicReference<>(null);
Callable<Boolean> callable = () -> {
CountDownLatch callableLatch = new CountDownLatch(1);
lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock -> {
if (lock != null) {
lockModelAtomicReference.set(lock);
Expand Down Expand Up @@ -355,6 +395,7 @@ public void testMultiThreadCreateLock() throws Exception {
@Ignore
public void testMultiThreadAcquireLock() throws Exception {
String uniqSuffix = "_multi_thread_acquire";
String lockID = randomAlphaOfLengthBetween(6, 15);
CountDownLatch latch = new CountDownLatch(1);
final LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0),
Expand All @@ -365,7 +406,7 @@ public void testMultiThreadAcquireLock() throws Exception {
if (created) {
// Set lock time in the past.
lockService.setTime(Instant.now().minus(Duration.ofSeconds(LOCK_DURATION_SECONDS + LOCK_DURATION_SECONDS)));
lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
createdLock -> {
assertNotNull(createdLock);
// Set lock back to current time to make the lock expire.
Expand All @@ -375,7 +416,7 @@ public void testMultiThreadAcquireLock() throws Exception {
final AtomicReference<LockModel> lockModelAtomicReference = new AtomicReference<>(null);
Callable<Boolean> callable = () -> {
CountDownLatch callableLatch = new CountDownLatch(1);
lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock -> {
if (lock != null) {
lockModelAtomicReference.set(lock);
Expand Down Expand Up @@ -430,12 +471,13 @@ public void testMultiThreadAcquireLock() throws Exception {

public void testRenewLock() throws Exception {
String uniqSuffix = "_lock_renew";
String lockID = randomAlphaOfLengthBetween(6, 15);
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(Instant.now(), new JobDocVersion(0, 0, 0),
lockService, JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix);

lockService.acquireLock(TEST_SCHEDULED_JOB_PARAM, context, ActionListener.wrap(
lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(
lock -> {
assertNotNull("Expected to successfully grab lock", lock);
// Set the time of LockService (the 'lockTime' of acquired locks) to a fixed time.
Expand Down