File tree Expand file tree Collapse file tree
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc Expand file tree Collapse file tree Original file line number Diff line number Diff line change 3636import java .util .List ;
3737import java .util .Map ;
3838import java .util .Set ;
39+ import java .util .regex .Pattern ;
3940import java .util .stream .Collectors ;
4041
4142import static org .apache .paimon .flink .action .MultiTablesSinkMode .COMBINED ;
@@ -302,19 +303,19 @@ public static String combinedModeTableList(
302303 // be excluded by excluding pattern at the same time
303304 String includingPattern =
304305 String .format ("(%s)\\ .(%s)" , databasePattern , includingTablePattern );
305-
306+ LOG . info ( "Combined mode including pattern is {}" , includingPattern );
306307 if (excludedTables .isEmpty ()) {
307308 return includingPattern ;
308309 }
309310
311+ // Use Pattern.quote to escape special regex characters (e.g., '$' in mysql table name)
312+ // in database and table names, ensuring exact literal matching in the regex.
310313 String excludingPattern =
311314 excludedTables .stream ()
312- .map (
313- t ->
314- String .format (
315- "(^%s$)" ,
316- t .getDatabaseName () + "\\ ." + t .getObjectName ()))
315+ .map (Identifier ::getFullName )
316+ .map (n -> String .format ("(^%s$)" , Pattern .quote (n )))
317317 .collect (Collectors .joining ("|" ));
318+ LOG .info ("Combined mode excluding pattern is {}" , excludingPattern );
318319 excludingPattern = "?!" + excludingPattern ;
319320 return String .format ("(%s)(%s)" , excludingPattern , includingPattern );
320321 }
You can’t perform that action at this time.
0 commit comments