Skip to content

Commit da0c758

Browse files
committed
[FLINK-36366][core] Remove deprecate API in flink-core exclude connector and state part
1 parent 7e7fd7b commit da0c758

File tree

99 files changed

+290
-1166
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

99 files changed

+290
-1166
lines changed

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ private static OutputFormat<RowData> createBulkWriterOutputFormat(
429429
public void configure(Configuration parameters) {}
430430

431431
@Override
432-
public void open(int taskNumber, int numTasks) throws IOException {
432+
public void open(InitializationContext context) throws IOException {
433433
this.stream = path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE);
434434
this.writer = factory.create(stream);
435435
}
@@ -460,7 +460,7 @@ private static OutputFormat<RowData> createEncoderOutputFormat(
460460
public void configure(Configuration parameters) {}
461461

462462
@Override
463-
public void open(int taskNumber, int numTasks) throws IOException {
463+
public void open(InitializationContext context) throws IOException {
464464
this.output = path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE);
465465
}
466466

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriter.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,23 @@ OutputFormat<T> createNewOutputFormat(Path path) throws IOException {
5858
OutputFormat<T> format = factory.createOutputFormat(path);
5959
format.configure(conf);
6060
// Here we just think of it as a single file format, so there can only be a single task.
61-
format.open(0, 1);
61+
format.open(
62+
new OutputFormat.InitializationContext() {
63+
@Override
64+
public int getNumTasks() {
65+
return 0;
66+
}
67+
68+
@Override
69+
public int getTaskNumber() {
70+
return 1;
71+
}
72+
73+
@Override
74+
public int getAttemptNumber() {
75+
return 0;
76+
}
77+
});
6278
return format;
6379
}
6480
}

flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/testutils/TestingFileSystem.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.core.fs.FSDataOutputStream;
2424
import org.apache.flink.core.fs.FileStatus;
2525
import org.apache.flink.core.fs.FileSystem;
26-
import org.apache.flink.core.fs.FileSystemKind;
2726
import org.apache.flink.core.fs.Path;
2827

2928
import javax.annotation.Nullable;
@@ -254,12 +253,6 @@ public boolean rename(Path src, Path dst) throws IOException {
254253
public boolean isDistributedFS() {
255254
return false;
256255
}
257-
258-
@Override
259-
public FileSystemKind getKind() {
260-
return FileSystemKind.FILE_SYSTEM;
261-
}
262-
263256
// ------------------------------------------------------------------------
264257

265258
/** Test implementation of a {@link FileStatus}. */

flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionWriterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ void after() {
7171
public void configure(Configuration parameters) {}
7272

7373
@Override
74-
public void open(int taskNumber, int numTasks) {
74+
public void open(InitializationContext context) {
7575
records.put(getKey(), new ArrayList<>());
7676
}
7777

flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveOutputFormatFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private HiveOutputFormat(RecordWriter recordWriter, Function<Row, Writable> rowC
6262
public void configure(Configuration parameters) {}
6363

6464
@Override
65-
public void open(int taskNumber, int numTasks) {}
65+
public void open(InitializationContext context) {}
6666

6767
@Override
6868
public void writeRecord(Row record) throws IOException {

flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void configure(Configuration parameters) {
113113
* @throws java.io.IOException
114114
*/
115115
@Override
116-
public void open(int taskNumber, int numTasks) throws IOException {
116+
public void open(InitializationContext context) throws IOException {
117117

118118
// enforce sequential open() calls
119119
synchronized (OPEN_MUTEX) {
@@ -178,7 +178,7 @@ public void close() throws IOException {
178178
}
179179

180180
@Override
181-
public void finalizeGlobal(int parallelism) throws IOException {
181+
public void finalizeGlobal(FinalizationContext context) throws IOException {
182182

183183
try {
184184
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());

flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,10 @@ public void configure(Configuration parameters) {
9898
/**
9999
* create the temporary output file for hadoop RecordWriter.
100100
*
101-
* @param taskNumber The number of the parallel instance.
102-
* @param numTasks The number of parallel tasks.
103101
* @throws java.io.IOException
104102
*/
105103
@Override
106-
public void open(int taskNumber, int numTasks) throws IOException {
104+
public void open(InitializationContext context) throws IOException {
107105

108106
// enforce sequential open() calls
109107
synchronized (OPEN_MUTEX) {
@@ -207,7 +205,7 @@ public void close() throws IOException {
207205
}
208206

209207
@Override
210-
public void finalizeGlobal(int parallelism) throws IOException {
208+
public void finalizeGlobal(FinalizationContext context) throws IOException {
211209

212210
JobContext jobContext;
213211
TaskAttemptContext taskContext;

flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.flink.api.java.hadoop.mapred;
2020

21+
import org.apache.flink.api.common.io.FinalizeOnMaster;
22+
import org.apache.flink.api.common.io.FirstAttemptInitializationContext;
2123
import org.apache.flink.api.java.tuple.Tuple2;
2224

2325
import org.apache.hadoop.conf.Configurable;
@@ -61,7 +63,7 @@ void testOpen() throws Exception {
6163
HadoopOutputFormat<String, Long> outputFormat =
6264
new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
6365

64-
outputFormat.open(1, 1);
66+
outputFormat.open(FirstAttemptInitializationContext.of(1, 1));
6567

6668
verify(jobConf, times(2)).getOutputCommitter();
6769
verify(outputCommitter, times(1)).setupJob(any(JobContext.class));
@@ -163,7 +165,18 @@ void testFinalizeGlobal() throws Exception {
163165
HadoopOutputFormat<String, Long> outputFormat =
164166
new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
165167

166-
outputFormat.finalizeGlobal(1);
168+
outputFormat.finalizeGlobal(
169+
new FinalizeOnMaster.FinalizationContext() {
170+
@Override
171+
public int getParallelism() {
172+
return 1;
173+
}
174+
175+
@Override
176+
public int getFinishedAttempt(int subtaskIndex) {
177+
return 0;
178+
}
179+
});
167180

168181
verify(outputCommitter, times(1)).commitJob(any(JobContext.class));
169182
}

flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.flink.api.java.hadoop.mapreduce;
2020

21+
import org.apache.flink.api.common.io.FinalizeOnMaster;
22+
import org.apache.flink.api.common.io.FirstAttemptInitializationContext;
2123
import org.apache.flink.api.java.tuple.Tuple2;
2224

2325
import org.apache.hadoop.conf.Configurable;
@@ -80,7 +82,7 @@ void testOpen() throws Exception {
8082
setupOutputCommitter(true),
8183
new Configuration());
8284

83-
hadoopOutputFormat.open(1, 4);
85+
hadoopOutputFormat.open(FirstAttemptInitializationContext.of(1, 4));
8486

8587
verify(hadoopOutputFormat.outputCommitter, times(1)).setupJob(any(JobContext.class));
8688
verify(hadoopOutputFormat.mapreduceOutputFormat, times(1))
@@ -152,7 +154,18 @@ void testFinalizedGlobal() throws Exception {
152154
null,
153155
new Configuration());
154156

155-
hadoopOutputFormat.finalizeGlobal(1);
157+
hadoopOutputFormat.finalizeGlobal(
158+
new FinalizeOnMaster.FinalizationContext() {
159+
@Override
160+
public int getParallelism() {
161+
return 1;
162+
}
163+
164+
@Override
165+
public int getFinishedAttempt(int subtaskIndex) {
166+
return 0;
167+
}
168+
});
156169

157170
verify(hadoopOutputFormat.outputCommitter, times(1)).commitJob(any(JobContext.class));
158171
}

flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
public class ArchivedExecutionConfig implements Serializable {
3434

3535
private static final long serialVersionUID = 2907040336948181163L;
36-
37-
private final String executionMode;
3836
private final String restartStrategyDescription;
3937
private final int parallelism;
4038
private final int maxParallelism;
@@ -43,7 +41,6 @@ public class ArchivedExecutionConfig implements Serializable {
4341
private final Map<String, String> globalJobParameters;
4442

4543
public ArchivedExecutionConfig(ExecutionConfig ec) {
46-
executionMode = ec.getExecutionMode().name();
4744
restartStrategyDescription =
4845
RestartStrategyDescriptionUtils.getRestartStrategyDescription(ec.toConfiguration());
4946

@@ -59,14 +56,12 @@ public ArchivedExecutionConfig(ExecutionConfig ec) {
5956
}
6057

6158
public ArchivedExecutionConfig(
62-
String executionMode,
6359
String restartStrategyDescription,
6460
int maxParallelism,
6561
int parallelism,
6662
boolean objectReuseEnabled,
6763
long periodicMaterializeIntervalMillis,
6864
Map<String, String> globalJobParameters) {
69-
this.executionMode = executionMode;
7065
this.restartStrategyDescription = restartStrategyDescription;
7166
this.maxParallelism = maxParallelism;
7267
this.parallelism = parallelism;
@@ -75,10 +70,6 @@ public ArchivedExecutionConfig(
7570
this.globalJobParameters = globalJobParameters;
7671
}
7772

78-
public String getExecutionMode() {
79-
return executionMode;
80-
}
81-
8273
public String getRestartStrategyDescription() {
8374
return restartStrategyDescription;
8475
}

flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -452,32 +452,6 @@ public ExecutionMode getExecutionMode() {
452452
return configuration.get(EXECUTION_MODE);
453453
}
454454

455-
/**
456-
* This method is deprecated. It was used to set the {@link InputDependencyConstraint} utilized
457-
* by the old scheduler implementations which got removed as part of FLINK-20589. The current
458-
* implementation has no effect.
459-
*
460-
* @param ignored Ignored parameter.
461-
* @deprecated due to the deprecation of {@code InputDependencyConstraint}.
462-
*/
463-
@PublicEvolving
464-
@Deprecated
465-
public void setDefaultInputDependencyConstraint(InputDependencyConstraint ignored) {}
466-
467-
/**
468-
* This method is deprecated. It was used to return the {@link InputDependencyConstraint}
469-
* utilized by the old scheduler implementations. These implementations were removed as part of
470-
* FLINK-20589.
471-
*
472-
* @return The previous default constraint {@link InputDependencyConstraint#ANY}.
473-
* @deprecated due to the deprecation of {@code InputDependencyConstraint}.
474-
*/
475-
@PublicEvolving
476-
@Deprecated
477-
public InputDependencyConstraint getDefaultInputDependencyConstraint() {
478-
return InputDependencyConstraint.ANY;
479-
}
480-
481455
/**
482456
* Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO. In
483457
* some cases this might be preferable. For example, when using interfaces with subclasses that
@@ -1000,16 +974,6 @@ public String toString() {
1000974
+ '}';
1001975
}
1002976

1003-
/**
1004-
* This method simply checks whether the object is an {@link ExecutionConfig} instance.
1005-
*
1006-
* @deprecated It is not intended to be used by users.
1007-
*/
1008-
@Deprecated
1009-
public boolean canEqual(Object obj) {
1010-
return obj instanceof ExecutionConfig;
1011-
}
1012-
1013977
@Override
1014978
@Internal
1015979
public ArchivedExecutionConfig archive() {

flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java

Lines changed: 0 additions & 39 deletions
This file was deleted.

flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.flink.api.common;
2020

2121
import org.apache.flink.annotation.Public;
22-
import org.apache.flink.annotation.PublicEvolving;
2322
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
2423
import org.apache.flink.util.OptionalFailure;
2524

@@ -136,42 +135,4 @@ public String toString() {
136135

137136
return result.toString();
138137
}
139-
140-
/**
141-
* Gets the accumulator with the given name as an integer.
142-
*
143-
* @param accumulatorName Name of the counter
144-
* @return Result of the counter, or null if the counter does not exist
145-
* @throws java.lang.ClassCastException Thrown, if the accumulator was not aggregating a {@link
146-
* java.lang.Integer}
147-
* @deprecated Will be removed in future versions. Use {@link #getAccumulatorResult} instead.
148-
*/
149-
@Deprecated
150-
@PublicEvolving
151-
public Integer getIntCounterResult(String accumulatorName) {
152-
Object result = this.accumulatorResults.get(accumulatorName).getUnchecked();
153-
if (result == null) {
154-
return null;
155-
}
156-
if (!(result instanceof Integer)) {
157-
throw new ClassCastException(
158-
"Requested result of the accumulator '"
159-
+ accumulatorName
160-
+ "' should be Integer but has type "
161-
+ result.getClass());
162-
}
163-
return (Integer) result;
164-
}
165-
166-
/**
167-
* Returns a dummy object for wrapping a JobSubmissionResult.
168-
*
169-
* @param result The SubmissionResult
170-
* @return a JobExecutionResult
171-
* @deprecated Will be removed in future versions.
172-
*/
173-
@Deprecated
174-
public static JobExecutionResult fromJobSubmissionResult(JobSubmissionResult result) {
175-
return new JobExecutionResult(result.getJobID(), -1, null);
176-
}
177138
}

0 commit comments

Comments
 (0)