From dc4518fec6eb86fff38db7a59359a0239c95cc77 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Sun, 9 Feb 2025 23:58:52 +0800 Subject: [PATCH] [FLINK-36439][docs] Documents for Disaggregated State --- .../datastream/fault-tolerance/state_v2.md | 4 +- .../docs/ops/state/disaggregated_state.md | 193 ++++++++++++++++++ .../datastream/fault-tolerance/state_v2.md | 4 +- .../docs/ops/state/disaggregated_state.md | 193 ++++++++++++++++++ docs/content/docs/ops/state/state_backends.md | 2 +- 5 files changed, 393 insertions(+), 3 deletions(-) create mode 100644 docs/content.zh/docs/ops/state/disaggregated_state.md create mode 100644 docs/content/docs/ops/state/disaggregated_state.md diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/state_v2.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/state_v2.md index 6ef16802f8010..ee46a2ab8ae06 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/state_v2.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/state_v2.md @@ -35,7 +35,9 @@ to learn about the concepts behind stateful stream processing. The new state API is designed to be more flexible than the previous API. User can perform asynchronous state operations, thus making it more powerful and more efficient. The asynchronous state access is essential for the state backend to be able to handle -large state sizes and to be able to spill to remote file systems when necessary. +large state sizes and to be able to spill to remote file systems when necessary. +This is called the 'disaggregated state management'. For more information about this, +please see [Disaggregated State Management]({{< ref "docs/ops/state/disaggregated_state" >}}). ## Keyed DataStream diff --git a/docs/content.zh/docs/ops/state/disaggregated_state.md b/docs/content.zh/docs/ops/state/disaggregated_state.md new file mode 100644 index 0000000000000..146de00094a29 --- /dev/null +++ b/docs/content.zh/docs/ops/state/disaggregated_state.md @@ -0,0 +1,193 @@ +--- +title: "Disaggregated State Management" +weight: 20 +type: docs +aliases: + - /ops/state/disaggregated_state.html + - /apis/streaming/disaggregated_state.html +--- + + +# Disaggregated State Management + +## Overview + +For the first ten years of Flink, the state management is based on memory or local disk of the TaskManager. +This approach works well for most use cases, but it has some limitations: + * **Local Disk Constraints**: The state size is limited by the memory or disk size of the TaskManager. + * **Spiky Resource Usage**: The local state model triggers periodic CPU and network I/O bursts during checkpointing or SST files compaction. + * **Heavy Recovery**: State needs to be downloaded during recovery. The recovery time is +proportional to the state size, which can be slow for large state sizes. + +In Flink 2.0, we introduced the disaggregated state management. This feature allows users to store +the state in external storage systems like S3, HDFS, etc. This is useful when the state size +is extremely large. It could be used to store the state in a more cost-effective way, or to +persist or recovery the state in a more lightweight way. The benefits of disaggregated state management are: + * **Unlimited State Size**: The state size is only limited by the external storage system. + * **Stable Resource Usage**: The state is stored in external storage, thus the checkpoint could be very lightweight. +And the SST files compaction could be done remotely (TODO). + * **Fast Recovery**: No need to download the state during recovery. The recovery time is +independent of the state size. + * **Flexible**: Users can easily choose different external storage systems or I/O performance levels, +or scale the storage based on their requirements without change their hardware. + * **Cost-effective**: External storage are usually cheaper than local disk. Users can flexibly +adjust computing resources and storage resources independently if there is any bottleneck. + +The disaggregated state management contains three parts: + * **ForSt State Backend**: A state backend that stores the state in external storage systems. It +can also leverage the local disk for caching and buffering. The asynchronous I/O model is used to +read and write the state. For more details, see [ForSt State Backend]({{< ref "docs/ops/state/state_backends#the-forststatebackend" >}}). + * **New State APIs**: The new state APIs (State V2) are introduced to perform asynchronous state +reads and writes, which is essential for overcoming the high network latency when accessing +the disaggregated state. For more details, see [New State APIs]({{< ref "docs/dev/datastream/fault-tolerance/state_v2" >}}). + * **SQL Support**: Many SQL operators are rewritten to support the disaggregated state management +and asynchronous state access. User can easily enable these by setting the configuration. + +{{ }} +Disaggregated state and asynchronous state access are encouraged for large state. However, when +the state size is small, the local state management with synchronous state access is a better +choice. +{{ }} + +{{ }} +The disaggregated state management is still in experimental state. We are working on improving +the performance and stability of this feature. The APIs and configurations may change in future +release. +{{ }} + +## Quick Start + +### For SQL Jobs + +To enable the disaggregated state management in SQL jobs, you can set the following configurations: +```yaml +state.backend.type: forst +table.exec.async-state.enabled: true + +# enable checkpoints, checkpoint directory is required +execution.checkpointing.incremental: true +execution.checkpointing.dir: s3://your-bucket/flink-checkpoints + +# We don't support the mini-batch and two-phase aggregation in asynchronous state access yet. +table.exec.mini-batch.enabled: false +table.optimizer.agg-phase-strategy: ONE_PHASE +``` +Thus, you could leverage the disaggregated state management and asynchronous state access in +your SQL jobs. We haven't implemented the full support for the asynchronous state access +in SQL yet. If the SQL operators you are using are not supported, the operator will fall back +to the synchronous state implementation automatically. The performance may not be optimal in +this case. The supported stateful operators are: + - Rank (Top1, Append TopN) + - Row Time Deduplicate + - Aggregate (without distinct) + - Join + - Window Join + - Tumble / Hop / Cumulative Window Aggregate + + +### For DataStream Jobs + +To enable the disaggregated state management in DataStream jobs, firstly you should use +the `ForStStateBackend`. Configure via code in per-job mode: +```java +Configuration config = new Configuration(); +config.set(StateBackendOptions.STATE_BACKEND, "forst"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "s3://your-bucket/flink-checkpoints"); +config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); +env.configure(config); +``` +Or configure via `config.yaml`: +```yaml +state.backend.type: forst + +# enable checkpoints, checkpoint directory is required +execution.checkpointing.incremental: true +execution.checkpointing.dir: s3://your-bucket/flink-checkpoints +``` + +Then, you should write your datastream jobs with the new state APIs. For more +details, see [State V2]({{< ref "docs/dev/datastream/fault-tolerance/state_v2" >}}). + +## Advanced Tuning Options + +### Tuning ForSt State Backend + +The `ForStStateBackend` has many configurations to tune the performance. +The design of ForSt is very similar to RocksDB, and the configurable options are almost the same, +so you can refer to [large state tuning]({{< ref "docs/ops/state/large_state_tuning#tuning-rocksdb-or-forst" >}}) +to tune the ForSt state backend. + +Besides that, the following sections introduce some unique configurations for ForSt. + +#### ForSt Primary Storage Location + +By default, ForSt stores the state in the checkpoint directory. In this case, +ForSt could perform lightweight checkpoints and fast recovery. However, users may +want to store the state in a different location, e.g., a different bucket in S3. +You can set the following configuration to specify the primary storage location: +```yaml +state.backend.forst.primary-dir: s3://your-bucket/forst-state +``` + +**Note**: If you set this configuration, you may not be able to leverage the lightweight +checkpoint and fast recovery, since the ForSt will perform file copy between the primary +storage location and the checkpoint directory during checkpointing and recovery. + + +#### ForSt File Cache + +ForSt uses the local disk for caching and buffering. The granularity of the cache is whole file. +This is enabled by default, except when the primary storage location is set to local. +There are two capacity limit policies for the cache: + - Size-based: The cache will evict the oldest files when the cache size exceeds the limit. + - Reserved-based: The cache will evict the oldest files when the reserved space on disk +(the disk where cache directory is) is not enough. +Corresponding configurations are: +```yaml +state.backend.forst.cache.size-based-limit: 1GB +state.backend.forst.cache.reserve-size: 10GB +``` +Those can take effect together. If so, the cache will evict the oldest files when the cache +size exceeds either the size-based limit or the reserved size limit. + +One can also specify the cache directory via: +```yaml +state.backend.forst.cache.dir: /tmp/forst-cache +``` + +#### ForSt Asynchronous Threads + +ForSt uses asynchronous I/O to read and write the state. There are three types of threads: + - Coordinator thread: The thread that coordinates the asynchronous read and write. + - Read thread: The thread that reads the state asynchronously. + - Write thread: The thread that writes the state asynchronously. + +The number of asynchronous threads is configurable. Typically, you don't need to adjust these +values since the default values are good enough for most cases. +In case for special needs, you can set the following configuration to specify the number of +asynchronous threads: + - `state.backend.forst.executor.read-io-parallelism`: The number of asynchronous threads for read. Default is 3. + - `state.backend.forst.executor.write-io-parallelism`: The number of asynchronous threads for write. Default is 1. + - `state.backend.forst.executor.inline-write`: Whether to inline the write operation in the coordinator thread. +Default is true. Setting this to false will raise the CPU usage. + - `state.backend.forst.executor.inline-coordinator`: Whether to let task thread be the coordinator thread. +Default is true. Setting this to false will raise the CPU usage. + +{{ }} diff --git a/docs/content/docs/dev/datastream/fault-tolerance/state_v2.md b/docs/content/docs/dev/datastream/fault-tolerance/state_v2.md index d1abd47ae49ee..a63ae36c001ef 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/state_v2.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/state_v2.md @@ -35,7 +35,9 @@ to learn about the concepts behind stateful stream processing. The new state API is designed to be more flexible than the previous API. User can perform asynchronous state operations, thus making it more powerful and more efficient. The asynchronous state access is essential for the state backend to be able to handle -large state sizes and to be able to spill to remote file systems when necessary. +large state sizes and to be able to spill to remote file systems when necessary. +This is called the 'disaggregated state management'. For more information about this, +please see [Disaggregated State Management]({{< ref "docs/ops/state/disaggregated_state" >}}). ## Keyed DataStream diff --git a/docs/content/docs/ops/state/disaggregated_state.md b/docs/content/docs/ops/state/disaggregated_state.md new file mode 100644 index 0000000000000..146de00094a29 --- /dev/null +++ b/docs/content/docs/ops/state/disaggregated_state.md @@ -0,0 +1,193 @@ +--- +title: "Disaggregated State Management" +weight: 20 +type: docs +aliases: + - /ops/state/disaggregated_state.html + - /apis/streaming/disaggregated_state.html +--- + + +# Disaggregated State Management + +## Overview + +For the first ten years of Flink, the state management is based on memory or local disk of the TaskManager. +This approach works well for most use cases, but it has some limitations: + * **Local Disk Constraints**: The state size is limited by the memory or disk size of the TaskManager. + * **Spiky Resource Usage**: The local state model triggers periodic CPU and network I/O bursts during checkpointing or SST files compaction. + * **Heavy Recovery**: State needs to be downloaded during recovery. The recovery time is +proportional to the state size, which can be slow for large state sizes. + +In Flink 2.0, we introduced the disaggregated state management. This feature allows users to store +the state in external storage systems like S3, HDFS, etc. This is useful when the state size +is extremely large. It could be used to store the state in a more cost-effective way, or to +persist or recovery the state in a more lightweight way. The benefits of disaggregated state management are: + * **Unlimited State Size**: The state size is only limited by the external storage system. + * **Stable Resource Usage**: The state is stored in external storage, thus the checkpoint could be very lightweight. +And the SST files compaction could be done remotely (TODO). + * **Fast Recovery**: No need to download the state during recovery. The recovery time is +independent of the state size. + * **Flexible**: Users can easily choose different external storage systems or I/O performance levels, +or scale the storage based on their requirements without change their hardware. + * **Cost-effective**: External storage are usually cheaper than local disk. Users can flexibly +adjust computing resources and storage resources independently if there is any bottleneck. + +The disaggregated state management contains three parts: + * **ForSt State Backend**: A state backend that stores the state in external storage systems. It +can also leverage the local disk for caching and buffering. The asynchronous I/O model is used to +read and write the state. For more details, see [ForSt State Backend]({{< ref "docs/ops/state/state_backends#the-forststatebackend" >}}). + * **New State APIs**: The new state APIs (State V2) are introduced to perform asynchronous state +reads and writes, which is essential for overcoming the high network latency when accessing +the disaggregated state. For more details, see [New State APIs]({{< ref "docs/dev/datastream/fault-tolerance/state_v2" >}}). + * **SQL Support**: Many SQL operators are rewritten to support the disaggregated state management +and asynchronous state access. User can easily enable these by setting the configuration. + +{{ }} +Disaggregated state and asynchronous state access are encouraged for large state. However, when +the state size is small, the local state management with synchronous state access is a better +choice. +{{ }} + +{{ }} +The disaggregated state management is still in experimental state. We are working on improving +the performance and stability of this feature. The APIs and configurations may change in future +release. +{{ }} + +## Quick Start + +### For SQL Jobs + +To enable the disaggregated state management in SQL jobs, you can set the following configurations: +```yaml +state.backend.type: forst +table.exec.async-state.enabled: true + +# enable checkpoints, checkpoint directory is required +execution.checkpointing.incremental: true +execution.checkpointing.dir: s3://your-bucket/flink-checkpoints + +# We don't support the mini-batch and two-phase aggregation in asynchronous state access yet. +table.exec.mini-batch.enabled: false +table.optimizer.agg-phase-strategy: ONE_PHASE +``` +Thus, you could leverage the disaggregated state management and asynchronous state access in +your SQL jobs. We haven't implemented the full support for the asynchronous state access +in SQL yet. If the SQL operators you are using are not supported, the operator will fall back +to the synchronous state implementation automatically. The performance may not be optimal in +this case. The supported stateful operators are: + - Rank (Top1, Append TopN) + - Row Time Deduplicate + - Aggregate (without distinct) + - Join + - Window Join + - Tumble / Hop / Cumulative Window Aggregate + + +### For DataStream Jobs + +To enable the disaggregated state management in DataStream jobs, firstly you should use +the `ForStStateBackend`. Configure via code in per-job mode: +```java +Configuration config = new Configuration(); +config.set(StateBackendOptions.STATE_BACKEND, "forst"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "s3://your-bucket/flink-checkpoints"); +config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); +env.configure(config); +``` +Or configure via `config.yaml`: +```yaml +state.backend.type: forst + +# enable checkpoints, checkpoint directory is required +execution.checkpointing.incremental: true +execution.checkpointing.dir: s3://your-bucket/flink-checkpoints +``` + +Then, you should write your datastream jobs with the new state APIs. For more +details, see [State V2]({{< ref "docs/dev/datastream/fault-tolerance/state_v2" >}}). + +## Advanced Tuning Options + +### Tuning ForSt State Backend + +The `ForStStateBackend` has many configurations to tune the performance. +The design of ForSt is very similar to RocksDB, and the configurable options are almost the same, +so you can refer to [large state tuning]({{< ref "docs/ops/state/large_state_tuning#tuning-rocksdb-or-forst" >}}) +to tune the ForSt state backend. + +Besides that, the following sections introduce some unique configurations for ForSt. + +#### ForSt Primary Storage Location + +By default, ForSt stores the state in the checkpoint directory. In this case, +ForSt could perform lightweight checkpoints and fast recovery. However, users may +want to store the state in a different location, e.g., a different bucket in S3. +You can set the following configuration to specify the primary storage location: +```yaml +state.backend.forst.primary-dir: s3://your-bucket/forst-state +``` + +**Note**: If you set this configuration, you may not be able to leverage the lightweight +checkpoint and fast recovery, since the ForSt will perform file copy between the primary +storage location and the checkpoint directory during checkpointing and recovery. + + +#### ForSt File Cache + +ForSt uses the local disk for caching and buffering. The granularity of the cache is whole file. +This is enabled by default, except when the primary storage location is set to local. +There are two capacity limit policies for the cache: + - Size-based: The cache will evict the oldest files when the cache size exceeds the limit. + - Reserved-based: The cache will evict the oldest files when the reserved space on disk +(the disk where cache directory is) is not enough. +Corresponding configurations are: +```yaml +state.backend.forst.cache.size-based-limit: 1GB +state.backend.forst.cache.reserve-size: 10GB +``` +Those can take effect together. If so, the cache will evict the oldest files when the cache +size exceeds either the size-based limit or the reserved size limit. + +One can also specify the cache directory via: +```yaml +state.backend.forst.cache.dir: /tmp/forst-cache +``` + +#### ForSt Asynchronous Threads + +ForSt uses asynchronous I/O to read and write the state. There are three types of threads: + - Coordinator thread: The thread that coordinates the asynchronous read and write. + - Read thread: The thread that reads the state asynchronously. + - Write thread: The thread that writes the state asynchronously. + +The number of asynchronous threads is configurable. Typically, you don't need to adjust these +values since the default values are good enough for most cases. +In case for special needs, you can set the following configuration to specify the number of +asynchronous threads: + - `state.backend.forst.executor.read-io-parallelism`: The number of asynchronous threads for read. Default is 3. + - `state.backend.forst.executor.write-io-parallelism`: The number of asynchronous threads for write. Default is 1. + - `state.backend.forst.executor.inline-write`: Whether to inline the write operation in the coordinator thread. +Default is true. Setting this to false will raise the CPU usage. + - `state.backend.forst.executor.inline-coordinator`: Whether to let task thread be the coordinator thread. +Default is true. Setting this to false will raise the CPU usage. + +{{ }} diff --git a/docs/content/docs/ops/state/state_backends.md b/docs/content/docs/ops/state/state_backends.md index b3cb746233398..05e35e87bd04f 100644 --- a/docs/content/docs/ops/state/state_backends.md +++ b/docs/content/docs/ops/state/state_backends.md @@ -99,7 +99,7 @@ The total memory amount of RocksDB instance(s) per slot can also be bounded, ple The *ForStStateBackend* is a state backend that is based on [ForSt project](https://github.com/ververica/ForSt), which is also a LSM-tree structured key-value store and built on top of the RocksDB. -It is designed to provide a more efficient way to store and access state in Flink applications. +It is designed for disaggregated state management, for more details, see [here]({{< ref "docs/ops/state/disaggregated_state" >}}). Most importantly, it can hold its sst files on remote file systems that Flink supports, such as HDFS, S3, etc. This allows Flink to scale the state size beyond the local disk capacity of the TaskManager. Moreover, by putting the sst files on remote file systems, it can also provide a more lightweight