Skip to content

Commit

Permalink
[FLINK-35601][test] Revert the junit5 migration of InitOutputPathTest
Browse files Browse the repository at this point in the history
PR#24881 introduce some issue during the migration to junit5, temporarily revert the changes to `InitOutputPathTest` file.
  • Loading branch information
reswqa committed Jun 17, 2024
1 parent 2385cc0 commit 4b51067
Showing 1 changed file with 51 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,52 +23,59 @@
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.testutils.junit.utils.TempDirUtils;

import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.file.FileAlreadyExistsException;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.fail;
import static org.powermock.api.mockito.PowerMockito.whenNew;

/** A test validating that the initialization of local output paths is properly synchronized. */
class InitOutputPathTest {
@RunWith(PowerMockRunner.class)
@PrepareForTest(LocalFileSystem.class)
public class InitOutputPathTest {

@TempDir private static java.nio.file.Path tempFolder;
@Rule public final TemporaryFolder tempDir = new TemporaryFolder();

/**
* This test validates that this test case makes sense - that the error can be produced in the
* absence of synchronization, if the threads make progress in a certain way, here enforced by
* latches.
*/
@Test
void testErrorOccursUnSynchronized() throws Exception {
public void testErrorOccursUnSynchronized() throws Exception {
// deactivate the lock to produce the original un-synchronized state
Field lock = FileSystem.class.getDeclaredField("OUTPUT_DIRECTORY_INIT_LOCK");
lock.setAccessible(true);

Field modifiers = Field.class.getDeclaredField("modifiers");
modifiers.setAccessible(true);
modifiers.setInt(lock, lock.getModifiers() & ~Modifier.FINAL);

lock.set(null, new NoOpLock());
// in the original un-synchronized state, we can force the race to occur by using
// the proper latch order to control the process of the concurrent threads
assertThatThrownBy(() -> runTest(true)).isInstanceOf(FileNotFoundException.class);
lock.set(null, new ReentrantLock(true));

try {
// in the original un-synchronized state, we can force the race to occur by using
// the proper latch order to control the process of the concurrent threads
runTest(true);
fail("should fail with an exception");
} catch (FileNotFoundException e) {
// expected
} finally {
// reset the proper value
lock.set(null, new ReentrantLock(true));
}
}

@Test
void testProperSynchronized() throws Exception {
public void testProperSynchronized() throws Exception {
// in the synchronized variant, we cannot use the "await latches" because not
// both threads can make process interleaved (due to the synchronization)
// the test uses sleeps (rather than latches) to produce the same interleaving.
Expand All @@ -80,7 +87,7 @@ void testProperSynchronized() throws Exception {
}

private void runTest(final boolean useAwaits) throws Exception {
final File tempFile = TempDirUtils.newFile(tempFolder);
final File tempFile = tempDir.newFile();
final Path path1 = new Path(tempFile.getAbsolutePath(), "1");
final Path path2 = new Path(tempFile.getAbsolutePath(), "2");

Expand All @@ -97,23 +104,32 @@ private void runTest(final boolean useAwaits) throws Exception {
final OneShotLatch createAwaitLatch = new OneShotLatch();
final OneShotLatch createTriggerLatch = new OneShotLatch();

// this "new LocalDataOutputStream()" is in the end called by the async threads
whenNew(LocalDataOutputStream.class)
.withAnyArguments()
.thenAnswer(
new Answer<LocalDataOutputStream>() {

@Override
public LocalDataOutputStream answer(InvocationOnMock invocation)
throws Throwable {
createAwaitLatch.trigger();
createTriggerLatch.await();

final File file = (File) invocation.getArguments()[0];
return new LocalDataOutputStream(file);
}
});

final LocalFileSystem fs1 =
new SyncedFileSystem(
deleteAwaitLatch1,
mkdirsAwaitLatch1,
deleteTriggerLatch1,
mkdirsTriggerLatch1,
createAwaitLatch,
createTriggerLatch);
deleteAwaitLatch1, mkdirsAwaitLatch1,
deleteTriggerLatch1, mkdirsTriggerLatch1);

final LocalFileSystem fs2 =
new SyncedFileSystem(
deleteAwaitLatch2,
mkdirsAwaitLatch2,
deletetriggerLatch2,
mkdirsTriggerLatch2,
createAwaitLatch,
createTriggerLatch);
deleteAwaitLatch2, mkdirsAwaitLatch2,
deletetriggerLatch2, mkdirsTriggerLatch2);

// start the concurrent file creators
FileCreator thread1 = new FileCreator(fs1, path1);
Expand Down Expand Up @@ -195,44 +211,16 @@ private static class SyncedFileSystem extends LocalFileSystem {
private final OneShotLatch deleteAwaitLatch;
private final OneShotLatch mkdirsAwaitLatch;

private final OneShotLatch createAwaitLatch;
private final OneShotLatch createTriggerLatch;

SyncedFileSystem(
OneShotLatch deleteTriggerLatch,
OneShotLatch mkdirsTriggerLatch,
OneShotLatch deleteAwaitLatch,
OneShotLatch mkdirsAwaitLatch,
OneShotLatch createAwaitLatch,
OneShotLatch createTriggerLatch) {
OneShotLatch mkdirsAwaitLatch) {

this.deleteTriggerLatch = deleteTriggerLatch;
this.mkdirsTriggerLatch = mkdirsTriggerLatch;
this.deleteAwaitLatch = deleteAwaitLatch;
this.mkdirsAwaitLatch = mkdirsAwaitLatch;
this.createAwaitLatch = createAwaitLatch;
this.createTriggerLatch = createTriggerLatch;
}

@Override
@SneakyThrows
public FSDataOutputStream create(final Path filePath, final WriteMode overwrite)
throws IOException {
checkNotNull(filePath, "filePath");

if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) {
throw new FileAlreadyExistsException("File already exists: " + filePath);
}

final Path parent = filePath.getParent();
if (parent != null && !mkdirs(parent)) {
throw new IOException("Mkdirs failed to create " + parent);
}

final File file = pathToFile(filePath);
createAwaitLatch.trigger();
createTriggerLatch.await();
return new LocalDataOutputStream(file);
}

@Override
Expand Down

0 comments on commit 4b51067

Please sign in to comment.