Skip to content

Commit 3a44c1f

Browse files
X-czhhuwh
authored andcommitted
[FLINK-32821][examples] Add integrated tests for streaming examples
1 parent 79f7bdb commit 3a44c1f

File tree

3 files changed

+80
-2
lines changed

3 files changed

+80
-2
lines changed

flink-end-to-end-tests/run-nightly-tests.sh

+2
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ function run_group_2 {
176176

177177
run_test "Flink CLI end-to-end test" "$END_TO_END_DIR/test-scripts/test_cli.sh"
178178

179+
run_test "Flink streaming examples end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_examples.sh"
180+
179181
run_test "Queryable state (rocksdb) end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state.sh rocksdb"
180182
run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" "skip_check_exceptions"
181183

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#!/usr/bin/env bash
2+
################################################################################
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
################################################################################
19+
20+
# End to end test for streaming examples. It only validates that the job graph can be successfully generated
21+
# and submitted to a standalone session cluster.
22+
# Usage:
23+
# FLINK_DIR=<flink dir> TEST_DATA_DIR-<test data dir> flink-end-to-end-tests/test-scripts/test_streaming_examples.sh
24+
25+
source "$(dirname "$0")"/common.sh
26+
27+
start_cluster
28+
$FLINK_DIR/bin/taskmanager.sh start
29+
$FLINK_DIR/bin/taskmanager.sh start
30+
$FLINK_DIR/bin/taskmanager.sh start
31+
32+
EXIT_CODE=0
33+
34+
function run_example() {
35+
printf "\n==============================================================================\n"
36+
printf "Test operation on running $1 example\n"
37+
printf "==============================================================================\n"
38+
if [ $EXIT_CODE == 0 ]; then
39+
TEST_PROGRAM_JAR=$FLINK_DIR/examples/streaming/$1.jar
40+
RETURN=`$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR --output file:///${TEST_DATA_DIR}/result1`
41+
EXIT_CODE=$?
42+
echo "$RETURN"
43+
fi
44+
}
45+
46+
run_example "Iteration"
47+
run_example "SessionWindowing"
48+
run_example "StateMachineExample"
49+
run_example "TopSpeedWindowing"
50+
run_example "WindowJoin"
51+
run_example "WordCount"
52+
53+
exit $EXIT_CODE

flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,24 @@
2424
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
2525
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2626
import org.apache.flink.api.common.functions.JoinFunction;
27+
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
2728
import org.apache.flink.api.java.functions.KeySelector;
2829
import org.apache.flink.api.java.tuple.Tuple2;
2930
import org.apache.flink.api.java.tuple.Tuple3;
3031
import org.apache.flink.api.java.utils.ParameterTool;
32+
import org.apache.flink.configuration.MemorySize;
33+
import org.apache.flink.connector.file.sink.FileSink;
34+
import org.apache.flink.core.fs.Path;
3135
import org.apache.flink.streaming.api.datastream.DataStream;
3236
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
37+
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
3338
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
3439
import org.apache.flink.streaming.api.windowing.time.Time;
3540
import org.apache.flink.streaming.examples.join.WindowJoinSampleData.GradeSource;
3641
import org.apache.flink.streaming.examples.join.WindowJoinSampleData.SalarySource;
3742

43+
import java.time.Duration;
44+
3845
/**
3946
* Example illustrating a windowed stream join between two data streams.
4047
*
@@ -55,6 +62,7 @@ public static void main(String[] args) throws Exception {
5562
final ParameterTool params = ParameterTool.fromArgs(args);
5663
final long windowSize = params.getLong("windowSize", 2000);
5764
final long rate = params.getLong("rate", 3L);
65+
final boolean fileOutput = params.has("output");
5866

5967
System.out.println("Using windowSize=" + windowSize + ", data rate=" + rate);
6068
System.out.println(
@@ -80,8 +88,23 @@ public static void main(String[] args) throws Exception {
8088
DataStream<Tuple3<String, Integer, Integer>> joinedStream =
8189
runWindowJoin(grades, salaries, windowSize);
8290

83-
// print the results with a single thread, rather than in parallel
84-
joinedStream.print().setParallelism(1);
91+
if (fileOutput) {
92+
joinedStream
93+
.sinkTo(
94+
FileSink.<Tuple3<String, Integer, Integer>>forRowFormat(
95+
new Path(params.get("output")),
96+
new SimpleStringEncoder<>())
97+
.withRollingPolicy(
98+
DefaultRollingPolicy.builder()
99+
.withMaxPartSize(MemorySize.ofMebiBytes(1))
100+
.withRolloverInterval(Duration.ofSeconds(10))
101+
.build())
102+
.build())
103+
.name("output");
104+
} else {
105+
// print the results with a single thread, rather than in parallel
106+
joinedStream.print().setParallelism(1);
107+
}
85108

86109
// execute program
87110
env.execute("Windowed Join Example");

0 commit comments

Comments
 (0)