Skip to content

Commit b5d1048

Browse files
temp hold sessions with retryable errors
1 parent 232df3a commit b5d1048

File tree

4 files changed

+138
-12
lines changed

4 files changed

+138
-12
lines changed

server/ai_mediaserver.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func aiMediaServerHandle[I, O any](ls *LivepeerServer, decoderFunc func(*I, *htt
111111
node: ls.LivepeerNode,
112112
os: drivers.NodeStorage.NewSession(requestID),
113113
sessManager: ls.AISessionManager,
114+
requestID: requestID,
114115
}
115116

116117
var req I
@@ -172,6 +173,7 @@ func (ls *LivepeerServer) ImageToVideo() http.Handler {
172173
node: ls.LivepeerNode,
173174
os: drivers.NodeStorage.NewSession(requestID),
174175
sessManager: ls.AISessionManager,
176+
requestID: requestID,
175177
}
176178

177179
if !async {
@@ -280,6 +282,7 @@ func (ls *LivepeerServer) LLM() http.Handler {
280282
node: ls.LivepeerNode,
281283
os: drivers.NodeStorage.NewSession(requestID),
282284
sessManager: ls.AISessionManager,
285+
requestID: requestID,
283286
}
284287

285288
start := time.Now()
@@ -554,6 +557,7 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {
554557
node: ls.LivepeerNode,
555558
os: drivers.NodeStorage.NewSession(requestID),
556559
sessManager: ls.AISessionManager,
560+
requestID: requestID,
557561

558562
liveParams: liveRequestParams{
559563
segmentReader: ssr,

server/ai_process.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type aiRequestParams struct {
8383
node *core.LivepeerNode
8484
os drivers.OSSession
8585
sessManager *AISessionManager
86+
requestID string
8687

8788
liveParams liveRequestParams
8889
}
@@ -1520,6 +1521,12 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface
15201521
break
15211522
}
15221523

1524+
// Don't suspend the session if the error is a transient error.
1525+
if isRetryableError(err) {
1526+
params.sessManager.PutSessionOnHold(ctx, params.requestID, sess)
1527+
continue
1528+
}
1529+
15231530
// Suspend the session on other errors.
15241531
clog.Infof(ctx, "Error submitting request modelID=%v try=%v orch=%v err=%v", modelID, tries, sess.Transcoder(), err)
15251532
params.sessManager.Remove(ctx, sess) //TODO: Improve session selection logic for live-video-to-video
@@ -1543,6 +1550,25 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface
15431550
return resp, nil
15441551
}
15451552

1553+
// isRetryableError checks if the error is a transient error that can be retried.
1554+
var isRetryableError = retryableError
1555+
1556+
func retryableError(err error) bool {
1557+
transientErrorMessages := []string{
1558+
"insufficient capacity", // Caused by limitation in our current implementation.
1559+
"invalid ticket sendernonce", // Caused by gateway nonce mismatch.
1560+
"ticketparams expired", // Caused by ticket expiration.
1561+
}
1562+
1563+
errMsg := strings.ToLower(err.Error())
1564+
for _, msg := range transientErrorMessages {
1565+
if strings.Contains(errMsg, msg) {
1566+
return true
1567+
}
1568+
}
1569+
return false
1570+
}
1571+
15461572
func prepareAIPayment(ctx context.Context, sess *AISession, outPixels int64) (worker.RequestEditorFn, *BalanceUpdate, error) {
15471573
// genSegCreds expects a stream.HLSSegment so in order to reuse it here we pass a dummy object
15481574
segCreds, err := genSegCreds(sess.BroadcastSession, &stream.HLSSegment{}, nil, false)

server/ai_process_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package server
22

33
import (
44
"context"
5+
"errors"
56
"reflect"
67
"testing"
78

@@ -123,3 +124,35 @@ func TestEncodeReqMetadata(t *testing.T) {
123124
})
124125
}
125126
}
127+
128+
func Test_isRetryableError(t *testing.T) {
129+
tests := []struct {
130+
name string
131+
err error
132+
want bool
133+
}{
134+
{
135+
name: "insufficient capacity error",
136+
err: errors.New("Insufficient capacity"),
137+
want: true,
138+
},
139+
{
140+
name: "INSUFFICIENT capacity ERROR",
141+
err: errors.New("Insufficient capacity"),
142+
want: true,
143+
},
144+
{
145+
name: "non-retryable error",
146+
err: errors.New("some other error"),
147+
want: false,
148+
},
149+
}
150+
151+
for _, tt := range tests {
152+
t.Run(tt.name, func(t *testing.T) {
153+
if got := isRetryableError(tt.err); got != tt.want {
154+
t.Errorf("isRetryableError() = %v, want %v", got, tt.want)
155+
}
156+
})
157+
}
158+
}

server/ai_session.go

Lines changed: 75 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,23 @@ type AISession struct {
2525
}
2626

2727
type AISessionPool struct {
28-
selector BroadcastSessionsSelector
29-
sessMap map[string]*BroadcastSession
30-
inUseSess []*BroadcastSession
31-
suspender *suspender
32-
penalty int
33-
mu sync.RWMutex
28+
selector BroadcastSessionsSelector
29+
sessMap map[string]*BroadcastSession
30+
inUseSess []*BroadcastSession
31+
sessionsOnHold map[string][]*BroadcastSession
32+
suspender *suspender
33+
penalty int
34+
mu sync.RWMutex
3435
}
3536

3637
func NewAISessionPool(selector BroadcastSessionsSelector, suspender *suspender, penalty int) *AISessionPool {
3738
return &AISessionPool{
38-
selector: selector,
39-
sessMap: make(map[string]*BroadcastSession),
40-
suspender: suspender,
41-
penalty: penalty,
42-
mu: sync.RWMutex{},
39+
selector: selector,
40+
sessMap: make(map[string]*BroadcastSession),
41+
sessionsOnHold: make(map[string][]*BroadcastSession),
42+
suspender: suspender,
43+
penalty: penalty,
44+
mu: sync.RWMutex{},
4345
}
4446
}
4547

@@ -135,6 +137,31 @@ func (pool *AISessionPool) Remove(sess *BroadcastSession) {
135137
pool.suspender.suspend(sess.Transcoder(), penalty)
136138
}
137139

140+
func (pool *AISessionPool) PutSessionOnHold(id string, sess *BroadcastSession) {
141+
pool.mu.Lock()
142+
defer pool.mu.Unlock()
143+
144+
var sessionList []*BroadcastSession
145+
holdList, ok := pool.sessionsOnHold[id]
146+
if ok {
147+
sessionList = holdList
148+
}
149+
pool.sessionsOnHold[id] = append(sessionList, sess)
150+
151+
delete(pool.sessMap, sess.Transcoder())
152+
pool.inUseSess = removeSessionFromList(pool.inUseSess, sess)
153+
}
154+
155+
func (pool *AISessionPool) ReleaseSessionsOnHold(id string) {
156+
sessions, ok := pool.sessionsOnHold[id]
157+
if !ok {
158+
return
159+
}
160+
161+
pool.Add(sessions)
162+
delete(pool.sessionsOnHold, id)
163+
}
164+
138165
func (pool *AISessionPool) Size() int {
139166
pool.mu.RLock()
140167
defer pool.mu.RUnlock()
@@ -245,7 +272,8 @@ func (sel *AISessionSelector) Select(ctx context.Context) *AISession {
245272

246273
// Refresh if the # of sessions across warm and cold pools falls below the smaller of the maxRefreshSessionsThreshold and
247274
// 1/2 the total # of orchs that can be queried during discovery
248-
if sel.warmPool.Size()+sel.coldPool.Size() < int(math.Min(maxRefreshSessionsThreshold, math.Ceil(float64(discoveryPoolSize)/2.0))) {
275+
size := sel.warmPool.Size() + sel.coldPool.Size()
276+
if size < int(math.Min(maxRefreshSessionsThreshold, math.Ceil(float64(discoveryPoolSize)/2.0))) {
249277
return true
250278
}
251279

@@ -293,6 +321,30 @@ func (sel *AISessionSelector) Remove(sess *AISession) {
293321
}
294322
}
295323

324+
func (sel *AISessionSelector) PutSessionOnHold(id string, sess *AISession) {
325+
// start cleanup func on first session on hold for request
326+
_, warmPoolCleanupStarted := sel.warmPool.sessionsOnHold[id]
327+
_, coldPoolCleanupStarted := sel.coldPool.sessionsOnHold[id]
328+
if !warmPoolCleanupStarted && !coldPoolCleanupStarted {
329+
go func(id string, sel *AISessionSelector) {
330+
time.Sleep(sel.node.AIProcesssingRetryTimeout)
331+
sel.ReleaseSessionsOnHold(id)
332+
}(id, sel)
333+
}
334+
335+
// put session on hold
336+
if sess.Warm {
337+
sel.warmPool.PutSessionOnHold(id, sess.BroadcastSession)
338+
} else {
339+
sel.coldPool.PutSessionOnHold(id, sess.BroadcastSession)
340+
}
341+
}
342+
343+
func (sel *AISessionSelector) ReleaseSessionsOnHold(id string) {
344+
sel.warmPool.ReleaseSessionsOnHold(id)
345+
sel.coldPool.ReleaseSessionsOnHold(id)
346+
}
347+
296348
func (sel *AISessionSelector) Refresh(ctx context.Context) error {
297349
// If we try to add new sessions to the pool the suspender
298350
// should treat this as a refresh
@@ -421,6 +473,17 @@ func (c *AISessionManager) Remove(ctx context.Context, sess *AISession) error {
421473
return nil
422474
}
423475

476+
func (c *AISessionManager) PutSessionOnHold(ctx context.Context, id string, sess *AISession) error {
477+
sel, err := c.getSelector(ctx, sess.Cap, sess.ModelID)
478+
if err != nil {
479+
return err
480+
}
481+
482+
sel.PutSessionOnHold(id, sess)
483+
484+
return nil
485+
}
486+
424487
func (c *AISessionManager) Complete(ctx context.Context, sess *AISession) error {
425488
sel, err := c.getSelector(ctx, sess.Cap, sess.ModelID)
426489
if err != nil {

0 commit comments

Comments
 (0)