Skip to content

Commit 24f1897

Browse files
authored
[FLINK-37219][table] Support value state in PTFs
This closes #26133.
1 parent 9ea6182 commit 24f1897

File tree

24 files changed

+1127
-152
lines changed

24 files changed

+1127
-152
lines changed

flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/StateHint.java

+28-5
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@
4040
* support a single state entry at the beginning of an accumulate()/retract() method (i.e. the
4141
* accumulator).
4242
*
43-
* <p>For example, {@code @StateHint(name = "count", type = @DataTypeHint("BIGINT"))} is a state
44-
* entry with the data type BIGINT named "count".
43+
* <p>Because state needs to be mutable for read and write access, only row or structured types
44+
* qualify as a data type for state entries. For example, {@code @StateHint(name = "count", type
45+
* = @DataTypeHint("ROW<count BIGINT>"))} is a state entry with the data type BIGINT named "count".
4546
*
46-
* <p>Note: Usually, a state entry is partitioned by a key and can not be accessed globally. The
47-
* partitioning (or whether it is only a single partition) is defined by the corresponding function
48-
* call.
47+
* <p>Note: A state entry is partitioned by a key and can not be accessed globally. The partitioning
48+
* (or a single partition in case of no partitioning) is defined by the corresponding function call.
4949
*
5050
* @see FunctionHint
5151
*/
@@ -70,4 +70,27 @@
7070
* or provide hints for the reflection-based extraction of the data type.
7171
*/
7272
DataTypeHint type() default @DataTypeHint();
73+
74+
/**
75+
* The time-to-live (TTL) duration that automatically cleans up the state entry.
76+
*
77+
* <p>It specifies a minimum time interval for how long idle state (i.e., state which was not
78+
* updated by a create or write operation) will be retained. State will never be cleared until
79+
* it was idle for less than the minimum time, and will be cleared at some time after it was
80+
* idle.
81+
*
82+
* <p>Use this for being able to efficiently manage an ever-growing state size or for complying
83+
* with data protection requirements.
84+
*
85+
* <p>The cleanup is based on processing time, which effectively corresponds to the wall clock
86+
* time as defined by {@link System#currentTimeMillis()}).
87+
*
88+
* <p>The provided string must use Flink's duration syntax (e.g., "3 days", "45 min", "3 hours",
89+
* "60 s"). If no unit is specified, the value is interpreted as milliseconds. The TTL setting
90+
* on a state entry has higher precedence than the global state TTL configuration for the entire
91+
* pipeline.
92+
*
93+
* @see org.apache.flink.util.TimeUtils#parseDuration(String)
94+
*/
95+
String ttl() default "";
7396
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java

+113-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.table.annotation.ArgumentTrait;
2424
import org.apache.flink.table.annotation.DataTypeHint;
2525
import org.apache.flink.table.annotation.FunctionHint;
26+
import org.apache.flink.table.annotation.StateHint;
2627
import org.apache.flink.table.catalog.DataTypeFactory;
2728
import org.apache.flink.table.types.extraction.TypeInferenceExtractor;
2829
import org.apache.flink.table.types.inference.TypeInference;
@@ -144,7 +145,7 @@
144145
* <pre>{@code
145146
* // Function with explicit table argument type of row
146147
* class MyPTF extends ProcessTableFunction<String> {
147-
* public void eval(Context ctx, @ArgumentHint(value = ArgumentTrait.TABLE_AS_SET, type = "ROW < s STRING >") Row t) {
148+
* public void eval(Context ctx, @ArgumentHint(value = ArgumentTrait.TABLE_AS_SET, type = @DataTypeHint("ROW < s STRING >")) Row t) {
148149
* TableSemantics semantics = ctx.tableSemanticsFor("t");
149150
* // Always returns "ROW < s STRING >"
150151
* semantics.dataType();
@@ -194,6 +195,96 @@
194195
* }
195196
* }</pre>
196197
*
198+
* <h1>State</h1>
199+
*
200+
* <p>A PTF that takes set semantic tables can be stateful. Intermediate results can be buffered,
201+
* cached, aggregated, or simply stored for repeated access. A function can have one or more state
202+
* entries which are managed by the framework. Flink takes care of storing and restoring those
203+
* during failures or restarts (i.e. Flink managed state).
204+
*
205+
* <p>A state entry is partitioned by a key and cannot be accessed globally. The partitioning (or a
206+
* single partition in case of no partitioning) is defined by the corresponding function call. In
207+
* other words: Similar to how a virtual processor has access only to a portion of the entire table,
208+
* a PTF has access only to a portion of the entire state defined by the PARTITION BY clause. In
209+
* Flink, this concept is also known as keyed state.
210+
*
211+
* <p>State entries can be added as a mutable parameter to the eval() method. In order to
212+
* distinguish them from call arguments, they must be declared before any other argument, but after
213+
* an optional {@link Context} parameter. Furthermore, they must be annotated either via {@link
214+
* StateHint} or declared as part of {@link FunctionHint#state()}.
215+
*
216+
* <p>For read and write access, only row or structured types (i.e. POJOs with default constructor)
217+
* qualify as a data type. If no state is present, all fields are set to null (in case of a row
218+
* type) or fields are set to their default value (in case of a structured type). For state
219+
* efficiency, it is recommended to keep all fields nullable.
220+
*
221+
* <pre>{@code
222+
* // a function that counts and stores its intermediate result in the CountState object
223+
* // which will be persisted by Flink
224+
* class CountingFunction extends ProcessTableFunction<String> {
225+
* public static class CountState {
226+
* public long count = 0L;
227+
* }
228+
*
229+
* public void eval(@StateHint CountState memory, @ArgumentHint(TABLE_AS_SET) Row input) {
230+
* memory.count++;
231+
* collect("Seen rows: " + memory.count);
232+
* }
233+
* }
234+
*
235+
* // a function that waits for a second event coming in
236+
* class CountingFunction extends ProcessTableFunction<String> {
237+
* public static class SeenState {
238+
* public String first;
239+
* }
240+
*
241+
* public void eval(@StateHint SeenState memory, @ArgumentHint(TABLE_AS_SET) Row input) {
242+
* if (memory.first == null) {
243+
* memory.first = input.toString();
244+
* } else {
245+
* collect("Event 1: " + memory.first + " and Event 2: " + input.toString());
246+
* }
247+
* }
248+
* }
249+
*
250+
* // a function that uses Row for state
251+
* class CountingFunction extends ProcessTableFunction<String> {
252+
* public void eval(@StateHint(type = @DataTypeHint("ROW < count BIGINT >")) Row memory, @ArgumentHint(TABLE_AS_SET) Row input) {
253+
* Long newCount = 1L;
254+
* if (memory.getField("count") != null) {
255+
* newCount += memory.getFieldAs("count");
256+
* }
257+
* memory.setField("count", newCount);
258+
* collect("Seen rows: " + newCount);
259+
* }
260+
* }
261+
* }</pre>
262+
*
263+
* <h2>Efficiency and Design Principles</h2>
264+
*
265+
* <p>A stateful function also means that data layout and data retention should be well thought
266+
* through. An ever-growing state can happen by an unlimited number of partitions (i.e. an open
267+
* keyspace) or even within a partition. Consider setting a {@link StateHint#ttl()} or call {@link
268+
* Context#clearAllState()} eventually:
269+
*
270+
* <pre>{@code
271+
* // a function that waits for a second event coming in BUT with better state efficiency
272+
* class CountingFunction extends ProcessTableFunction<String> {
273+
* public static class SeenState {
274+
* public String first;
275+
* }
276+
*
277+
* public void eval(Context ctx, @StateHint(ttl = "1 day") SeenState memory, @ArgumentHint(TABLE_AS_SET) Row input) {
278+
* if (memory.first == null) {
279+
* memory.first = input.toString();
280+
* } else {
281+
* collect("Event 1: " + memory.first + " and Event 2: " + input.toString());
282+
* ctx.clearAllState();
283+
* }
284+
* }
285+
* }
286+
* }</pre>
287+
*
197288
* @param <T> The type of the output row. Either an explicit composite type or an atomic type that
198289
* is implicitly wrapped into a row consisting of one field.
199290
*/
@@ -241,8 +332,28 @@ public interface Context {
241332
/**
242333
* Returns additional information about the semantics of a table argument.
243334
*
244-
* @param argName name of the table argument
335+
* @param argName name of the table argument; either reflectively extracted or manually
336+
* defined via {@link ArgumentHint#name()}.
245337
*/
246338
TableSemantics tableSemanticsFor(String argName);
339+
340+
/**
341+
* Clears the given state entry within the virtual partition once the eval() method returns.
342+
*
343+
* <p>Semantically this is equal to setting all fields of the state entry to null shortly
344+
* before the eval() method returns.
345+
*
346+
* @param stateName name of the state entry; either reflectively extracted or manually
347+
* defined via {@link StateHint#name()}.
348+
*/
349+
void clearState(String stateName);
350+
351+
/**
352+
* Clears all state entries within the virtual partition once the eval() method returns.
353+
*
354+
* <p>Semantically this is equal to calling {@link #clearState(String)} on all state
355+
* entries.
356+
*/
357+
void clearAllState();
247358
}
248359
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java

+2
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ public final class UserDefinedFunctionHelper {
9595

9696
public static final String PROCESS_TABLE_EVAL = "eval";
9797

98+
public static final String DEFAULT_ACCUMULATOR_NAME = "acc";
99+
98100
/**
99101
* Tries to infer the TypeInformation of an AggregateFunction's accumulator type.
100102
*

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/BaseMappingExtractor.java

+19-5
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import org.apache.flink.table.catalog.DataTypeFactory;
2828
import org.apache.flink.table.data.RowData;
2929
import org.apache.flink.table.functions.UserDefinedFunction;
30+
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
3031
import org.apache.flink.table.procedures.Procedure;
3132
import org.apache.flink.table.types.CollectionDataType;
3233
import org.apache.flink.table.types.DataType;
3334
import org.apache.flink.table.types.extraction.FunctionResultTemplate.FunctionOutputTemplate;
3435
import org.apache.flink.table.types.extraction.FunctionResultTemplate.FunctionStateTemplate;
36+
import org.apache.flink.table.types.extraction.FunctionResultTemplate.FunctionStateTemplate.StateInfoTemplate;
3537
import org.apache.flink.table.types.inference.StaticArgumentTrait;
3638
import org.apache.flink.types.Row;
3739

@@ -177,8 +179,10 @@ static ResultExtraction createStateFromGenericInClassOrParametersExtraction(
177179
baseClass,
178180
genericPos,
179181
extractor.getFunctionClass());
180-
final LinkedHashMap<String, DataType> state = new LinkedHashMap<>();
181-
state.put("acc", dataType);
182+
final LinkedHashMap<String, StateInfoTemplate> state = new LinkedHashMap<>();
183+
state.put(
184+
UserDefinedFunctionHelper.DEFAULT_ACCUMULATOR_NAME,
185+
StateInfoTemplate.of(dataType, null));
182186
return FunctionResultTemplate.ofState(state);
183187
}
184188
return createStateTemplateFromParameters(extractor, method, stateParameters);
@@ -325,9 +329,19 @@ private static FunctionStateTemplate createStateTemplateFromParameters(
325329
s.pos))
326330
.collect(Collectors.toList());
327331

328-
final LinkedHashMap<String, DataType> state =
329-
IntStream.range(0, dataTypes.size())
330-
.mapToObj(i -> Map.entry(argumentNames[i], dataTypes.get(i)))
332+
final LinkedHashMap<String, StateInfoTemplate> state =
333+
IntStream.range(0, argumentNames.length)
334+
.mapToObj(
335+
i -> {
336+
final DataType dataType = dataTypes.get(i);
337+
final StateHint hint =
338+
stateParameters
339+
.get(i)
340+
.parameter
341+
.getAnnotation(StateHint.class);
342+
return Map.entry(
343+
argumentNames[i], StateInfoTemplate.of(dataType, hint));
344+
})
331345
.collect(
332346
Collectors.toMap(
333347
Map.Entry::getKey,

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java

+26
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,12 @@
2626
import org.apache.flink.table.api.DataTypes;
2727
import org.apache.flink.table.api.ValidationException;
2828
import org.apache.flink.table.catalog.DataTypeFactory;
29+
import org.apache.flink.table.functions.ProcessTableFunction;
2930
import org.apache.flink.table.types.DataType;
31+
import org.apache.flink.table.types.logical.LogicalType;
32+
import org.apache.flink.table.types.logical.LogicalTypeRoot;
3033
import org.apache.flink.table.types.logical.StructuredType;
34+
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
3135

3236
import org.apache.flink.shaded.asm9.org.objectweb.asm.ClassReader;
3337
import org.apache.flink.shaded.asm9.org.objectweb.asm.ClassVisitor;
@@ -401,6 +405,28 @@ public static Class<?> getClassFromType(Type type) {
401405
return (Class<?>) type;
402406
}
403407

408+
/**
409+
* Checks whether the given data type can be used as a state entry for {@link
410+
* ProcessTableFunction}.
411+
*/
412+
public static void checkStateDataType(DataType dataType) {
413+
final LogicalType type = dataType.getLogicalType();
414+
if (!LogicalTypeChecks.isCompositeType(type)) {
415+
throw extractionError(
416+
"State entries must use a mutable, composite data type. But was: %s", dataType);
417+
}
418+
if (type.is(LogicalTypeRoot.ROW)) {
419+
return;
420+
}
421+
if (!hasInvokableConstructor(dataType.getConversionClass())) {
422+
throw extractionError(
423+
"Class '%s' cannot be used as state because a default constructor is missing. "
424+
+ "State entries must provide an argument-less constructor so that all "
425+
+ "fields are mutable.",
426+
dataType.getConversionClass().getName());
427+
}
428+
}
429+
404430
// --------------------------------------------------------------------------------------------
405431
// Methods intended for this package
406432
// --------------------------------------------------------------------------------------------

0 commit comments

Comments
 (0)