Skip to content

Commit

Permalink
Merge pull request #673 from onflow/petera/make-eventencoding-configu…
Browse files Browse the repository at this point in the history
…rable

Make event encoding configurable over grpc
  • Loading branch information
peterargue authored May 30, 2024
2 parents a2a93c1 + 11aba65 commit bb9a5c3
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 22 deletions.
19 changes: 17 additions & 2 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ const PreviewnetHost = "access.previewnet.nodes.onflow.org:9000"
type ClientOption func(*options)

type options struct {
dialOptions []grpc.DialOption
jsonOptions []jsoncdc.Option
dialOptions []grpc.DialOption
jsonOptions []jsoncdc.Option
eventEncoding flow.EventEncodingVersion
}

func DefaultClientOptions() *options {
Expand All @@ -61,6 +62,7 @@ func DefaultClientOptions() *options {
jsonOptions: []jsoncdc.Option{
jsoncdc.WithAllowUnstructuredStaticTypes(true),
},
eventEncoding: flow.EventEncodingVersionCCF,
}
}

Expand All @@ -78,6 +80,13 @@ func WithJSONOptions(jsonOpts ...jsoncdc.Option) ClientOption {
}
}

// WithEventEncoding sets the default event encoding to use when requesting events from the API
func WithEventEncoding(version flow.EventEncodingVersion) ClientOption {
return func(opts *options) {
opts.eventEncoding = version
}
}

// NewClient creates an gRPC client exposing all the common access APIs.
// Client will use provided host for connection.
func NewClient(host string, opts ...ClientOption) (*Client, error) {
Expand All @@ -92,6 +101,7 @@ func NewClient(host string, opts ...ClientOption) (*Client, error) {
}

client.SetJSONOptions(cfg.jsonOptions)
client.SetEventEncoding(cfg.eventEncoding)

return &Client{grpc: client}, nil
}
Expand All @@ -103,6 +113,11 @@ type Client struct {
grpc *BaseClient
}

// RPCClient returns the underlying gRPC client.
func (c *Client) RPCClient() RPCClient {
return c.grpc.RPCClient()
}

func (c *Client) Ping(ctx context.Context) error {
return c.grpc.Ping(ctx)
}
Expand Down
37 changes: 24 additions & 13 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/onflow/cadence"
"github.com/onflow/cadence/encoding/json"
"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/entities"
"github.com/onflow/flow/protobuf/go/flow/executiondata"

"github.com/onflow/flow-go-sdk"
Expand Down Expand Up @@ -81,6 +80,7 @@ type BaseClient struct {
executionDataClient ExecutionDataRPCClient
close func() error
jsonOptions []json.Option
eventEncoding flow.EventEncodingVersion
}

// NewBaseClient creates a new gRPC handler for network communication.
Expand All @@ -99,14 +99,16 @@ func NewBaseClient(url string, opts ...grpc.DialOption) (*BaseClient, error) {
executionDataClient: execDataClient,
close: func() error { return conn.Close() },
jsonOptions: []json.Option{json.WithAllowUnstructuredStaticTypes(true)},
eventEncoding: flow.EventEncodingVersionCCF,
}, nil
}

// NewFromRPCClient initializes a Flow client using a pre-configured gRPC provider.
func NewFromRPCClient(rpcClient RPCClient) *BaseClient {
return &BaseClient{
rpcClient: rpcClient,
close: func() error { return nil },
rpcClient: rpcClient,
close: func() error { return nil },
eventEncoding: flow.EventEncodingVersionCCF,
}
}

Expand All @@ -115,13 +117,22 @@ func NewFromExecutionDataRPCClient(rpcClient ExecutionDataRPCClient) *BaseClient
return &BaseClient{
executionDataClient: rpcClient,
close: func() error { return nil },
eventEncoding: flow.EventEncodingVersionCCF,
}
}

func (c *BaseClient) SetJSONOptions(options []json.Option) {
c.jsonOptions = options
}

func (c *BaseClient) SetEventEncoding(version flow.EventEncodingVersion) {
c.eventEncoding = version
}

func (c *BaseClient) RPCClient() RPCClient {
return c.rpcClient
}

// Close closes the client connection.
func (c *BaseClient) Close() error {
return c.close()
Expand Down Expand Up @@ -380,7 +391,7 @@ func (c *BaseClient) GetTransactionResult(
) (*flow.TransactionResult, error) {
req := &access.GetTransactionRequest{
Id: txID.Bytes(),
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}

res, err := c.rpcClient.GetTransactionResult(ctx, req, opts...)
Expand All @@ -406,7 +417,7 @@ func (c *BaseClient) GetTransactionResultByIndex(
req := &access.GetTransactionByIndexRequest{
BlockId: blockID.Bytes(),
Index: index,
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}

res, err := c.rpcClient.GetTransactionResultByIndex(ctx, req, opts...)
Expand All @@ -429,7 +440,7 @@ func (c *BaseClient) GetTransactionResultsByBlockID(

req := &access.GetTransactionsByBlockIDRequest{
BlockId: blockID.Bytes(),
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}

res, err := c.rpcClient.GetTransactionResultsByBlockID(ctx, req, opts...)
Expand Down Expand Up @@ -607,7 +618,7 @@ func (c *BaseClient) GetEventsForHeightRange(
Type: query.Type,
StartHeight: query.StartHeight,
EndHeight: query.EndHeight,
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}

res, err := c.rpcClient.GetEventsForHeightRange(ctx, req, opts...)
Expand All @@ -627,7 +638,7 @@ func (c *BaseClient) GetEventsForBlockIDs(
req := &access.GetEventsForBlockIDsRequest{
Type: eventType,
BlockIds: identifiersToMessages(blockIDs),
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}

res, err := c.rpcClient.GetEventsForBlockIDs(ctx, req, opts...)
Expand Down Expand Up @@ -724,7 +735,7 @@ func (c *BaseClient) GetExecutionDataByBlockID(

ed, err := c.executionDataClient.GetExecutionDataByBlockID(ctx, &executiondata.GetExecutionDataByBlockIDRequest{
BlockId: identifierToMessage(blockID),
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}, opts...)
if err != nil {
return nil, newRPCError(err)
Expand All @@ -741,7 +752,7 @@ func (c *BaseClient) SubscribeExecutionDataByBlockID(
) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
req := executiondata.SubscribeExecutionDataRequest{
StartBlockId: startBlockID[:],
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}
return c.subscribeExecutionData(ctx, &req, opts...)
}
Expand All @@ -753,7 +764,7 @@ func (c *BaseClient) SubscribeExecutionDataByBlockHeight(
) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
req := executiondata.SubscribeExecutionDataRequest{
StartBlockHeight: startHeight,
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}
return c.subscribeExecutionData(ctx, &req, opts...)
}
Expand Down Expand Up @@ -824,7 +835,7 @@ func (c *BaseClient) SubscribeEventsByBlockID(
) (<-chan flow.BlockEvents, <-chan error, error) {
req := executiondata.SubscribeEventsRequest{
StartBlockId: startBlockID[:],
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}
return c.subscribeEvents(ctx, &req, filter, opts...)
}
Expand All @@ -837,7 +848,7 @@ func (c *BaseClient) SubscribeEventsByBlockHeight(
) (<-chan flow.BlockEvents, <-chan error, error) {
req := executiondata.SubscribeEventsRequest{
StartBlockHeight: startHeight,
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}
return c.subscribeEvents(ctx, &req, filter, opts...)
}
Expand Down
84 changes: 81 additions & 3 deletions access/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"testing"

"github.com/onflow/cadence/encoding/ccf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -927,7 +928,8 @@ func TestClient_GetEventsForHeightRange(t *testing.T) {

func TestClient_GetEventsForBlockIDs(t *testing.T) {
ids := test.IdentifierGenerator()
events := test.EventGenerator()
ccfEvents := test.EventGenerator().WithEncoding(flow.EventEncodingVersionCCF)
jsonEvents := test.EventGenerator().WithEncoding(flow.EventEncodingVersionJSONCDC)

t.Run(
"Empty result",
Expand All @@ -948,10 +950,85 @@ func TestClient_GetEventsForBlockIDs(t *testing.T) {
)

t.Run(
"Non-empty result",
"Non-empty result with ccf encoding",
clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
blockIDA, blockIDB := ids.New(), ids.New()
eventA, eventB, eventC, eventD := events.New(), events.New(), events.New(), events.New()
eventA, eventB, eventC, eventD := ccfEvents.New(), ccfEvents.New(), ccfEvents.New(), ccfEvents.New()

eventAMsg, _ := eventToMessage(eventA)
eventBMsg, _ := eventToMessage(eventB)
eventCMsg, _ := eventToMessage(eventC)
eventDMsg, _ := eventToMessage(eventD)

var err error
eventAMsg.Payload, err = ccf.Encode(eventA.Value)
require.NoError(t, err)

eventBMsg.Payload, err = ccf.Encode(eventB.Value)
require.NoError(t, err)

eventCMsg.Payload, err = ccf.Encode(eventC.Value)
require.NoError(t, err)

eventDMsg.Payload, err = ccf.Encode(eventD.Value)
require.NoError(t, err)

response := &access.EventsResponse{
Results: []*access.EventsResponse_Result{
{
BlockId: blockIDA.Bytes(),
BlockHeight: 1,
BlockTimestamp: timestamppb.Now(),
Events: []*entities.Event{
eventAMsg,
eventBMsg,
},
},
{
BlockId: blockIDB.Bytes(),
BlockHeight: 2,
BlockTimestamp: timestamppb.Now(),
Events: []*entities.Event{
eventCMsg,
eventDMsg,
},
},
},
}

rpc.On("GetEventsForBlockIDs", ctx, mock.Anything).Return(response, nil)

blocks, err := c.GetEventsForBlockIDs(ctx, "foo", []flow.Identifier{blockIDA, blockIDB})
require.NoError(t, err)

// Force evaluation of type ID, which is cached in type.
// Necessary for equality checks below
for _, block := range blocks {
for _, event := range block.Events {
_ = event.Value.Type().ID()
}
}

assert.Len(t, blocks, len(response.Results))

assert.Equal(t, response.Results[0].BlockId, blocks[0].BlockID.Bytes())
assert.Equal(t, response.Results[0].BlockHeight, blocks[0].Height)

assert.Equal(t, response.Results[1].BlockId, blocks[1].BlockID.Bytes())
assert.Equal(t, response.Results[1].BlockHeight, blocks[1].Height)

assert.Equal(t, eventA, blocks[0].Events[0])
assert.Equal(t, eventB, blocks[0].Events[1])
assert.Equal(t, eventC, blocks[1].Events[0])
assert.Equal(t, eventD, blocks[1].Events[1])
}),
)

t.Run(
"Non-empty result with json encoding",
clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
blockIDA, blockIDB := ids.New(), ids.New()
eventA, eventB, eventC, eventD := jsonEvents.New(), jsonEvents.New(), jsonEvents.New(), jsonEvents.New()

eventAMsg, _ := eventToMessage(eventA)
eventBMsg, _ := eventToMessage(eventB)
Expand Down Expand Up @@ -983,6 +1060,7 @@ func TestClient_GetEventsForBlockIDs(t *testing.T) {

rpc.On("GetEventsForBlockIDs", ctx, mock.Anything).Return(response, nil)

c.SetEventEncoding(flow.EventEncodingVersionJSONCDC)
blocks, err := c.GetEventsForBlockIDs(ctx, "foo", []flow.Identifier{blockIDA, blockIDB})
require.NoError(t, err)

Expand Down
8 changes: 8 additions & 0 deletions decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,12 @@ import (
// as it registers the type ID decoder for the Flow types,
// e.g. `flow.AccountCreated`
_ "github.com/onflow/cadence/runtime/stdlib"
"github.com/onflow/flow/protobuf/go/flow/entities"
)

type EventEncodingVersion = entities.EventEncodingVersion

const (
EventEncodingVersionCCF = entities.EventEncodingVersion_CCF_V0
EventEncodingVersionJSONCDC = entities.EventEncodingVersion_JSON_CDC_V0
)
8 changes: 4 additions & 4 deletions test/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ func EventGenerator() *Events {

func (g *Events) WithEncoding(encoding entities.EventEncodingVersion) *Events {
switch encoding {
case entities.EventEncodingVersion_CCF_V0:
case flow.EventEncodingVersionCCF:
g.encoding = encoding
case entities.EventEncodingVersion_JSON_CDC_V0:
case flow.EventEncodingVersionJSONCDC:
g.encoding = encoding
default:
panic(fmt.Errorf("unsupported event encoding: %v", encoding))
Expand Down Expand Up @@ -297,7 +297,7 @@ func (g *Events) New() flow.Event {

var payload []byte
var err error
if g.encoding == entities.EventEncodingVersion_CCF_V0 {
if g.encoding == flow.EventEncodingVersionCCF {
payload, err = ccf.Encode(testEvent)
} else {
payload, err = jsoncdc.Encode(testEvent)
Expand Down Expand Up @@ -490,7 +490,7 @@ func ChunkExecutionDataGenerator() *ChunkExecutionDatas {
return &ChunkExecutionDatas{
ids: IdentifierGenerator(),
txs: TransactionGenerator(),
events: EventGenerator().WithEncoding(entities.EventEncodingVersion_CCF_V0),
events: EventGenerator().WithEncoding(flow.EventEncodingVersionCCF),
trieUpdates: TrieUpdateGenerator(),
results: LightTransactionResultGenerator(),
}
Expand Down

0 comments on commit bb9a5c3

Please sign in to comment.