Skip to content

Commit

Permalink
Make blob feed API into two-way APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix committed Feb 18, 2025
1 parent 268a3a9 commit c6306e7
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 112 deletions.
255 changes: 179 additions & 76 deletions disperser/dataapi/v2/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,129 +10,201 @@ import (

"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
v2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/disperser/dataapi"
"github.com/gin-gonic/gin"
)

// FetchBlobFeed godoc
// FetchBlobFeedForward godoc
//
// @Summary Fetch blob feed
// @Tags Blobs
// @Produce json
// @Param end query string false "Fetch blobs up to the end time (ISO 8601 format: 2006-01-02T15:04:05Z) [default: now]"
// @Param interval query int false "Fetch blobs starting from an interval (in seconds) before the end time [default: 3600]"
// @Param pagination_token query string false "Fetch blobs starting from the pagination token (exclusively). Overrides the interval param if specified [default: empty]"
// @Param limit query int false "The maximum number of blobs to fetch. System max (1000) if limit <= 0 [default: 20; max: 1000]"
// @Success 200 {object} BlobFeedResponse
// @Failure 400 {object} ErrorResponse "error: Bad request"
// @Failure 404 {object} ErrorResponse "error: Not found"
// @Failure 500 {object} ErrorResponse "error: Server error"
// @Router /blobs/feed [get]
func (s *ServerV2) FetchBlobFeed(c *gin.Context) {
// @Summary Fetch blob feed forward in time (oldest to newest)
// @Tags Blobs
// @Produce json
// @Param after query string false "Fetch blobs after this time (ISO 8601 format) [default: until - 1h]"
// @Param until query string false "Stop fetching at this time (ISO 8601 format) [default: now]"
// @Param cursor query string false "Pagination cursor for fetching newer items; override after [default: empty]"
// @Param limit query int false "Maximum number of blobs to fetch [default: 20; max: 1000]"
// @Success 200 {object} BlobFeedResponse
// @Failure 400 {object} ErrorResponse "error: Bad request"
// @Failure 404 {object} ErrorResponse "error: Not found"
// @Failure 500 {object} ErrorResponse "error: Server error"
// @Router /blobs/feed/forward [get]
func (s *ServerV2) FetchBlobFeedForward(c *gin.Context) {
handlerStart := time.Now()
var err error

now := handlerStart
oldestTime := now.Add(-maxBlobAge)

endTime := now
if c.Query("end") != "" {
endTime, err = time.Parse("2006-01-02T15:04:05Z", c.Query("end"))
if err != nil {
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeed")
invalidParamsErrorResponse(c, fmt.Errorf("failed to parse end param: %w", err))
return
}
if endTime.Before(oldestTime) {
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeed")
invalidParamsErrorResponse(c, fmt.Errorf("end time cannot be more than 14 days in the past, found: %s", c.Query("end")))
return
}
}

interval := 3600
if c.Query("interval") != "" {
interval, err = strconv.Atoi(c.Query("interval"))
untilTime := now
if c.Query("until") != "" {
untilTime, err = time.Parse("2006-01-02T15:04:05Z", c.Query("until"))
if err != nil {
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeed")
invalidParamsErrorResponse(c, fmt.Errorf("failed to parse interval param: %w", err))
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeedForward")
invalidParamsErrorResponse(c, fmt.Errorf("failed to parse until param: %w", err))
return
}
if interval <= 0 {
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeed")
invalidParamsErrorResponse(c, fmt.Errorf("interval must be greater than 0, found: %d", interval))
if untilTime.Before(oldestTime) {
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeedForward")
invalidParamsErrorResponse(c, fmt.Errorf("until time cannot be more than 14 days in the past, found: %s", c.Query("until")))
return
}
}

limit, err := strconv.Atoi(c.DefaultQuery("limit", "20"))
if err != nil {
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeed")
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeedForward")
invalidParamsErrorResponse(c, fmt.Errorf("failed to parse limit param: %w", err))
return
}
if limit <= 0 || limit > maxNumBlobsPerBlobFeedResponse {
limit = maxNumBlobsPerBlobFeedResponse
}

paginationCursor := blobstore.BlobFeedCursor{
RequestedAt: 0,
}
if c.Query("pagination_token") != "" {
cursor, err := paginationCursor.FromCursorKey(c.Query("pagination_token"))
var afterCursor blobstore.BlobFeedCursor

// Handle cursor (overrides after)
if cursorStr := c.Query("cursor"); cursorStr != "" {
cursor, err := new(blobstore.BlobFeedCursor).FromCursorKey(cursorStr)
if err != nil {
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeed")
invalidParamsErrorResponse(c, fmt.Errorf("failed to parse the pagination token: %w", err))
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeedForward")
invalidParamsErrorResponse(c, fmt.Errorf("failed to parse the cursor: %w", err))
return
}
paginationCursor = *cursor
afterCursor = *cursor
} else {
// Use after parameter if no cursor
afterTime := untilTime.Add(-time.Hour) // default to 1 hour ago
if c.Query("after") != "" {
afterTime, err = time.Parse("2006-01-02T15:04:05Z", c.Query("after"))
if err != nil {
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeedForward")
invalidParamsErrorResponse(c, fmt.Errorf("failed to parse after param: %w", err))
return
}
if afterTime.Before(oldestTime) {
afterTime = oldestTime
}
}
afterCursor = blobstore.BlobFeedCursor{
RequestedAt: uint64(afterTime.UnixNano()),
}
}

startTime := endTime.Add(-time.Duration(interval) * time.Second)
if startTime.Before(oldestTime) {
startTime = oldestTime
untilCursor := blobstore.BlobFeedCursor{
RequestedAt: uint64(untilTime.UnixNano()),
}
startCursor := blobstore.BlobFeedCursor{
RequestedAt: uint64(startTime.UnixNano()),
}
if startCursor.LessThan(&paginationCursor) {
startCursor = paginationCursor

blobs, nextCursor, err := s.blobMetadataStore.GetBlobMetadataByRequestedAt(
c.Request.Context(),
afterCursor,
untilCursor,
limit,
)
if err != nil {
s.metrics.IncrementFailedRequestNum("FetchBlobFeedForward")
errorResponse(c, fmt.Errorf("failed to fetch feed from blob metadata store: %w", err))
return
}
endCursor := blobstore.BlobFeedCursor{
RequestedAt: uint64(endTime.UnixNano()),
s.sendBlobFeedResponse(c, blobs, nextCursor, handlerStart)
}

// FetchBlobFeedBackward godoc
//
// @Summary Fetch blob feed backward in time (newest to oldest)
// @Tags Blobs
// @Produce json
// @Param before query string false "Fetch blobs before this time (ISO 8601 format) [default: now]"
// @Param until query string false "Stop fetching at this time (ISO 8601 format) [default: now-1h]"
// @Param cursor query string false "Pagination cursor for fetching older items; override before [default: empty]"
// @Param limit query int false "Maximum number of blobs to fetch [default: 20; max: 1000]"
// @Success 200 {object} BlobFeedResponse
// @Failure 400 {object} ErrorResponse "error: Bad request"
// @Failure 404 {object} ErrorResponse "error: Not found"
// @Failure 500 {object} ErrorResponse "error: Server error"
// @Router /blobs/feed/backward [get]
func (s *ServerV2) FetchBlobFeedBackward(c *gin.Context) {
handlerStart := time.Now()
var err error

now := handlerStart
oldestTime := now.Add(-maxBlobAge)

// Handle before parameter
beforeTime := now // default to now
if c.Query("before") != "" {
beforeTime, err = time.Parse("2006-01-02T15:04:05Z", c.Query("before"))
if err != nil {
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeedBackward")
invalidParamsErrorResponse(c, fmt.Errorf("failed to parse before param: %w", err))
return
}
if beforeTime.Before(oldestTime) {
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeedBackward")
invalidParamsErrorResponse(c, fmt.Errorf("before time cannot be more than 14 days in the past, found: %s", c.Query("before")))
return
}
}

blobs, paginationToken, err := s.blobMetadataStore.GetBlobMetadataByRequestedAt(c.Request.Context(), startCursor, endCursor, limit)
limit, err := strconv.Atoi(c.DefaultQuery("limit", "20"))
if err != nil {
s.metrics.IncrementFailedRequestNum("FetchBlobFeed")
errorResponse(c, fmt.Errorf("failed to fetch feed from blob metadata store: %w", err))
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeedBackward")
invalidParamsErrorResponse(c, fmt.Errorf("failed to parse limit param: %w", err))
return
}
if limit <= 0 || limit > maxNumBlobsPerBlobFeedResponse {
limit = maxNumBlobsPerBlobFeedResponse
}

token := ""
if paginationToken != nil {
token = paginationToken.ToCursorKey()
var beforeCursor blobstore.BlobFeedCursor

// Handle cursor (overrides before)
if cursorStr := c.Query("cursor"); cursorStr != "" {
cursor, err := new(blobstore.BlobFeedCursor).FromCursorKey(cursorStr)
if err != nil {
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeedBackward")
invalidParamsErrorResponse(c, fmt.Errorf("failed to parse the cursor: %w", err))
return
}
beforeCursor = *cursor
} else {
beforeCursor = blobstore.BlobFeedCursor{
RequestedAt: uint64(beforeTime.UnixNano()),
}
}
blobInfo := make([]BlobInfo, len(blobs))
for i := 0; i < len(blobs); i++ {
bk, err := blobs[i].BlobHeader.BlobKey()

// Handle until parameter
untilTime := now.Add(-time.Hour) // default to 1 hour ago
if c.Query("until") != "" {
untilTime, err = time.Parse("2006-01-02T15:04:05Z", c.Query("until"))
if err != nil {
s.metrics.IncrementFailedRequestNum("FetchBlobFeed")
errorResponse(c, fmt.Errorf("failed to serialize blob key: %w", err))
s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeedBackward")
invalidParamsErrorResponse(c, fmt.Errorf("failed to parse until param: %w", err))
return
}
blobInfo[i].BlobKey = bk.Hex()
blobInfo[i].BlobMetadata = blobs[i]
if untilTime.Before(oldestTime) {
untilTime = oldestTime
}
}
response := &BlobFeedResponse{
Blobs: blobInfo,
PaginationToken: token,

untilCursor := blobstore.BlobFeedCursor{
RequestedAt: uint64(untilTime.UnixNano()),
}
c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxFeedBlobAge))
s.metrics.IncrementSuccessfulRequestNum("FetchBlobFeed")
s.metrics.ObserveLatency("FetchBlobFeed", time.Since(handlerStart))
c.JSON(http.StatusOK, response)

// TODO(jianxiao): this is just a placeholder as GetBlobMetadataByRequested is doing forward retrieval
blobs, nextCursor, err := s.blobMetadataStore.GetBlobMetadataByRequestedAt(
c.Request.Context(),
untilCursor,
beforeCursor,
limit,
)
if err != nil {
s.metrics.IncrementFailedRequestNum("FetchBlobFeedBackward")
errorResponse(c, fmt.Errorf("failed to fetch feed from blob metadata store: %w", err))
return
}
s.sendBlobFeedResponse(c, blobs, nextCursor, handlerStart)
}

// FetchBlob godoc
Expand Down Expand Up @@ -356,3 +428,34 @@ func (s *ServerV2) getAllOperatorsForAttestation(ctx context.Context, attestatio

return operatorList, operatorsByQuorum, nil
}

func (s *ServerV2) sendBlobFeedResponse(
c *gin.Context,
blobs []*v2.BlobMetadata,
nextCursor *blobstore.BlobFeedCursor,
handlerStart time.Time,
) {
cursorStr := ""
if nextCursor != nil {
cursorStr = nextCursor.ToCursorKey()
}
blobInfo := make([]BlobInfo, len(blobs))
for i := 0; i < len(blobs); i++ {
bk, err := blobs[i].BlobHeader.BlobKey()
if err != nil {
s.metrics.IncrementFailedRequestNum("FetchBlobFeedForward")
errorResponse(c, fmt.Errorf("failed to serialize blob key: %w", err))
return
}
blobInfo[i].BlobKey = bk.Hex()
blobInfo[i].BlobMetadata = blobs[i]
}
response := &BlobFeedResponse{
Blobs: blobInfo,
Cursor: cursorStr,
}
c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxFeedBlobAge))
s.metrics.IncrementSuccessfulRequestNum("FetchBlobFeedForward")
s.metrics.ObserveLatency("FetchBlobFeedForward", time.Since(handlerStart))
c.JSON(http.StatusOK, response)
}
7 changes: 4 additions & 3 deletions disperser/dataapi/v2/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ type (
BlobMetadata *disperserv2.BlobMetadata `json:"blob_metadata"`
}
BlobFeedResponse struct {
Blobs []BlobInfo `json:"blobs"`
PaginationToken string `json:"pagination_token"`
Blobs []BlobInfo `json:"blobs"`
Cursor string `json:"cursor"`
}

BatchResponse struct {
Expand Down Expand Up @@ -263,7 +263,8 @@ func (s *ServerV2) Start() error {
{
blobs := v2.Group("/blobs")
{
blobs.GET("/feed", s.FetchBlobFeed)
blobs.GET("/feed/forward", s.FetchBlobFeedForward)
blobs.GET("/feed/backward", s.FetchBlobFeedBackward)
blobs.GET("/:blob_key", s.FetchBlob)
blobs.GET("/:blob_key/certificate", s.FetchBlobCertificate)
blobs.GET("/:blob_key/attestation-info", s.FetchBlobAttestationInfo)
Expand Down
Loading

0 comments on commit c6306e7

Please sign in to comment.