Skip to content

Commit cc0aabe

Browse files
peterbitflyguybrush
authored andcommitted
feat(validator-tagger): reduce clickhouse query memory usage, display validator list, remove ethstore exporter, add lido dvt, add beaconscore to validator view, use 90d period instead of all time
1 parent 0a876e5 commit cc0aabe

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+7179
-1172
lines changed

Makefile

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ LDFLAGS="-X ${PACKAGE}/version.Version=${VERSION} -X ${PACKAGE}/version.BuildDat
77
CGO_CFLAGS="-O -D__BLST_PORTABLE__"
88
CGO_CFLAGS_ALLOW="-O -D__BLST_PORTABLE__"
99
10-
all: explorer stats frontend-data-updater eth1indexer blobindexer ethstore-exporter rewards-exporter node-jobs-processor signatures notification-sender notification-collector user-service misc validator-tagger
10+
all: explorer stats frontend-data-updater eth1indexer blobindexer rewards-exporter node-jobs-processor signatures notification-sender notification-collector user-service misc validator-tagger
1111
1212
lint:
1313
echo
@@ -34,9 +34,6 @@ stats:
3434
frontend-data-updater:
3535
CGO_CFLAGS=${CGO_CFLAGS} CGO_CFLAGS_ALLOW=${CGO_CFLAGS_ALLOW} go build --ldflags=${LDFLAGS} -o bin/frontend-data-updater cmd/frontend-data-updater/main.go
3636
37-
ethstore-exporter:
38-
CGO_CFLAGS=${CGO_CFLAGS} CGO_CFLAGS_ALLOW=${CGO_CFLAGS_ALLOW} go build --ldflags=${LDFLAGS} -o bin/ethstore-exporter cmd/ethstore-exporter/main.go
39-
4037
rewards-exporter:
4138
CGO_CFLAGS=${CGO_CFLAGS} CGO_CFLAGS_ALLOW=${CGO_CFLAGS_ALLOW} go build --ldflags=${LDFLAGS} -o bin/rewards-exporter cmd/rewards-exporter/main.go
4239

cmd/ethstore-exporter/main.go

Lines changed: 0 additions & 113 deletions
This file was deleted.

cmd/explorer/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,10 @@ func main() {
519519

520520
router.HandleFunc("/ethClients", handlers.EthClientsServices).Methods("GET")
521521
router.HandleFunc("/entities", handlers.Entities).Methods("GET")
522+
router.HandleFunc("/entities/data", handlers.EntitiesData).Methods("GET")
522523
router.HandleFunc("/entity/{entity}/{subEntity}", handlers.EntityDetail).Methods("GET")
524+
router.HandleFunc("/entity/{entity}/{subEntity}/subentities/data", handlers.EntitySubEntitiesData).Methods("GET")
525+
router.HandleFunc("/entity/{entity}/{subEntity}/validators/data", handlers.EntityValidatorsData).Methods("GET")
523526
router.HandleFunc("/relays", handlers.Relays).Methods("GET")
524527
router.HandleFunc("/pools/rocketpool", handlers.PoolsRocketpool).Methods("GET")
525528
router.HandleFunc("/pools/rocketpool/data/minipools", handlers.PoolsRocketpoolDataMinipools).Methods("GET")

cmd/validator-tagger/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func main() {
2222
configPath := flag.String("config", "", "Path to the config file, if empty string defaults will be used")
2323
versionFlag := flag.Bool("version", false, "Show version and exit")
2424
scheduleFlag := flag.Bool("schedule", false, "Start scheduler loop (daily 10:00 UTC and hourly precompute)")
25-
runFlag := flag.String("run", "", "Comma-separated steps to run on demand (import,lido,lido_csm,rocketpool,withdrawal_tagging,deposit_tagging,populate_validator_names,precompute,all)")
25+
runFlag := flag.String("run", "", "Comma-separated steps to run on demand (import,lido,lido_csm,lido_simple_dvt,rocketpool,withdrawal_tagging,deposit_tagging,populate_validator_names,precompute,all)")
2626
flag.Parse()
2727

2828
if *versionFlag {

db/clickhouse.go

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,30 +31,57 @@ func FetchClickhouseParquet[T any](ctx context.Context, sql string, yield func(T
3131
scheme := "https"
3232
url := fmt.Sprintf("%s://%s:%d/?database=%s&enable_http_compression=1", scheme, chCfg.Host, 8443, chCfg.Name)
3333

34-
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, io.NopCloser(strings.NewReader(sql)))
35-
if err != nil {
36-
return fmt.Errorf("new request: %w", err)
37-
}
38-
// Basic auth
39-
req.SetBasicAuth(chCfg.Username, chCfg.Password)
40-
// We'll expect raw Parquet bytes in the body
41-
req.Header.Set("Content-Type", "text/plain; charset=UTF-8")
42-
4334
client := &http.Client{Timeout: time.Second * 120}
44-
resp, err := client.Do(req)
45-
if err != nil {
46-
return fmt.Errorf("http do: %w", err)
47-
}
48-
if resp.StatusCode != http.StatusOK {
49-
defer resp.Body.Close()
50-
b, _ := io.ReadAll(io.LimitReader(resp.Body, 8192))
51-
return fmt.Errorf("clickhouse http status %d: %s", resp.StatusCode, string(b))
35+
36+
var data []byte
37+
var lastErr error
38+
for attempt := 1; attempt <= 5; attempt++ {
39+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, io.NopCloser(strings.NewReader(sql)))
40+
if err != nil {
41+
return fmt.Errorf("new request: %w", err)
42+
}
43+
// Basic auth
44+
req.SetBasicAuth(chCfg.Username, chCfg.Password)
45+
// We'll expect raw Parquet bytes in the body
46+
req.Header.Set("Content-Type", "text/plain; charset=UTF-8")
47+
48+
resp, err := client.Do(req)
49+
if err != nil {
50+
lastErr = fmt.Errorf("http do (attempt %d/5): %w", attempt, err)
51+
logger.WithField("attempt", attempt).Warnf("clickhouse parquet http request failed: %v", err)
52+
} else if resp.StatusCode != http.StatusOK {
53+
b, _ := io.ReadAll(io.LimitReader(resp.Body, 8192))
54+
_ = resp.Body.Close()
55+
lastErr = fmt.Errorf("clickhouse http status %d (attempt %d/5): %s", resp.StatusCode, attempt, string(b))
56+
logger.WithFields(map[string]interface{}{"attempt": attempt, "status": resp.StatusCode}).Warnf("clickhouse parquet http non-200: %s", strings.TrimSpace(string(b)))
57+
} else {
58+
logger.Info("fetched parquet http response")
59+
// Read body fully
60+
bodyBytes, err := io.ReadAll(resp.Body)
61+
resp.Body.Close()
62+
if err != nil {
63+
lastErr = fmt.Errorf("read parquet http body (attempt %d/5): %w", attempt, err)
64+
logger.WithField("attempt", attempt).Warnf("failed reading parquet http body: %v", err)
65+
} else {
66+
data = bodyBytes
67+
break
68+
}
69+
}
70+
71+
if attempt < 5 {
72+
// If context is cancelled, abort early
73+
select {
74+
case <-ctx.Done():
75+
return fmt.Errorf("context canceled during clickhouse fetch: %w", ctx.Err())
76+
case <-time.After(1 * time.Second):
77+
}
78+
}
5279
}
53-
logger.Info("fetched parquet http response")
54-
defer resp.Body.Close()
55-
data, err := io.ReadAll(resp.Body)
56-
if err != nil {
57-
return fmt.Errorf("read parquet http body: %w", err)
80+
if data == nil {
81+
if lastErr != nil {
82+
return lastErr
83+
}
84+
return fmt.Errorf("failed to fetch parquet data from clickhouse after 5 attempts")
5885
}
5986
// print the body size in megabytes
6087
logger.Infof("fetched %d Mb of parquet data", len(data)/1024/1024)

db/entities.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func GetEntitiesTreemapData(period string) ([]types.EntityTreemapItem, error) {
103103
logger.WithFields(map[string]interface{}{"period": period, "rows": len(cached)}).Debug("treemap: cache hit")
104104
return cached, nil
105105
}
106-
logger.WithField("period", period).Debug("treemap: cache miss")
106+
logger.WithField("period", period).Warn("treemap: cache miss")
107107
}
108108

109109
// Fetch from DB (singleflight) and optionally warm cache
@@ -296,3 +296,26 @@ func GetSubEntitiesForEntities(entityNames []string, period string) ([]SubEntity
296296
`, pq.Array(entityNames), period)
297297
return subEntities, err
298298
}
299+
300+
// GetSubEntityCountsForEntities returns the number of real sub-entities (sub_entity <> '-') per entity for the given period.
301+
func GetSubEntityCountsForEntities(entityNames []string, period string) (map[string]int, error) {
302+
type countRow struct {
303+
Entity string `db:"entity"`
304+
Count int `db:"cnt"`
305+
}
306+
var rows []countRow
307+
err := ReaderDb.Select(&rows, `
308+
SELECT entity, COUNT(*) AS cnt
309+
FROM validator_entities_data_periods
310+
WHERE period = $2 AND entity = ANY($1) AND sub_entity <> '-'
311+
GROUP BY entity
312+
`, pq.Array(entityNames), period)
313+
if err != nil {
314+
return nil, err
315+
}
316+
result := make(map[string]int, len(rows))
317+
for _, r := range rows {
318+
result[r.Entity] = r.Count
319+
}
320+
return result, nil
321+
}

0 commit comments

Comments
 (0)