Skip to content

Commit

Permalink
Fix flaky tests
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <[email protected]>
  • Loading branch information
prudhvigodithi committed Jan 29, 2024
1 parent 761c9bb commit 04142ca
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 63 deletions.
5 changes: 5 additions & 0 deletions spi/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ plugins {
id 'jacoco'
id 'maven-publish'
id 'signing'
id "org.gradle.test-retry" version "1.5.8"
}

apply plugin: 'opensearch.java'
Expand Down Expand Up @@ -70,6 +71,10 @@ shadowJar {
}

test {
retry {
failOnPassedAfterRetry = false
maxRetries = 5
}
doFirst {
// reverse operation of https://github.com/elastic/elasticsearch/blob/7.6/buildSrc/src/main/groovy/org/elasticsearch/gradle/BuildPlugin.groovy#L736-L743
// to fix the classpath for unit tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,20 @@ public void setup() {
// thus the OpenSearchIntegTestCase.clusterService() will throw exception.
this.clusterService = Mockito.mock(ClusterService.class, Mockito.RETURNS_DEEP_STUBS);
Mockito.when(this.clusterService.state().routingTable().hasIndex(".opendistro-job-scheduler-lock"))
.thenReturn(false)
.thenReturn(true);
.thenReturn(false)
.thenReturn(true);
}

public void testSanity() throws Exception {
String uniqSuffix = "_sanity";
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);
Instant testTime = Instant.now();
lockService.setTime(testTime);
Expand All @@ -108,9 +108,9 @@ public void testSanity() throws Exception {
assertEquals("job_id does not match.", JOB_ID + uniqSuffix, lock.getJobId());
assertEquals("job_index_name does not match.", JOB_INDEX_NAME + uniqSuffix, lock.getJobIndexName());
assertEquals(
"lock_id does not match.",
LockModel.generateLockId(JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix),
lock.getLockId()
"lock_id does not match.",
LockModel.generateLockId(JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix),
lock.getLockId()
);
assertEquals("lock_duration_seconds does not match.", LOCK_DURATION_SECONDS, lock.getLockDurationSeconds());
assertEquals("lock_time does not match.", testTime.getEpochSecond(), lock.getLockTime().getEpochSecond());
Expand All @@ -133,11 +133,11 @@ public void testSanityWithCustomLockID() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);
Instant testTime = Instant.now();
lockService.setTime(testTime);
Expand Down Expand Up @@ -167,11 +167,11 @@ public void testSecondAcquireLockFail() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand All @@ -196,11 +196,11 @@ public void testAcquireLockWithLongIdFail() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand All @@ -218,11 +218,11 @@ public void testLockReleasedAndAcquired() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand Down Expand Up @@ -252,11 +252,11 @@ public void testLockExpired() throws Exception {
// Set lock time in the past.
lockService.setTime(Instant.now().minus(Duration.ofSeconds(LOCK_DURATION_SECONDS + LOCK_DURATION_SECONDS)));
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand Down Expand Up @@ -316,11 +316,11 @@ public void testMultiThreadCreateLock() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.createLockIndex(ActionListener.wrap(created -> {
Expand Down Expand Up @@ -379,11 +379,11 @@ public void testMultiThreadAcquireLock() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.createLockIndex(ActionListener.wrap(created -> {
Expand All @@ -400,16 +400,16 @@ public void testMultiThreadAcquireLock() throws Exception {
Callable<Boolean> callable = () -> {
CountDownLatch callableLatch = new CountDownLatch(1);
lockService.acquireLockWithId(
context.getJobIndexName(),
LOCK_DURATION_SECONDS,
lockID,
ActionListener.wrap(lock -> {
if (lock != null) {
lockModelAtomicReference.set(lock);
Integer test = multiThreadAcquireLockCounter.getAndAdd(1);
}
callableLatch.countDown();
}, exception -> fail(exception.getMessage()))
context.getJobIndexName(),
LOCK_DURATION_SECONDS,
lockID,
ActionListener.wrap(lock -> {
if (lock != null) {
lockModelAtomicReference.set(lock);
Integer test = multiThreadAcquireLockCounter.getAndAdd(1);
}
callableLatch.countDown();
}, exception -> fail(exception.getMessage()))
);
callableLatch.await(5L, TimeUnit.SECONDS);
return true;
Expand Down Expand Up @@ -446,11 +446,11 @@ public void testRenewLock() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand All @@ -463,9 +463,9 @@ public void testRenewLock() throws Exception {
assertNotNull("Expected to successfully renew lock", renewedLock);
assertEquals("lock_time is expected to be the renewal time.", now, renewedLock.getLockTime());
assertEquals(
"lock_duration is expected to be unchanged.",
lock.getLockDurationSeconds(),
renewedLock.getLockDurationSeconds()
"lock_duration is expected to be unchanged.",
lock.getLockDurationSeconds(),
renewedLock.getLockDurationSeconds()
);
lockService.release(lock, ActionListener.wrap(released -> {
assertTrue("Failed to release lock.", released);
Expand Down

0 comments on commit 04142ca

Please sign in to comment.