Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Cron-based Indexing with QuickNode Streams Integration #384

Open
chibie opened this issue Jan 16, 2025 · 0 comments · May be fixed by #420
Open

Replace Cron-based Indexing with QuickNode Streams Integration #384

chibie opened this issue Jan 16, 2025 · 0 comments · May be fixed by #420
Assignees
Labels

Comments

@chibie
Copy link
Contributor

chibie commented Jan 16, 2025

User Story
As a backend developer, I want to replace the current cron-based blockchain event indexing with QuickNode Streams to improve real-time event processing and reduce unnecessary RPC calls.

Acceptance Criteria

  1. GIVEN a new receive/linked address is generated
    WHEN no existing stream is monitoring this address
    THEN a QuickNode Stream should be created to monitor token deposits for that address

  2. GIVEN a new receive/linked address is generated
    WHEN an existing stream is already monitoring this address
    THEN reuse the existing stream and no new stream should be created

  3. GIVEN an order is created from a receive/linked address
    WHEN the order creation is successful
    THEN the corresponding address monitoring stream should be deleted

  4. GIVEN the server starts up
    WHEN checking Gateway contract events streams
    THEN only create streams for OrderCreated, OrderSettled, and OrderRefunded events that don't already exist across supported networks

  5. GIVEN a webhook endpoint is hit by QuickNode Stream
    WHEN the payload contains a relevant event
    THEN process the event according to existing business logic

Tech Details

type StreamManager interface {
    // Check if stream exists before creating
    GetAddressStream(ctx context.Context, address string, network *ent.Network) (*types.Stream, error)
    CreateAddressStream(ctx context.Context, address string, network *ent.Network) error
    DeleteAddressStream(ctx context.Context, streamID string) error
    
    // Contract event stream management
    GetContractEventStream(ctx context.Context, eventName string, network *ent.Network) (*types.Stream, error)
    CreateContractEventStream(ctx context.Context, eventName string, network *ent.Network) error
    DeleteContractEventStream(ctx context.Context, streamID string) error
}
  • Add stream existence check helper:
func streamExists(streams []types.Stream, address string, network string) bool {
    for _, stream := range streams {
        if stream.Address == address && stream.Network == network {
            return true
        }
    }
    return false
}
  • Implement webhook controller for QuickNode Stream notifications
  • Comment out existing cron tasks in tasks.go:
// Commented out in favor of QuickNode Streams
// _, err = scheduler.Cron("*/1 * * * *").Do(IndexBlockchainEvents)
// _, err = scheduler.Cron("*/1 * * * *").Do(IndexLinkedAddresses)
  • Add stream verification and creation logic in address generation flow:
// Pseudocode
if existingStream := streamManager.GetAddressStream(ctx, address, network); existingStream == nil {
    err := streamManager.CreateAddressStream(ctx, address, network)
    if err != nil {
        return fmt.Errorf("failed to create stream: %w", err)
    }
}
  • Add startup check and stream creation for Gateway contract events:
// Pseudocode for each supported network
for _, event := range []string{"OrderCreated", "OrderSettled", "OrderRefunded"} {
    if existingStream := streamManager.GetContractEventStream(ctx, event, network); existingStream == nil {
        err := streamManager.CreateContractEventStream(ctx, event, network)
        if err != nil {
            return fmt.Errorf("failed to create contract event stream: %w", err)
        }
    }
}

Notes/Assumptions

  • QuickNode Streams have better latency than 2 seconds cron jobs
  • Stream creation limits and pricing are acceptable for the current scale
  • All supported networks (Base, Polygon, Arbitrum One, BNB Smart Chain) have QuickNode Stream support. Including upcoming ones like Celo.
  • Existing business logic in Indexer interface can be reused for processing events

Open Questions

  • How should we handle stream creation failures?
  • How should we handle duplicate events in case of webhook retries?
  • Should we implement a fallback mechanism if QuickNode Streams service is down?
  • Should we implement stream cleanup for addresses that haven't received transactions after a certain period?
  • How should we handle the case where a stream exists but is in an invalid state?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants