Skip to content

Commit

Permalink
[2/N][multi quorum ejection] Fetch operator quorum opt-in/out info (L…
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Mar 5, 2024
1 parent ecc092f commit 8b71e69
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 0 deletions.
36 changes: 36 additions & 0 deletions disperser/dataapi/subgraph/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type (
QueryDeregisteredOperatorsGreaterThanBlockTimestamp(ctx context.Context, blockTimestamp uint64) ([]*Operator, error)
QueryRegisteredOperatorsGreaterThanBlockTimestamp(ctx context.Context, blockTimestamp uint64) ([]*Operator, error)
QueryOperatorInfoByOperatorIdAtBlockNumber(ctx context.Context, operatorId core.OperatorID, blockNumber uint32) (*IndexedOperatorInfo, error)
QueryOperatorAddedToQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*OperatorQuorum, error)
QueryOperatorRemovedFromQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*OperatorQuorum, error)
}

api struct {
Expand Down Expand Up @@ -207,3 +209,37 @@ func (a *api) QueryOperatorInfoByOperatorIdAtBlockNumber(ctx context.Context, op

return &query.Operator, nil
}

// QueryOperatorAddedToQuorum finds operators' quorum opt-in history in range [startBlock, endBlock].
func (a *api) QueryOperatorAddedToQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*OperatorQuorum, error) {
if startBlock > endBlock {
return nil, fmt.Errorf("startBlock must be no less than endBlock, startBlock: %d, endBlock: %d", startBlock, endBlock)
}
variables := map[string]any{
"blockNumber_gt": graphql.Int(startBlock - 1),
"blockNumber_lt": graphql.Int(endBlock + 1),
}
query := new(queryOperatorAddedToQuorum)
err := a.operatorStateGql.Query(ctx, &query, variables)
if err != nil {
return nil, err
}
return query.OperatorAddedToQuorum, nil
}

// QueryOperatorRemovedFromQuorum finds operators' quorum opt-out history in range [startBlock, endBlock].
func (a *api) QueryOperatorRemovedFromQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*OperatorQuorum, error) {
if startBlock > endBlock {
return nil, fmt.Errorf("startBlock must be no less than endBlock, startBlock: %d, endBlock: %d", startBlock, endBlock)
}
variables := map[string]any{
"blockNumber_gt": graphql.Int(startBlock - 1),
"blockNumber_lt": graphql.Int(endBlock + 1),
}
query := new(queryOperatorRemovedFromQuorum)
err := a.operatorStateGql.Query(ctx, &query, variables)
if err != nil {
return nil, err
}
return query.OperatorRemovedFromQuorum, nil
}
22 changes: 22 additions & 0 deletions disperser/dataapi/subgraph/mock/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,25 @@ func (m *MockSubgraphApi) QueryOperatorInfoByOperatorIdAtBlockNumber(ctx context

return value, args.Error(1)
}

func (m *MockSubgraphApi) QueryOperatorAddedToQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*subgraph.OperatorQuorum, error) {
args := m.Called()

var value []*subgraph.OperatorQuorum
if args.Get(0) != nil {
value = args.Get(0).([]*subgraph.OperatorQuorum)
}

return value, args.Error(1)
}

func (m *MockSubgraphApi) QueryOperatorRemovedFromQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*subgraph.OperatorQuorum, error) {
args := m.Called()

var value []*subgraph.OperatorQuorum
if args.Get(0) != nil {
value = args.Get(0).([]*subgraph.OperatorQuorum)
}

return value, args.Error(1)
}
12 changes: 12 additions & 0 deletions disperser/dataapi/subgraph/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ type (
BlockNumber graphql.String
TransactionHash graphql.String
}
OperatorQuorum struct {
Id graphql.String
Operator graphql.String
QuorumNumbers graphql.String
BlockNumber graphql.String
}
BatchNonSigningOperatorIds struct {
NonSigning struct {
NonSigners []struct {
Expand Down Expand Up @@ -95,4 +101,10 @@ type (
queryOperatorById struct {
Operator IndexedOperatorInfo `graphql:"operator(id: $id)"`
}
queryOperatorAddedToQuorum struct {
OperatorAddedToQuorum []*OperatorQuorum `graphql:"operatorAddedToQuorum(orderBy: blockTimestamp, where: {and [{blockNumber_gt: $blockNumber_gt}, {blockNumber_lt: $blockNumber_lt}]})"`
}
queryOperatorRemovedFromQuorum struct {
OperatorRemovedFromQuorum []*OperatorQuorum `graphql:"operatorRemovedFromQuorum(orderBy: blockTimestamp, where: {and [{blockNumber_gt: $blockNumber_gt}, {blockNumber_lt: $blockNumber_lt}]})"`
}
)
97 changes: 97 additions & 0 deletions disperser/dataapi/subgraph_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ type (
BlockNumber uint64
TransactionHash []byte
}
OperatorQuorum struct {
Operator string
QuorumNumbers []byte
BlockNumber uint32
}
OperatorQuorumEvents struct {
// AddedToQuorum is mapping from operator address to a list of sorted events
// (ascending by BlockNumber) where the operator was added to quorums.
AddedToQuorum map[string][]*OperatorQuorum
// RemovedFromQuorum is mapping from operator address to a list of sorted events
// (ascending by BlockNumber) where the operator was removed from quorums.
RemovedFromQuorum map[string][]*OperatorQuorum
}
DeregisteredOperatorInfo struct {
IndexedOperatorInfo *core.IndexedOperatorInfo
// BlockNumber is the block number at which the operator was deregistered.
Expand Down Expand Up @@ -159,6 +172,67 @@ func (sc *subgraphClient) QueryBatchNonSigningOperatorIdsInInterval(ctx context.
return batchNonSigningOperatorIds, nil
}

func (sc *subgraphClient) QueryOperatorQuorumEvent(ctx context.Context, startBlock, endBlock uint32) (*OperatorQuorumEvents, error) {
var (
operatorAddedQuorum []*subgraph.OperatorQuorum
operatorRemovedQuorum []*subgraph.OperatorQuorum
err error
pool = workerpool.New(maxWorkerPoolSize)
)

pool.Submit(func() {
added, errQ := sc.api.QueryOperatorAddedToQuorum(ctx, startBlock, endBlock)
if errQ != nil {
err = errQ
}
operatorAddedQuorum = added
})

pool.Submit(func() {
removed, errQ := sc.api.QueryOperatorRemovedFromQuorum(ctx, startBlock, endBlock)

if errQ != nil {
err = errQ
}
operatorRemovedQuorum = removed
})
pool.StopWait()

if err != nil {
return nil, err
}

addedQuorum, err := parseOperatorQuorum(operatorAddedQuorum)
if err != nil {
return nil, err
}
removedQuorum, err := parseOperatorQuorum(operatorRemovedQuorum)
if err != nil {
return nil, err
}

addedQuorumMap := make(map[string][]*OperatorQuorum)
for _, opq := range addedQuorum {
if _, ok := addedQuorumMap[opq.Operator]; !ok {
addedQuorumMap[opq.Operator] = make([]*OperatorQuorum, 0)
}
addedQuorumMap[opq.Operator] = append(addedQuorumMap[opq.Operator], opq)
}

removedQuorumMap := make(map[string][]*OperatorQuorum)
for _, opq := range removedQuorum {
if _, ok := removedQuorumMap[opq.Operator]; !ok {
removedQuorumMap[opq.Operator] = make([]*OperatorQuorum, 0)
}
removedQuorumMap[opq.Operator] = append(removedQuorumMap[opq.Operator], opq)
}

return &OperatorQuorumEvents{
AddedToQuorum: addedQuorumMap,
RemovedFromQuorum: removedQuorumMap,
}, nil
}

func (sc *subgraphClient) QueryNumBatchesByOperatorsInThePastBlockTimestamp(ctx context.Context, blockTimestamp uint64, nonSigners map[string]int) (map[string]int, error) {
var (
registeredOperators []*subgraph.Operator
Expand Down Expand Up @@ -550,6 +624,29 @@ func addOperatorWithErrorDetail(operators map[core.OperatorID]*DeregisteredOpera
}
}

func parseOperatorQuorum(operatorQuorum []*subgraph.OperatorQuorum) ([]*OperatorQuorum, error) {
parsed := make([]*OperatorQuorum, len(operatorQuorum))
for i, opq := range operatorQuorum {
blockNum, err := strconv.ParseUint(string(opq.BlockNumber), 10, 64)
if err != nil {
return nil, err
}
parsed[i] = &OperatorQuorum{
Operator: string(opq.Operator),
QuorumNumbers: []byte(opq.QuorumNumbers),
BlockNumber: uint32(blockNum),
}
}
// Sort the quorum events by ascending order of block number.
sort.SliceStable(parsed, func(i, j int) bool {
if parsed[i].BlockNumber == parsed[j].BlockNumber {
return parsed[i].Operator < parsed[j].Operator
}
return parsed[i].BlockNumber < parsed[j].BlockNumber
})
return parsed, nil
}

func convertNonSigningInfo(infoGql *subgraph.BatchNonSigningInfo) (*BatchNonSigningInfo, error) {
quorums := make([]uint8, len(infoGql.BatchHeader.QuorumNumbers))
for i, q := range infoGql.BatchHeader.QuorumNumbers {
Expand Down
93 changes: 93 additions & 0 deletions disperser/dataapi/subgraph_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,41 @@ var (
"0xe22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311": 1,
}

operatorAddedToQuorum = []*subgraph.OperatorQuorum{
{
Operator: "operator-2",
QuorumNumbers: "2",
BlockNumber: "82",
},
{
Operator: "operator-1",
QuorumNumbers: "2",
BlockNumber: "82",
},
{
Operator: "operator-1",
QuorumNumbers: "01",
BlockNumber: "80",
},
}
operatorRemovedFromQuorum = []*subgraph.OperatorQuorum{
{
Operator: "operator-1",
QuorumNumbers: "0",
BlockNumber: "81",
},
{
Operator: "operator-2",
QuorumNumbers: "2",
BlockNumber: "83",
},
{
Operator: "operator-1",
QuorumNumbers: "1",
BlockNumber: "83",
},
}

batchNonSigningInfo = []*subgraph.BatchNonSigningInfo{
{
BatchId: "1",
Expand Down Expand Up @@ -497,3 +532,61 @@ func assertGasFees(t *testing.T, gasFees *dataapi.GasFees) {
assert.Equal(t, uint64(1000045336), gasFees.GasPrice)
assert.Equal(t, uint64(249826325612840), gasFees.TxFee)
}

func TestQueryOperatorQuorumEvent(t *testing.T) {
mockSubgraphApi := &subgraphmock.MockSubgraphApi{}
mockSubgraphApi.On("QueryOperatorAddedToQuorum").Return(operatorAddedToQuorum, nil)
mockSubgraphApi.On("QueryOperatorRemovedFromQuorum").Return(operatorRemovedFromQuorum, nil)
subgraphClient := dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{})
result, err := subgraphClient.QueryOperatorQuorumEvent(context.Background(), uint32(78), uint32(88))
assert.NoError(t, err)

addedMap := result.AddedToQuorum
assert.Equal(t, 2, len(addedMap))
// Quorum events for operator-1.
added1, ok := addedMap["operator-1"]
assert.True(t, ok)
assert.Equal(t, 2, len(added1))
assert.Equal(t, "operator-1", added1[0].Operator)
assert.Equal(t, uint32(80), added1[0].BlockNumber)
assert.Equal(t, 2, len(added1[0].QuorumNumbers))
// Note: the quorumId is 48 not 01 is because the string "01" is in UTF-8
// encoding (the default in golang), and it corresponding to 48 in decimal.
assert.Equal(t, uint8(48), added1[0].QuorumNumbers[0])
assert.Equal(t, uint8(49), added1[0].QuorumNumbers[1])
assert.Equal(t, "operator-1", added1[1].Operator)
assert.Equal(t, uint32(82), added1[1].BlockNumber)
assert.Equal(t, 1, len(added1[1].QuorumNumbers))
assert.Equal(t, uint8(50), added1[1].QuorumNumbers[0])
// Quorum events for operator-2.
added2, ok := addedMap["operator-2"]
assert.True(t, ok)
assert.Equal(t, 1, len(added2))
assert.Equal(t, "operator-2", added2[0].Operator)
assert.Equal(t, uint32(82), added2[0].BlockNumber)
assert.Equal(t, 1, len(added2[0].QuorumNumbers))
assert.Equal(t, uint8(50), added2[0].QuorumNumbers[0])

removedMap := result.RemovedFromQuorum
assert.Equal(t, 2, len(removedMap))
// Quorum events for operator-1.
removed1, ok := removedMap["operator-1"]
assert.True(t, ok)
assert.Equal(t, 2, len(removed1))
assert.Equal(t, "operator-1", removed1[0].Operator)
assert.Equal(t, uint32(81), removed1[0].BlockNumber)
assert.Equal(t, 1, len(removed1[0].QuorumNumbers))
assert.Equal(t, uint8(48), removed1[0].QuorumNumbers[0])
assert.Equal(t, "operator-1", removed1[1].Operator)
assert.Equal(t, uint32(83), removed1[1].BlockNumber)
assert.Equal(t, 1, len(removed1[1].QuorumNumbers))
assert.Equal(t, uint8(49), removed1[1].QuorumNumbers[0])
// Quorum events for operator-2.
removed2, ok := removedMap["operator-2"]
assert.True(t, ok)
assert.Equal(t, 1, len(removed2))
assert.Equal(t, "operator-2", removed2[0].Operator)
assert.Equal(t, uint32(83), removed2[0].BlockNumber)
assert.Equal(t, 1, len(removed2[0].QuorumNumbers))
assert.Equal(t, uint8(50), removed2[0].QuorumNumbers[0])
}

0 comments on commit 8b71e69

Please sign in to comment.