Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ai): update ai-video selection suspension #3033

Merged
24 changes: 24 additions & 0 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,23 @@
return txtResp, nil
}

// isRetryableError checks if the error is a transient error that can be retried.
func isRetryableError(err error) bool {
transientErrorMessages := []string{
"insufficient capacity", // Caused by limitation in our current implementation.
"invalid ticket sendernonce", // Caused by gateway nonce mismatch.
"ticketparams expired", // Caused by ticket expiration.
}

errMsg := strings.ToLower(err.Error())
for _, msg := range transientErrorMessages {
if strings.Contains(errMsg, msg) {
return true
}
}
return false
}

func processAIRequest(ctx context.Context, params aiRequestParams, req interface{}) (interface{}, error) {
var cap core.Capability
var modelID string
Expand Down Expand Up @@ -1519,6 +1536,13 @@
break
}

// Don't suspend the session if the error is a transient error.
if isRetryableError(err) {
params.sessManager.Complete(ctx, sess)
continue

Check warning on line 1542 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L1540-L1542

Added lines #L1540 - L1542 were not covered by tests
}

// Suspend the session on other errors.
clog.Infof(ctx, "Error submitting request modelID=%v try=%v orch=%v err=%v", modelID, tries, sess.Transcoder(), err)
params.sessManager.Remove(ctx, sess) //TODO: Improve session selection logic for live-video-to-video

Expand Down
33 changes: 33 additions & 0 deletions server/ai_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"errors"
"reflect"
"testing"

Expand Down Expand Up @@ -88,6 +89,38 @@ func Test_submitAudioToText(t *testing.T) {
}
}

func Test_isRetryableError(t *testing.T) {
tests := []struct {
name string
err error
want bool
}{
{
name: "insufficient capacity error",
err: errors.New("Insufficient capacity"),
want: true,
},
{
name: "INSUFFICIENT capacity ERROR",
err: errors.New("Insufficient capacity"),
want: true,
},
{
name: "non-retryable error",
err: errors.New("some other error"),
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := isRetryableError(tt.err); got != tt.want {
t.Errorf("isRetryableError() = %v, want %v", got, tt.want)
}
})
}
}

func TestEncodeReqMetadata(t *testing.T) {
tests := []struct {
name string
Expand Down
58 changes: 47 additions & 11 deletions server/ai_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@
sessMap map[string]*BroadcastSession
inUseSess []*BroadcastSession
suspender *suspender
penalty int
mu sync.RWMutex
}

func NewAISessionPool(selector BroadcastSessionsSelector, suspender *suspender) *AISessionPool {
func NewAISessionPool(selector BroadcastSessionsSelector, suspender *suspender, penalty int) *AISessionPool {

Check warning on line 36 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L36

Added line #L36 was not covered by tests
return &AISessionPool{
selector: selector,
sessMap: make(map[string]*BroadcastSession),
suspender: suspender,
penalty: penalty,

Check warning on line 41 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L41

Added line #L41 was not covered by tests
mu: sync.RWMutex{},
}
}
Expand Down Expand Up @@ -101,10 +103,6 @@
pool.mu.Lock()
defer pool.mu.Unlock()

// If we try to add new sessions to the pool the suspender
// should treat this as a refresh
pool.suspender.signalRefresh()

var uniqueSessions []*BroadcastSession
for _, sess := range sessions {
if _, ok := pool.sessMap[sess.Transcoder()]; ok {
Expand All @@ -126,10 +124,14 @@
delete(pool.sessMap, sess.Transcoder())
pool.inUseSess = removeSessionFromList(pool.inUseSess, sess)

// Magic number for now
penalty := 3
// If this method is called assume that the orch should be suspended
// as well
// as well. Since AISessionManager re-uses the pools the suspension
// penalty needs to consider the current suspender count to set the penalty
lastCount, ok := pool.suspender.list[sess.Transcoder()]
penalty := pool.suspender.count + pool.penalty
if ok {
penalty -= lastCount
}

Check warning on line 134 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L128-L134

Added lines #L128 - L134 were not covered by tests
pool.suspender.suspend(sess.Transcoder(), penalty)
}

Expand All @@ -156,12 +158,14 @@
// The time until the pools should be refreshed with orchs from discovery
ttl time.Duration
lastRefreshTime time.Time
initialPoolSize int

cap core.Capability
modelID string

node *core.LivepeerNode
suspender *suspender
penalty int
os drivers.OSSession
}

Expand All @@ -180,8 +184,10 @@
// The latency score in this context is just the latency of the last completed request for a session
// The "good enough" latency score is set to 0.0 so the selector will always select unknown sessions first
minLS := 0.0
warmPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore, warmCaps), suspender)
coldPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore, coldCaps), suspender)
// Session pool suspender starts at 0. Suspension is 3 requests if there are errors from the orchestrator
penalty := 3
warmPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore, warmCaps), suspender, penalty)
coldPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore, coldCaps), suspender, penalty)

Check warning on line 190 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L187-L190

Added lines #L187 - L190 were not covered by tests
sel := &AISessionSelector{
warmPool: warmPool,
coldPool: coldPool,
Expand All @@ -190,6 +196,7 @@
modelID: modelID,
node: node,
suspender: suspender,
penalty: penalty,

Check warning on line 199 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L199

Added line #L199 was not covered by tests
os: drivers.NodeStorage.NewSession(strconv.Itoa(int(cap)) + "_" + modelID),
}

Expand Down Expand Up @@ -218,11 +225,26 @@
return caps
}

// selectorIsEmpty returns true if no orchestrators are in the warm or cold pools.
func (sel *AISessionSelector) SelectorIsEmpty() bool {
return sel.warmPool.Size() == 0 && sel.coldPool.Size() == 0

Check warning on line 230 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L229-L230

Added lines #L229 - L230 were not covered by tests
}

func (sel *AISessionSelector) Select(ctx context.Context) *AISession {
shouldRefreshSelector := func() bool {
discoveryPoolSize := int(math.Min(float64(sel.node.OrchestratorPool.Size()), float64(sel.initialPoolSize)))

// If the selector is empty, release all orchestrators from suspension and
// try refresh.
if sel.SelectorIsEmpty() {
clog.Infof(ctx, "refreshing sessions, no orchestrators in pools")
for i := 0; i < sel.penalty; i++ {
sel.suspender.signalRefresh()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

release all orchestrators

shouldn't we then just remove them from the suspender.list() rather than calling signalRefresh()? My understanding is that if penalty = 3, then we would need to call signalRefresh() 3 times in order to "release all orchestrators from suspension".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way thinking that there could be more than 3 orchestrators suspended so it would be less loops to just signalRefresh() 3 times. An alternative would be to just create a new suspender for the selector to clear it or kick all the orchs out of the suspended list (will require new function to do second option).

}

Check warning on line 243 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L235-L243

Added lines #L235 - L243 were not covered by tests
}

// Refresh if the # of sessions across warm and cold pools falls below the smaller of the maxRefreshSessionsThreshold and
// 1/2 the total # of orchs that can be queried during discovery
discoveryPoolSize := sel.node.OrchestratorPool.Size()
if sel.warmPool.Size()+sel.coldPool.Size() < int(math.Min(maxRefreshSessionsThreshold, math.Ceil(float64(discoveryPoolSize)/2.0))) {
return true
}
Expand Down Expand Up @@ -272,6 +294,10 @@
}

func (sel *AISessionSelector) Refresh(ctx context.Context) error {
// If we try to add new sessions to the pool the suspender
// should treat this as a refresh
sel.suspender.signalRefresh()

Check warning on line 300 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L297-L300

Added lines #L297 - L300 were not covered by tests
sessions, err := sel.getSessions(ctx)
if err != nil {
return err
Expand All @@ -286,6 +312,13 @@
continue
}

// We request 100 orchestrators in getSessions above so all Orchestrators are returned with refreshed information
// This keeps the suspended Orchestrators out of the pool until the selector is empty or 30 minutes has passed (refresh happens every 10 minutes)
if sel.suspender.Suspended(sess.Transcoder()) > 0 {
clog.V(common.DEBUG).Infof(ctx, "skipping suspended orchestrator=%s", sess.Transcoder())
continue

Check warning on line 319 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L317-L319

Added lines #L317 - L319 were not covered by tests
}

// If the constraint for the modelID are missing skip this session
modelConstraint, ok := constraints.Models[sel.modelID]
if !ok {
Expand All @@ -301,6 +334,7 @@

sel.warmPool.Add(warmSessions)
sel.coldPool.Add(coldSessions)
sel.initialPoolSize = len(warmSessions) + len(coldSessions) + len(sel.suspender.list)

Check warning on line 337 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L337

Added line #L337 was not covered by tests

sel.lastRefreshTime = time.Now()

Expand Down Expand Up @@ -371,6 +405,8 @@
return nil, err
}

clog.V(common.DEBUG).Infof(ctx, "selected orchestrator=%s", sess.Transcoder())

Check warning on line 409 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L408-L409

Added lines #L408 - L409 were not covered by tests
return sess, nil
}

Expand Down
82 changes: 82 additions & 0 deletions server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,88 @@
})
}

// poolOrchestrator contains information about an orchestrator in a pool.
type poolOrchestrator struct {
Url string `json:"url"`
LatencyScore float64 `json:"latency_score"`
InFlight int `json:"in_flight"`
}

// aiPoolInfo contains information about an AI pool.
type aiPoolInfo struct {
Size int `json:"size"`
InUse int `json:"in_use"`
Orchestrators []poolOrchestrator `json:"orchestrators"`
}

// suspendedInfo contains information about suspended orchestrators.
type suspendedInfo struct {
List map[string]int `json:"list"`
CurrentCount int `json:"current_count"`
}

// aiOrchestratorPools contains information about all AI pools.
type aiOrchestratorPools struct {
Cold aiPoolInfo `json:"cold"`
Warm aiPoolInfo `json:"warm"`
LastRefresh time.Time `json:"last_refresh"`
Suspended suspendedInfo `json:"suspended"`
}

// getAIOrchestratorPoolsInfoHandler returns information about AI orchestrator pools.
func (s *LivepeerServer) getAIPoolsInfoHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
aiPoolsInfoResp := make(map[string]aiOrchestratorPools)

s.AISessionManager.mu.Lock()
defer s.AISessionManager.mu.Unlock()

// Return if no selectors are present.
if len(s.AISessionManager.selectors) == 0 {
glog.Warning("Orchestrator pools are not yet initialized")
respondJson(w, aiPoolsInfoResp)
return
}

Check warning on line 340 in server/handlers.go

View check run for this annotation

Codecov / codecov/patch

server/handlers.go#L330-L340

Added lines #L330 - L340 were not covered by tests

// Loop through selectors and get pools info.
for cap, pool := range s.AISessionManager.selectors {
warmPool := aiPoolInfo{
Size: pool.warmPool.Size(),
InUse: len(pool.warmPool.inUseSess),
}
for _, sess := range pool.warmPool.sessMap {
poolOrchestrator := poolOrchestrator{
Url: sess.Transcoder(),
LatencyScore: sess.LatencyScore,
InFlight: len(sess.SegsInFlight),
}
warmPool.Orchestrators = append(warmPool.Orchestrators, poolOrchestrator)
}

Check warning on line 355 in server/handlers.go

View check run for this annotation

Codecov / codecov/patch

server/handlers.go#L343-L355

Added lines #L343 - L355 were not covered by tests

coldPool := aiPoolInfo{
Size: pool.coldPool.Size(),
InUse: len(pool.coldPool.inUseSess),
}
for _, sess := range pool.coldPool.sessMap {
coldPool.Orchestrators = append(coldPool.Orchestrators, poolOrchestrator{
Url: sess.Transcoder(),
LatencyScore: sess.LatencyScore,
InFlight: len(sess.SegsInFlight),
})
}

Check warning on line 367 in server/handlers.go

View check run for this annotation

Codecov / codecov/patch

server/handlers.go#L357-L367

Added lines #L357 - L367 were not covered by tests

aiPoolsInfoResp[cap] = aiOrchestratorPools{
Cold: coldPool,
Warm: warmPool,
LastRefresh: pool.lastRefreshTime,
Suspended: suspendedInfo{List: pool.suspender.list, CurrentCount: pool.suspender.count},
}

Check warning on line 374 in server/handlers.go

View check run for this annotation

Codecov / codecov/patch

server/handlers.go#L369-L374

Added lines #L369 - L374 were not covered by tests
}

respondJson(w, aiPoolsInfoResp)

Check warning on line 377 in server/handlers.go

View check run for this annotation

Codecov / codecov/patch

server/handlers.go#L377

Added line #L377 was not covered by tests
})
}

// Rounds
func currentRoundHandler(client eth.LivepeerEthClient) http.Handler {
return mustHaveClient(client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
1 change: 1 addition & 0 deletions server/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (s *LivepeerServer) cliWebServerHandlers(bindAddr string) *http.ServeMux {
mux.Handle("/getBroadcastConfig", getBroadcastConfigHandler())
mux.Handle("/getAvailableTranscodingOptions", getAvailableTranscodingOptionsHandler())
mux.Handle("/setMaxPriceForCapability", mustHaveFormParams(s.setMaxPriceForCapability(), "maxPricePerUnit", "pixelsPerUnit", "currency", "pipeline", "modelID"))
mux.Handle("/getAISessionPoolsInfo", s.getAIPoolsInfoHandler())

// Rounds
mux.Handle("/currentRound", currentRoundHandler(client))
Expand Down
Loading