Skip to content

Commit

Permalink
Store upstream schema & evolved schema separately
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Jul 17, 2024
1 parent 0ad032c commit 13746a0
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,74 @@ void testOneToOneRouting() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
}

@Test
void testIdenticalOneToOneRouting() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup route
TableId routedTable1 = TABLE_1;
TableId routedTable2 = TABLE_2;
List<RouteDef> routeDef =
Arrays.asList(
new RouteDef(TABLE_1.toString(), routedTable1.toString(), null, null),
new RouteDef(TABLE_2.toString(), routedTable2.toString(), null, null));

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef, sinkDef, routeDef, Collections.emptyList(), pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

// Check result in ValuesDatabase
List<String> routed1Results = ValuesDatabase.getResults(routedTable1);
assertThat(routed1Results)
.contains(
"default_namespace.default_schema.table1:col1=2;newCol3=x",
"default_namespace.default_schema.table1:col1=3;newCol3=");
List<String> routed2Results = ValuesDatabase.getResults(routedTable2);
assertThat(routed2Results)
.contains(
"default_namespace.default_schema.table2:col1=1;col2=1",
"default_namespace.default_schema.table2:col1=2;col2=2",
"default_namespace.default_schema.table2:col1=3;col2=3");

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
}

@Test
void testMergingWithRoute() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,11 @@ public List<SchemaChangeEvent> applySchemaChange(SchemaChangeEvent schemaChangeE
// single source mapping, replace the table ID directly
SchemaChangeEvent derivedSchemaChangeEvent =
ChangeEventUtils.recreateSchemaChangeEvent(schemaChangeEvent, derivedTable);
schemaManager.applyUpstreamSchemaChange(derivedSchemaChangeEvent);
events.add(derivedSchemaChangeEvent);
} else {
// multiple source mapping (merging tables)
Schema derivedTableSchema =
schemaManager.getLatestUpstreamSchema(derivedTable).get();
schemaManager.getLatestEvolvedSchema(derivedTable).get();
if (schemaChangeEvent instanceof CreateTableEvent) {
events.addAll(
handleCreateTableEvent(
Expand Down Expand Up @@ -230,7 +229,6 @@ private List<SchemaChangeEvent> handleRenameColumnEvent(
AddColumnEvent derivedSchemaChangeEvent = new AddColumnEvent(derivedTable, newColumns);
schemaChangeEvents.add(derivedSchemaChangeEvent);
}
schemaChangeEvents.forEach(schemaManager::applyUpstreamSchemaChange);
return schemaChangeEvents;
}

Expand Down Expand Up @@ -262,7 +260,6 @@ private List<SchemaChangeEvent> handleAlterColumnTypeEvent(
new AlterColumnTypeEvent(derivedTable, typeDifference);
schemaChangeEvents.add(derivedSchemaChangeEvent);
}
schemaChangeEvents.forEach(schemaManager::applyUpstreamSchemaChange);
return schemaChangeEvents;
}

Expand Down Expand Up @@ -301,7 +298,6 @@ private List<SchemaChangeEvent> handleAddColumnEvent(
if (!newTypeMapping.isEmpty()) {
schemaChangeEvents.add(new AlterColumnTypeEvent(derivedTable, newTypeMapping));
}
schemaChangeEvents.forEach(schemaManager::applyUpstreamSchemaChange);
return schemaChangeEvents;
}

Expand Down Expand Up @@ -337,7 +333,6 @@ private List<SchemaChangeEvent> handleCreateTableEvent(
if (!newTypeMapping.isEmpty()) {
schemaChangeEvents.add(new AlterColumnTypeEvent(derivedTable, newTypeMapping));
}
schemaChangeEvents.forEach(schemaManager::applyUpstreamSchemaChange);
return schemaChangeEvents;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,12 @@ private void startNextSchemaChangeRequest() {
PendingSchemaChange pendingSchemaChange = pendingSchemaChanges.get(0);
SchemaChangeRequest request = pendingSchemaChange.changeRequest;
if (request.getSchemaChangeEvent() instanceof CreateTableEvent
&& schemaManager.upstreamSchemaExists(request.getTableId())) {
&& schemaManager.evolvedSchemaExists(request.getTableId())) {
pendingSchemaChange
.getResponseFuture()
.complete(wrap(new SchemaChangeResponse(Collections.emptyList())));
pendingSchemaChanges.remove(0);
} else {
schemaManager.applyUpstreamSchemaChange(request.getSchemaChangeEvent());
List<SchemaChangeEvent> derivedSchemaChangeEvents =
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
pendingSchemaChange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private void broadcastEvent(Event toBroadcast) {
private Schema loadLatestSchemaFromRegistry(TableId tableId) {
Optional<Schema> schema;
try {
schema = schemaEvolutionClient.getLatestUpstreamSchema(tableId);
schema = schemaEvolutionClient.getLatestEvolvedSchema(tableId);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to request latest schema for table \"%s\"", tableId), e);
Expand Down

0 comments on commit 13746a0

Please sign in to comment.