diff --git a/access/grpc/client.go b/access/grpc/client.go index 1ba647dd1..946d4189e 100644 --- a/access/grpc/client.go +++ b/access/grpc/client.go @@ -49,8 +49,9 @@ const PreviewnetHost = "access.previewnet.nodes.onflow.org:9000" type ClientOption func(*options) type options struct { - dialOptions []grpc.DialOption - jsonOptions []jsoncdc.Option + dialOptions []grpc.DialOption + jsonOptions []jsoncdc.Option + eventEncoding flow.EventEncodingVersion } func DefaultClientOptions() *options { @@ -61,6 +62,7 @@ func DefaultClientOptions() *options { jsonOptions: []jsoncdc.Option{ jsoncdc.WithAllowUnstructuredStaticTypes(true), }, + eventEncoding: flow.EventEncodingVersionCCF, } } @@ -78,6 +80,13 @@ func WithJSONOptions(jsonOpts ...jsoncdc.Option) ClientOption { } } +// WithEventEncoding sets the default event encoding to use when requesting events from the API +func WithEventEncoding(version flow.EventEncodingVersion) ClientOption { + return func(opts *options) { + opts.eventEncoding = version + } +} + // NewClient creates an gRPC client exposing all the common access APIs. // Client will use provided host for connection. func NewClient(host string, opts ...ClientOption) (*Client, error) { @@ -92,6 +101,7 @@ func NewClient(host string, opts ...ClientOption) (*Client, error) { } client.SetJSONOptions(cfg.jsonOptions) + client.SetEventEncoding(cfg.eventEncoding) return &Client{grpc: client}, nil } @@ -103,6 +113,11 @@ type Client struct { grpc *BaseClient } +// RPCClient returns the underlying gRPC client. +func (c *Client) RPCClient() RPCClient { + return c.grpc.RPCClient() +} + func (c *Client) Ping(ctx context.Context) error { return c.grpc.Ping(ctx) } diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 13b85c801..7b0093daf 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -31,7 +31,6 @@ import ( "github.com/onflow/cadence" "github.com/onflow/cadence/encoding/json" "github.com/onflow/flow/protobuf/go/flow/access" - "github.com/onflow/flow/protobuf/go/flow/entities" "github.com/onflow/flow/protobuf/go/flow/executiondata" "github.com/onflow/flow-go-sdk" @@ -81,6 +80,7 @@ type BaseClient struct { executionDataClient ExecutionDataRPCClient close func() error jsonOptions []json.Option + eventEncoding flow.EventEncodingVersion } // NewBaseClient creates a new gRPC handler for network communication. @@ -99,14 +99,16 @@ func NewBaseClient(url string, opts ...grpc.DialOption) (*BaseClient, error) { executionDataClient: execDataClient, close: func() error { return conn.Close() }, jsonOptions: []json.Option{json.WithAllowUnstructuredStaticTypes(true)}, + eventEncoding: flow.EventEncodingVersionCCF, }, nil } // NewFromRPCClient initializes a Flow client using a pre-configured gRPC provider. func NewFromRPCClient(rpcClient RPCClient) *BaseClient { return &BaseClient{ - rpcClient: rpcClient, - close: func() error { return nil }, + rpcClient: rpcClient, + close: func() error { return nil }, + eventEncoding: flow.EventEncodingVersionCCF, } } @@ -115,6 +117,7 @@ func NewFromExecutionDataRPCClient(rpcClient ExecutionDataRPCClient) *BaseClient return &BaseClient{ executionDataClient: rpcClient, close: func() error { return nil }, + eventEncoding: flow.EventEncodingVersionCCF, } } @@ -122,6 +125,14 @@ func (c *BaseClient) SetJSONOptions(options []json.Option) { c.jsonOptions = options } +func (c *BaseClient) SetEventEncoding(version flow.EventEncodingVersion) { + c.eventEncoding = version +} + +func (c *BaseClient) RPCClient() RPCClient { + return c.rpcClient +} + // Close closes the client connection. func (c *BaseClient) Close() error { return c.close() @@ -380,7 +391,7 @@ func (c *BaseClient) GetTransactionResult( ) (*flow.TransactionResult, error) { req := &access.GetTransactionRequest{ Id: txID.Bytes(), - EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, + EventEncodingVersion: c.eventEncoding, } res, err := c.rpcClient.GetTransactionResult(ctx, req, opts...) @@ -406,7 +417,7 @@ func (c *BaseClient) GetTransactionResultByIndex( req := &access.GetTransactionByIndexRequest{ BlockId: blockID.Bytes(), Index: index, - EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, + EventEncodingVersion: c.eventEncoding, } res, err := c.rpcClient.GetTransactionResultByIndex(ctx, req, opts...) @@ -429,7 +440,7 @@ func (c *BaseClient) GetTransactionResultsByBlockID( req := &access.GetTransactionsByBlockIDRequest{ BlockId: blockID.Bytes(), - EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, + EventEncodingVersion: c.eventEncoding, } res, err := c.rpcClient.GetTransactionResultsByBlockID(ctx, req, opts...) @@ -607,7 +618,7 @@ func (c *BaseClient) GetEventsForHeightRange( Type: query.Type, StartHeight: query.StartHeight, EndHeight: query.EndHeight, - EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, + EventEncodingVersion: c.eventEncoding, } res, err := c.rpcClient.GetEventsForHeightRange(ctx, req, opts...) @@ -627,7 +638,7 @@ func (c *BaseClient) GetEventsForBlockIDs( req := &access.GetEventsForBlockIDsRequest{ Type: eventType, BlockIds: identifiersToMessages(blockIDs), - EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, + EventEncodingVersion: c.eventEncoding, } res, err := c.rpcClient.GetEventsForBlockIDs(ctx, req, opts...) @@ -724,7 +735,7 @@ func (c *BaseClient) GetExecutionDataByBlockID( ed, err := c.executionDataClient.GetExecutionDataByBlockID(ctx, &executiondata.GetExecutionDataByBlockIDRequest{ BlockId: identifierToMessage(blockID), - EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, + EventEncodingVersion: c.eventEncoding, }, opts...) if err != nil { return nil, newRPCError(err) @@ -741,7 +752,7 @@ func (c *BaseClient) SubscribeExecutionDataByBlockID( ) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) { req := executiondata.SubscribeExecutionDataRequest{ StartBlockId: startBlockID[:], - EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, + EventEncodingVersion: c.eventEncoding, } return c.subscribeExecutionData(ctx, &req, opts...) } @@ -753,7 +764,7 @@ func (c *BaseClient) SubscribeExecutionDataByBlockHeight( ) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) { req := executiondata.SubscribeExecutionDataRequest{ StartBlockHeight: startHeight, - EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, + EventEncodingVersion: c.eventEncoding, } return c.subscribeExecutionData(ctx, &req, opts...) } @@ -824,7 +835,7 @@ func (c *BaseClient) SubscribeEventsByBlockID( ) (<-chan flow.BlockEvents, <-chan error, error) { req := executiondata.SubscribeEventsRequest{ StartBlockId: startBlockID[:], - EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, + EventEncodingVersion: c.eventEncoding, } return c.subscribeEvents(ctx, &req, filter, opts...) } @@ -837,7 +848,7 @@ func (c *BaseClient) SubscribeEventsByBlockHeight( ) (<-chan flow.BlockEvents, <-chan error, error) { req := executiondata.SubscribeEventsRequest{ StartBlockHeight: startHeight, - EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, + EventEncodingVersion: c.eventEncoding, } return c.subscribeEvents(ctx, &req, filter, opts...) } diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index 15244153e..9c01dca97 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -25,6 +25,7 @@ import ( "sync" "testing" + "github.com/onflow/cadence/encoding/ccf" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -927,7 +928,8 @@ func TestClient_GetEventsForHeightRange(t *testing.T) { func TestClient_GetEventsForBlockIDs(t *testing.T) { ids := test.IdentifierGenerator() - events := test.EventGenerator() + ccfEvents := test.EventGenerator().WithEncoding(flow.EventEncodingVersionCCF) + jsonEvents := test.EventGenerator().WithEncoding(flow.EventEncodingVersionJSONCDC) t.Run( "Empty result", @@ -948,10 +950,85 @@ func TestClient_GetEventsForBlockIDs(t *testing.T) { ) t.Run( - "Non-empty result", + "Non-empty result with ccf encoding", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { blockIDA, blockIDB := ids.New(), ids.New() - eventA, eventB, eventC, eventD := events.New(), events.New(), events.New(), events.New() + eventA, eventB, eventC, eventD := ccfEvents.New(), ccfEvents.New(), ccfEvents.New(), ccfEvents.New() + + eventAMsg, _ := eventToMessage(eventA) + eventBMsg, _ := eventToMessage(eventB) + eventCMsg, _ := eventToMessage(eventC) + eventDMsg, _ := eventToMessage(eventD) + + var err error + eventAMsg.Payload, err = ccf.Encode(eventA.Value) + require.NoError(t, err) + + eventBMsg.Payload, err = ccf.Encode(eventB.Value) + require.NoError(t, err) + + eventCMsg.Payload, err = ccf.Encode(eventC.Value) + require.NoError(t, err) + + eventDMsg.Payload, err = ccf.Encode(eventD.Value) + require.NoError(t, err) + + response := &access.EventsResponse{ + Results: []*access.EventsResponse_Result{ + { + BlockId: blockIDA.Bytes(), + BlockHeight: 1, + BlockTimestamp: timestamppb.Now(), + Events: []*entities.Event{ + eventAMsg, + eventBMsg, + }, + }, + { + BlockId: blockIDB.Bytes(), + BlockHeight: 2, + BlockTimestamp: timestamppb.Now(), + Events: []*entities.Event{ + eventCMsg, + eventDMsg, + }, + }, + }, + } + + rpc.On("GetEventsForBlockIDs", ctx, mock.Anything).Return(response, nil) + + blocks, err := c.GetEventsForBlockIDs(ctx, "foo", []flow.Identifier{blockIDA, blockIDB}) + require.NoError(t, err) + + // Force evaluation of type ID, which is cached in type. + // Necessary for equality checks below + for _, block := range blocks { + for _, event := range block.Events { + _ = event.Value.Type().ID() + } + } + + assert.Len(t, blocks, len(response.Results)) + + assert.Equal(t, response.Results[0].BlockId, blocks[0].BlockID.Bytes()) + assert.Equal(t, response.Results[0].BlockHeight, blocks[0].Height) + + assert.Equal(t, response.Results[1].BlockId, blocks[1].BlockID.Bytes()) + assert.Equal(t, response.Results[1].BlockHeight, blocks[1].Height) + + assert.Equal(t, eventA, blocks[0].Events[0]) + assert.Equal(t, eventB, blocks[0].Events[1]) + assert.Equal(t, eventC, blocks[1].Events[0]) + assert.Equal(t, eventD, blocks[1].Events[1]) + }), + ) + + t.Run( + "Non-empty result with json encoding", + clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { + blockIDA, blockIDB := ids.New(), ids.New() + eventA, eventB, eventC, eventD := jsonEvents.New(), jsonEvents.New(), jsonEvents.New(), jsonEvents.New() eventAMsg, _ := eventToMessage(eventA) eventBMsg, _ := eventToMessage(eventB) @@ -983,6 +1060,7 @@ func TestClient_GetEventsForBlockIDs(t *testing.T) { rpc.On("GetEventsForBlockIDs", ctx, mock.Anything).Return(response, nil) + c.SetEventEncoding(flow.EventEncodingVersionJSONCDC) blocks, err := c.GetEventsForBlockIDs(ctx, "foo", []flow.Identifier{blockIDA, blockIDB}) require.NoError(t, err) diff --git a/decode.go b/decode.go index f7c8f51c6..68ad97d6f 100644 --- a/decode.go +++ b/decode.go @@ -23,4 +23,12 @@ import ( // as it registers the type ID decoder for the Flow types, // e.g. `flow.AccountCreated` _ "github.com/onflow/cadence/runtime/stdlib" + "github.com/onflow/flow/protobuf/go/flow/entities" +) + +type EventEncodingVersion = entities.EventEncodingVersion + +const ( + EventEncodingVersionCCF = entities.EventEncodingVersion_CCF_V0 + EventEncodingVersionJSONCDC = entities.EventEncodingVersion_JSON_CDC_V0 ) diff --git a/test/entities.go b/test/entities.go index cd696f226..558744831 100644 --- a/test/entities.go +++ b/test/entities.go @@ -255,9 +255,9 @@ func EventGenerator() *Events { func (g *Events) WithEncoding(encoding entities.EventEncodingVersion) *Events { switch encoding { - case entities.EventEncodingVersion_CCF_V0: + case flow.EventEncodingVersionCCF: g.encoding = encoding - case entities.EventEncodingVersion_JSON_CDC_V0: + case flow.EventEncodingVersionJSONCDC: g.encoding = encoding default: panic(fmt.Errorf("unsupported event encoding: %v", encoding)) @@ -297,7 +297,7 @@ func (g *Events) New() flow.Event { var payload []byte var err error - if g.encoding == entities.EventEncodingVersion_CCF_V0 { + if g.encoding == flow.EventEncodingVersionCCF { payload, err = ccf.Encode(testEvent) } else { payload, err = jsoncdc.Encode(testEvent) @@ -490,7 +490,7 @@ func ChunkExecutionDataGenerator() *ChunkExecutionDatas { return &ChunkExecutionDatas{ ids: IdentifierGenerator(), txs: TransactionGenerator(), - events: EventGenerator().WithEncoding(entities.EventEncodingVersion_CCF_V0), + events: EventGenerator().WithEncoding(flow.EventEncodingVersionCCF), trieUpdates: TrieUpdateGenerator(), results: LightTransactionResultGenerator(), }