Skip to content

Commit

Permalink
Add periodic session refresh for Live Video to Video
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko committed Feb 20, 2025
1 parent 232df3a commit f48a4a3
Showing 1 changed file with 32 additions and 1 deletion.
33 changes: 32 additions & 1 deletion server/ai_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,32 @@ func NewAISessionSelector(ctx context.Context, cap core.Capability, modelID stri
return nil, err
}

// Periodically refresh sessions for Live Video to Video in order to minimize the necessity of refreshing sessions
// when the AI process is started
if cap == core.Capability_LiveVideoToVideo {
startPeriodicRefresh(sel)
}

Check warning on line 211 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L209-L211

Added lines #L209 - L211 were not covered by tests

return sel, nil
}

func startPeriodicRefresh(sel *AISessionSelector) {
go func() {
// Refresh and 80% or tll to avoid ever getting ttl applied
refreshInterval := time.Duration(0.8 * float64(sel.ttl))
ticker := time.NewTicker(refreshInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := sel.Refresh(context.Background()); err != nil {
clog.Infof(context.Background(), "Error refreshing AISessionSelector err=%v", err)
}

Check warning on line 227 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L216-L227

Added lines #L216 - L227 were not covered by tests
}
}
}()
}

// newAICapabilities creates a new capabilities object with
func newAICapabilities(cap core.Capability, modelID string, warm bool, minVersion string) *core.Capabilities {
aiCaps := []core.Capability{cap}
Expand All @@ -232,6 +255,11 @@ func (sel *AISessionSelector) SelectorIsEmpty() bool {

func (sel *AISessionSelector) Select(ctx context.Context) *AISession {
shouldRefreshSelector := func() bool {
sel.warmPool.mu.Lock()
sel.coldPool.mu.Lock()

defer sel.coldPool.mu.Unlock()
defer sel.warmPool.mu.Unlock()

Check warning on line 262 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L258-L262

Added lines #L258 - L262 were not covered by tests
discoveryPoolSize := int(math.Min(float64(sel.node.OrchestratorPool.Size()), float64(sel.initialPoolSize)))

// If the selector is empty, release all orchestrators from suspension and
Expand Down Expand Up @@ -332,10 +360,13 @@ func (sel *AISessionSelector) Refresh(ctx context.Context) error {
}
}

sel.warmPool.mu.Lock()
sel.coldPool.mu.Lock()
defer sel.coldPool.mu.Unlock()
defer sel.warmPool.mu.Unlock()

Check warning on line 366 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L363-L366

Added lines #L363 - L366 were not covered by tests
sel.warmPool.Add(warmSessions)
sel.coldPool.Add(coldSessions)
sel.initialPoolSize = len(warmSessions) + len(coldSessions) + len(sel.suspender.list)

sel.lastRefreshTime = time.Now()

return nil
Expand Down

0 comments on commit f48a4a3

Please sign in to comment.