Skip to content

Commit 6656716

Browse files
committed
[FLINK-3021] Fix class loading issue for streaming sources
Streaming sources were directly assigned their InputFormat in the StreamingJobGraphGenerator. As a consequence, the input formats were directly serialized/deserialized by Akka when the JobGraph was sent to the JobManager. In cases where the user provided a custom input format or an input format with custom types, this could lead to a ClassDefNotFoundException, because the system class loader instead of the user code class loader is used by Akka for the deserialization. The problem was fixed by wrapping the InputFormat into a UserCodeObjectWrapper which is shipped ot the JobManager via the JobVertex's configuration. By instantiating stream sources as InputFormatVertices, the corresponding InputFormat is retrieved from the Configuration in the initializeOnMaster method call. This closes apache#1368.
1 parent 4bcc154 commit 6656716

File tree

5 files changed

+249
-8
lines changed

5 files changed

+249
-8
lines changed

flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@
2929

3030
import org.apache.commons.lang.StringUtils;
3131
import org.apache.flink.api.common.ExecutionConfig;
32+
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
3233
import org.apache.flink.api.java.functions.KeySelector;
3334
import org.apache.flink.api.java.tuple.Tuple2;
3435
import org.apache.flink.configuration.Configuration;
3536
import org.apache.flink.runtime.jobgraph.DistributionPattern;
37+
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
3638
import org.apache.flink.runtime.jobgraph.JobGraph;
3739
import org.apache.flink.runtime.jobgraph.JobVertex;
3840
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -41,6 +43,7 @@
4143
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
4244
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
4345
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
46+
import org.apache.flink.runtime.operators.util.TaskConfig;
4447
import org.apache.flink.streaming.api.CheckpointingMode;
4548
import org.apache.flink.streaming.api.operators.StreamOperator;
4649
import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -221,10 +224,17 @@ private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutpu
221224
}
222225

223226
private StreamConfig createProcessingVertex(Integer vertexID) {
224-
225-
JobVertex jobVertex = new JobVertex(chainedNames.get(vertexID));
227+
JobVertex jobVertex;
226228
StreamNode vertex = streamGraph.getStreamNode(vertexID);
227229

230+
if (vertex.getInputFormat() != null) {
231+
jobVertex = new InputFormatVertex(chainedNames.get(vertexID));
232+
TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration());
233+
taskConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(vertex.getInputFormat()));
234+
} else {
235+
jobVertex = new JobVertex(chainedNames.get(vertexID));
236+
}
237+
228238
jobVertex.setInvokableClass(vertex.getJobVertexClass());
229239

230240
int parallelism = vertex.getParallelism();
@@ -237,10 +247,6 @@ private StreamConfig createProcessingVertex(Integer vertexID) {
237247
LOG.debug("Parallelism set: {} for {}", parallelism, vertexID);
238248
}
239249

240-
if (vertex.getInputFormat() != null) {
241-
jobVertex.setInputSplitSource(vertex.getInputFormat());
242-
}
243-
244250
jobVertices.put(vertexID, jobVertex);
245251
builtVertices.add(vertexID);
246252
jobGraph.addVertex(jobVertex);

flink-tests/pom.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,25 @@ under the License.
391391
</descriptors>
392392
</configuration>
393393
</execution>
394+
<execution>
395+
<id>create-streaming-custominputsplit-jar</id>
396+
<phase>process-test-classes</phase>
397+
<goals>
398+
<goal>single</goal>
399+
</goals>
400+
<configuration>
401+
<archive>
402+
<manifest>
403+
<mainClass>org.apache.flink.test.classloading.jar.StreamingCustomInputSplitProgram</mainClass>
404+
</manifest>
405+
</archive>
406+
<finalName>streaming-customsplit</finalName>
407+
<attach>false</attach>
408+
<descriptors>
409+
<descriptor>src/test/assembly/test-streaming-custominput-assembly.xml</descriptor>
410+
</descriptors>
411+
</configuration>
412+
</execution>
394413
<execution>
395414
<id>create-streamingclassloader-jar</id>
396415
<phase>process-test-classes</phase>
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<!--
2+
~ Licensed to the Apache Software Foundation (ASF) under one
3+
~ or more contributor license agreements. See the NOTICE file
4+
~ distributed with this work for additional information
5+
~ regarding copyright ownership. The ASF licenses this file
6+
~ to you under the Apache License, Version 2.0 (the
7+
~ "License"); you may not use this file except in compliance
8+
~ with the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
<assembly>
20+
<id>test-jar</id>
21+
<formats>
22+
<format>jar</format>
23+
</formats>
24+
<includeBaseDirectory>false</includeBaseDirectory>
25+
<fileSets>
26+
<fileSet>
27+
<directory>${project.build.testOutputDirectory}</directory>
28+
<outputDirectory>/</outputDirectory>
29+
<!--modify/add include to match your package(s) -->
30+
<includes>
31+
<include>org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.class</include>
32+
<include>org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram$*.class</include>
33+
</includes>
34+
</fileSet>
35+
</fileSets>
36+
</assembly>

flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public class ClassLoaderITCase {
3737

3838
private static final String INPUT_SPLITS_PROG_JAR_FILE = "target/customsplit-test-jar.jar";
3939

40+
private static final String STREAMING_INPUT_SPLITS_PROG_JAR_FILE = "target/streaming-customsplit-test-jar.jar";
41+
4042
private static final String STREAMING_PROG_JAR_FILE = "target/streamingclassloader-test-jar.jar";
4143

4244
private static final String STREAMING_CHECKPOINTED_PROG_JAR_FILE = "target/streaming-checkpointed-classloader-test-jar.jar";
@@ -76,6 +78,15 @@ public void testJobsWithCustomClassLoader() {
7678
});
7779
inputSplitTestProg.invokeInteractiveModeForExecution();
7880

81+
PackagedProgram streamingInputSplitTestProg = new PackagedProgram(
82+
new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE),
83+
new String[] { STREAMING_INPUT_SPLITS_PROG_JAR_FILE,
84+
"localhost",
85+
String.valueOf(port),
86+
"4" // parallelism
87+
});
88+
streamingInputSplitTestProg.invokeInteractiveModeForExecution();
89+
7990
String classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL().toString();
8091
PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE),
8192
new String[] { "",
@@ -89,7 +100,7 @@ public void testJobsWithCustomClassLoader() {
89100
// regular streaming job
90101
PackagedProgram streamingProg = new PackagedProgram(
91102
new File(STREAMING_PROG_JAR_FILE),
92-
new String[] {
103+
new String[] {
93104
STREAMING_PROG_JAR_FILE,
94105
"localhost",
95106
String.valueOf(port)
@@ -102,7 +113,7 @@ public void testJobsWithCustomClassLoader() {
102113
PackagedProgram streamingCheckpointedProg = new PackagedProgram(
103114
new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE),
104115
new String[] {
105-
STREAMING_CHECKPOINTED_PROG_JAR_FILE,
116+
STREAMING_CHECKPOINTED_PROG_JAR_FILE,
106117
"localhost",
107118
String.valueOf(port)});
108119
streamingCheckpointedProg.invokeInteractiveModeForExecution();
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.test.classloading.jar;
20+
21+
import org.apache.flink.api.common.functions.MapFunction;
22+
import org.apache.flink.api.common.io.InputFormat;
23+
import org.apache.flink.api.common.io.statistics.BaseStatistics;
24+
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
25+
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.api.java.tuple.Tuple2;
27+
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
28+
import org.apache.flink.configuration.ConfigConstants;
29+
import org.apache.flink.configuration.Configuration;
30+
import org.apache.flink.core.io.InputSplit;
31+
import org.apache.flink.core.io.InputSplitAssigner;
32+
import org.apache.flink.streaming.api.datastream.DataStream;
33+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
34+
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
35+
36+
import java.io.Serializable;
37+
import java.util.ArrayList;
38+
import java.util.Arrays;
39+
import java.util.List;
40+
41+
@SuppressWarnings("serial")
42+
public class StreamingCustomInputSplitProgram {
43+
44+
public static void main(String[] args) throws Exception {
45+
final String jarFile = args[0];
46+
final String host = args[1];
47+
final int port = Integer.parseInt(args[2]);
48+
final int parallelism = Integer.parseInt(args[3]);
49+
50+
Configuration config = new Configuration();
51+
52+
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
53+
54+
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, config, jarFile);
55+
env.getConfig().disableSysoutLogging();
56+
env.setParallelism(parallelism);
57+
58+
DataStream<Integer> data = env.createInput(new CustomInputFormat());
59+
60+
data.map(new MapFunction<Integer, Tuple2<Integer, Double>>() {
61+
@Override
62+
public Tuple2<Integer, Double> map(Integer value) throws Exception {
63+
return new Tuple2<Integer, Double>(value, value * 0.5);
64+
}
65+
}).addSink(new NoOpSink());
66+
67+
env.execute();
68+
}
69+
// --------------------------------------------------------------------------------------------
70+
71+
public static final class CustomInputFormat implements InputFormat<Integer, CustomInputSplit>, ResultTypeQueryable<Integer> {
72+
73+
private static final long serialVersionUID = 1L;
74+
75+
private Integer value;
76+
77+
CustomInputSplit split = new CustomInputSplit(0);
78+
79+
@Override
80+
public void configure(Configuration parameters) {}
81+
82+
@Override
83+
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
84+
return null;
85+
}
86+
87+
@Override
88+
public CustomInputSplit[] createInputSplits(int minNumSplits) {
89+
CustomInputSplit[] splits = new CustomInputSplit[minNumSplits];
90+
for (int i = 0; i < minNumSplits; i++) {
91+
splits[i] = new CustomInputSplit(i);
92+
}
93+
return splits;
94+
}
95+
96+
@Override
97+
public InputSplitAssigner getInputSplitAssigner(CustomInputSplit[] inputSplits) {
98+
return new CustomSplitAssigner(inputSplits);
99+
}
100+
101+
@Override
102+
public void open(CustomInputSplit split) {
103+
this.value = split.getSplitNumber();
104+
}
105+
106+
@Override
107+
public boolean reachedEnd() {
108+
return this.value == null;
109+
}
110+
111+
@Override
112+
public Integer nextRecord(Integer reuse) {
113+
Integer val = this.value;
114+
this.value = null;
115+
return val;
116+
}
117+
118+
@Override
119+
public void close() {}
120+
121+
@Override
122+
public TypeInformation<Integer> getProducedType() {
123+
return BasicTypeInfo.INT_TYPE_INFO;
124+
}
125+
}
126+
127+
public static final class CustomInputSplit implements InputSplit {
128+
129+
private static final long serialVersionUID = 1L;
130+
131+
private final int splitNumber;
132+
133+
public CustomInputSplit(int splitNumber) {
134+
this.splitNumber = splitNumber;
135+
}
136+
137+
@Override
138+
public int getSplitNumber() {
139+
return this.splitNumber;
140+
}
141+
}
142+
143+
public static final class CustomSplitAssigner implements InputSplitAssigner, Serializable {
144+
145+
private final List<CustomInputSplit> remainingSplits;
146+
147+
public CustomSplitAssigner(CustomInputSplit[] splits) {
148+
this.remainingSplits = new ArrayList<CustomInputSplit>(Arrays.asList(splits));
149+
}
150+
151+
@Override
152+
public InputSplit getNextInputSplit(String host, int taskId) {
153+
synchronized (this) {
154+
int size = remainingSplits.size();
155+
if (size > 0) {
156+
return remainingSplits.remove(size - 1);
157+
} else {
158+
return null;
159+
}
160+
}
161+
}
162+
}
163+
164+
public static class NoOpSink implements SinkFunction<Tuple2<Integer, Double>> {
165+
@Override
166+
public void invoke(Tuple2<Integer, Double> value) throws Exception {
167+
}
168+
}
169+
}

0 commit comments

Comments
 (0)