Skip to content

Commit

Permalink
rename configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
wangjunbo committed Feb 7, 2025
1 parent 432f663 commit 9197b9b
Show file tree
Hide file tree
Showing 12 changed files with 31 additions and 31 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ Flink SQL> SELECT * FROM orders;
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.assign-ending-first.enabled</td>
<td>scan.incremental.snapshot.assign-ending-chunk-first.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ pipeline:
<td>是否将TINYINT(1)类型当做Boolean类型处理,默认true。</td>
</tr>
<tr>
<td>scan.incremental.snapshot.assign-ending-first.enabled</td>
<td>scan.incremental.snapshot.assign-ending-chunk-first.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ During a snapshot operation, the connector will query each included table to pro
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.assign-ending-first.enabled</td>
<td>scan.incremental.snapshot.assign-ending-chunk-first.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ pipeline:
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.assign-ending-first.enabled</td>
<td>scan.incremental.snapshot.assign-ending-chunk-first.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
Expand Down Expand Up @@ -152,7 +152,7 @@ public DataSource createDataSource(Context context) {
boolean isParsingOnLineSchemaChanges = config.get(PARSE_ONLINE_SCHEMA_CHANGES);
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
boolean isAssignEndingChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST);
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -341,7 +341,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(USE_LEGACY_JSON_FORMAT);
options.add(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST);
options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ public class MySqlDataSourceOptions {
"Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");

@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST =
ConfigOptions.key("scan.incremental.snapshot.assign-ending-first.enabled")
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST =
ConfigOptions.key("scan.incremental.snapshot.assign-ending-chunk-first.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
Expand Down Expand Up @@ -261,7 +261,7 @@ public void testOptionalOption() {
// optional option
options.put(TREAT_TINYINT1_AS_BOOLEAN_ENABLED.key(), "false");
options.put(PARSE_ONLINE_SCHEMA_CHANGES.key(), "false");
options.put(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.key(), "false");
options.put(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.key(), "false");

Factory.Context context = new MockContext(Configuration.fromMap(options));
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
Expand All @@ -270,7 +270,7 @@ public void testOptionalOption() {
assertThat(factory.optionalOptions().contains(PARSE_ONLINE_SCHEMA_CHANGES)).isEqualTo(true);
assertThat(
factory.optionalOptions()
.contains(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST))
.contains(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST))
.isEqualTo(true);

MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public abstract class JdbcSourceConfigFactory implements Factory<JdbcSourceConfi
protected boolean scanNewlyAddedTableEnabled =
JdbcSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue();
protected boolean assignEndingChunkFirst =
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.defaultValue();
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue();

/** Integer port number of the database server. */
public JdbcSourceConfigFactory hostname(String hostname) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ public class SourceOptions {
"Whether capture the newly added tables when restoring from a savepoint/checkpoint or not, by default is false.");

@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST =
ConfigOptions.key("scan.incremental.snapshot.assign-ending-first.enabled")
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST =
ConfigOptions.key("scan.incremental.snapshot.assign-ending-chunk-first.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ public class MySqlSourceOptions {
"Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");

@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST =
ConfigOptions.key("scan.incremental.snapshot.assign-ending-first.enabled")
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST =
ConfigOptions.key("scan.incremental.snapshot.assign-ending-chunk-first.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
config.get(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
boolean useLegacyJsonFormat = config.get(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
boolean assignEndingChunkFirst =
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST);
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);

if (enableParallelRead) {
validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn);
Expand Down Expand Up @@ -201,7 +201,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST);
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
Expand Down Expand Up @@ -133,7 +133,7 @@ public void testCommonProperties() {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -182,7 +182,7 @@ public void testEnableParallelReadSource() {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -227,7 +227,7 @@ public void testEnableParallelReadSourceWithSingleServerId() {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -270,7 +270,7 @@ public void testEnableParallelReadSourceLatestOffset() {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -330,7 +330,7 @@ public void testOptionalProperties() {
true,
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
true,
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
assertEquals(expectedSource, actualSource);
assertTrue(actualSource instanceof MySqlTableSource);
MySqlTableSource actualMySqlTableSource = (MySqlTableSource) actualSource;
Expand Down Expand Up @@ -387,7 +387,7 @@ public void testStartupFromSpecificOffset() {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -428,7 +428,7 @@ public void testStartupFromInitial() {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -470,7 +470,7 @@ public void testStartupFromEarliestOffset() {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -513,7 +513,7 @@ public void testStartupFromSpecificTimestamp() {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -554,7 +554,7 @@ public void testStartupFromLatestOffset() {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -600,7 +600,7 @@ public void testMetadataColumns() {
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");

Expand Down

0 comments on commit 9197b9b

Please sign in to comment.