Skip to content

Commit

Permalink
[Supplier] Implement supplier service activation (#707)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Olshansky <[email protected]>
  • Loading branch information
red-0ne and Olshansk authored Aug 2, 2024
1 parent 672fbb1 commit 57ced24
Show file tree
Hide file tree
Showing 14 changed files with 784 additions and 200 deletions.
374 changes: 345 additions & 29 deletions api/poktroll/shared/supplier.pulsar.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,15 @@ genesis:
rpc_type: JSON_RPC
url: http://relayminer1:8545
service:
compute_units_per_relay: 1
id: anvil
name: ""
- endpoints:
- configs: []
rpc_type: REST
url: http://relayminer1:8545
service:
compute_units_per_relay: 1
id: ollama
name: ""
stake:
Expand Down
74 changes: 54 additions & 20 deletions e2e/tests/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,27 +400,11 @@ func (s *suite) TheSupplierIsStakedForService(supplierName string, serviceId str
}

func (s *suite) TheSessionForApplicationAndServiceContainsTheSupplier(appName string, serviceId string, supplierName string) {
app, ok := accNameToAppMap[appName]
require.True(s, ok, "application %s not found", appName)

expectedSupplier, ok := accNameToSupplierMap[supplierName]
require.True(s, ok, "supplier %s not found", supplierName)

argsAndFlags := []string{
"query",
"session",
"get-session",
app.Address,
serviceId,
fmt.Sprintf("--%s=json", cometcli.OutputFlag),
}
res, err := s.pocketd.RunCommandOnHostWithRetry("", numQueryRetries, argsAndFlags...)
require.NoError(s, err, "error getting session for app %s and service %q", appName, serviceId)

var resp sessiontypes.QueryGetSessionResponse
responseBz := []byte(strings.TrimSpace(res.Stdout))
s.cdc.MustUnmarshalJSON(responseBz, &resp)
for _, supplier := range resp.Session.Suppliers {
session := s.getSession(appName, serviceId)
for _, supplier := range session.Suppliers {
if supplier.Address == expectedSupplier.Address {
return
}
Expand Down Expand Up @@ -505,6 +489,32 @@ func (s *suite) getStakedAmount(actorType, accName string) (int, bool) {
return 0, false
}

func (s *suite) TheUserShouldSeeThatTheSupplierForAccountIsStaked(supplierName string) {
supplier := s.getSupplierInfo(supplierName)
accNameToSupplierMap[accAddrToNameMap[supplier.Address]] = *supplier
require.NotNil(s, supplier, "supplier %s not found", supplierName)
}

func (s *suite) TheSessionForApplicationAndServiceDoesNotContain(appName, serviceId, supplierName string) {
session := s.getSession(appName, serviceId)

for _, supplier := range session.Suppliers {
if supplier.Address == accNameToAddrMap[supplierName] {
s.Fatalf(
"ERROR: session for app %s and service %s should not contain supplier %s",
appName,
serviceId,
supplierName,
)
}
}
}

func (s *suite) TheUserWaitsForSupplierToBecomeActiveForService(supplierName, serviceId string) {
supplier := s.getSupplierInfo(supplierName)
s.waitForBlockHeight(int64(supplier.ServicesActivationHeightsMap[serviceId]))
}

func (s *suite) buildAddrMap() {
s.Helper()
res, err := s.pocketd.RunCommand(
Expand Down Expand Up @@ -559,6 +569,29 @@ func (s *suite) buildSupplierMap() {
}
}

// getSession returns the current session for the given application and service.
func (s *suite) getSession(appName string, serviceId string) *sessiontypes.Session {
app, ok := accNameToAppMap[appName]
require.True(s, ok, "application %s not found", appName)

argsAndFlags := []string{
"query",
"session",
"get-session",
app.Address,
serviceId,
fmt.Sprintf("--%s=json", cometcli.OutputFlag),
}
res, err := s.pocketd.RunCommandOnHostWithRetry("", numQueryRetries, argsAndFlags...)
require.NoError(s, err, "error getting session for app %s and service %q", appName, serviceId)

var resp sessiontypes.QueryGetSessionResponse
responseBz := []byte(strings.TrimSpace(res.Stdout))
s.cdc.MustUnmarshalJSON(responseBz, &resp)

return resp.Session
}

// TODO_TECHDEBT(@bryanchriswhite): Cleanup & deduplicate the code related
// to this accessors. Ref: https://github.com/pokt-network/poktroll/pull/448/files#r1547930911
func (s *suite) getAccBalance(accName string) int {
Expand Down Expand Up @@ -601,12 +634,13 @@ func (s *suite) validateAmountChange(prevAmount, currAmount int, expectedAmountC
}

// getSupplierInfo returns the supplier information for a given supplier address
func (s *suite) getSupplierInfo(supplierAddr string) *sharedtypes.Supplier {
func (s *suite) getSupplierInfo(supplierName string) *sharedtypes.Supplier {
supplierAddr := accNameToAddrMap[supplierName]
args := []string{
"query",
"supplier",
"show-supplier",
accNameToAddrMap[supplierAddr],
supplierAddr,
"--output=json",
}

Expand Down
16 changes: 12 additions & 4 deletions e2e/tests/stake_supplier.feature
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
Feature: Stake Supplier Namespace

Scenario: User can stake a Supplier
Scenario: User can stake and unstake a Supplier waiting for it to unbound
Given the user has the pocketd binary installed
And the user verifies the "supplier" for account "supplier2" is not staked
# Stake with 1 uPOKT more than the current stake used in genesis to make
# the transaction succeed.
And the account "supplier2" has a balance greater than "1000070" uPOKT
When the user stakes a "supplier" with "1000070" uPOKT for "anvil" service from the account "supplier2"
Then the user should be able to see standard output containing "txhash:"
Expand All @@ -25,4 +23,14 @@ Feature: Stake Supplier Namespace
And the supplier for account "supplier2" is unbonding
When the user waits for "supplier2" unbonding period to finish
Then the user verifies the "supplier" for account "supplier2" is not staked
And the account balance of "supplier2" should be "1000070" uPOKT "more" than before
And the account balance of "supplier2" should be "1000070" uPOKT "more" than before

Scenario: User can restake a Supplier waiting for it to become active again
Given the user has the pocketd binary installed
And the user verifies the "supplier" for account "supplier2" is not staked
Then the user stakes a "supplier" with "1000070" uPOKT for "anvil" service from the account "supplier2"
And the user should wait for the "supplier" module "StakeSupplier" message to be submitted
Then the user should see that the supplier for account "supplier2" is staked
But the session for application "app1" and service "anvil" does not contain "supplier2"
When the user waits for supplier "supplier2" to become active for service "anvil"
Then the session for application "app1" and service "anvil" contains the supplier "supplier2"
4 changes: 4 additions & 0 deletions proto/poktroll/shared/supplier.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@ message Supplier {
// The session end height at which an actively unbonding supplier unbonds its stake.
// If the supplier did not unstake, this value will be 0.
uint64 unstake_session_end_height = 4;
// services_activation_heights_map is a map of serviceIds to the height at
// which the staked supplier will become active for that service.
// Activation heights are session start heights.
map<string, uint64> services_activation_heights_map = 5;
}

5 changes: 4 additions & 1 deletion testutil/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,14 @@ func DefaultSupplierModuleGenesisState(t *testing.T, n int) *suppliertypes.Genes
t.Helper()
state := suppliertypes.DefaultGenesis()
for i := 0; i < n; i++ {
svcId := fmt.Sprintf("svc%d", i)
stake := sdk.NewCoin("upokt", math.NewInt(int64(i)))
supplier := sharedtypes.Supplier{
Address: sample.AccAddress(),
Stake: &stake,
Services: []*sharedtypes.SupplierServiceConfig{
{
Service: &sharedtypes.Service{Id: fmt.Sprintf("svc%d", i)},
Service: &sharedtypes.Service{Id: svcId},
Endpoints: []*sharedtypes.SupplierEndpoint{
{
Url: fmt.Sprintf("http://localhost:%d", i),
Expand All @@ -152,6 +153,7 @@ func DefaultSupplierModuleGenesisState(t *testing.T, n int) *suppliertypes.Genes
},
},
},
ServicesActivationHeightsMap: map[string]uint64{svcId: 0},
}
// TODO_CONSIDERATION: Evaluate whether we need `nullify.Fill` or if we should enforce `(gogoproto.nullable) = false` everywhere
// nullify.Fill(&supplier)
Expand Down Expand Up @@ -180,6 +182,7 @@ func SupplierModuleGenesisStateWithAddresses(t *testing.T, addresses []string) *
},
},
},
ServicesActivationHeightsMap: map[string]uint64{"svc1": 0},
}
state.SupplierList = append(state.SupplierList, supplier)
}
Expand Down
11 changes: 10 additions & 1 deletion x/session/keeper/query_get_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

sdk "github.com/cosmos/cosmos-sdk/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -25,7 +26,15 @@ func (k Keeper) GetSession(ctx context.Context, req *types.QueryGetSessionReques
// The former is stateful but does not lead to state transitions, while the latter one
// does. The request height depends on how much the node has synched and only acts as a read,
// while the `Msg` server handles the code flow of the validator when a new block is being proposed.
blockHeight := req.BlockHeight
var blockHeight int64
// If the request specifies a block height, use it. Otherwise, use the current
// block height.
// Requesting a session with a block height of 0 allows to get the current session,
// which is useful for querying from CLI.
blockHeight = sdk.UnwrapSDKContext(ctx).BlockHeight()
if req.BlockHeight > 0 {
blockHeight = req.BlockHeight
}

k.Logger().Info(fmt.Sprintf("Getting session for height: %d", blockHeight))

Expand Down
4 changes: 1 addition & 3 deletions x/session/keeper/session_hydrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,7 @@ func (k Keeper) hydrateSessionSuppliers(ctx context.Context, sh *sessionHydrator
candidateSuppliers := make([]*sharedtypes.Supplier, 0)
for _, s := range suppliers {
// Exclude suppliers that are inactive (i.e. currently unbonding).
// TODO_TECHDEBT(#695): Suppliers that stake mid-session SHOULD NOT be included
// in the current session's suppliers list and must wait until the next one.
if !s.IsActive(sh.sessionHeader.SessionEndBlockHeight) {
if !s.IsActive(uint64(sh.sessionHeader.SessionEndBlockHeight), sh.sessionHeader.Service.Id) {
continue
}

Expand Down
6 changes: 6 additions & 0 deletions x/shared/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,9 @@ func GetEarliestSupplierProofCommitHeight(
//return proofWindowOpenHeight + randCreateProofHeightOffset
return proofWindowOpenHeight
}

// GetNextSessionStartHeight returns the start block height of the session
// following the session that includes queryHeight, given the passed sharedParams.
func GetNextSessionStartHeight(sharedParams *sharedtypes.Params, queryHeight int64) int64 {
return GetSessionEndHeight(sharedParams, queryHeight) + 1
}
22 changes: 17 additions & 5 deletions x/shared/types/supplier.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,23 @@ func (s *Supplier) IsUnbonding() bool {
return s.UnstakeSessionEndHeight != SupplierNotUnstaking
}

// IsActive returns whether the supplier is allowed to serve requests at the
// given query height.
// A supplier that has not submitted an unstake message is always active.
// IsActive returns whether the supplier is allowed to serve requests for the
// given serviceId and query height.
// A supplier is active for a given service starting from the session following
// the one during which the supplier staked for that service.
// A supplier that has submitted an unstake message is active until the end of
// the session containing the height at which unstake message was submitted.
func (s *Supplier) IsActive(queryHeight int64) bool {
return !s.IsUnbonding() || uint64(queryHeight) <= s.UnstakeSessionEndHeight
func (s *Supplier) IsActive(queryHeight uint64, serviceId string) bool {
// Service that has been staked for is not active yet.
if s.ServicesActivationHeightsMap[serviceId] > queryHeight {
return false
}

// If the supplier is not unbonding then its UnstakeSessionEndHeight is 0,
// which returns true for all query heights.
if s.IsUnbonding() {
return queryHeight > s.UnstakeSessionEndHeight
}

return true
}
Loading

0 comments on commit 57ced24

Please sign in to comment.