Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add periodic session refresh for Live Video to Video #3404

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion server/ai_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,32 @@
return nil, err
}

// Periodically refresh sessions for Live Video to Video in order to minimize the necessity of refreshing sessions
// when the AI process is started
if cap == core.Capability_LiveVideoToVideo {
startPeriodicRefresh(sel)
}

Check warning on line 222 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L220-L222

Added lines #L220 - L222 were not covered by tests

return sel, nil
}

func startPeriodicRefresh(sel *AISessionSelector) {
go func() {
// Refresh at 80% of tll to avoid ever getting ttl applied
refreshInterval := 1 * time.Minute
ticker := time.NewTicker(refreshInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := sel.Refresh(context.Background()); err != nil {
clog.Infof(context.Background(), "Error refreshing AISessionSelector err=%v", err)
}

Check warning on line 238 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L227-L238

Added lines #L227 - L238 were not covered by tests
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we try to be a good citizen and tear down the loop when the node shuts down?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, would be good. But I believe it's not so trivial....heheh. I guess we would need to move this to starter.go, etc. Unless you have some simple idea how to exit this loop when the node shuts down?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, or some chan / signal from LivepeerNode ... not a big deal though if it seems more trouble than it is worth.

}
}()
}

// newAICapabilities creates a new capabilities object with
func newAICapabilities(cap core.Capability, modelID string, warm bool, minVersion string) *core.Capabilities {
aiCaps := []core.Capability{cap}
Expand All @@ -238,6 +261,10 @@

// selectorIsEmpty returns true if no orchestrators are in the warm or cold pools.
func (sel *AISessionSelector) SelectorIsEmpty() bool {
sel.warmPool.mu.Lock()
sel.coldPool.mu.Lock()
defer sel.coldPool.mu.Unlock()
defer sel.warmPool.mu.Unlock()

Check warning on line 267 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L264-L267

Added lines #L264 - L267 were not covered by tests
return sel.warmPool.Size() == 0 && sel.coldPool.Size() == 0
}

Expand All @@ -261,6 +288,10 @@
}

// Refresh if the selector has expired
sel.warmPool.mu.Lock()
sel.coldPool.mu.Lock()
defer sel.coldPool.mu.Unlock()
defer sel.warmPool.mu.Unlock()

Check warning on line 294 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L291-L294

Added lines #L291 - L294 were not covered by tests
if time.Now().After(sel.lastRefreshTime.Add(sel.ttl)) {
return true
}
Expand Down Expand Up @@ -345,8 +376,12 @@

sel.warmPool.Add(warmSessions)
sel.coldPool.Add(coldSessions)
sel.initialPoolSize = len(warmSessions) + len(coldSessions) + len(sel.suspender.list)

sel.warmPool.mu.Lock()
sel.coldPool.mu.Lock()
defer sel.coldPool.mu.Unlock()
defer sel.warmPool.mu.Unlock()
sel.initialPoolSize = len(warmSessions) + len(coldSessions) + len(sel.suspender.list)

Check warning on line 384 in server/ai_session.go

View check run for this annotation

Codecov / codecov/patch

server/ai_session.go#L380-L384

Added lines #L380 - L384 were not covered by tests
sel.lastRefreshTime = time.Now()

return nil
Expand Down
Loading