File tree 1 file changed +56
-0
lines changed
docs/content.zh/docs/connectors/flink-sources
1 file changed +56
-0
lines changed Original file line number Diff line number Diff line change @@ -488,6 +488,62 @@ public class MongoDBIncrementalSourceExample {
488
488
- 如果使用数据库正则表达式,则需要 `readAnyDatabase` 角色。
489
489
- 增量快照功能仅支持 MongoDB 4.0 之后的版本。
490
490
491
+ ### 完整的 Changelog
492
+
493
+ MongoDB 6.0 以及更高的版本支持发送变更流事件,其中包含文档的更新前和更新后的内容(或者说数据的前后镜像)。
494
+
495
+ - 前镜像是指被替换、更新或删除之前的文档。对于插入操作没有前镜像。
496
+
497
+ - 后镜像是指被替换、更新或删除之后的文档。对于删除操作没有后镜像。
498
+
499
+ MongoDB CDC 能够使用前镜像和后镜像来生成完整的变更日志流,包括插入、更新前、更新后和删除的数据行,从而避免了额外的 `ChangelogNormalize` 下游节点。
500
+
501
+ 为了启用此功能,你需要满足以下条件:
502
+
503
+ - MongoDB 的版本必须为 6.0 或更高版本。
504
+ - 启用 `preAndPostImages` 功能。
505
+
506
+ ```javascript
507
+ db.runCommand({
508
+ setClusterParameter: {
509
+ changeStreamOptions: {
510
+ preAndPostImages: {
511
+ expireAfterSeconds: ' off' // replace with custom image expiration time
512
+ }
513
+ }
514
+ }
515
+ })
516
+ ```
517
+
518
+ - 为希望监控的 collection 启用 `changeStreamPreAndPostImages` 功能:
519
+ ```javascript
520
+ db.runCommand({
521
+ collMod: "<< collection name >>",
522
+ changeStreamPreAndPostImages: {
523
+ enabled: true
524
+ }
525
+ })
526
+ ```
527
+
528
+ 在 DataStream 中开启 MongoDB CDC 的 `scan.full-changelog` 功能:
529
+
530
+ ```java
531
+ MongoDBSource.builder()
532
+ .scanFullChangelog(true)
533
+ ...
534
+ .build()
535
+ ```
536
+
537
+ 或者使用 Flink SQL:
538
+
539
+ ```SQL
540
+ CREATE TABLE mongodb_source (...) WITH (
541
+ ' connector' = ' mongodb- cdc' ,
542
+ ' scan. full- changelog' = ' true ' ,
543
+ ...
544
+ )
545
+ ```
546
+
491
547
数据类型映射
492
548
----------------
493
549
[BSON](https://docs.mongodb.com/manual/reference/bson-types/) **二进制 JSON**的缩写是一种类似 JSON 格式的二进制编码序列,用于在 MongoDB 中存储文档和进行远程过程调用。
You can’t perform that action at this time.
0 commit comments