Skip to content

Commit 0223348

Browse files
authored
[core] Fix TagAutoCreation.forceCreatingSnapshot to use SINK_PROCESS_TIME_ZONE (#7600)
TagAutoCreation.forceCreatingSnapshot in the ProcessTimeExtractor branch uses LocalDateTime.now() (machine timezone) to determine whether to force creating a snapshot. When sink.process-time-zone is configured differently from the machine timezone (e.g. UTC on an Asia/Shanghai machine), the tag creation time is incorrect — it triggers at the machine's midnight instead of the configured timezone's midnight.
1 parent 2b557d4 commit 0223348

File tree

2 files changed

+59
-5
lines changed

2 files changed

+59
-5
lines changed

paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.Snapshot;
23+
import org.apache.paimon.annotation.VisibleForTesting;
2324
import org.apache.paimon.operation.TagDeletion;
2425
import org.apache.paimon.table.sink.TagCallback;
2526
import org.apache.paimon.tag.TagTimeExtractor.ProcessTimeExtractor;
@@ -60,6 +61,7 @@ public class TagAutoCreation {
6061
private final List<TagCallback> callbacks;
6162
private final Duration idlenessTimeout;
6263
private final boolean automaticCompletion;
64+
private final ZoneId sinkProcessTimeZone;
6365

6466
private LocalDateTime nextTag;
6567
private long nextSnapshot;
@@ -75,7 +77,8 @@ private TagAutoCreation(
7577
@Nullable Duration defaultTimeRetained,
7678
Duration idlenessTimeout,
7779
boolean automaticCompletion,
78-
List<TagCallback> callbacks) {
80+
List<TagCallback> callbacks,
81+
ZoneId sinkProcessTimeZone) {
7982
this.snapshotManager = snapshotManager;
8083
this.tagManager = tagManager;
8184
this.tagDeletion = tagDeletion;
@@ -87,6 +90,7 @@ private TagAutoCreation(
8790
this.callbacks = callbacks;
8891
this.idlenessTimeout = idlenessTimeout;
8992
this.automaticCompletion = automaticCompletion;
93+
this.sinkProcessTimeZone = sinkProcessTimeZone;
9094

9195
this.periodHandler.validateDelay(delay);
9296

@@ -123,13 +127,17 @@ public boolean forceCreatingSnapshot() {
123127

124128
return isAfterOrEqual(LocalDateTime.now().minus(idlenessTimeout), snapshotTime);
125129
} else if (timeExtractor instanceof ProcessTimeExtractor) {
126-
return nextTag == null
127-
|| isAfterOrEqual(
128-
LocalDateTime.now().minus(delay), periodHandler.nextTagTime(nextTag));
130+
return forceCreatingSnapshotProcessTime(LocalDateTime.now(sinkProcessTimeZone));
129131
}
130132
return false;
131133
}
132134

135+
@VisibleForTesting
136+
boolean forceCreatingSnapshotProcessTime(LocalDateTime now) {
137+
return nextTag == null
138+
|| isAfterOrEqual(now.minus(delay), periodHandler.nextTagTime(nextTag));
139+
}
140+
133141
public void run() {
134142
while (true) {
135143
if (snapshotManager.snapshotExists(nextSnapshot)) {
@@ -230,6 +238,7 @@ public static TagAutoCreation create(
230238
options.tagDefaultTimeRetained(),
231239
options.snapshotWatermarkIdleTimeout(),
232240
options.tagAutomaticCompletion(),
233-
callbacks);
241+
callbacks,
242+
options.sinkProcessTimeZone());
234243
}
235244
}

paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.time.ZoneId;
3737
import java.util.Collections;
3838

39+
import static org.apache.paimon.CoreOptions.SINK_PROCESS_TIME_ZONE;
3940
import static org.apache.paimon.CoreOptions.SINK_WATERMARK_TIME_ZONE;
4041
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
4142
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
@@ -519,6 +520,50 @@ public void testAutoExpireTagWithoutDashes() {
519520
assertThat(tagManager.allTagNames()).containsOnly("20230718");
520521
}
521522

523+
@Test
524+
public void testForceCreatingSnapshotProcessTime() throws Exception {
525+
// sink.process-time-zone=UTC, machine timezone=Asia/Shanghai.
526+
// Daily tag should trigger at UTC 00:00 (Shanghai 08:00), not Shanghai 00:00.
527+
528+
Options options = new Options();
529+
options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.PROCESS_TIME);
530+
options.set(TAG_CREATION_PERIOD, TagCreationPeriod.DAILY);
531+
options.set(SINK_PROCESS_TIME_ZONE, "UTC");
532+
533+
FileStoreTable table = this.table.copy(options.toMap());
534+
535+
// Commit a snapshot to set nextTag
536+
TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
537+
commit.commit(new ManifestCommittable(0));
538+
commit.close();
539+
540+
TagAutoCreation tagAutoCreation =
541+
TagAutoCreation.create(
542+
table.coreOptions(),
543+
table.snapshotManager(),
544+
table.store().newTagManager(),
545+
table.store().newTagDeletion(),
546+
Collections.emptyList());
547+
548+
// threshold = tagTime + 2 days (nextTag + 1 period)
549+
TagManager tagManager = table.store().newTagManager();
550+
String createdTag = tagManager.allTagNames().get(0);
551+
LocalDateTime tagTime = LocalDateTime.parse(createdTag + "T00:00:00");
552+
LocalDateTime thresholdUtc = tagTime.plusDays(2);
553+
554+
// Shanghai 00:00 = UTC 16:00 previous day, before threshold -> false
555+
LocalDateTime shanghaiMidnightAsUtc = thresholdUtc.minusHours(8);
556+
assertThat(tagAutoCreation.forceCreatingSnapshotProcessTime(shanghaiMidnightAsUtc))
557+
.isFalse();
558+
559+
// UTC 00:00 = threshold -> true
560+
assertThat(tagAutoCreation.forceCreatingSnapshotProcessTime(thresholdUtc)).isTrue();
561+
562+
// After threshold -> true
563+
assertThat(tagAutoCreation.forceCreatingSnapshotProcessTime(thresholdUtc.plusHours(1)))
564+
.isTrue();
565+
}
566+
522567
private long localZoneMills(String timestamp) {
523568
return LocalDateTime.parse(timestamp)
524569
.atZone(ZoneId.systemDefault())

0 commit comments

Comments
 (0)