Skip to content

Commit 3ab74c2

Browse files
committed
fix: unify waitForSnapshotStarted / waitForSinkSize / waitForUpsertSinkSize utility methods with timeout
1 parent 3f84c46 commit 3ab74c2

File tree

59 files changed

+672
-1195
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+672
-1195
lines changed

flink-cdc-common/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,17 @@ limitations under the License.
2323
<groupId>org.apache.flink</groupId>
2424
<version>${revision}</version>
2525
</parent>
26+
27+
<dependencies>
28+
<dependency>
29+
<groupId>org.apache.flink</groupId>
30+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
31+
<version>${flink.version}</version>
32+
<type>test-jar</type>
33+
<scope>test</scope>
34+
</dependency>
35+
</dependencies>
36+
2637
<modelVersion>4.0.0</modelVersion>
2738

2839
<artifactId>flink-cdc-common</artifactId>

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/TestCaseUtils.java

Lines changed: 0 additions & 94 deletions
This file was deleted.
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.testutils;
19+
20+
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
21+
import org.apache.flink.util.function.SupplierWithException;
22+
23+
import java.time.Duration;
24+
import java.util.ArrayList;
25+
import java.util.Collections;
26+
import java.util.Iterator;
27+
import java.util.List;
28+
import java.util.concurrent.ConcurrentLinkedQueue;
29+
import java.util.concurrent.atomic.AtomicReference;
30+
import java.util.function.Function;
31+
import java.util.function.Predicate;
32+
import java.util.function.Supplier;
33+
import java.util.stream.Collectors;
34+
35+
/** Some utility methods for creating repeated-checking test cases. */
36+
public class TestCaseUtils {
37+
38+
public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
39+
public static final Duration DEFAULT_INTERVAL = Duration.ofSeconds(1);
40+
41+
/** Fetch with a ({@code timeout}, {@code interval}) duration. */
42+
public static void repeatedCheck(Supplier<Boolean> fetcher) {
43+
repeatedCheck(fetcher, DEFAULT_TIMEOUT);
44+
}
45+
46+
/** Fetch with a ({@code timeout}, {@code interval}) duration. */
47+
public static void repeatedCheck(Supplier<Boolean> fetcher, Duration timeout) {
48+
repeatedCheck(fetcher, timeout, DEFAULT_INTERVAL);
49+
}
50+
51+
/** Fetch with a ({@code timeout}, {@code interval}) duration. */
52+
public static void repeatedCheck(
53+
Supplier<Boolean> fetcher, Duration timeout, Duration interval) {
54+
repeatedCheck(fetcher::get, timeout, interval, Collections.emptyList());
55+
}
56+
57+
/** Fetch and wait with a ({@code timeout}, {@code interval}) duration. */
58+
public static <T> void repeatedCheck(
59+
Supplier<T> fetcher, Predicate<T> validator, Duration timeout, Duration interval) {
60+
repeatedCheckAndValidate(
61+
fetcher::get, validator, timeout, interval, Collections.emptyList());
62+
}
63+
64+
/** Waiting for fetching values with a ({@code timeout}, {@code interval}) duration. */
65+
public static void repeatedCheck(
66+
SupplierWithException<Boolean, Throwable> fetcher,
67+
Duration timeout,
68+
Duration interval,
69+
List<Class<? extends Throwable>> allowedThrowsList) {
70+
repeatedCheckAndValidate(fetcher, b -> b, timeout, interval, allowedThrowsList);
71+
}
72+
73+
/** Fetch and validate, with a ({@code timeout}, {@code interval}) duration. */
74+
public static <T> void repeatedCheckAndValidate(
75+
SupplierWithException<T, Throwable> fetcher,
76+
Predicate<T> validator,
77+
Duration timeout,
78+
Duration interval,
79+
List<Class<? extends Throwable>> allowedThrowsList) {
80+
81+
long start = System.currentTimeMillis();
82+
while (System.currentTimeMillis() - start < timeout.toMillis()) {
83+
try {
84+
if (validator.test(fetcher.get())) {
85+
return;
86+
}
87+
} catch (Throwable t) {
88+
if (allowedThrowsList.stream()
89+
.noneMatch(clazz -> clazz.isAssignableFrom(t.getClass()))) {
90+
throw new RuntimeException("Fetcher has thrown an unexpected exception: ", t);
91+
}
92+
}
93+
try {
94+
Thread.sleep(interval.toMillis());
95+
} catch (InterruptedException ignored) {
96+
// ignored
97+
}
98+
}
99+
throw new RuntimeException("Timeout when waiting for state to be ready.");
100+
}
101+
102+
public static <T> List<T> fetch(Iterator<T> iter, final int size) throws InterruptedException {
103+
return fetch(iter, size, DEFAULT_TIMEOUT);
104+
}
105+
106+
public static <T> List<T> fetch(Iterator<T> iter, final int size, Duration timeout)
107+
throws InterruptedException {
108+
return fetch(iter, size, timeout, DEFAULT_INTERVAL);
109+
}
110+
111+
/**
112+
* Fetches at most {@code size} entries from {@link Iterator} {@code iter}. <br>
113+
* It may return a list with less than {@code size} elements, if {@code iter} doesn't provide
114+
* results or {@code timeout} exceeds.
115+
*/
116+
public static <T> List<T> fetch(
117+
Iterator<T> iter, final int size, Duration timeout, Duration interval)
118+
throws InterruptedException {
119+
long deadline = System.currentTimeMillis() + timeout.toMillis();
120+
121+
ConcurrentLinkedQueue<T> results = new ConcurrentLinkedQueue<>();
122+
AtomicReference<Throwable> fetchException = new AtomicReference<>();
123+
124+
Thread thread =
125+
new Thread(
126+
() -> {
127+
try {
128+
int remainingSize = size;
129+
while (remainingSize > 0 && iter.hasNext()) {
130+
T row = iter.next();
131+
results.add(row);
132+
remainingSize--;
133+
}
134+
} catch (Throwable t) {
135+
fetchException.set(t);
136+
}
137+
});
138+
139+
thread.start();
140+
141+
while (true) {
142+
// Raise any exception thrown by the fetching thread
143+
if (fetchException.get() != null) {
144+
throw (RuntimeException) fetchException.get();
145+
}
146+
147+
// Stop if fetching thread has exited
148+
if (!thread.isAlive()) {
149+
break;
150+
}
151+
152+
// Stop waiting if deadline has arrived
153+
if (System.currentTimeMillis() > deadline) {
154+
thread.interrupt();
155+
break;
156+
}
157+
158+
Thread.sleep(interval.toMillis());
159+
}
160+
161+
return new ArrayList<>(results);
162+
}
163+
164+
public static <S, T> List<T> fetchAndConvert(
165+
Iterator<S> iter, int size, Function<S, T> converter) throws InterruptedException {
166+
return fetch(iter, size).stream().map(converter).collect(Collectors.toList());
167+
}
168+
169+
public static <S, T> List<T> fetchAndConvert(
170+
Iterator<S> iter, int size, Duration timeout, Function<S, T> converter)
171+
throws InterruptedException {
172+
return fetch(iter, size, timeout).stream().map(converter).collect(Collectors.toList());
173+
}
174+
175+
public static void waitForSnapshotStarted(Iterator<?> iter) {
176+
repeatedCheck(iter::hasNext);
177+
}
178+
179+
public static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
180+
waitForSinkSize(sinkName, false, 1);
181+
}
182+
183+
public static void waitForSinkSize(String sinkName, boolean upsertMode, int expectedSize)
184+
throws InterruptedException {
185+
waitForSinkSize(sinkName, upsertMode, DEFAULT_TIMEOUT, expectedSize);
186+
}
187+
188+
public static void waitForSinkSize(
189+
String sinkName, boolean upsertMode, Duration timeout, int expectedSize)
190+
throws InterruptedException {
191+
waitForSinkSize(sinkName, upsertMode, expectedSize, timeout, DEFAULT_INTERVAL);
192+
}
193+
194+
public static void waitForSinkSize(
195+
String sinkName,
196+
boolean upsertMode,
197+
int expectedSize,
198+
Duration timeout,
199+
Duration interval)
200+
throws InterruptedException {
201+
long deadline = System.currentTimeMillis() + timeout.toMillis();
202+
while (sinkSize(sinkName, upsertMode) < expectedSize) {
203+
if (System.nanoTime() > deadline) {
204+
throw new RuntimeException(
205+
String.format(
206+
"Wait for sink size timeout. Expected %s, got actual %s",
207+
expectedSize, sinkSize(sinkName, upsertMode)));
208+
}
209+
Thread.sleep(interval.toMillis());
210+
}
211+
}
212+
213+
public static int sinkSize(String sinkName, boolean upsertMode) {
214+
synchronized (TestValuesTableFactory.class) {
215+
try {
216+
if (upsertMode) {
217+
return TestValuesTableFactory.getResultsAsStrings(sinkName).size();
218+
} else {
219+
return TestValuesTableFactory.getRawResultsAsStrings(sinkName).size();
220+
}
221+
} catch (IllegalArgumentException e) {
222+
// job is not started yet
223+
return 0;
224+
}
225+
}
226+
}
227+
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,14 @@ limitations under the License.
6262
</dependency>
6363

6464
<!-- Test dependencies -->
65+
<dependency>
66+
<groupId>org.apache.flink</groupId>
67+
<artifactId>flink-cdc-common</artifactId>
68+
<version>${project.version}</version>
69+
<type>test-jar</type>
70+
<scope>test</scope>
71+
</dependency>
72+
6573
<dependency>
6674
<groupId>org.testcontainers</groupId>
6775
<artifactId>db2</artifactId>

0 commit comments

Comments
 (0)