Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add query IDs filter #87

Merged
merged 12 commits into from
May 30, 2024
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ RELAYER_TARGET_CHAIN_DEBUG=true
RELAYER_TARGET_CHAIN_OUTPUT_FORMAT=json

RELAYER_REGISTRY_ADDRESSES=neutron14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9s5c2epq
RELAYER_REGISTRY_QUERY_IDS=

RELAYER_ALLOW_TX_QUERIES=true
RELAYER_ALLOW_KV_CALLBACKS=true
Expand Down
1 change: 1 addition & 0 deletions .env.example.dev
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ RELAYER_TARGET_CHAIN_OUTPUT_FORMAT=json
RELAYER_TARGET_CHAIN_SIGN_MODE_STR=direct

RELAYER_REGISTRY_ADDRESSES=
RELAYER_REGISTRY_QUERY_IDS=

RELAYER_ALLOW_TX_QUERIES=true
RELAYER_ALLOW_KV_CALLBACKS=true
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ Relayer:
| `RELAYER_TARGET_CHAIN_DEBUG ` | `bool` | flag to run target chain provider in debug mode | optional |
| `RELAYER_TARGET_CHAIN_OUTPUT_FORMAT` | `json` or `yaml` | target chain provider output format | optional |
| `RELAYER_REGISTRY_ADDRESSES` | `string` | a list of comma-separated smart-contract addresses for which the relayer processes interchain queries | required |
| `RELAYER_REGISTRY_QUERY_IDS` | `string` | a list of comma-separated query IDs which complements to `RELAYER_REGISTRY_ADDRESSES` to further filter out interchain queries being processed | optional |
| `RELAYER_ALLOW_TX_QUERIES` | `bool` | if true relayer will process tx queries (if `false`, relayer will drop them) | required |
| `RELAYER_ALLOW_KV_CALLBACKS` | `bool` | if `true`, will pass proofs as sudo callbacks to contracts | required |
| `RELAYER_MIN_KV_UPDATE_PERIOD` | `uint` | minimal period of queries execution and submission (not less than `n` blocks) | optional |
Expand Down
29 changes: 23 additions & 6 deletions internal/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,53 @@ package registry
// RegistryConfig represents the config structure for the Registry.
type RegistryConfig struct {
Addresses []string
QueryIDs []uint64 `envconfig:"QUERY_IDS"`
}

// New instantiates a new *Registry based on the cfg.
func New(cfg *RegistryConfig) *Registry {
r := &Registry{
addresses: make(map[string]struct{}, len(cfg.Addresses)),
queryIDs: make(map[uint64]struct{}, len(cfg.QueryIDs)),
}
for _, addr := range cfg.Addresses {
r.addresses[addr] = struct{}{}
}
for _, queryID := range cfg.QueryIDs {
r.queryIDs[queryID] = struct{}{}
}
return r
}

// Registry is the relayer's watch list registry. It contains a list of addresses, and the relayer
// only works with interchain queries that are under these addresses' ownership.
// Registry is the relayer's watch list registry. It contains a list of addresses and a list of queryIDs,
// and the relayer only works with interchain queries that are under these addresses' ownership and match the queryIDs.
type Registry struct {
addresses map[string]struct{}
queryIDs map[uint64]struct{}
}

// IsEmpty returns true if the registry addresses list is empty.
func (r *Registry) IsEmpty() bool {
// IsAddressesEmpty returns true if the registry addresses list is empty.
func (r *Registry) IsAddressesEmpty() bool {
return len(r.addresses) == 0
}

// Contains returns true if the addr is in the registry.
func (r *Registry) Contains(addr string) bool {
// IsQueryIDsEmpty returns true if the registry queryIDs list is empty.
func (r *Registry) IsQueryIDsEmpty() bool {
return len(r.queryIDs) == 0
}

// ContainsAddress returns true if the addr is in the registry.
func (r *Registry) ContainsAddress(addr string) bool {
_, ex := r.addresses[addr]
return ex
}

// ContainsQueryID returns true if the queryID is in the registry.
func (r *Registry) ContainsQueryID(queryID uint64) bool {
_, ex := r.queryIDs[queryID]
return ex
}

func (r *Registry) GetAddresses() []string {
var out []string
for addr := range r.addresses {
Expand Down
15 changes: 13 additions & 2 deletions internal/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package subscriber
import (
"context"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -33,8 +34,8 @@ type SubscriberConfig struct {
ConnectionID string
// WatchedTypes is the list of query types to be observed and handled.
WatchedTypes []neutrontypes.InterchainQueryType
// Registry is a watch list registry. It contains a list of addresses, and the Subscriber only
// works with interchain queries and events that are under these addresses' ownership.
// Registry is a watch list registry. It contains a list of addresses and a list of queryIDs, and the Subscriber only
// works with interchain queries and events that are under ownership of these addresses and match the queryIDs.
Registry *rg.Registry
}

Expand Down Expand Up @@ -193,6 +194,16 @@ func (s *Subscriber) processUpdateEvent(ctx context.Context, event tmtypes.Resul
zap.String("query_id", queryID))
continue
}
queryIDNumber, err := strconv.ParseUint(queryID, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse queryID: %w", err)
}

if !s.isWatchedQueryID(queryIDNumber) {
s.logger.Debug("Skipping query (wrong queryID)", zap.String("owner", owner),
zap.String("query_id", queryID))
continue
}

// Load all information about the neutronQuery directly from Neutron.
neutronQuery, err := s.getNeutronRegisteredQuery(ctx, queryID)
Expand Down
13 changes: 11 additions & 2 deletions internal/subscriber/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *Subscriber) getNeutronRegisteredQuery(ctx context.Context, queryId stri
return neutronQuery, nil
}

// getNeutronRegisteredQueries retrieves the list of registered queries filtered by owner, connection, and query type.
// getNeutronRegisteredQueries retrieves the list of registered queries filtered by owner, connection, query type, and queryID.
func (s *Subscriber) getNeutronRegisteredQueries(ctx context.Context) (map[string]*neutrontypes.RegisteredQuery, error) {
var out = map[string]*neutrontypes.RegisteredQuery{}
var pageKey *strfmt.Base64
Expand Down Expand Up @@ -95,6 +95,9 @@ func (s *Subscriber) getNeutronRegisteredQueries(ctx context.Context) (map[strin
if !s.isWatchedMsgType(neutronQuery.QueryType) {
continue
}
if !s.isWatchedQueryID(neutronQuery.Id) {
continue
}
out[restQuery.ID] = neutronQuery
}
if payload.Pagination != nil && payload.Pagination.NextKey.String() != "" {
Expand Down Expand Up @@ -168,8 +171,14 @@ func (s *Subscriber) isWatchedMsgType(msgType string) bool {
return ex
}

// isWatchedQueryID returns true if the queryID is within the registry watched queryIDs or there
// are no registry watched queryIDs configured for the subscriber meaning all queryIDs are watched.
func (s *Subscriber) isWatchedQueryID(queryID uint64) bool {
return s.registry.IsQueryIDsEmpty() || s.registry.ContainsQueryID(queryID)
}

// isWatchedAddress returns true if the address is within the registry watched addresses or there
// are no registry watched addresses configured for the subscriber meaning all addresses are watched.
func (s *Subscriber) isWatchedAddress(address string) bool {
return s.registry.IsEmpty() || s.registry.Contains(address)
return s.registry.IsAddressesEmpty() || s.registry.ContainsAddress(address)
}
Loading