Skip to content

Commit a498cf2

Browse files
authored
[FLINK-37205][python] Correct the state cache behavior during bump beam version (#26058)
1 parent aef8c86 commit a498cf2

File tree

1 file changed

+34
-2
lines changed

1 file changed

+34
-2
lines changed

flink-python/pyflink/fn_execution/beam/beam_sdk_worker_main.py

+34-2
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,53 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
################################################################################
18+
import re
1819
import sys
1920

2021
# force to register the operations to SDK Harness
22+
from apache_beam.options.pipeline_options import DebugOptions, PipelineOptions
23+
2124
import pyflink.fn_execution.beam.beam_operations # noqa # pylint: disable=unused-import
2225

2326
# force to register the coders to SDK Harness
2427
import pyflink.fn_execution.beam.beam_coders # noqa # pylint: disable=unused-import
2528

26-
import apache_beam.runners.worker.sdk_worker_main
29+
import apache_beam
2730

2831
# disable bundle processor shutdown
29-
from apache_beam.runners.worker import sdk_worker
32+
from apache_beam.runners.worker import sdk_worker, sdk_worker_main, statecache
3033
sdk_worker.DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S = 86400 * 30
3134

3235

36+
# Currently PyFlink only support count-based state cache strategy
37+
def get_deep_size(*objs):
38+
return 1
39+
40+
41+
def get_state_cache_size(options):
42+
"""
43+
Return the maximum size of state cache in count.
44+
"""
45+
if isinstance(options, PipelineOptions):
46+
experiments = options.view_as(DebugOptions).experiments or []
47+
else:
48+
experiments = options
49+
50+
for experiment in experiments:
51+
# There should only be 1 match so returning from the loop
52+
if re.match(r'state_cache_size=', experiment):
53+
return int(
54+
re.match(r'state_cache_size=(?P<state_cache_size>.*)',
55+
experiment).group('state_cache_size'))
56+
return 0
57+
58+
59+
statecache.get_deep_size = get_deep_size
60+
sdk_worker_main._get_state_cache_size = get_state_cache_size
61+
# since Beam 2.52.0, _get_state_cache_size is renamed to _get_state_cache_size_bytes
62+
sdk_worker_main._get_state_cache_size_bytes = get_state_cache_size
63+
64+
3365
def print_to_logging(logging_func, msg, *args, **kwargs):
3466
if msg != '\n':
3567
logging_func(msg, *args, **kwargs)

0 commit comments

Comments
 (0)