Skip to content

Commit e590b3b

Browse files
committed
update doc
1 parent d6d5fcc commit e590b3b

File tree

14 files changed

+39
-47
lines changed

14 files changed

+39
-47
lines changed

docs/content.zh/docs/dev/datastream/fault-tolerance/state.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>,
160160
}
161161

162162
@Override
163-
public void open(Configuration config) {
163+
public void open(OpenContext ctx) {
164164
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
165165
new ValueStateDescriptor<>(
166166
"average", // the state name

docs/content.zh/docs/learn-flink/etl.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ minutesByStartCell
236236

237237
对其中的每一个接口,Flink 同样提供了一个所谓 "rich" 的变体,如 `RichFlatMapFunction`,其中增加了以下方法,包括:
238238

239-
- `open(Configuration c)`
239+
- `open(OpenContext context)`
240240
- `close()`
241241
- `getRuntimeContext()`
242242

@@ -280,7 +280,7 @@ public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
280280
ValueState<Boolean> keyHasBeenSeen;
281281

282282
@Override
283-
public void open(Configuration conf) {
283+
public void open(OpenContext ctx) {
284284
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
285285
keyHasBeenSeen = getRuntimeContext().getState(desc);
286286
}
@@ -373,7 +373,7 @@ public static class ControlFunction extends RichCoFlatMapFunction<String, String
373373
private ValueState<Boolean> blocked;
374374

375375
@Override
376-
public void open(Configuration config) {
376+
public void open(OpenContext ctx) {
377377
blocked = getRuntimeContext()
378378
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
379379
}

docs/content.zh/docs/learn-flink/event_driven.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public static class PseudoWindow extends
7979

8080
@Override
8181
// 在初始化期间调用一次。
82-
public void open(Configuration conf) {
82+
public void open(OpenContext ctx) {
8383
. . .
8484
}
8585

@@ -126,7 +126,7 @@ public static class PseudoWindow extends
126126
private transient MapState<Long, Float> sumOfTips;
127127

128128
@Override
129-
public void open(Configuration conf) {
129+
public void open(OpenContext ctx) {
130130

131131
MapStateDescriptor<Long, Float> sumDesc =
132132
new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);

docs/content.zh/docs/ops/metrics.md

+7-7
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class MyMapper extends RichMapFunction<String, String> {
5252
private transient Counter counter;
5353

5454
@Override
55-
public void open(Configuration config) {
55+
public void open(OpenContext ctx) {
5656
this.counter = getRuntimeContext()
5757
.getMetricGroup()
5858
.counter("myCounter");
@@ -116,7 +116,7 @@ public class MyMapper extends RichMapFunction<String, String> {
116116
private transient Counter counter;
117117

118118
@Override
119-
public void open(Configuration config) {
119+
public void open(OpenContext ctx) {
120120
this.counter = getRuntimeContext()
121121
.getMetricGroup()
122122
.counter("myCustomCounter", new CustomCounter());
@@ -173,7 +173,7 @@ public class MyMapper extends RichMapFunction<String, String> {
173173
private transient int valueToExpose = 0;
174174

175175
@Override
176-
public void open(Configuration config) {
176+
public void open(OpenContext ctx) {
177177
getRuntimeContext()
178178
.getMetricGroup()
179179
.gauge("MyGauge", new Gauge<Integer>() {
@@ -247,7 +247,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
247247
private transient Histogram histogram;
248248

249249
@Override
250-
public void open(Configuration config) {
250+
public void open(OpenContext ctx) {
251251
this.histogram = getRuntimeContext()
252252
.getMetricGroup()
253253
.histogram("myHistogram", new MyHistogram());
@@ -307,7 +307,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
307307
private transient Histogram histogram;
308308

309309
@Override
310-
public void open(Configuration config) {
310+
public void open(OpenContext ctx) {
311311
com.codahale.metrics.Histogram dropwizardHistogram =
312312
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
313313

@@ -366,7 +366,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
366366
private transient Meter meter;
367367

368368
@Override
369-
public void open(Configuration config) {
369+
public void open(OpenContext ctx) {
370370
this.meter = getRuntimeContext()
371371
.getMetricGroup()
372372
.meter("myMeter", new MyMeter());
@@ -440,7 +440,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
440440
private transient Meter meter;
441441

442442
@Override
443-
public void open(Configuration config) {
443+
public void open(OpenContext ctx) {
444444
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
445445

446446
this.meter = getRuntimeContext()

docs/content/docs/dev/datastream/fault-tolerance/state.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>,
192192
}
193193

194194
@Override
195-
public void open(Configuration config) {
195+
public void open(OpenContext ctx) {
196196
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
197197
new ValueStateDescriptor<>(
198198
"average", // the state name

docs/content/docs/learn-flink/etl.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ Abstract Method pattern.
274274
For each of these interfaces, Flink also provides a so-called "rich" variant, e.g.,
275275
`RichFlatMapFunction`, which has some additional methods, including:
276276

277-
- `open(Configuration c)`
277+
- `open(OpenContext context)`
278278
- `close()`
279279
- `getRuntimeContext()`
280280

@@ -329,7 +329,7 @@ public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
329329
ValueState<Boolean> keyHasBeenSeen;
330330

331331
@Override
332-
public void open(Configuration conf) {
332+
public void open(OpenContext ctx) {
333333
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
334334
keyHasBeenSeen = getRuntimeContext().getState(desc);
335335
}
@@ -447,7 +447,7 @@ public static class ControlFunction extends RichCoFlatMapFunction<String, String
447447
private ValueState<Boolean> blocked;
448448

449449
@Override
450-
public void open(Configuration config) {
450+
public void open(OpenContext ctx) {
451451
blocked = getRuntimeContext()
452452
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
453453
}

docs/content/docs/learn-flink/event_driven.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public static class PseudoWindow extends
7575

7676
@Override
7777
// Called once during initialization.
78-
public void open(Configuration conf) {
78+
public void open(OpenContext ctx) {
7979
. . .
8080
}
8181

@@ -116,7 +116,7 @@ Things to be aware of:
116116
private transient MapState<Long, Float> sumOfTips;
117117

118118
@Override
119-
public void open(Configuration conf) {
119+
public void open(OpenContext ctx) {
120120

121121
MapStateDescriptor<Long, Float> sumDesc =
122122
new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);

docs/content/docs/ops/metrics.md

+7-7
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class MyMapper extends RichMapFunction<String, String> {
5252
private transient Counter counter;
5353

5454
@Override
55-
public void open(Configuration config) {
55+
public void open(OpenContext ctx) {
5656
this.counter = getRuntimeContext()
5757
.getMetricGroup()
5858
.counter("myCounter");
@@ -116,7 +116,7 @@ public class MyMapper extends RichMapFunction<String, String> {
116116
private transient Counter counter;
117117

118118
@Override
119-
public void open(Configuration config) {
119+
public void open(OpenContext ctx) {
120120
this.counter = getRuntimeContext()
121121
.getMetricGroup()
122122
.counter("myCustomCounter", new CustomCounter());
@@ -173,7 +173,7 @@ public class MyMapper extends RichMapFunction<String, String> {
173173
private transient int valueToExpose = 0;
174174

175175
@Override
176-
public void open(Configuration config) {
176+
public void open(OpenContext ctx) {
177177
getRuntimeContext()
178178
.getMetricGroup()
179179
.gauge("MyGauge", new Gauge<Integer>() {
@@ -247,7 +247,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
247247
private transient Histogram histogram;
248248

249249
@Override
250-
public void open(Configuration config) {
250+
public void open(OpenContext ctx) {
251251
this.histogram = getRuntimeContext()
252252
.getMetricGroup()
253253
.histogram("myHistogram", new MyHistogram());
@@ -307,7 +307,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
307307
private transient Histogram histogram;
308308

309309
@Override
310-
public void open(Configuration config) {
310+
public void open(OpenContext ctx) {
311311
com.codahale.metrics.Histogram dropwizardHistogram =
312312
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
313313

@@ -366,7 +366,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
366366
private transient Meter meter;
367367

368368
@Override
369-
public void open(Configuration config) {
369+
public void open(OpenContext ctx) {
370370
this.meter = getRuntimeContext()
371371
.getMetricGroup()
372372
.meter("myMeter", new MyMeter());
@@ -440,7 +440,7 @@ public class MyMapper extends RichMapFunction<Long, Long> {
440440
private transient Meter meter;
441441

442442
@Override
443-
public void open(Configuration config) {
443+
public void open(OpenContext ctx) {
444444
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
445445

446446
this.meter = getRuntimeContext()

flink-core/src/main/java/org/apache/flink/api/common/functions/BroadcastVariableInitializer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
*
4242
* private Map<Long, String> map;
4343
*
44-
* public void open(Configuration cfg) throws Exception {
44+
* public void open(OpenContext ctx) throws Exception {
4545
* getRuntimeContext().getBroadcastVariableWithInitializer("mapvar",
4646
* new BroadcastVariableInitializer<Tuple2<Long, String>, Map<Long, String>>() {
4747
*

flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java

-8
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,6 @@ public interface RichFunction extends Function {
5454
* }
5555
* }</pre>
5656
*
57-
* <p>By default, this method does nothing.
58-
*
59-
* <p>1. If you implement {@code open(OpenContext openContext)}, the {@code open(OpenContext
60-
* openContext)} will be invoked and the {@code open(Configuration parameters)} won't be
61-
* invoked. 2. If you don't implement {@code open(OpenContext openContext)}, the {@code
62-
* open(Configuration parameters)} will be invoked in the default implementation of the {@code
63-
* open(OpenContext openContext)}.
64-
*
6557
* @param openContext The context containing information about the context in which the function
6658
* is opened.
6759
* @throws Exception Implementations may forward exceptions, which are caught by the runtime.

flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ <T, C> C getBroadcastVariableWithInitializer(
232232
*
233233
* private ValueState<Long> state;
234234
*
235-
* public void open(Configuration cfg) {
235+
* public void open(OpenContext ctx) {
236236
* state = getRuntimeContext().getState(
237237
* new ValueStateDescriptor<Long>("count", LongSerializer.INSTANCE, 0L));
238238
* }
@@ -269,7 +269,7 @@ <T, C> C getBroadcastVariableWithInitializer(
269269
*
270270
* private ListState<MyType> state;
271271
*
272-
* public void open(Configuration cfg) {
272+
* public void open(OpenContext ctx) {
273273
* state = getRuntimeContext().getListState(
274274
* new ListStateDescriptor<>("myState", MyType.class));
275275
* }
@@ -310,7 +310,7 @@ <T, C> C getBroadcastVariableWithInitializer(
310310
*
311311
* private ReducingState<Long> state;
312312
*
313-
* public void open(Configuration cfg) {
313+
* public void open(OpenContext ctx) {
314314
* state = getRuntimeContext().getReducingState(
315315
* new ReducingStateDescriptor<>("sum", (a, b) -> a + b, Long.class));
316316
* }
@@ -348,7 +348,7 @@ <T, C> C getBroadcastVariableWithInitializer(
348348
*
349349
* private AggregatingState<MyType, Long> state;
350350
*
351-
* public void open(Configuration cfg) {
351+
* public void open(OpenContext ctx) {
352352
* state = getRuntimeContext().getAggregatingState(
353353
* new AggregatingStateDescriptor<>("sum", aggregateFunction, Long.class));
354354
* }
@@ -388,7 +388,7 @@ <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
388388
*
389389
* private MapState<MyType, Long> state;
390390
*
391-
* public void open(Configuration cfg) {
391+
* public void open(OpenContext ctx) {
392392
* state = getRuntimeContext().getMapState(
393393
* new MapStateDescriptor<>("sum", MyType.class, Long.class));
394394
* }

flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public interface KeyedStateStore {
4646
*
4747
* private ValueState<Long> state;
4848
*
49-
* public void open(Configuration cfg) {
49+
* public void open(OpenContext ctx) {
5050
* state = getRuntimeContext().getState(
5151
* new ValueStateDescriptor<Long>("count", LongSerializer.INSTANCE, 0L));
5252
* }
@@ -83,7 +83,7 @@ public interface KeyedStateStore {
8383
*
8484
* private ListState<MyType> state;
8585
*
86-
* public void open(Configuration cfg) {
86+
* public void open(OpenContext ctx) {
8787
* state = getRuntimeContext().getListState(
8888
* new ListStateDescriptor<>("myState", MyType.class));
8989
* }
@@ -124,7 +124,7 @@ public interface KeyedStateStore {
124124
*
125125
* private ReducingState<Long> state;
126126
*
127-
* public void open(Configuration cfg) {
127+
* public void open(OpenContext ctx) {
128128
* state = getRuntimeContext().getReducingState(
129129
* new ReducingStateDescriptor<>("sum", (a, b) -> a + b, Long.class));
130130
* }
@@ -162,7 +162,7 @@ public interface KeyedStateStore {
162162
*
163163
* private AggregatingState<MyType, Long> state;
164164
*
165-
* public void open(Configuration cfg) {
165+
* public void open(OpenContext ctx) {
166166
* state = getRuntimeContext().getAggregatingState(
167167
* new AggregatingStateDescriptor<>("sum", aggregateFunction, Long.class));
168168
* }
@@ -202,7 +202,7 @@ <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
202202
*
203203
* private MapState<MyType, Long> state;
204204
*
205-
* public void open(Configuration cfg) {
205+
* public void open(OpenContext ctx) {
206206
* state = getRuntimeContext().getMapState(
207207
* new MapStateDescriptor<>("sum", MyType.class, Long.class));
208208
* }

flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1496,7 +1496,7 @@ public static void openUserCode(Function stub, Configuration parameters) throws
14961496
FunctionUtils.openFunction(stub, DefaultOpenContext.INSTANCE);
14971497
} catch (Throwable t) {
14981498
throw new Exception(
1499-
"The user defined 'open(Configuration)' method in "
1499+
"The user defined 'open(OpenContext)' method in "
15001500
+ stub.getClass().toString()
15011501
+ " caused an exception: "
15021502
+ t.getMessage(),

flink-runtime/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@
131131
*
132132
* private ValueState<Long> count;
133133
*
134-
* public void open(Configuration cfg) throws Exception {
134+
* public void open(OpenContext ctx) throws Exception {
135135
* count = getRuntimeContext().getState(new ValueStateDescriptor<>("myCount", Long.class));
136136
* }
137137
*

0 commit comments

Comments
 (0)