Skip to content

Commit 4facda6

Browse files
committed
[hotfix] Make ForSt state backend use checkpoint directory as its primary storage by default
1 parent 381be9c commit 4facda6

File tree

9 files changed

+58
-30
lines changed

9 files changed

+58
-30
lines changed

docs/layouts/shortcodes/generated/forst_configuration.html

+4-4
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<td><h5>state.backend.forst.cache.dir</h5></td>
1313
<td style="word-wrap: break-word;">(none)</td>
1414
<td>String</td>
15-
<td>The directory where ForSt caches its SST files, fallback to state.backend.forst.local-dir/cache if not configured.</td>
15+
<td>The directory where ForSt caches its SST files, fallback to the subdirectory of '/cache' under the value of 'state.backend.forst.local-dir' if not configured.</td>
1616
</tr>
1717
<tr>
1818
<td><h5>state.backend.forst.cache.reserve-size</h5></td>
@@ -99,10 +99,10 @@
9999
<td>The options factory class for users to add customized options in DBOptions and ColumnFamilyOptions for ForSt. If set, the ForSt state backend will load the class and apply configs to DBOptions and ColumnFamilyOptions after loading ones from 'ForStConfigurableOptions' and pre-defined options.</td>
100100
</tr>
101101
<tr>
102-
<td><h5>state.backend.forst.remote-dir</h5></td>
103-
<td style="word-wrap: break-word;">(none)</td>
102+
<td><h5>state.backend.forst.primary-dir</h5></td>
103+
<td style="word-wrap: break-word;">"checkpoint-dir"</td>
104104
<td>String</td>
105-
<td>The remote directory where ForSt puts its SST files, fallback to state.backend.forst.local-dir if not configured. Recognized shortcut name is 'checkpoint-dir', which means that forst shares the directory with checkpoint.</td>
105+
<td>The primary directory where ForSt puts its SST files. By default, it will be the same as the checkpoint directory. Recognized shortcut name is 'checkpoint-dir', which means that ForSt shares the directory with checkpoint, and 'local-dir', which means that ForSt will use the local directory of TaskManager.</td>
106106
</tr>
107107
<tr>
108108
<td><h5>state.backend.forst.timer-service.cache-size</h5></td>

docs/layouts/shortcodes/generated/state_backend_forst_section.html

+4-4
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<td><h5>state.backend.forst.cache.dir</h5></td>
1313
<td style="word-wrap: break-word;">(none)</td>
1414
<td>String</td>
15-
<td>The directory where ForSt caches its SST files, fallback to state.backend.forst.local-dir/cache if not configured.</td>
15+
<td>The directory where ForSt caches its SST files, fallback to the subdirectory of '/cache' under the value of 'state.backend.forst.local-dir' if not configured.</td>
1616
</tr>
1717
<tr>
1818
<td><h5>state.backend.forst.cache.reserve-size</h5></td>
@@ -45,10 +45,10 @@
4545
<td>With partitioning, the index/filter block of an SST file is partitioned into smaller blocks with an additional top-level index on them. When reading an index/filter, only top-level index is loaded into memory. The partitioned index/filter then uses the top-level index to load on demand into the block cache the partitions that are required to perform the index/filter query. This option only has an effect when 'state.backend.forst.memory.managed' or 'state.backend.forst.memory.fixed-per-slot' are configured.</td>
4646
</tr>
4747
<tr>
48-
<td><h5>state.backend.forst.remote-dir</h5></td>
49-
<td style="word-wrap: break-word;">(none)</td>
48+
<td><h5>state.backend.forst.primary-dir</h5></td>
49+
<td style="word-wrap: break-word;">"checkpoint-dir"</td>
5050
<td>String</td>
51-
<td>The remote directory where ForSt puts its SST files, fallback to state.backend.forst.local-dir if not configured. Recognized shortcut name is 'checkpoint-dir', which means that forst shares the directory with checkpoint.</td>
51+
<td>The primary directory where ForSt puts its SST files. By default, it will be the same as the checkpoint directory. Recognized shortcut name is 'checkpoint-dir', which means that ForSt shares the directory with checkpoint, and 'local-dir', which means that ForSt will use the local directory of TaskManager.</td>
5252
</tr>
5353
<tr>
5454
<td><h5>state.backend.forst.timer-service.cache-size</h5></td>

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java

+12-8
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@
2727
import org.apache.flink.configuration.description.Description;
2828
import org.apache.flink.configuration.description.TextElement;
2929

30+
import static org.apache.flink.state.forst.ForStStateBackend.CHECKPOINT_DIR_AS_PRIMARY_SHORTCUT;
31+
import static org.apache.flink.state.forst.ForStStateBackend.LOCAL_DIR_AS_PRIMARY_SHORTCUT;
3032
import static org.apache.flink.state.forst.ForStStateBackend.PriorityQueueStateType.ForStDB;
31-
import static org.apache.flink.state.forst.ForStStateBackend.REMOTE_SHORTCUT_CHECKPOINT;
3233

3334
/** Configuration options for the ForStStateBackend. */
3435
@Experimental
@@ -52,15 +53,17 @@ public class ForStOptions {
5253

5354
/** The remote directory where ForSt puts its SST files. */
5455
@Documentation.Section(Documentation.Sections.STATE_BACKEND_FORST)
55-
public static final ConfigOption<String> REMOTE_DIRECTORY =
56-
ConfigOptions.key("state.backend.forst.remote-dir")
56+
public static final ConfigOption<String> PRIMARY_DIRECTORY =
57+
ConfigOptions.key("state.backend.forst.primary-dir")
5758
.stringType()
58-
.noDefaultValue()
59+
.defaultValue(CHECKPOINT_DIR_AS_PRIMARY_SHORTCUT)
5960
.withDescription(
6061
String.format(
61-
"The remote directory where ForSt puts its SST files, fallback to %s if not configured."
62-
+ " Recognized shortcut name is '%s', which means that forst shares the directory with checkpoint.",
63-
LOCAL_DIRECTORIES.key(), REMOTE_SHORTCUT_CHECKPOINT));
62+
"The primary directory where ForSt puts its SST files. By default, it will be the same as the checkpoint directory. "
63+
+ "Recognized shortcut name is '%s', which means that ForSt shares the directory with checkpoint, "
64+
+ "and '%s', which means that ForSt will use the local directory of TaskManager.",
65+
CHECKPOINT_DIR_AS_PRIMARY_SHORTCUT,
66+
LOCAL_DIR_AS_PRIMARY_SHORTCUT));
6467

6568
@Documentation.Section(Documentation.Sections.STATE_BACKEND_FORST)
6669
public static final ConfigOption<String> CACHE_DIRECTORY =
@@ -69,7 +72,8 @@ public class ForStOptions {
6972
.noDefaultValue()
7073
.withDescription(
7174
String.format(
72-
"The directory where ForSt caches its SST files, fallback to %s/cache if not configured.",
75+
"The directory where ForSt caches its SST files, fallback to the "
76+
+ "subdirectory of '/cache' under the value of '%s' if not configured.",
7377
LOCAL_DIRECTORIES.key()));
7478

7579
@Documentation.Section(Documentation.Sections.STATE_BACKEND_FORST)

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,11 @@ public void prepareDirectories() throws Exception {
386386
remoteForStPath.toUri(),
387387
localForStPath,
388388
ForStFlinkFileSystem.getFileBasedCache(
389-
cacheBasePath, cacheCapacity, cacheReservedSize, metricGroup));
389+
cacheBasePath,
390+
remoteForStPath,
391+
cacheCapacity,
392+
cacheReservedSize,
393+
metricGroup));
390394
} else {
391395
forStFileSystem = null;
392396
}

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@
9595
public class ForStStateBackend extends AbstractManagedMemoryStateBackend
9696
implements ConfigurableStateBackend {
9797

98-
public static final String REMOTE_SHORTCUT_CHECKPOINT = "checkpoint-dir";
98+
public static final String CHECKPOINT_DIR_AS_PRIMARY_SHORTCUT = "checkpoint-dir";
99+
public static final String LOCAL_DIR_AS_PRIMARY_SHORTCUT = "local-dir";
99100

100101
private static final long serialVersionUID = 1L;
101102

@@ -216,12 +217,15 @@ private ForStStateBackend(
216217
if (original.remoteForStDirectory != null) {
217218
this.remoteForStDirectory = original.remoteForStDirectory;
218219
} else {
219-
String remoteDirStr = config.get(ForStOptions.REMOTE_DIRECTORY);
220-
if (REMOTE_SHORTCUT_CHECKPOINT.equals(remoteDirStr)) {
220+
String remoteDirStr = config.get(ForStOptions.PRIMARY_DIRECTORY);
221+
if (CHECKPOINT_DIR_AS_PRIMARY_SHORTCUT.equals(remoteDirStr)) {
221222
this.remoteForStDirectory = null;
222223
this.remoteShareWithCheckpoint = true;
223224
} else {
224-
this.remoteForStDirectory = remoteDirStr == null ? null : new Path(remoteDirStr);
225+
this.remoteForStDirectory =
226+
remoteDirStr == null || LOCAL_DIR_AS_PRIMARY_SHORTCUT.equals(remoteDirStr)
227+
? null
228+
: new Path(remoteDirStr);
225229
}
226230
}
227231

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,21 @@ public static ForStFlinkFileSystem get(URI uri, Path localBase, FileBasedCache f
108108
}
109109

110110
public static FileBasedCache getFileBasedCache(
111-
Path cacheBase, long cacheCapacity, long cacheReservedSize, MetricGroup metricGroup)
111+
Path cacheBase,
112+
Path remoteForStPath,
113+
long cacheCapacity,
114+
long cacheReservedSize,
115+
MetricGroup metricGroup)
112116
throws IOException {
113117
if (cacheBase == null || cacheCapacity <= 0 && cacheReservedSize <= 0) {
114118
return null;
115119
}
120+
if (cacheBase.getFileSystem().equals(remoteForStPath.getFileSystem())) {
121+
LOG.info(
122+
"Skip creating ForSt cache "
123+
+ "since the cache and primary path are on the same file system.");
124+
return null;
125+
}
116126
CacheLimitPolicy cacheLimitPolicy = null;
117127
if (cacheCapacity > 0 && cacheReservedSize > 0) {
118128
cacheLimitPolicy =

flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import java.util.Collection;
7070
import java.util.Collections;
7171

72+
import static org.apache.flink.state.forst.ForStStateBackend.LOCAL_DIR_AS_PRIMARY_SHORTCUT;
7273
import static org.apache.flink.state.forst.ForStTestUtils.createKeyedStateBackend;
7374
import static org.hamcrest.CoreMatchers.anyOf;
7475
import static org.hamcrest.MatcherAssert.assertThat;
@@ -283,6 +284,7 @@ public void testCleanRelocatedDbLogs() throws Exception {
283284
conf.set(ForStConfigurableOptions.LOG_LEVEL, InfoLogLevel.DEBUG_LEVEL);
284285
conf.set(ForStConfigurableOptions.LOG_FILE_NUM, 4);
285286
conf.set(ForStConfigurableOptions.LOG_MAX_FILE_SIZE, MemorySize.parse("1kb"));
287+
conf.set(ForStOptions.PRIMARY_DIRECTORY, LOCAL_DIR_AS_PRIMARY_SHORTCUT);
286288
final ForStStateBackend forStBackend =
287289
new ForStStateBackend().configure(conf, getClass().getClassLoader());
288290
final String dbStoragePath = new Path(folder.toURI().toString()).toString();
@@ -742,10 +744,11 @@ public void testConfigureIllegalMemoryControlParameters() {
742744
}
743745

744746
@Test
745-
public void testRemoteDirectory() throws Exception {
747+
public void testPrimaryDirectory() throws Exception {
746748
FileSystem.initialize(new Configuration(), null);
747749
Configuration configuration = new Configuration();
748-
configuration.set(ForStOptions.REMOTE_DIRECTORY, tempFolder.newFolder().toURI().toString());
750+
configuration.set(
751+
ForStOptions.PRIMARY_DIRECTORY, tempFolder.newFolder().toURI().toString());
749752
ForStStateBackend forStStateBackend =
750753
new ForStStateBackend().configure(configuration, null);
751754
ForStKeyedStateBackend<Integer> keyedBackend = null;
@@ -759,7 +762,7 @@ public void testRemoteDirectory() throws Exception {
759762
keyedBackend
760763
.getRemoteBasePath()
761764
.toString()
762-
.startsWith(configuration.get(ForStOptions.REMOTE_DIRECTORY)));
765+
.startsWith(configuration.get(ForStOptions.PRIMARY_DIRECTORY)));
763766
} finally {
764767
if (keyedBackend != null) {
765768
keyedBackend.dispose();

flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendV2Test.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@
5454
import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING;
5555
import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
5656
import static org.apache.flink.state.forst.ForStOptions.LOCAL_DIRECTORIES;
57-
import static org.apache.flink.state.forst.ForStOptions.REMOTE_DIRECTORY;
58-
import static org.apache.flink.state.forst.ForStStateBackend.REMOTE_SHORTCUT_CHECKPOINT;
57+
import static org.apache.flink.state.forst.ForStOptions.PRIMARY_DIRECTORY;
58+
import static org.apache.flink.state.forst.ForStStateBackend.CHECKPOINT_DIR_AS_PRIMARY_SHORTCUT;
59+
import static org.apache.flink.state.forst.ForStStateBackend.LOCAL_DIR_AS_PRIMARY_SHORTCUT;
5960
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
6061

6162
/** Tests for the async keyed state backend part of {@link ForStStateBackend}. */
@@ -169,7 +170,9 @@ protected ConfigurableStateBackend getStateBackend() throws Exception {
169170
config.set(LOCAL_DIRECTORIES, tempFolderForForStLocal.toString());
170171
}
171172
if (hasRemoteDir) {
172-
config.set(REMOTE_DIRECTORY, tempFolderForForstRemote.toString());
173+
config.set(PRIMARY_DIRECTORY, tempFolderForForstRemote.toString());
174+
} else {
175+
config.set(PRIMARY_DIRECTORY, LOCAL_DIR_AS_PRIMARY_SHORTCUT);
173176
}
174177
config.set(USE_INGEST_DB_RESTORE_MODE, useIngestDbRestoreMode);
175178
config.set(USE_DELETE_FILES_IN_RANGE_DURING_RESCALING, useDeleteFileInRange);
@@ -207,7 +210,7 @@ void testRemoteDirShareCheckpointDir(boolean createJob) throws Exception {
207210

208211
Configuration config = new Configuration();
209212
config.set(LOCAL_DIRECTORIES, tempFolderForForStLocal.toString());
210-
config.set(REMOTE_DIRECTORY, REMOTE_SHORTCUT_CHECKPOINT);
213+
config.set(PRIMARY_DIRECTORY, CHECKPOINT_DIR_AS_PRIMARY_SHORTCUT);
211214
config.set(CheckpointingOptions.CREATE_CHECKPOINT_SUB_DIR, createJob);
212215

213216
checkpointStorage =

flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateTestBase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class ForStStateTestBase {
6161
public void setup(@TempDir File temporaryFolder) throws IOException {
6262
FileSystem.initialize(new Configuration(), null);
6363
Configuration configuration = new Configuration();
64-
configuration.set(ForStOptions.REMOTE_DIRECTORY, temporaryFolder.toURI().toString());
64+
configuration.set(ForStOptions.PRIMARY_DIRECTORY, temporaryFolder.toURI().toString());
6565
ForStStateBackend forStStateBackend =
6666
new ForStStateBackend().configure(configuration, null);
6767

0 commit comments

Comments
 (0)