Skip to content

Commit c5f7fb6

Browse files
committed
Track and resume in-progress syncs
1 parent ed1deae commit c5f7fb6

11 files changed

Lines changed: 582 additions & 386 deletions

src/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ const (
5252
STORAGE_TYPE_S3 = "S3"
5353
)
5454

55+
var STORAGE_TYPES = []string{STORAGE_TYPE_LOCAL, STORAGE_TYPE_S3}
56+
5557
type AwsConfig struct {
5658
Region string
5759
S3Endpoint string // optional

src/iceberg_writer_table.go

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,8 @@ func (writer *IcebergWriterTable) Write(loadRows func() ([][]string, InternalTab
3434
metadataDirPath := writer.storage.CreateMetadataDir(writer.schemaTable)
3535

3636
var lastSequenceNumber int
37-
var firstNewParquetFile ParquetFile
38-
var newParquetCount int
37+
newManifestListItemsSortedDesc := []ManifestListItem{}
3938
existingManifestListItemsSortedDesc := []ManifestListItem{}
40-
loadMoreRows := true
41-
42-
finalManifestListItemsSortedAsc := []ManifestListItem{}
4339
finalManifestListFilesSortedAsc := []ManifestListFile{}
4440

4541
if writer.continuedRefresh {
@@ -49,23 +45,31 @@ func (writer *IcebergWriterTable) Write(loadRows func() ([][]string, InternalTab
4945
existingManifestListItemsSortedDesc, err = writer.storage.ExistingManifestListItems(existingManifestListFilesSortedAsc[len(existingManifestListFilesSortedAsc)-1])
5046
PanicIfError(writer.config, err)
5147

52-
finalManifestListFilesSortedAsc = existingManifestListFilesSortedAsc
5348
lastSequenceNumber = existingManifestListItemsSortedDesc[0].SequenceNumber
49+
finalManifestListFilesSortedAsc = existingManifestListFilesSortedAsc
5450
}
5551

52+
var firstNewParquetFile ParquetFile
53+
var newParquetCount int
54+
loadMoreRows := true
55+
5656
for loadMoreRows {
57-
newParquetFile, newInternalTableMetadata, loadedAllRows, err := writer.storage.CreateParquet(
57+
newParquetFile, newInternalTableMetadata, err := writer.storage.CreateParquet(
5858
dataDirPath,
5959
writer.pgSchemaColumns,
6060
writer.maxParquetPayloadThreshold,
6161
loadRows,
6262
)
6363
PanicIfError(writer.config, err)
6464

65-
// If Parquet is empty and we are continuing to refresh / process subsequent chunks, delete it and exit (no trailing Parquet files)
65+
// If Parquet is empty and we are continuing to refresh / process subsequent chunks, delete it, mark the sync as completed and exit (no trailing Parquet files)
6666
if newParquetFile.RecordCount == 0 && (writer.continuedRefresh || newParquetCount > 0) {
6767
err = writer.storage.DeleteParquet(newParquetFile)
6868
PanicIfError(writer.config, err)
69+
70+
err = writer.storage.WriteInternalTableMetadata(metadataDirPath, newInternalTableMetadata)
71+
PanicIfError(writer.config, err)
72+
6973
return
7074
}
7175

@@ -74,20 +78,18 @@ func (writer *IcebergWriterTable) Write(loadRows func() ([][]string, InternalTab
7478
firstNewParquetFile = newParquetFile
7579
}
7680

77-
if writer.continuedRefresh {
78-
var overwrittenManifestListItemsSortedAsc []ManifestListItem
81+
if writer.continuedRefresh && (newInternalTableMetadata.LastRefreshMode == RefreshModeIncremental || newInternalTableMetadata.LastRefreshMode == RefreshModeIncrementalInProgress) {
7982
var overwrittenManifestListFilesSortedAsc []ManifestListFile
8083

81-
overwrittenManifestListItemsSortedAsc, overwrittenManifestListFilesSortedAsc, lastSequenceNumber = writer.overwriteExistingFiles(
84+
existingManifestListItemsSortedDesc, overwrittenManifestListFilesSortedAsc, lastSequenceNumber = writer.overwriteExistingFiles(
8285
dataDirPath,
8386
metadataDirPath,
84-
Reverse(existingManifestListItemsSortedDesc),
87+
existingManifestListItemsSortedDesc,
8588
newParquetFile,
8689
firstNewParquetFile,
8790
lastSequenceNumber,
8891
)
8992

90-
finalManifestListItemsSortedAsc = append(finalManifestListItemsSortedAsc, overwrittenManifestListItemsSortedAsc...)
9193
finalManifestListFilesSortedAsc = append(finalManifestListFilesSortedAsc, overwrittenManifestListFilesSortedAsc...)
9294
}
9395

@@ -96,39 +98,36 @@ func (writer *IcebergWriterTable) Write(loadRows func() ([][]string, InternalTab
9698

9799
lastSequenceNumber++
98100
newManifestListItem := ManifestListItem{SequenceNumber: lastSequenceNumber, ManifestFile: newManifestFile}
99-
finalManifestListItemsSortedAsc = append(finalManifestListItemsSortedAsc, newManifestListItem)
101+
newManifestListItemsSortedDesc = append([]ManifestListItem{newManifestListItem}, newManifestListItemsSortedDesc...)
100102

101-
newManifestListFile, err := writer.storage.CreateManifestList(metadataDirPath, firstNewParquetFile.Uuid, Reverse(finalManifestListItemsSortedAsc))
103+
finalManifestListItemsSortedDesc := append(newManifestListItemsSortedDesc, existingManifestListItemsSortedDesc...)
104+
newManifestListFile, err := writer.storage.CreateManifestList(metadataDirPath, firstNewParquetFile.Uuid, finalManifestListItemsSortedDesc)
102105
PanicIfError(writer.config, err)
103106

104107
finalManifestListFilesSortedAsc = append(finalManifestListFilesSortedAsc, newManifestListFile)
105108
_, err = writer.storage.CreateMetadata(metadataDirPath, writer.pgSchemaColumns, finalManifestListFilesSortedAsc)
106109
PanicIfError(writer.config, err)
107110

108-
loadMoreRows = !loadedAllRows
109-
110-
if !loadMoreRows {
111-
newInternalTableMetadata.LastRefreshMode = REFRESH_MODE_FULL
112-
}
113111
err = writer.storage.WriteInternalTableMetadata(metadataDirPath, newInternalTableMetadata)
114112
PanicIfError(writer.config, err)
115113

114+
loadMoreRows = newInternalTableMetadata.InProgress()
116115
LogDebug(writer.config, "Written", newParquetCount, "Parquet file(s). Load more rows:", loadMoreRows)
117116
}
118117
}
119118

120119
func (writer *IcebergWriterTable) overwriteExistingFiles(
121120
dataDirPath string,
122121
metadataDirPath string,
123-
existingManifestListItemsSortedAsc []ManifestListItem,
122+
originalExistingManifestListItemsSortedDesc []ManifestListItem,
124123
newParquetFile ParquetFile,
125124
firstNewParquetFile ParquetFile,
126-
lastSequenceNumber int,
127-
) ([]ManifestListItem, []ManifestListFile, int) {
128-
overwrittenManifestListItemsSortedAsc := []ManifestListItem{}
129-
overwrittenManifestListFilesSortedAsc := []ManifestListFile{}
125+
originalLastSequenceNumber int,
126+
) (existingManifestListItemsSortedDesc []ManifestListItem, overwrittenManifestListFilesSortedAsc []ManifestListFile, lastSequenceNumber int) {
127+
originalExistingManifestListItemsSortedAsc := Reverse(originalExistingManifestListItemsSortedDesc)
128+
lastSequenceNumber = originalLastSequenceNumber
130129

131-
for i, existingManifestListItem := range existingManifestListItemsSortedAsc {
130+
for i, existingManifestListItem := range originalExistingManifestListItemsSortedAsc {
132131
existingManifestFile := existingManifestListItem.ManifestFile
133132
existingParquetFilePath, err := writer.storage.ExistingParquetFilePath(existingManifestFile)
134133
PanicIfError(writer.config, err)
@@ -139,7 +138,7 @@ func (writer *IcebergWriterTable) overwriteExistingFiles(
139138
// Keep as is if no overlapping records found
140139
if overwrittenParquetFile.Path == "" {
141140
LogDebug(writer.config, "No overlapping records found")
142-
overwrittenManifestListItemsSortedAsc = append(overwrittenManifestListItemsSortedAsc, existingManifestListItem)
141+
existingManifestListItemsSortedDesc = append([]ManifestListItem{existingManifestListItem}, existingManifestListItemsSortedDesc...)
143142
continue
144143
}
145144

@@ -151,17 +150,17 @@ func (writer *IcebergWriterTable) overwriteExistingFiles(
151150
PanicIfError(writer.config, err)
152151

153152
// Constructing a new manifest list without the previous manifest file and with the new "deleted" manifest file
154-
overwrittenManifestListItemsSortedAsc := []ManifestListItem{}
155-
for j, existingItem := range existingManifestListItemsSortedAsc {
153+
finalManifestListItemsSortedAsc := []ManifestListItem{}
154+
for j, existingItem := range originalExistingManifestListItemsSortedAsc {
156155
if i != j {
157-
overwrittenManifestListItemsSortedAsc = append(overwrittenManifestListItemsSortedAsc, existingItem)
156+
finalManifestListItemsSortedAsc = append(finalManifestListItemsSortedAsc, existingItem)
158157
}
159158
}
160159
lastSequenceNumber++
161160
overwrittenManifestListItem := ManifestListItem{SequenceNumber: lastSequenceNumber, ManifestFile: deletedRecsManifestFile}
162-
overwrittenManifestListItemsSortedAsc = append(overwrittenManifestListItemsSortedAsc, overwrittenManifestListItem)
161+
finalManifestListItemsSortedAsc = append(finalManifestListItemsSortedAsc, overwrittenManifestListItem)
163162

164-
overwrittenManifestList, err := writer.storage.CreateManifestList(metadataDirPath, firstNewParquetFile.Uuid, Reverse(overwrittenManifestListItemsSortedAsc))
163+
overwrittenManifestList, err := writer.storage.CreateManifestList(metadataDirPath, firstNewParquetFile.Uuid, Reverse(finalManifestListItemsSortedAsc))
165164
PanicIfError(writer.config, err)
166165
overwrittenManifestListFilesSortedAsc = append(overwrittenManifestListFilesSortedAsc, overwrittenManifestList)
167166
continue
@@ -181,10 +180,10 @@ func (writer *IcebergWriterTable) overwriteExistingFiles(
181180
overwrittenManifestList, err := writer.storage.CreateManifestList(metadataDirPath, firstNewParquetFile.Uuid, []ManifestListItem{overwrittenManifestListItem, deletedRecsManifestListItem})
182181
PanicIfError(writer.config, err)
183182

184-
overwrittenManifestListItemsSortedAsc = append(overwrittenManifestListItemsSortedAsc, overwrittenManifestListItem)
183+
existingManifestListItemsSortedDesc = append([]ManifestListItem{overwrittenManifestListItem}, existingManifestListItemsSortedDesc...)
185184
overwrittenManifestListFilesSortedAsc = append(overwrittenManifestListFilesSortedAsc, overwrittenManifestList)
186185
}
187186
}
188187

189-
return overwrittenManifestListItemsSortedAsc, overwrittenManifestListFilesSortedAsc, lastSequenceNumber
188+
return existingManifestListItemsSortedDesc, overwrittenManifestListFilesSortedAsc, lastSequenceNumber
190189
}

0 commit comments

Comments
 (0)