-
Notifications
You must be signed in to change notification settings - Fork 71
/
Copy patharchiver.go
351 lines (292 loc) · 11.1 KB
/
archiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
package service
import (
"context"
"errors"
"strconv"
"time"
client "github.com/attestantio/go-eth2-client"
"github.com/attestantio/go-eth2-client/api"
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/base/blob-archiver/archiver/flags"
"github.com/base/blob-archiver/archiver/metrics"
"github.com/base/blob-archiver/common/storage"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/google/uuid"
)
const (
liveFetchBlobMaximumRetries = 10
startupFetchBlobMaximumRetries = 3
rearchiveMaximumRetries = 3
backfillErrorRetryInterval = 5 * time.Second
)
type BeaconClient interface {
client.BlobSidecarsProvider
client.BeaconBlockHeadersProvider
}
func NewArchiver(l log.Logger, cfg flags.ArchiverConfig, dataStoreClient storage.DataStore, client BeaconClient, m metrics.Metricer) (*Archiver, error) {
return &Archiver{
log: l,
cfg: cfg,
dataStoreClient: dataStoreClient,
metrics: m,
beaconClient: client,
stopCh: make(chan struct{}),
id: uuid.New().String(),
}, nil
}
type Archiver struct {
log log.Logger
cfg flags.ArchiverConfig
dataStoreClient storage.DataStore
beaconClient BeaconClient
metrics metrics.Metricer
stopCh chan struct{}
id string
}
// Start starts archiving blobs. It begins polling the beacon node for the latest blocks and persisting blobs for
// them. Concurrently it'll also begin a backfill process (see backfillBlobs) to store all blobs from the current head
// to the previously stored blocks. This ensures that during restarts or outages of an archiver, any gaps will be
// filled in.
func (a *Archiver) Start(ctx context.Context) error {
currentBlock, _, err := retry.Do2(ctx, startupFetchBlobMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) {
return a.persistBlobsForBlock(ctx, "head", false)
})
if err != nil {
a.log.Error("failed to seed archiver with initial block", "err", err)
return err
}
a.waitObtainStorageLock(ctx)
go a.backfillBlobs(ctx, currentBlock)
return a.trackLatestBlocks(ctx)
}
// Stops the archiver service.
func (a *Archiver) Stop(ctx context.Context) error {
close(a.stopCh)
return nil
}
// persistBlobsForBlock fetches the blobs for a given block and persists them to S3. It returns the block header
// and a boolean indicating whether the blobs already existed in S3 and any errors that occur.
// If the blobs are already stored, it will not overwrite the data. Currently, the archiver does not
// perform any validation of the blobs, it assumes a trusted beacon node. See:
// https://github.com/base/blob-archiver/issues/4.
func (a *Archiver) persistBlobsForBlock(ctx context.Context, blockIdentifier string, overwrite bool) (*v1.BeaconBlockHeader, bool, error) {
currentHeader, err := a.beaconClient.BeaconBlockHeader(ctx, &api.BeaconBlockHeaderOpts{
Block: blockIdentifier,
})
if err != nil {
a.log.Error("failed to fetch latest beacon block header", "err", err)
return nil, false, err
}
exists, err := a.dataStoreClient.Exists(ctx, common.Hash(currentHeader.Data.Root))
if err != nil {
a.log.Error("failed to check if blob exists", "err", err)
return nil, false, err
}
if exists && !overwrite {
a.log.Debug("blob already exists", "hash", currentHeader.Data.Root)
return currentHeader.Data, true, nil
}
blobSidecars, err := a.beaconClient.BlobSidecars(ctx, &api.BlobSidecarsOpts{
Block: currentHeader.Data.Root.String(),
})
if err != nil {
a.log.Error("failed to fetch blob sidecars", "err", err)
return nil, false, err
}
a.log.Debug("fetched blob sidecars", "count", len(blobSidecars.Data))
blobData := storage.BlobData{
Header: storage.Header{
BeaconBlockHash: common.Hash(currentHeader.Data.Root),
},
BlobSidecars: storage.BlobSidecars{Data: blobSidecars.Data},
}
// The blob that is being written has not been validated. It is assumed that the beacon node is trusted.
err = a.dataStoreClient.WriteBlob(ctx, blobData)
if err != nil {
a.log.Error("failed to write blob", "err", err)
return nil, false, err
}
a.metrics.RecordStoredBlobs(len(blobSidecars.Data))
return currentHeader.Data, exists, nil
}
const LockUpdateInterval = 10 * time.Second
const LockTimeout = int64(20) // 20 seconds
var ObtainLockRetryInterval = 10 * time.Second
func (a *Archiver) waitObtainStorageLock(ctx context.Context) {
lockfile, err := a.dataStoreClient.ReadLockfile(ctx)
if err != nil {
a.log.Crit("failed to read lockfile", "err", err)
}
currentTime := time.Now().Unix()
emptyLockfile := storage.Lockfile{}
if lockfile != emptyLockfile {
for lockfile.ArchiverId != a.id && lockfile.Timestamp+LockTimeout > currentTime {
// Loop until the timestamp read from storage is expired
a.log.Info("waiting for storage lock timestamp to expire",
"timestamp", strconv.FormatInt(lockfile.Timestamp, 10),
"currentTime", strconv.FormatInt(currentTime, 10),
)
time.Sleep(ObtainLockRetryInterval)
lockfile, err = a.dataStoreClient.ReadLockfile(ctx)
if err != nil {
a.log.Crit("failed to read lockfile", "err", err)
}
currentTime = time.Now().Unix()
}
}
err = a.dataStoreClient.WriteLockfile(ctx, storage.Lockfile{ArchiverId: a.id, Timestamp: currentTime})
if err != nil {
a.log.Crit("failed to write to lockfile: %v", err)
}
a.log.Info("obtained storage lock")
go func() {
// Retain storage lock by continually updating the stored timestamp
ticker := time.NewTicker(LockUpdateInterval)
for {
select {
case <-ticker.C:
currentTime := time.Now().Unix()
err := a.dataStoreClient.WriteLockfile(ctx, storage.Lockfile{ArchiverId: a.id, Timestamp: currentTime})
if err != nil {
a.log.Error("failed to update lockfile timestamp", "err", err)
}
case <-ctx.Done():
ticker.Stop()
return
}
}
}()
}
// backfillBlobs will persist all blobs from the provided beacon block header, to either the last block that was persisted
// to the archivers storage or the origin block in the configuration. This is used to ensure that any gaps can be filled.
// If an error is encountered persisting a block, it will retry after waiting for a period of time.
func (a *Archiver) backfillBlobs(ctx context.Context, latest *v1.BeaconBlockHeader) {
// Add backfill process that starts at latest slot, then loop through all backfill processes
backfillProcesses, err := a.dataStoreClient.ReadBackfillProcesses(ctx)
if err != nil {
a.log.Crit("failed to read backfill_processes", "err", err)
}
backfillProcesses[common.Hash(latest.Root)] = storage.BackfillProcess{Start: *latest, Current: *latest}
_ = a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses)
backfillLoop := func(start *v1.BeaconBlockHeader, current *v1.BeaconBlockHeader) {
curr, alreadyExists, err := current, false, error(nil)
count := 0
a.log.Info("backfill process initiated",
"currHash", curr.Root.String(),
"currSlot", curr.Header.Message.Slot,
"startHash", start.Root.String(),
"startSlot", start.Header.Message.Slot,
)
defer func() {
a.log.Info("backfill process complete",
"endHash", curr.Root.String(),
"endSlot", curr.Header.Message.Slot,
"startHash", start.Root.String(),
"startSlot", start.Header.Message.Slot,
)
delete(backfillProcesses, common.Hash(start.Root))
_ = a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses)
}()
for !alreadyExists {
previous := curr
if common.Hash(curr.Root) == a.cfg.OriginBlock {
a.log.Info("reached origin block", "hash", curr.Root.String())
return
}
curr, alreadyExists, err = a.persistBlobsForBlock(ctx, previous.Header.Message.ParentRoot.String(), false)
if err != nil {
a.log.Error("failed to persist blobs for block, will retry", "err", err, "hash", previous.Header.Message.ParentRoot.String())
// Revert back to block we failed to fetch
curr = previous
time.Sleep(backfillErrorRetryInterval)
continue
}
if !alreadyExists {
a.metrics.RecordProcessedBlock(metrics.BlockSourceBackfill)
}
count++
if count%10 == 0 {
backfillProcesses[common.Hash(start.Root)] = storage.BackfillProcess{Start: *start, Current: *curr}
_ = a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses)
}
}
}
for _, process := range backfillProcesses {
backfillLoop(&process.Start, &process.Current)
}
}
// trackLatestBlocks will poll the beacon node for the latest blocks and persist blobs for them.
func (a *Archiver) trackLatestBlocks(ctx context.Context) error {
t := time.NewTicker(a.cfg.PollInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-a.stopCh:
return nil
case <-t.C:
a.processBlocksUntilKnownBlock(ctx)
}
}
}
// processBlocksUntilKnownBlock will fetch and persist blobs for blocks until it finds a block that has been stored before.
// In the case of a reorg, it will fetch the new head and then walk back the chain, storing all blobs until it finds a
// known block -- that already exists in the archivers' storage.
func (a *Archiver) processBlocksUntilKnownBlock(ctx context.Context) {
a.log.Debug("refreshing live data")
var start *v1.BeaconBlockHeader
currentBlockId := "head"
for {
current, alreadyExisted, err := retry.Do2(ctx, liveFetchBlobMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) {
return a.persistBlobsForBlock(ctx, currentBlockId, false)
})
if err != nil {
a.log.Error("failed to update live blobs for block", "err", err, "blockId", currentBlockId)
return
}
if start == nil {
start = current
}
if !alreadyExisted {
a.metrics.RecordProcessedBlock(metrics.BlockSourceLive)
} else {
a.log.Debug("blob already exists", "hash", current.Root.String())
break
}
currentBlockId = current.Header.Message.ParentRoot.String()
}
a.log.Info("live data refreshed", "startHash", start.Root.String(), "endHash", currentBlockId)
}
// rearchiveRange will rearchive all blocks in the range from the given start to end. It returns the start and end of the
// range that was successfully rearchived. On any persistent errors, it will halt archiving and return the range of blocks
// that were rearchived and the error that halted the process.
func (a *Archiver) rearchiveRange(from uint64, to uint64) (uint64, uint64, error) {
for i := from; i <= to; i++ {
id := strconv.FormatUint(i, 10)
l := a.log.New("slot", id)
l.Info("rearchiving block")
rewritten, err := retry.Do(context.Background(), rearchiveMaximumRetries, retry.Exponential(), func() (bool, error) {
_, _, e := a.persistBlobsForBlock(context.Background(), id, true)
// If the block is not found, we can assume that the slot has been skipped
if e != nil {
var apiErr *api.Error
if errors.As(e, &apiErr) && apiErr.StatusCode == 404 {
return false, nil
}
return false, e
}
return true, nil
})
if err != nil {
return from, i, err
}
if !rewritten {
l.Info("block not found during reachiving", "slot", id)
}
a.metrics.RecordProcessedBlock(metrics.BlockSourceRearchive)
}
return from, to, nil
}