Skip to content

Commit 96ae0a5

Browse files
bbejeckguozhangwang
authored andcommitted
MINOR: Follow-up from KAFKA-3842 to fix tempDir
…p directories, waitForCondition Author: bbejeck <[email protected]> Reviewers: Guozhang Wang, Ismael Juma Closes apache#1554 from bbejeck/follow_up_for_KAFKA-3842
1 parent cf03f34 commit 96ae0a5

File tree

9 files changed

+49
-49
lines changed

9 files changed

+49
-49
lines changed

clients/src/test/java/org/apache/kafka/test/TestUtils.java

+30-20
Original file line numberDiff line numberDiff line change
@@ -123,20 +123,17 @@ public static File tempFile() throws IOException {
123123
*
124124
* @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
125125
*/
126-
public static File tempDirectory(String prefix) throws IOException {
126+
public static File tempDirectory(String prefix) {
127127
return tempDirectory(null, prefix);
128128
}
129129

130130
/**
131-
* Create a temporary directory named "test" under /temp
131+
* Create a temporary relative directory in the default temporary-file directory with a
132+
* prefix of "kafka-"
132133
* @return the temporary directory just created.
133134
*/
134-
public static File tempDir() {
135-
try {
136-
return tempDirectory(new File("/tmp").toPath(), "test");
137-
} catch (IOException ex) {
138-
throw new RuntimeException("Failed to create a temp dir", ex);
139-
}
135+
public static File tempDirectory() {
136+
return tempDirectory(null);
140137
}
141138

142139
/**
@@ -145,10 +142,15 @@ public static File tempDir() {
145142
* @param parent The parent folder path name, if null using the default temporary-file directory
146143
* @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
147144
*/
148-
public static File tempDirectory(Path parent, String prefix) throws IOException {
149-
final File file = parent == null ?
150-
Files.createTempDirectory(prefix == null ? "kafka-" : prefix).toFile() :
151-
Files.createTempDirectory(parent, prefix == null ? "kafka-" : prefix).toFile();
145+
public static File tempDirectory(Path parent, String prefix) {
146+
final File file;
147+
prefix = prefix == null ? "kafka-" : prefix;
148+
try {
149+
file = parent == null ?
150+
Files.createTempDirectory(prefix).toFile() : Files.createTempDirectory(parent, prefix).toFile();
151+
} catch (IOException ex) {
152+
throw new RuntimeException("Failed to create a temp dir", ex);
153+
}
152154
file.deleteOnExit();
153155

154156
Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -225,21 +227,29 @@ public static Properties consumerConfig(final String bootstrapServers, Class key
225227
}
226228

227229
/**
228-
* uses default value of 30 seconds for timeout
230+
* uses default value of 15 seconds for timeout
229231
*/
230-
public static void waitForCondition(TestCondition testCondition) throws InterruptedException {
231-
waitForCondition(testCondition, 30000);
232+
public static void waitForCondition(TestCondition testCondition, String conditionDetails) throws InterruptedException {
233+
waitForCondition(testCondition, 15000, conditionDetails);
232234
}
233235

234236
/**
235-
* Used to wait for specific conditions/state to be me during a test
236-
* this is meant to be a replacement for using Thread.sleep
237+
* Wait for condition to be met for at most {@code maxWaitMs} and throw assertion failure otherwise.
238+
* This should be used instead of {@code Thread.sleep} whenever possible as it allows a longer timeout to be used
239+
* without unnecessarily increasing test time (as the condition is checked frequently). The longer timeout is needed to
240+
* avoid transient failures due to slow or overloaded machines.
237241
*/
238-
public static void waitForCondition(TestCondition testCondition, long maxTimeMillis) throws InterruptedException {
242+
public static void waitForCondition(TestCondition testCondition, long maxWaitMs, String conditionDetails) throws InterruptedException {
239243
long startTime = System.currentTimeMillis();
240244

241-
while (!testCondition.conditionMet() && ((System.currentTimeMillis() - startTime) < maxTimeMillis)) {
242-
Thread.sleep(Math.min(maxTimeMillis, 100L));
245+
246+
while (!testCondition.conditionMet() && ((System.currentTimeMillis() - startTime) < maxWaitMs)) {
247+
Thread.sleep(Math.min(maxWaitMs, 100L));
248+
}
249+
250+
if (!testCondition.conditionMet()) {
251+
conditionDetails = conditionDetails != null ? conditionDetails : "";
252+
throw new AssertionError("Condition not met within timeout " + maxWaitMs + ". " + conditionDetails);
243253
}
244254
}
245255

streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public void shouldCompactTopicsForStateChangelogs() throws Exception {
123123
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
124124
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
125125
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
126-
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDir().getPath());
126+
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
127127
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
128128
KStreamBuilder builder = new KStreamBuilder();
129129

streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void shouldCountClicksPerRegion() throws Exception {
149149
// with automatically) we don't need to set this anymore and can update `purgeLocalStreamsState`
150150
// accordingly.
151151
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
152-
TestUtils.tempDir().getPath());
152+
TestUtils.tempDirectory().getPath());
153153

154154
// Remove any state from previous test runs
155155
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);

streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void before() {
8080
.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
8181
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
8282
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
83-
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDir().getPath());
83+
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
8484

8585
KeyValueMapper<Integer, String, String>
8686
mapper =

streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void before() {
8383
.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
8484
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
8585
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
86-
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDir().getPath());
86+
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
8787
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
8888

8989

streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java

+9-10
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public class RegexSourceIntegrationTest {
8686
private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
8787
private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
8888
private Properties streamsConfiguration;
89+
private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated";
8990

9091

9192
@BeforeClass
@@ -146,11 +147,12 @@ public void testRegexMatchesTopicsAWhenCreated() throws Exception {
146147
streamThreads[0] = testStreamThread;
147148
streams.start();
148149

149-
TestUtils.waitForCondition(tasksUpdated);
150+
TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
151+
testStreamThread.streamTaskUpdated = false;
150152

151153
CLUSTER.createTopic("TEST-TOPIC-2");
152154

153-
TestUtils.waitForCondition(tasksUpdated);
155+
TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
154156

155157
streams.close();
156158

@@ -193,11 +195,13 @@ public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
193195

194196
streams.start();
195197

196-
TestUtils.waitForCondition(tasksUpdated);
198+
TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
199+
//reset
200+
testStreamThread.streamTaskUpdated = false;
197201

198202
CLUSTER.deleteTopic("TEST-TOPIC-A");
199203

200-
TestUtils.waitForCondition(tasksUpdated);
204+
TestUtils.waitForCondition(tasksUpdated, STREAM_TASKS_NOT_UPDATED);
201205

202206
streams.close();
203207

@@ -331,12 +335,7 @@ private TestCondition createTasksUpdatedCondition(final TestStreamThread testStr
331335
return new TestCondition() {
332336
@Override
333337
public boolean conditionMet() {
334-
if (testStreamThread.streamTaskUpdated) {
335-
testStreamThread.streamTaskUpdated = false;
336-
return true;
337-
} else {
338-
return false;
339-
}
338+
return testStreamThread.streamTaskUpdated;
340339
}
341340
};
342341
}

streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java

+4-13
Original file line numberDiff line numberDiff line change
@@ -200,13 +200,9 @@ public boolean conditionMet() {
200200
}
201201
};
202202

203-
TestUtils.waitForCondition(valuesRead, waitTime);
203+
String conditionDetails = "Did not receive " + expectedNumRecords + " number of records";
204204

205-
if (accumData.size() < expectedNumRecords) {
206-
throw new AssertionError("Expected " + expectedNumRecords +
207-
" but received only " + accumData.size() +
208-
" records before timeout " + waitTime + " ms");
209-
}
205+
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
210206

211207
return accumData;
212208
}
@@ -243,16 +239,11 @@ public boolean conditionMet() {
243239
}
244240
};
245241

246-
TestUtils.waitForCondition(valuesRead, waitTime);
242+
String conditionDetails = "Did not receive " + expectedNumRecords + " number of records";
247243

248-
if (accumData.size() < expectedNumRecords) {
249-
throw new AssertionError("Expected " + expectedNumRecords +
250-
" but received only " + accumData.size() +
251-
" records before timeout " + waitTime + " ms");
252-
}
244+
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
253245

254246
return accumData;
255-
256247
}
257248

258249
}

streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public class ProcessorTopologyTest {
6666
@Before
6767
public void setup() {
6868
// Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
69-
File localState = TestUtils.tempDir();
69+
File localState = TestUtils.tempDirectory();
7070
Properties props = new Properties();
7171
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
7272
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");

streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySeria
211211
send(record, keySerializer, valueSerializer);
212212
}
213213
};
214-
this.stateDir = TestUtils.tempDir();
214+
this.stateDir = TestUtils.tempDirectory();
215215
this.stateDir.mkdirs();
216216

217217
Properties props = new Properties();

0 commit comments

Comments
 (0)