Skip to content

Commit 3b33d92

Browse files
RyanSkrabaMartijnVisser
authored andcommitted
[docs] Minor documentation fixes
1 parent 459d773 commit 3b33d92

File tree

6 files changed

+586
-15
lines changed

6 files changed

+586
-15
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
*/
19+
20+
package org.apache.flink.connector.gcp.pubsub;
21+
22+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
23+
import org.apache.flink.api.common.functions.RichMapFunction;
24+
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
25+
import org.apache.flink.api.common.serialization.SimpleStringSchema;
26+
import org.apache.flink.connector.gcp.pubsub.source.PubSubSource;
27+
import org.apache.flink.streaming.api.datastream.DataStream;
28+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
29+
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
30+
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase;
31+
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
32+
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper;
33+
34+
import com.google.cloud.pubsub.v1.Publisher;
35+
import com.google.protobuf.ByteString;
36+
import com.google.pubsub.v1.PubsubMessage;
37+
import org.junit.After;
38+
import org.junit.Before;
39+
import org.junit.Test;
40+
41+
import java.time.Duration;
42+
import java.util.ArrayList;
43+
import java.util.Arrays;
44+
import java.util.List;
45+
import java.util.concurrent.ExecutionException;
46+
47+
import static org.junit.Assert.assertEquals;
48+
import static org.junit.Assert.assertTrue;
49+
50+
/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */
51+
public class EmulatedPubSubSourceTest extends GCloudUnitTestBase {
52+
private static final String PROJECT_NAME = "FLProject";
53+
private static final String TOPIC_NAME = "FLTopic";
54+
private static final String SUBSCRIPTION_NAME = "FLSubscription";
55+
56+
private static PubsubHelper pubsubHelper;
57+
58+
@Before
59+
public void setUp() throws Exception {
60+
pubsubHelper = getPubsubHelper();
61+
pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME);
62+
pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME);
63+
}
64+
65+
@After
66+
public void tearDown() throws Exception {
67+
pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME);
68+
pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME);
69+
}
70+
71+
public void testFlinkSource(boolean testWithFailure) throws Exception {
72+
// Create some messages and put them into pubsub
73+
List<String> input =
74+
Arrays.asList(
75+
"One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine",
76+
"Ten");
77+
78+
List<String> messagesToSend = new ArrayList<>(input);
79+
80+
// Publish the messages into PubSub
81+
Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME);
82+
messagesToSend.forEach(
83+
s -> {
84+
try {
85+
publisher
86+
.publish(
87+
PubsubMessage.newBuilder()
88+
.setData(ByteString.copyFromUtf8(s))
89+
.build())
90+
.get();
91+
} catch (InterruptedException | ExecutionException e) {
92+
e.printStackTrace();
93+
}
94+
});
95+
96+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
97+
env.enableCheckpointing(100);
98+
env.setParallelism(1);
99+
if (testWithFailure) {
100+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000));
101+
} else {
102+
env.setRestartStrategy(RestartStrategies.noRestart());
103+
}
104+
105+
PubSubSource<String> source =
106+
PubSubSource.<String>builder()
107+
.setDeserializationSchema(new SimpleStringSchema())
108+
.setProjectName(PROJECT_NAME)
109+
.setSubscriptionName(SUBSCRIPTION_NAME)
110+
.setCredentials(EmulatorCredentials.getInstance())
111+
.setPubSubSubscriberFactory(
112+
new PubSubSubscriberFactoryForEmulator(
113+
getPubSubHostPort(),
114+
PROJECT_NAME,
115+
SUBSCRIPTION_NAME,
116+
10,
117+
Duration.ofSeconds(1),
118+
3))
119+
.build();
120+
121+
DataStream<String> fromPubSub =
122+
env.fromSource(source, WatermarkStrategy.noWatermarks(), "test-pubsub-new-source");
123+
124+
if (testWithFailure) {
125+
fromPubSub = fromPubSub.map(new FailureMapFunction<>(3));
126+
}
127+
128+
// Asking for any more elements would wait forever, and there isn't a graceful way to
129+
// indicate end of stream.
130+
List<String> output = fromPubSub.executeAndCollect(input.size());
131+
132+
assertEquals("Wrong number of elements", input.size(), output.size());
133+
for (String test : input) {
134+
assertTrue("Missing " + test, output.contains(test));
135+
}
136+
}
137+
138+
private static class FailureMapFunction<T> extends RichMapFunction<T, T> {
139+
private final long numberOfRecordsUntilFailure;
140+
private long numberOfRecordsProcessed;
141+
142+
private FailureMapFunction(long numberOfRecordsBeforeFailure) {
143+
this.numberOfRecordsUntilFailure = numberOfRecordsBeforeFailure;
144+
}
145+
146+
@Override
147+
public T map(T value) throws Exception {
148+
numberOfRecordsProcessed++;
149+
150+
if (shouldThrowException()) {
151+
throw new Exception(
152+
"Deliberately thrown exception to induce crash for failure recovery testing.");
153+
}
154+
return value;
155+
}
156+
157+
private boolean shouldThrowException() {
158+
return getRuntimeContext().getAttemptNumber() <= 1
159+
&& (numberOfRecordsProcessed >= numberOfRecordsUntilFailure);
160+
}
161+
}
162+
163+
// IMPORTANT: This test makes use of things that happen in the emulated PubSub that
164+
// are GUARANTEED to be different in the real Google hosted PubSub.
165+
// So running these tests against the real thing will have a very high probability of
166+
// failing.
167+
// The assumptions:
168+
// 1) The ordering of the messages is maintained.
169+
// 2) Exactly once: We assume that every message we put in comes out exactly once.
170+
// In the real PubSub there are a lot of situations (mostly failure/retry) where this is not
171+
// true.
172+
@Test
173+
public void testFlinkSourceOk() throws Exception {
174+
testFlinkSource(false);
175+
}
176+
177+
@Test
178+
public void testFlinkSourceFailure() throws Exception {
179+
testFlinkSource(true);
180+
}
181+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.gcp.pubsub;
20+
21+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
22+
import org.apache.flink.api.common.functions.RichMapFunction;
23+
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
24+
import org.apache.flink.api.common.serialization.SimpleStringSchema;
25+
import org.apache.flink.streaming.api.datastream.DataStream;
26+
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
27+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
28+
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
29+
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase;
30+
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
31+
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper;
32+
import org.apache.flink.streaming.connectors.gcp.pubsub.source.PubSubSource;
33+
34+
import com.google.cloud.pubsub.v1.Publisher;
35+
import com.google.protobuf.ByteString;
36+
import com.google.pubsub.v1.PubsubMessage;
37+
import org.junit.After;
38+
import org.junit.Before;
39+
import org.junit.Test;
40+
41+
import java.time.Duration;
42+
import java.util.ArrayList;
43+
import java.util.Arrays;
44+
import java.util.List;
45+
import java.util.concurrent.ExecutionException;
46+
47+
import static org.junit.Assert.assertEquals;
48+
import static org.junit.Assert.assertTrue;
49+
50+
/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */
51+
public class EmulatedPubSubNewSourceTest extends GCloudUnitTestBase {
52+
private static final String PROJECT_NAME = "FLProject";
53+
private static final String TOPIC_NAME = "FLTopic";
54+
private static final String SUBSCRIPTION_NAME = "FLSubscription";
55+
56+
private static PubsubHelper pubsubHelper;
57+
58+
@Before
59+
public void setUp() throws Exception {
60+
pubsubHelper = getPubsubHelper();
61+
pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME);
62+
pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME);
63+
}
64+
65+
@After
66+
public void tearDown() throws Exception {
67+
pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME);
68+
pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME);
69+
}
70+
71+
public void testFlinkSource(boolean testWithFailure) throws Exception {
72+
// Create some messages and put them into pubsub
73+
List<String> input =
74+
Arrays.asList(
75+
"One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine",
76+
"Ten");
77+
78+
List<String> messagesToSend = new ArrayList<>(input);
79+
80+
// Publish the messages into PubSub
81+
Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME);
82+
messagesToSend.forEach(
83+
s -> {
84+
try {
85+
publisher
86+
.publish(
87+
PubsubMessage.newBuilder()
88+
.setData(ByteString.copyFromUtf8(s))
89+
.build())
90+
.get();
91+
} catch (InterruptedException | ExecutionException e) {
92+
e.printStackTrace();
93+
}
94+
});
95+
96+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
97+
env.enableCheckpointing(100);
98+
env.setParallelism(1);
99+
if (testWithFailure) {
100+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000));
101+
} else {
102+
env.setRestartStrategy(RestartStrategies.noRestart());
103+
}
104+
105+
PubSubSource<String> source =
106+
PubSubSource.<String>builder()
107+
.setDeserializationSchema(new SimpleStringSchema())
108+
.setProjectName(PROJECT_NAME)
109+
.setSubscriptionName(SUBSCRIPTION_NAME)
110+
.setCredentials(EmulatorCredentials.getInstance())
111+
.setPubSubSubscriberFactory(
112+
new PubSubSubscriberFactoryForEmulator(
113+
getPubSubHostPort(),
114+
PROJECT_NAME,
115+
SUBSCRIPTION_NAME,
116+
10,
117+
Duration.ofSeconds(1),
118+
3))
119+
.build();
120+
121+
DataStream<String> fromPubSub =
122+
env.fromSource(source, WatermarkStrategy.noWatermarks(), "test-pubsub-new-source");
123+
124+
if (testWithFailure) {
125+
fromPubSub = fromPubSub.map(new FailureMapFunction<>(3));
126+
}
127+
128+
List<String> output = new ArrayList<>();
129+
DataStreamUtils.collect(fromPubSub).forEachRemaining(output::add);
130+
131+
assertEquals("Wrong number of elements", input.size(), output.size());
132+
for (String test : input) {
133+
assertTrue("Missing " + test, output.contains(test));
134+
}
135+
}
136+
137+
private class FailureMapFunction<T> extends RichMapFunction<T, T> {
138+
private final long numberOfRecordsUntilFailure;
139+
private long numberOfRecordsProcessed;
140+
141+
private FailureMapFunction(long numberOfRecordsBeforeFailure) {
142+
this.numberOfRecordsUntilFailure = numberOfRecordsBeforeFailure;
143+
}
144+
145+
@Override
146+
public T map(T value) throws Exception {
147+
numberOfRecordsProcessed++;
148+
149+
if (shouldThrowException()) {
150+
throw new Exception(
151+
"Deliberately thrown exception to induce crash for failure recovery testing.");
152+
}
153+
return value;
154+
}
155+
156+
private boolean shouldThrowException() {
157+
return getRuntimeContext().getAttemptNumber() <= 1
158+
&& (numberOfRecordsProcessed >= numberOfRecordsUntilFailure);
159+
}
160+
}
161+
162+
// IMPORTANT: This test makes use of things that happen in the emulated PubSub that
163+
// are GUARANTEED to be different in the real Google hosted PubSub.
164+
// So running these tests against the real thing will have a very high probability of
165+
// failing.
166+
// The assumptions:
167+
// 1) The ordering of the messages is maintained.
168+
// 2) Exactly once: We assume that every message we put in comes out exactly once.
169+
// In the real PubSub there are a lot of situations (mostly failure/retry) where this is not
170+
// true.
171+
@Test
172+
public void testFlinkSourceOk() throws Exception {
173+
testFlinkSource(false);
174+
}
175+
176+
@Test
177+
public void testFlinkSourceFailure() throws Exception {
178+
testFlinkSource(true);
179+
}
180+
}

0 commit comments

Comments
 (0)