Skip to content

Commit 988d545

Browse files
authored
Merge pull request #29 from bitwiseguy/ss/finish-old-backfills
Store backfill_processes status for protection against interruptions
2 parents 1ef67f6 + cfba816 commit 988d545

File tree

11 files changed

+496
-52
lines changed

11 files changed

+496
-52
lines changed

api/service/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func (a *API) blobSidecarHandler(w http.ResponseWriter, r *http.Request) {
179179
return
180180
}
181181

182-
result, storageErr := a.dataStoreClient.Read(r.Context(), beaconBlockHash)
182+
result, storageErr := a.dataStoreClient.ReadBlob(r.Context(), beaconBlockHash)
183183
if storageErr != nil {
184184
if errors.Is(storageErr, storage.ErrNotFound) {
185185
errUnknownBlock.write(w)

api/service/api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,10 @@ func TestAPIService(t *testing.T) {
9292
},
9393
}
9494

95-
err := fs.Write(context.Background(), blockOne)
95+
err := fs.WriteBlob(context.Background(), blockOne)
9696
require.NoError(t, err)
9797

98-
err = fs.Write(context.Background(), blockTwo)
98+
err = fs.WriteBlob(context.Background(), blockTwo)
9999
require.NoError(t, err)
100100

101101
beaconClient.Headers["finalized"] = &v1.BeaconBlockHeader{

archiver/service/archiver.go

Lines changed: 114 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/ethereum-optimism/optimism/op-service/retry"
1616
"github.com/ethereum/go-ethereum/common"
1717
"github.com/ethereum/go-ethereum/log"
18+
"github.com/google/uuid"
1819
)
1920

2021
const (
@@ -37,6 +38,7 @@ func NewArchiver(l log.Logger, cfg flags.ArchiverConfig, dataStoreClient storage
3738
metrics: m,
3839
beaconClient: client,
3940
stopCh: make(chan struct{}),
41+
id: uuid.New().String(),
4042
}, nil
4143
}
4244

@@ -47,6 +49,7 @@ type Archiver struct {
4749
beaconClient BeaconClient
4850
metrics metrics.Metricer
4951
stopCh chan struct{}
52+
id string
5053
}
5154

5255
// Start starts archiving blobs. It begins polling the beacon node for the latest blocks and persisting blobs for
@@ -63,6 +66,8 @@ func (a *Archiver) Start(ctx context.Context) error {
6366
return err
6467
}
6568

69+
a.waitObtainStorageLock(ctx)
70+
6671
go a.backfillBlobs(ctx, currentBlock)
6772

6873
return a.trackLatestBlocks(ctx)
@@ -119,7 +124,7 @@ func (a *Archiver) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier
119124
}
120125

121126
// The blob that is being written has not been validated. It is assumed that the beacon node is trusted.
122-
err = a.dataStoreClient.Write(ctx, blobData)
127+
err = a.dataStoreClient.WriteBlob(ctx, blobData)
123128

124129
if err != nil {
125130
a.log.Error("failed to write blob", "err", err)
@@ -131,36 +136,123 @@ func (a *Archiver) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier
131136
return currentHeader.Data, exists, nil
132137
}
133138

139+
const LockUpdateInterval = 10 * time.Second
140+
const LockTimeout = int64(20) // 20 seconds
141+
var ObtainLockRetryInterval = 10 * time.Second
142+
143+
func (a *Archiver) waitObtainStorageLock(ctx context.Context) {
144+
lockfile, err := a.dataStoreClient.ReadLockfile(ctx)
145+
if err != nil {
146+
a.log.Crit("failed to read lockfile", "err", err)
147+
}
148+
149+
currentTime := time.Now().Unix()
150+
emptyLockfile := storage.Lockfile{}
151+
if lockfile != emptyLockfile {
152+
for lockfile.ArchiverId != a.id && lockfile.Timestamp+LockTimeout > currentTime {
153+
// Loop until the timestamp read from storage is expired
154+
a.log.Info("waiting for storage lock timestamp to expire",
155+
"timestamp", strconv.FormatInt(lockfile.Timestamp, 10),
156+
"currentTime", strconv.FormatInt(currentTime, 10),
157+
)
158+
time.Sleep(ObtainLockRetryInterval)
159+
lockfile, err = a.dataStoreClient.ReadLockfile(ctx)
160+
if err != nil {
161+
a.log.Crit("failed to read lockfile", "err", err)
162+
}
163+
currentTime = time.Now().Unix()
164+
}
165+
}
166+
167+
err = a.dataStoreClient.WriteLockfile(ctx, storage.Lockfile{ArchiverId: a.id, Timestamp: currentTime})
168+
if err != nil {
169+
a.log.Crit("failed to write to lockfile: %v", err)
170+
}
171+
a.log.Info("obtained storage lock")
172+
173+
go func() {
174+
// Retain storage lock by continually updating the stored timestamp
175+
ticker := time.NewTicker(LockUpdateInterval)
176+
for {
177+
select {
178+
case <-ticker.C:
179+
currentTime := time.Now().Unix()
180+
err := a.dataStoreClient.WriteLockfile(ctx, storage.Lockfile{ArchiverId: a.id, Timestamp: currentTime})
181+
if err != nil {
182+
a.log.Error("failed to update lockfile timestamp", "err", err)
183+
}
184+
case <-ctx.Done():
185+
ticker.Stop()
186+
return
187+
}
188+
}
189+
}()
190+
}
191+
134192
// backfillBlobs will persist all blobs from the provided beacon block header, to either the last block that was persisted
135193
// to the archivers storage or the origin block in the configuration. This is used to ensure that any gaps can be filled.
136194
// If an error is encountered persisting a block, it will retry after waiting for a period of time.
137195
func (a *Archiver) backfillBlobs(ctx context.Context, latest *v1.BeaconBlockHeader) {
138-
current, alreadyExists, err := latest, false, error(nil)
139-
140-
defer func() {
141-
a.log.Info("backfill complete", "endHash", current.Root.String(), "startHash", latest.Root.String())
142-
}()
196+
// Add backfill process that starts at latest slot, then loop through all backfill processes
197+
backfillProcesses, err := a.dataStoreClient.ReadBackfillProcesses(ctx)
198+
if err != nil {
199+
a.log.Crit("failed to read backfill_processes", "err", err)
200+
}
201+
backfillProcesses[common.Hash(latest.Root)] = storage.BackfillProcess{Start: *latest, Current: *latest}
202+
a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses)
203+
204+
backfillLoop := func(start *v1.BeaconBlockHeader, current *v1.BeaconBlockHeader) {
205+
curr, alreadyExists, err := current, false, error(nil)
206+
count := 0
207+
a.log.Info("backfill process initiated",
208+
"currHash", curr.Root.String(),
209+
"currSlot", curr.Header.Message.Slot,
210+
"startHash", start.Root.String(),
211+
"startSlot", start.Header.Message.Slot,
212+
)
213+
214+
defer func() {
215+
a.log.Info("backfill process complete",
216+
"endHash", curr.Root.String(),
217+
"endSlot", curr.Header.Message.Slot,
218+
"startHash", start.Root.String(),
219+
"startSlot", start.Header.Message.Slot,
220+
)
221+
delete(backfillProcesses, common.Hash(start.Root))
222+
a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses)
223+
}()
224+
225+
for !alreadyExists {
226+
previous := curr
227+
228+
if common.Hash(curr.Root) == a.cfg.OriginBlock {
229+
a.log.Info("reached origin block", "hash", curr.Root.String())
230+
return
231+
}
143232

144-
for !alreadyExists {
145-
previous := current
233+
curr, alreadyExists, err = a.persistBlobsForBlockToS3(ctx, previous.Header.Message.ParentRoot.String(), false)
234+
if err != nil {
235+
a.log.Error("failed to persist blobs for block, will retry", "err", err, "hash", previous.Header.Message.ParentRoot.String())
236+
// Revert back to block we failed to fetch
237+
curr = previous
238+
time.Sleep(backfillErrorRetryInterval)
239+
continue
240+
}
146241

147-
if common.Hash(current.Root) == a.cfg.OriginBlock {
148-
a.log.Info("reached origin block", "hash", current.Root.String())
149-
return
150-
}
242+
if !alreadyExists {
243+
a.metrics.RecordProcessedBlock(metrics.BlockSourceBackfill)
244+
}
151245

152-
current, alreadyExists, err = a.persistBlobsForBlockToS3(ctx, previous.Header.Message.ParentRoot.String(), false)
153-
if err != nil {
154-
a.log.Error("failed to persist blobs for block, will retry", "err", err, "hash", previous.Header.Message.ParentRoot.String())
155-
// Revert back to block we failed to fetch
156-
current = previous
157-
time.Sleep(backfillErrorRetryInterval)
158-
continue
246+
count++
247+
if count%10 == 0 {
248+
backfillProcesses[common.Hash(start.Root)] = storage.BackfillProcess{Start: *start, Current: *curr}
249+
a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses)
250+
}
159251
}
252+
}
160253

161-
if !alreadyExists {
162-
a.metrics.RecordProcessedBlock(metrics.BlockSourceBackfill)
163-
}
254+
for _, process := range backfillProcesses {
255+
backfillLoop(&process.Start, &process.Current)
164256
}
165257
}
166258

archiver/service/archiver_test.go

Lines changed: 101 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func TestArchiver_BackfillToOrigin(t *testing.T) {
8989
svc, fs := setup(t, beacon)
9090

9191
// We have the current head, which is block 5 written to storage
92-
err := fs.Write(context.Background(), storage.BlobData{
92+
err := fs.WriteBlob(context.Background(), storage.BlobData{
9393
Header: storage.Header{
9494
BeaconBlockHash: blobtest.Five,
9595
},
@@ -119,7 +119,7 @@ func TestArchiver_BackfillToExistingBlock(t *testing.T) {
119119
svc, fs := setup(t, beacon)
120120

121121
// We have the current head, which is block 5 written to storage
122-
err := fs.Write(context.Background(), storage.BlobData{
122+
err := fs.WriteBlob(context.Background(), storage.BlobData{
123123
Header: storage.Header{
124124
BeaconBlockHash: blobtest.Five,
125125
},
@@ -130,7 +130,7 @@ func TestArchiver_BackfillToExistingBlock(t *testing.T) {
130130
require.NoError(t, err)
131131

132132
// We also have block 1 written to storage
133-
err = fs.Write(context.Background(), storage.BlobData{
133+
err = fs.WriteBlob(context.Background(), storage.BlobData{
134134
Header: storage.Header{
135135
BeaconBlockHash: blobtest.One,
136136
},
@@ -156,13 +156,110 @@ func TestArchiver_BackfillToExistingBlock(t *testing.T) {
156156
require.NoError(t, err)
157157
require.True(t, exists)
158158

159-
data, err := fs.Read(context.Background(), blob)
159+
data, err := fs.ReadBlob(context.Background(), blob)
160160
require.NoError(t, err)
161161
require.NotNil(t, data)
162162
require.Equal(t, data.BlobSidecars.Data, beacon.Blobs[blob.String()])
163163
}
164164
}
165165

166+
func TestArchiver_ObtainLockfile(t *testing.T) {
167+
beacon := beacontest.NewDefaultStubBeaconClient(t)
168+
svc, _ := setup(t, beacon)
169+
170+
currentTime := time.Now().Unix()
171+
expiredTime := currentTime - 19
172+
err := svc.dataStoreClient.WriteLockfile(context.Background(), storage.Lockfile{ArchiverId: "FAKEID", Timestamp: expiredTime})
173+
require.NoError(t, err)
174+
175+
ObtainLockRetryInterval = 1 * time.Second
176+
svc.waitObtainStorageLock(context.Background())
177+
178+
lockfile, err := svc.dataStoreClient.ReadLockfile(context.Background())
179+
require.NoError(t, err)
180+
require.Equal(t, svc.id, lockfile.ArchiverId)
181+
require.True(t, lockfile.Timestamp >= currentTime)
182+
}
183+
184+
func TestArchiver_BackfillFinishOldProcess(t *testing.T) {
185+
beacon := beacontest.NewDefaultStubBeaconClient(t)
186+
svc, fs := setup(t, beacon)
187+
188+
// We have the current head, which is block 5 written to storage
189+
err := fs.WriteBlob(context.Background(), storage.BlobData{
190+
Header: storage.Header{
191+
BeaconBlockHash: blobtest.Five,
192+
},
193+
BlobSidecars: storage.BlobSidecars{
194+
Data: beacon.Blobs[blobtest.Five.String()],
195+
},
196+
})
197+
require.NoError(t, err)
198+
199+
// We also have block 3 written to storage
200+
err = fs.WriteBlob(context.Background(), storage.BlobData{
201+
Header: storage.Header{
202+
BeaconBlockHash: blobtest.Three,
203+
},
204+
BlobSidecars: storage.BlobSidecars{
205+
Data: beacon.Blobs[blobtest.Three.String()],
206+
},
207+
})
208+
require.NoError(t, err)
209+
210+
// We also have block 1 written to storage
211+
err = fs.WriteBlob(context.Background(), storage.BlobData{
212+
Header: storage.Header{
213+
BeaconBlockHash: blobtest.One,
214+
},
215+
BlobSidecars: storage.BlobSidecars{
216+
Data: beacon.Blobs[blobtest.One.String()],
217+
},
218+
})
219+
require.NoError(t, err)
220+
221+
// We expect to backfill blob 4 first, then 2 in a separate process
222+
expectedBlobs := []common.Hash{blobtest.Four, blobtest.Two}
223+
224+
for _, blob := range expectedBlobs {
225+
exists, err := fs.Exists(context.Background(), blob)
226+
require.NoError(t, err)
227+
require.False(t, exists)
228+
}
229+
230+
actualProcesses, err := svc.dataStoreClient.ReadBackfillProcesses(context.Background())
231+
expectedProcesses := make(storage.BackfillProcesses)
232+
require.NoError(t, err)
233+
require.Equal(t, expectedProcesses, actualProcesses)
234+
235+
expectedProcesses[blobtest.Three] = storage.BackfillProcess{Start: *beacon.Headers[blobtest.Three.String()], Current: *beacon.Headers[blobtest.Three.String()]}
236+
err = svc.dataStoreClient.WriteBackfillProcesses(context.Background(), expectedProcesses)
237+
require.NoError(t, err)
238+
239+
actualProcesses, err = svc.dataStoreClient.ReadBackfillProcesses(context.Background())
240+
require.NoError(t, err)
241+
require.Equal(t, expectedProcesses, actualProcesses)
242+
243+
svc.backfillBlobs(context.Background(), beacon.Headers[blobtest.Five.String()])
244+
245+
for _, blob := range expectedBlobs {
246+
exists, err := fs.Exists(context.Background(), blob)
247+
require.NoError(t, err)
248+
require.True(t, exists)
249+
250+
data, err := fs.ReadBlob(context.Background(), blob)
251+
require.NoError(t, err)
252+
require.NotNil(t, data)
253+
require.Equal(t, data.BlobSidecars.Data, beacon.Blobs[blob.String()])
254+
}
255+
256+
actualProcesses, err = svc.dataStoreClient.ReadBackfillProcesses(context.Background())
257+
require.NoError(t, err)
258+
svc.log.Info("backfill processes", "processes", actualProcesses)
259+
require.Equal(t, storage.BackfillProcesses{}, actualProcesses)
260+
261+
}
262+
166263
func TestArchiver_LatestStopsAtExistingBlock(t *testing.T) {
167264
beacon := beacontest.NewDefaultStubBeaconClient(t)
168265
svc, fs := setup(t, beacon)

common/blobtest/helpers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ var (
1717
Three = common.Hash{3}
1818
Four = common.Hash{4}
1919
Five = common.Hash{5}
20+
Six = common.Hash{6}
21+
Seven = common.Hash{7}
2022

2123
StartSlot = uint64(10)
2224
EndSlot = uint64(15)

0 commit comments

Comments
 (0)