@@ -3,32 +3,30 @@ package subscriber
3
3
import (
4
4
"context"
5
5
"fmt"
6
+ nlogger "github.com/neutron-org/neutron-logger"
7
+ "github.com/neutron-org/neutron-query-relayer/internal/app"
8
+ "github.com/neutron-org/neutron-query-relayer/internal/config"
9
+ "github.com/neutron-org/neutron-query-relayer/internal/relay"
6
10
"sync"
7
11
"time"
8
12
13
+ "github.com/neutron-org/neutron-query-relayer/internal/registry"
14
+
9
15
instrumenters "github.com/neutron-org/neutron-query-relayer/internal/metrics"
10
16
11
- "github.com/cometbft/cometbft/rpc/client/http"
12
17
tmtypes "github.com/cometbft/cometbft/rpc/core/types"
13
18
"go.uber.org/zap"
14
19
15
20
rg "github.com/neutron-org/neutron-query-relayer/internal/registry"
16
- restclient "github.com/neutron-org/neutron-query-relayer/internal/subscriber/querier/client"
17
21
neutrontypes "github.com/neutron-org/neutron/x/interchainqueries/types"
18
22
)
19
23
20
24
var (
21
25
unsubscribeTimeout = time .Second * 5
22
26
)
23
27
24
- // SubscriberConfig contains configurable fields for the Subscriber.
25
- type SubscriberConfig struct {
26
- // RPCAddress represents the address for RPC calls to the chain.
27
- RPCAddress string
28
- // RESTAddress represents the address for REST calls to the chain.
29
- RESTAddress string
30
- // Timeout defines time limit for requests executed by the Subscriber.
31
- Timeout time.Duration
28
+ // Config contains configurable fields for the Subscriber.
29
+ type Config struct {
32
30
// ConnectionID is the Neutron's side connection ID used to filter out queries.
33
31
ConnectionID string
34
32
// WatchedTypes is the list of query types to be observed and handled.
@@ -38,24 +36,50 @@ type SubscriberConfig struct {
38
36
Registry * rg.Registry
39
37
}
40
38
41
- // NewSubscriber creates a new Subscriber instance ready to subscribe to Neutron events.
42
- func NewSubscriber (
43
- cfg * SubscriberConfig ,
44
- logger * zap.Logger ,
45
- ) (* Subscriber , error ) {
39
+ func NewDefaultSubscriber (cfg config.NeutronQueryRelayerConfig , logRegistry * nlogger.Registry ) (relay.Subscriber , error ) {
40
+ watchedMsgTypes := []neutrontypes.InterchainQueryType {neutrontypes .InterchainQueryTypeKV }
41
+ if cfg .AllowTxQueries {
42
+ watchedMsgTypes = append (watchedMsgTypes , neutrontypes .InterchainQueryTypeTX )
43
+ }
44
+
46
45
// rpcClient is used to subscribe to Neutron events.
47
- rpcClient , err := newRPCClient (cfg .RPCAddress , cfg .Timeout )
46
+ rpcClient , err := NewRPCClient (cfg .NeutronChain . RPCAddr , cfg . NeutronChain .Timeout )
48
47
if err != nil {
49
- return nil , fmt .Errorf ("could not create new tendermint rpcClient: %w" , err )
50
- }
51
- if err = rpcClient .Start (); err != nil {
52
- return nil , fmt .Errorf ("could not start tendermint rpcClient: %w" , err )
48
+ return nil , fmt .Errorf ("could not create new tendermint rpcClient for Subscriber: %w" , err )
53
49
}
54
50
55
51
// restClient is used to retrieve registered queries from Neutron.
56
- restClient , err := newRESTClient (cfg .RESTAddress , cfg .Timeout )
52
+ restClient , err := NewRESTClient (cfg .NeutronChain .RESTAddr , cfg .NeutronChain .Timeout )
53
+ if err != nil {
54
+ return nil , fmt .Errorf ("failed to get NewRESTClient for Subscriber: %w" , err )
55
+ }
56
+
57
+ sub , err := NewSubscriber (
58
+ & Config {
59
+ ConnectionID : cfg .NeutronChain .ConnectionID ,
60
+ WatchedTypes : watchedMsgTypes ,
61
+ Registry : registry .New (cfg .Registry ),
62
+ },
63
+ rpcClient ,
64
+ restClient .Query ,
65
+ logRegistry .Get (app .SubscriberContext ),
66
+ )
57
67
if err != nil {
58
- return nil , fmt .Errorf ("failed to get newRESTClient: %w" , err )
68
+ return nil , fmt .Errorf ("failed to create a NewSubscriber: %s" , err )
69
+ }
70
+
71
+ return sub , nil
72
+ }
73
+
74
+ // NewSubscriber creates a new Subscriber instance ready to subscribe to Neutron events.
75
+ func NewSubscriber (
76
+ cfg * Config ,
77
+ rpcClient RpcHttpClient ,
78
+ restClient RestHttpQuery ,
79
+ logger * zap.Logger ,
80
+ ) (* Subscriber , error ) {
81
+ if err := rpcClient .Start (); err != nil {
82
+ return nil , fmt .Errorf ("could not start tendermint rpcClient: %w" , err )
59
83
}
60
84
61
85
// Contains the types of queries that we are ready to serve (KV / TX).
@@ -65,8 +89,8 @@ func NewSubscriber(
65
89
}
66
90
67
91
return & Subscriber {
68
- rpcClient : rpcClient ,
69
- restClient : restClient ,
92
+ rpcClient : rpcClient ,
93
+ restClientQuery : restClient ,
70
94
71
95
connectionID : cfg .ConnectionID ,
72
96
registry : cfg .Registry ,
@@ -81,13 +105,12 @@ func NewSubscriber(
81
105
// filters them in accordance with the Registry configuration and watchedTypes, and provides a
82
106
// stream of split to KV and TX messages.
83
107
type Subscriber struct {
84
- rpcClient * http.HTTP // Used to subscribe to events
85
- restClient * restclient.HTTPAPIConsole // Used to run Neutron-specific queries using the REST
86
-
87
- connectionID string
88
- registry * rg.Registry
89
- logger * zap.Logger
90
- watchedTypes map [neutrontypes.InterchainQueryType ]struct {}
108
+ rpcClient RpcHttpClient // Used to subscribe to events
109
+ restClientQuery RestHttpQuery // Used to run Neutron-specific queries using the REST
110
+ connectionID string
111
+ registry * rg.Registry
112
+ logger * zap.Logger
113
+ watchedTypes map [neutrontypes.InterchainQueryType ]struct {}
91
114
92
115
activeQueries map [string ]* neutrontypes.RegisteredQuery
93
116
}
@@ -183,10 +206,10 @@ func (s *Subscriber) processUpdateEvent(ctx context.Context, event tmtypes.Resul
183
206
// There can be multiple events of the same type associated with our connection id in a
184
207
// single tmtypes.ResultEvent value. We need to process all of them.
185
208
var events = event .Events
186
- for idx := range events [connectionIdAttr ] {
209
+ for idx := range events [ConnectionIdAttr ] {
187
210
var (
188
- owner = events [ownerAttr ][idx ]
189
- queryID = events [queryIdAttr ][idx ]
211
+ owner = events [OwnerAttr ][idx ]
212
+ queryID = events [QueryIdAttr ][idx ]
190
213
)
191
214
if ! s .isWatchedAddress (owner ) {
192
215
s .logger .Debug ("Skipping query (wrong owner)" , zap .String ("owner" , owner ),
@@ -197,7 +220,8 @@ func (s *Subscriber) processUpdateEvent(ctx context.Context, event tmtypes.Resul
197
220
// Load all information about the neutronQuery directly from Neutron.
198
221
neutronQuery , err := s .getNeutronRegisteredQuery (ctx , queryID )
199
222
if err != nil {
200
- return fmt .Errorf ("failed to getNeutronRegisteredQuery: %w" , err )
223
+ s .logger .Debug ("Skipping query (could not find by id, probably removed)" , zap .String ("queryId" , queryID ))
224
+ continue
201
225
}
202
226
203
227
if ! s .isWatchedMsgType (neutronQuery .QueryType ) {
@@ -228,9 +252,9 @@ func (s *Subscriber) processRemoveEvent(event tmtypes.ResultEvent) error {
228
252
// There can be multiple events of the same type associated with our connection id in a
229
253
// single tmtypes.ResultEvent value. We need to process all of them.
230
254
var events = event .Events
231
- for idx := range events [connectionIdAttr ] {
255
+ for idx := range events [ConnectionIdAttr ] {
232
256
var (
233
- queryID = events [queryIdAttr ][idx ]
257
+ queryID = events [QueryIdAttr ][idx ]
234
258
)
235
259
236
260
// Delete the query from the active queries list.
0 commit comments