Skip to content

Commit 14573f3

Browse files
tillrohrmannrmetzger
authored andcommitted
[FLINK-3021] Fix class loading issue for streaming sources
Streaming sources were directly assigned their InputFormat in the StreamingJobGraphGenerator. As a consequence, the input formats were directly serialized/deserialized by Akka when the JobGraph was sent to the JobManager. In cases where the user provided a custom input format or an input format with custom types, this could lead to a ClassDefNotFoundException, because the system class loader instead of the user code class loader is used by Akka for the deserialization. The problem was fixed by wrapping the InputFormat into a UserCodeObjectWrapper which is shipped ot the JobManager via the JobVertex's configuration. By instantiating stream sources as InputFormatVertices, the corresponding InputFormat is retrieved from the Configuration in the initializeOnMaster method call.
1 parent 8605400 commit 14573f3

File tree

5 files changed

+249
-8
lines changed

5 files changed

+249
-8
lines changed

flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@
2929

3030
import org.apache.commons.lang.StringUtils;
3131
import org.apache.flink.api.common.ExecutionConfig;
32+
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
3233
import org.apache.flink.api.java.functions.KeySelector;
3334
import org.apache.flink.api.java.tuple.Tuple2;
3435
import org.apache.flink.configuration.Configuration;
3536
import org.apache.flink.runtime.jobgraph.DistributionPattern;
37+
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
3638
import org.apache.flink.runtime.jobgraph.JobGraph;
3739
import org.apache.flink.runtime.jobgraph.JobVertex;
3840
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -41,6 +43,7 @@
4143
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
4244
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
4345
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
46+
import org.apache.flink.runtime.operators.util.TaskConfig;
4447
import org.apache.flink.streaming.api.CheckpointingMode;
4548
import org.apache.flink.streaming.api.operators.StreamOperator;
4649
import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -221,10 +224,17 @@ private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutpu
221224
}
222225

223226
private StreamConfig createProcessingVertex(Integer vertexID) {
224-
225-
JobVertex jobVertex = new JobVertex(chainedNames.get(vertexID));
227+
JobVertex jobVertex;
226228
StreamNode vertex = streamGraph.getStreamNode(vertexID);
227229

230+
if (vertex.getInputFormat() != null) {
231+
jobVertex = new InputFormatVertex(chainedNames.get(vertexID));
232+
TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration());
233+
taskConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(vertex.getInputFormat()));
234+
} else {
235+
jobVertex = new JobVertex(chainedNames.get(vertexID));
236+
}
237+
228238
jobVertex.setInvokableClass(vertex.getJobVertexClass());
229239

230240
int parallelism = vertex.getParallelism();
@@ -237,10 +247,6 @@ private StreamConfig createProcessingVertex(Integer vertexID) {
237247
LOG.debug("Parallelism set: {} for {}", parallelism, vertexID);
238248
}
239249

240-
if (vertex.getInputFormat() != null) {
241-
jobVertex.setInputSplitSource(vertex.getInputFormat());
242-
}
243-
244250
jobVertices.put(vertexID, jobVertex);
245251
builtVertices.add(vertexID);
246252
jobGraph.addVertex(jobVertex);

flink-tests/pom.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,25 @@ under the License.
391391
</descriptors>
392392
</configuration>
393393
</execution>
394+
<execution>
395+
<id>create-streaming-custominputsplit-jar</id>
396+
<phase>process-test-classes</phase>
397+
<goals>
398+
<goal>single</goal>
399+
</goals>
400+
<configuration>
401+
<archive>
402+
<manifest>
403+
<mainClass>org.apache.flink.test.classloading.jar.StreamingCustomInputSplitProgram</mainClass>
404+
</manifest>
405+
</archive>
406+
<finalName>streaming-customsplit</finalName>
407+
<attach>false</attach>
408+
<descriptors>
409+
<descriptor>src/test/assembly/test-streaming-custominput-assembly.xml</descriptor>
410+
</descriptors>
411+
</configuration>
412+
</execution>
394413
<execution>
395414
<id>create-streamingclassloader-jar</id>
396415
<phase>process-test-classes</phase>
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<!--
2+
~ Licensed to the Apache Software Foundation (ASF) under one
3+
~ or more contributor license agreements. See the NOTICE file
4+
~ distributed with this work for additional information
5+
~ regarding copyright ownership. The ASF licenses this file
6+
~ to you under the Apache License, Version 2.0 (the
7+
~ "License"); you may not use this file except in compliance
8+
~ with the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
<assembly>
20+
<id>test-jar</id>
21+
<formats>
22+
<format>jar</format>
23+
</formats>
24+
<includeBaseDirectory>false</includeBaseDirectory>
25+
<fileSets>
26+
<fileSet>
27+
<directory>${project.build.testOutputDirectory}</directory>
28+
<outputDirectory>/</outputDirectory>
29+
<!--modify/add include to match your package(s) -->
30+
<includes>
31+
<include>org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.class</include>
32+
<include>org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram$*.class</include>
33+
</includes>
34+
</fileSet>
35+
</fileSets>
36+
</assembly>

flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public class ClassLoaderITCase {
3737

3838
private static final String INPUT_SPLITS_PROG_JAR_FILE = "target/customsplit-test-jar.jar";
3939

40+
private static final String STREAMING_INPUT_SPLITS_PROG_JAR_FILE = "target/streaming-customsplit-test-jar.jar";
41+
4042
private static final String STREAMING_PROG_JAR_FILE = "target/streamingclassloader-test-jar.jar";
4143

4244
private static final String STREAMING_CHECKPOINTED_PROG_JAR_FILE = "target/streaming-checkpointed-classloader-test-jar.jar";
@@ -76,6 +78,15 @@ public void testJobsWithCustomClassLoader() {
7678
});
7779
inputSplitTestProg.invokeInteractiveModeForExecution();
7880

81+
PackagedProgram streamingInputSplitTestProg = new PackagedProgram(
82+
new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE),
83+
new String[] { STREAMING_INPUT_SPLITS_PROG_JAR_FILE,
84+
"localhost",
85+
String.valueOf(port),
86+
"4" // parallelism
87+
});
88+
streamingInputSplitTestProg.invokeInteractiveModeForExecution();
89+
7990
String classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL().toString();
8091
PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE),
8192
new String[] { "",
@@ -89,7 +100,7 @@ public void testJobsWithCustomClassLoader() {
89100
// regular streaming job
90101
PackagedProgram streamingProg = new PackagedProgram(
91102
new File(STREAMING_PROG_JAR_FILE),
92-
new String[] {
103+
new String[] {
93104
STREAMING_PROG_JAR_FILE,
94105
"localhost",
95106
String.valueOf(port)
@@ -102,7 +113,7 @@ public void testJobsWithCustomClassLoader() {
102113
PackagedProgram streamingCheckpointedProg = new PackagedProgram(
103114
new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE),
104115
new String[] {
105-
STREAMING_CHECKPOINTED_PROG_JAR_FILE,
116+
STREAMING_CHECKPOINTED_PROG_JAR_FILE,
106117
"localhost",
107118
String.valueOf(port)});
108119
streamingCheckpointedProg.invokeInteractiveModeForExecution();
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.test.classloading.jar;
20+
21+
import org.apache.flink.api.common.functions.MapFunction;
22+
import org.apache.flink.api.common.io.InputFormat;
23+
import org.apache.flink.api.common.io.statistics.BaseStatistics;
24+
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
25+
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.api.java.tuple.Tuple2;
27+
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
28+
import org.apache.flink.configuration.ConfigConstants;
29+
import org.apache.flink.configuration.Configuration;
30+
import org.apache.flink.core.io.InputSplit;
31+
import org.apache.flink.core.io.InputSplitAssigner;
32+
import org.apache.flink.streaming.api.datastream.DataStream;
33+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
34+
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
35+
36+
import java.io.Serializable;
37+
import java.util.ArrayList;
38+
import java.util.Arrays;
39+
import java.util.List;
40+
41+
@SuppressWarnings("serial")
42+
public class StreamingCustomInputSplitProgram {
43+
44+
public static void main(String[] args) throws Exception {
45+
final String jarFile = args[0];
46+
final String host = args[1];
47+
final int port = Integer.parseInt(args[2]);
48+
final int parallelism = Integer.parseInt(args[3]);
49+
50+
Configuration config = new Configuration();
51+
52+
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
53+
54+
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, config, jarFile);
55+
env.getConfig().disableSysoutLogging();
56+
env.setParallelism(parallelism);
57+
58+
DataStream<Integer> data = env.createInput(new CustomInputFormat());
59+
60+
data.map(new MapFunction<Integer, Tuple2<Integer, Double>>() {
61+
@Override
62+
public Tuple2<Integer, Double> map(Integer value) throws Exception {
63+
return new Tuple2<Integer, Double>(value, value * 0.5);
64+
}
65+
}).addSink(new NoOpSink());
66+
67+
env.execute();
68+
}
69+
// --------------------------------------------------------------------------------------------
70+
71+
public static final class CustomInputFormat implements InputFormat<Integer, CustomInputSplit>, ResultTypeQueryable<Integer> {
72+
73+
private static final long serialVersionUID = 1L;
74+
75+
private Integer value;
76+
77+
CustomInputSplit split = new CustomInputSplit(0);
78+
79+
@Override
80+
public void configure(Configuration parameters) {}
81+
82+
@Override
83+
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
84+
return null;
85+
}
86+
87+
@Override
88+
public CustomInputSplit[] createInputSplits(int minNumSplits) {
89+
CustomInputSplit[] splits = new CustomInputSplit[minNumSplits];
90+
for (int i = 0; i < minNumSplits; i++) {
91+
splits[i] = new CustomInputSplit(i);
92+
}
93+
return splits;
94+
}
95+
96+
@Override
97+
public InputSplitAssigner getInputSplitAssigner(CustomInputSplit[] inputSplits) {
98+
return new CustomSplitAssigner(inputSplits);
99+
}
100+
101+
@Override
102+
public void open(CustomInputSplit split) {
103+
this.value = split.getSplitNumber();
104+
}
105+
106+
@Override
107+
public boolean reachedEnd() {
108+
return this.value == null;
109+
}
110+
111+
@Override
112+
public Integer nextRecord(Integer reuse) {
113+
Integer val = this.value;
114+
this.value = null;
115+
return val;
116+
}
117+
118+
@Override
119+
public void close() {}
120+
121+
@Override
122+
public TypeInformation<Integer> getProducedType() {
123+
return BasicTypeInfo.INT_TYPE_INFO;
124+
}
125+
}
126+
127+
public static final class CustomInputSplit implements InputSplit {
128+
129+
private static final long serialVersionUID = 1L;
130+
131+
private final int splitNumber;
132+
133+
public CustomInputSplit(int splitNumber) {
134+
this.splitNumber = splitNumber;
135+
}
136+
137+
@Override
138+
public int getSplitNumber() {
139+
return this.splitNumber;
140+
}
141+
}
142+
143+
public static final class CustomSplitAssigner implements InputSplitAssigner, Serializable {
144+
145+
private final List<CustomInputSplit> remainingSplits;
146+
147+
public CustomSplitAssigner(CustomInputSplit[] splits) {
148+
this.remainingSplits = new ArrayList<CustomInputSplit>(Arrays.asList(splits));
149+
}
150+
151+
@Override
152+
public InputSplit getNextInputSplit(String host, int taskId) {
153+
synchronized (this) {
154+
int size = remainingSplits.size();
155+
if (size > 0) {
156+
return remainingSplits.remove(size - 1);
157+
} else {
158+
return null;
159+
}
160+
}
161+
}
162+
}
163+
164+
public static class NoOpSink implements SinkFunction<Tuple2<Integer, Double>> {
165+
@Override
166+
public void invoke(Tuple2<Integer, Double> value) throws Exception {
167+
}
168+
}
169+
}

0 commit comments

Comments
 (0)