Skip to content

Commit

Permalink
[FLINK-36355][runtime] Remove deprecated xxxStreams#with
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub authored and reswqa committed Sep 27, 2024
1 parent 21092bf commit be4549d
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,8 @@ public WithWindow<T1, T2, KEY, W> allowedLateness(@Nullable Duration newLateness
/**
* Completes the co-group operation with the user function that is executed for windowed
* groups.
*
* <p>Note: This method's return type does not support setting an operator-specific
* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
* {@link #with(CoGroupFunction)} method to set an operator-specific parallelism.
*/
public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {
public <T> SingleOutputStreamOperator<T> apply(CoGroupFunction<T1, T2, T> function) {

TypeInformation<T> resultType =
TypeExtractor.getCoGroupReturnTypes(
Expand All @@ -381,30 +377,8 @@ public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {
/**
* Completes the co-group operation with the user function that is executed for windowed
* groups.
*
* <p><b>Note:</b> This is a temporary workaround while the {@link #apply(CoGroupFunction)}
* method has the wrong return type and hence does not allow one to set an operator-specific
* parallelism
*
* @deprecated This method will be removed once the {@link #apply(CoGroupFunction)} method
* is fixed in the next major version of Flink (2.0).
*/
@PublicEvolving
@Deprecated
public <T> SingleOutputStreamOperator<T> with(CoGroupFunction<T1, T2, T> function) {
return (SingleOutputStreamOperator<T>) apply(function);
}

/**
* Completes the co-group operation with the user function that is executed for windowed
* groups.
*
* <p>Note: This method's return type does not support setting an operator-specific
* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
* {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific
* parallelism.
*/
public <T> DataStream<T> apply(
public <T> SingleOutputStreamOperator<T> apply(
CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
// clean the closure
function = input1.getExecutionEnvironment().clean(function);
Expand Down Expand Up @@ -446,24 +420,6 @@ public <T> DataStream<T> apply(
new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
}

/**
* Completes the co-group operation with the user function that is executed for windowed
* groups.
*
* <p><b>Note:</b> This is a temporary workaround while the {@link #apply(CoGroupFunction,
* TypeInformation)} method has the wrong return type and hence does not allow one to set an
* operator-specific parallelism
*
* @deprecated This method will be removed once the {@link #apply(CoGroupFunction,
* TypeInformation)} method is fixed in the next major version of Flink (2.0).
*/
@PublicEvolving
@Deprecated
public <T> SingleOutputStreamOperator<T> with(
CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
return (SingleOutputStreamOperator<T>) apply(function, resultType);
}

/** @deprecated Use {@link #getAllowedLatenessDuration()} */
@Deprecated
@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,8 @@ public WithWindow<T1, T2, KEY, W> allowedLateness(@Nullable Duration newLateness
/**
* Completes the join operation with the user function that is executed for each combination
* of elements with the same key in a window.
*
* <p>Note: This method's return type does not support setting an operator-specific
* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
* {@link #with(JoinFunction)} method to set an operator-specific parallelism.
*/
public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
public <T> SingleOutputStreamOperator<T> apply(JoinFunction<T1, T2, T> function) {
TypeInformation<T> resultType =
TypeExtractor.getBinaryOperatorReturnType(
function,
Expand All @@ -380,30 +376,8 @@ public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
/**
* Completes the join operation with the user function that is executed for each combination
* of elements with the same key in a window.
*
* <p><b>Note:</b> This is a temporary workaround while the {@link #apply(JoinFunction)}
* method has the wrong return type and hence does not allow one to set an operator-specific
* parallelism
*
* @deprecated This method will be removed once the {@link #apply(JoinFunction)} method is
* fixed in the next major version of Flink (2.0).
*/
@PublicEvolving
@Deprecated
public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function) {
return (SingleOutputStreamOperator<T>) apply(function);
}

/**
* Completes the join operation with the user function that is executed for each combination
* of elements with the same key in a window.
*
* <p>Note: This method's return type does not support setting an operator-specific
* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
* {@link #with(JoinFunction, TypeInformation)}, method to set an operator-specific
* parallelism.
*/
public <T> DataStream<T> apply(
public <T> SingleOutputStreamOperator<T> apply(
FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
// clean the closure
function = input1.getExecutionEnvironment().clean(function);
Expand All @@ -424,31 +398,8 @@ public <T> DataStream<T> apply(
/**
* Completes the join operation with the user function that is executed for each combination
* of elements with the same key in a window.
*
* <p><b>Note:</b> This is a temporary workaround while the {@link #apply(JoinFunction,
* TypeInformation)} method has the wrong return type and hence does not allow one to set an
* operator-specific parallelism
*
* @deprecated This method will be replaced by {@link #apply(FlatJoinFunction,
* TypeInformation)} in Flink 2.0. So use the {@link #apply(FlatJoinFunction,
* TypeInformation)} in the future.
*/
@PublicEvolving
@Deprecated
public <T> SingleOutputStreamOperator<T> with(
FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
return (SingleOutputStreamOperator<T>) apply(function, resultType);
}

/**
* Completes the join operation with the user function that is executed for each combination
* of elements with the same key in a window.
*
* <p>Note: This method's return type does not support setting an operator-specific
* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
* {@link #with(FlatJoinFunction)}, method to set an operator-specific parallelism.
*/
public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
public <T> SingleOutputStreamOperator<T> apply(FlatJoinFunction<T1, T2, T> function) {
TypeInformation<T> resultType =
TypeExtractor.getBinaryOperatorReturnType(
function,
Expand All @@ -468,30 +419,8 @@ public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
/**
* Completes the join operation with the user function that is executed for each combination
* of elements with the same key in a window.
*
* <p><b>Note:</b> This is a temporary workaround while the {@link #apply(FlatJoinFunction)}
* method has the wrong return type and hence does not allow one to set an operator-specific
* parallelism.
*
* @deprecated This method will be removed once the {@link #apply(FlatJoinFunction)} method
* is fixed in the next major version of Flink (2.0).
*/
@PublicEvolving
@Deprecated
public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function) {
return (SingleOutputStreamOperator<T>) apply(function);
}

/**
* Completes the join operation with the user function that is executed for each combination
* of elements with the same key in a window.
*
* <p>Note: This method's return type does not support setting an operator-specific
* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
* {@link #with(JoinFunction, TypeInformation)}, method to set an operator-specific
* parallelism.
*/
public <T> DataStream<T> apply(
public <T> SingleOutputStreamOperator<T> apply(
JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
// clean the closure
function = input1.getExecutionEnvironment().clean(function);
Expand All @@ -508,24 +437,6 @@ public <T> DataStream<T> apply(
return coGroupedWindowedStream.apply(new JoinCoGroupFunction<>(function), resultType);
}

/**
* Completes the join operation with the user function that is executed for each combination
* of elements with the same key in a window.
*
* <p><b>Note:</b> This is a temporary workaround while the {@link #apply(FlatJoinFunction,
* TypeInformation)} method has the wrong return type and hence does not allow one to set an
* operator-specific parallelism
*
* @deprecated This method will be removed once the {@link #apply(JoinFunction,
* TypeInformation)} method is fixed in the next major version of Flink (2.0).
*/
@PublicEvolving
@Deprecated
public <T> SingleOutputStreamOperator<T> with(
JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
return (SingleOutputStreamOperator<T>) apply(function, resultType);
}

/** @deprecated Use {@link #getAllowedLatenessDuration()}} */
@VisibleForTesting
@Nullable
Expand Down

0 comments on commit be4549d

Please sign in to comment.