Skip to content

Commit

Permalink
Merge branch 'master' into ai-video-fix-selection-pr
Browse files Browse the repository at this point in the history
  • Loading branch information
ad-astra-video authored Jan 29, 2025
2 parents f3156a3 + 51e6cbe commit c0df840
Show file tree
Hide file tree
Showing 27 changed files with 717 additions and 151 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
# Changelog

## v0.8.2

### Features ⚒

#### Broadcaster

- [#3321](https://github.com/livepeer/go-livepeer/pull/3321) Add orchestrator info on live AI monitoring events

#### Orchestrator

- [#3355](https://github.com/livepeer/go-livepeer/pull/3355) Allow O/T AI orchs to run without `transcoder` flag.
- [#2968](https://github.com/livepeer/go-livepeer/pull/2968) Enhance payment processing log line for better processing by Loki.

#### Transcoder

- [#3359](https://github.com/livepeer/go-livepeer/pull/3359) Update LPMS to use ffmpeg H.264 parser

## v0.8.1

- [#3279](https://github.com/livepeer/go-livepeer/pull/3279) Enable automatic worker image pulling.
Expand Down
5 changes: 4 additions & 1 deletion CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

### Features ⚒

- [#3365](https://github.com/livepeer/go-livepeer/pull/3336/) updated AI llm pipeline to new OpenAI compatible API format.

#### General

#### Broadcaster
- [#3321](https://github.com/livepeer/go-livepeer/pull/3321) Add orchestrator info on live AI monitoring events

#### Orchestrator

Expand All @@ -19,6 +20,8 @@

#### CLI

- [#3364](https://github.com/livepeer/go-livepeer/pull/3364) fix orchestrator status json unmarshalling issue.

#### General

#### Broadcaster
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.8.1
0.8.2
3 changes: 2 additions & 1 deletion cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.AIWorker = flag.Bool("aiWorker", *cfg.AIWorker, "Set to true to run an AI worker")
cfg.AIModels = flag.String("aiModels", *cfg.AIModels, "Set models (pipeline:model_id) for AI worker to load upon initialization")
cfg.AIModelsDir = flag.String("aiModelsDir", *cfg.AIModelsDir, "Set directory where AI model weights are stored")
cfg.AIRunnerImage = flag.String("aiRunnerImage", *cfg.AIRunnerImage, "Set the docker image for the AI runner: Example - livepeer/ai-runner:0.0.1")
cfg.AIRunnerImage = flag.String("aiRunnerImage", *cfg.AIRunnerImage, "[Deprecated] Specify the base Docker image for the AI runner. Example: livepeer/ai-runner:0.0.1. Use -aiRunnerImageOverrides instead.")
cfg.AIRunnerImageOverrides = flag.String("aiRunnerImageOverrides", *cfg.AIRunnerImageOverrides, `Specify overrides for the Docker images used by the AI runner. Example: '{"default": "livepeer/ai-runner:v1.0", "batch": {"text-to-speech": "livepeer/ai-runner:text-to-speech-v1.0"}, "live": {"another-pipeline": "livepeer/ai-runner:another-pipeline-v1.0"}}'`)

// Live AI:
cfg.MediaMTXApiPassword = flag.String("mediaMTXApiPassword", "", "HTTP basic auth password for MediaMTX API requests")
Expand Down
49 changes: 35 additions & 14 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ type LivepeerConfig struct {
OrchMinLivepeerVersion *string
TestOrchAvail *bool
AIRunnerImage *string
AIRunnerImageOverrides *string
KafkaBootstrapServers *string
KafkaUsername *string
KafkaPassword *string
Expand Down Expand Up @@ -214,6 +215,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultAIModels := ""
defaultAIModelsDir := ""
defaultAIRunnerImage := "livepeer/ai-runner:latest"
defaultAIRunnerImageOverrides := ""
defaultLiveAIAuthWebhookURL := ""
defaultLivePaymentInterval := 5 * time.Second
defaultGatewayHost := ""
Expand Down Expand Up @@ -318,14 +320,15 @@ func DefaultLivepeerConfig() LivepeerConfig {
TestTranscoder: &defaultTestTranscoder,

// AI:
AIServiceRegistry: &defaultAIServiceRegistry,
AIWorker: &defaultAIWorker,
AIModels: &defaultAIModels,
AIModelsDir: &defaultAIModelsDir,
AIRunnerImage: &defaultAIRunnerImage,
LiveAIAuthWebhookURL: &defaultLiveAIAuthWebhookURL,
LivePaymentInterval: &defaultLivePaymentInterval,
GatewayHost: &defaultGatewayHost,
AIServiceRegistry: &defaultAIServiceRegistry,
AIWorker: &defaultAIWorker,
AIModels: &defaultAIModels,
AIModelsDir: &defaultAIModelsDir,
AIRunnerImage: &defaultAIRunnerImage,
AIRunnerImageOverrides: &defaultAIRunnerImageOverrides,
LiveAIAuthWebhookURL: &defaultLiveAIAuthWebhookURL,
LivePaymentInterval: &defaultLivePaymentInterval,
GatewayHost: &defaultGatewayHost,

// Onchain:
EthAcctAddr: &defaultEthAcctAddr,
Expand Down Expand Up @@ -1211,7 +1214,24 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
return
}

n.AIWorker, err = worker.NewWorker(*cfg.AIRunnerImage, gpus, modelsDir)
// Retrieve image overrides from the config.
var imageOverrides worker.ImageOverrides
if *cfg.AIRunnerImageOverrides != "" {
if err := json.Unmarshal([]byte(*cfg.AIRunnerImageOverrides), &imageOverrides); err != nil {
glog.Errorf("Error unmarshaling image overrides: %v", err)
return
}
}

// Backwards compatibility for deprecated flags.
if *cfg.AIRunnerImage != "" {
glog.Warning("-aiRunnerImage flag is deprecated and will be removed in a future release. Please use -aiWorkerImageOverrides instead")
if imageOverrides.Default == "" {
imageOverrides.Default = *cfg.AIRunnerImage
}
}

n.AIWorker, err = worker.NewWorker(imageOverrides, gpus, modelsDir)
if err != nil {
glog.Errorf("Error starting AI worker: %v", err)
return
Expand Down Expand Up @@ -1534,10 +1554,11 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
*cfg.CliAddr = defaultAddr(*cfg.CliAddr, "127.0.0.1", TranscoderCliPort)
} else if n.NodeType == core.AIWorkerNode {
*cfg.CliAddr = defaultAddr(*cfg.CliAddr, "127.0.0.1", AIWorkerCliPort)
// Need to have default Capabilities if not running transcoder.
if !*cfg.Transcoder {
aiCaps = append(aiCaps, core.DefaultCapabilities()...)
}
}

// Apply default capabilities if not running as a transcoder.
if !*cfg.Transcoder && (n.NodeType == core.AIWorkerNode || n.NodeType == core.OrchestratorNode) {
aiCaps = append(aiCaps, core.DefaultCapabilities()...)
}

n.Capabilities = core.NewCapabilities(append(transcoderCaps, aiCaps...), nil)
Expand Down Expand Up @@ -1588,7 +1609,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
//Create Livepeer Node

//Set up the media server
s, err := server.NewLivepeerServer(*cfg.RtmpAddr, n, httpIngest, *cfg.TranscodingOptions)
s, err := server.NewLivepeerServer(ctx, *cfg.RtmpAddr, n, httpIngest, *cfg.TranscodingOptions)
if err != nil {
exit("Error creating Livepeer server: err=%q", err)
}
Expand Down
21 changes: 21 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func FromRemoteInfos(infos []*net.OrchestratorInfo) OrchestratorDescriptors {
return ods
}

// MarshalJSON ensures that URL is marshaled as a string.
func (u *OrchestratorLocalInfo) MarshalJSON() ([]byte, error) {
type Alias OrchestratorLocalInfo
return json.Marshal(&struct {
Expand All @@ -97,6 +98,26 @@ func (u *OrchestratorLocalInfo) MarshalJSON() ([]byte, error) {
})
}

// UnmarshalJSON ensures that URL string is unmarshaled as a URL.
func (o *OrchestratorLocalInfo) UnmarshalJSON(data []byte) error {
type Alias OrchestratorLocalInfo
aux := &struct {
URL string `json:"Url"`
*Alias
}{
Alias: (*Alias)(o),
}
if err := json.Unmarshal(data, aux); err != nil {
return err
}
parsedURL, err := url.Parse(aux.URL)
if err != nil {
return err
}
o.URL = parsedURL
return nil
}

type ScorePred = func(float32) bool
type OrchestratorPool interface {
GetInfos() []OrchestratorLocalInfo
Expand Down
2 changes: 1 addition & 1 deletion core/ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type AI interface {
ImageToVideo(context.Context, worker.GenImageToVideoMultipartRequestBody) (*worker.VideoResponse, error)
Upscale(context.Context, worker.GenUpscaleMultipartRequestBody) (*worker.ImageResponse, error)
AudioToText(context.Context, worker.GenAudioToTextMultipartRequestBody) (*worker.TextResponse, error)
LLM(context.Context, worker.GenLLMFormdataRequestBody) (interface{}, error)
LLM(context.Context, worker.GenLLMJSONRequestBody) (interface{}, error)
SegmentAnything2(context.Context, worker.GenSegmentAnything2MultipartRequestBody) (*worker.MasksResponse, error)
ImageToText(context.Context, worker.GenImageToTextMultipartRequestBody) (*worker.ImageToTextResponse, error)
TextToSpeech(context.Context, worker.GenTextToSpeechJSONRequestBody) (*worker.AudioResponse, error)
Expand Down
7 changes: 5 additions & 2 deletions core/ai_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,11 @@ func (a *stubAIWorker) SegmentAnything2(ctx context.Context, req worker.GenSegme
return &worker.MasksResponse{Logits: "logits", Masks: "masks", Scores: "scores"}, nil
}

func (a *stubAIWorker) LLM(ctx context.Context, req worker.GenLLMFormdataRequestBody) (interface{}, error) {
return &worker.LLMResponse{Response: "response tokens", TokensUsed: 10}, nil
func (a *stubAIWorker) LLM(ctx context.Context, req worker.GenLLMJSONRequestBody) (interface{}, error) {
var choices []worker.LLMChoice
choices = append(choices, worker.LLMChoice{Delta: &worker.LLMMessage{Content: "choice1", Role: "assistant"}, Index: 0})
tokensUsed := worker.LLMTokenUsage{PromptTokens: 40, CompletionTokens: 10, TotalTokens: 50}
return &worker.LLMResponse{Choices: choices, Created: 1, Model: "llm_model", Usage: tokensUsed}, nil
}

func (a *stubAIWorker) ImageToText(ctx context.Context, req worker.GenImageToTextMultipartRequestBody) (*worker.ImageToTextResponse, error) {
Expand Down
8 changes: 4 additions & 4 deletions core/ai_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,14 +787,14 @@ func (orch *orchestrator) SegmentAnything2(ctx context.Context, requestID string
}

// Return type is LLMResponse, but a stream is available as well as chan(string)
func (orch *orchestrator) LLM(ctx context.Context, requestID string, req worker.GenLLMFormdataRequestBody) (interface{}, error) {
func (orch *orchestrator) LLM(ctx context.Context, requestID string, req worker.GenLLMJSONRequestBody) (interface{}, error) {
// local AIWorker processes job if combined orchestrator/ai worker
if orch.node.AIWorker != nil {
// no file response to save, response is text sent back to gateway
return orch.node.AIWorker.LLM(ctx, req)
}

res, err := orch.node.AIWorkerManager.Process(ctx, requestID, "llm", *req.ModelId, "", AIJobRequestData{Request: req})
res, err := orch.node.AIWorkerManager.Process(ctx, requestID, "llm", *req.Model, "", AIJobRequestData{Request: req})
if err != nil {
return nil, err
}
Expand All @@ -805,7 +805,7 @@ func (orch *orchestrator) LLM(ctx context.Context, requestID string, req worker.
if err != nil {
clog.Errorf(ctx, "Error saving remote ai result err=%q", err)
if monitor.Enabled {
monitor.AIResultSaveError(ctx, "llm", *req.ModelId, string(monitor.SegmentUploadErrorUnknown))
monitor.AIResultSaveError(ctx, "llm", *req.Model, string(monitor.SegmentUploadErrorUnknown))
}
return nil, err

Expand Down Expand Up @@ -1050,7 +1050,7 @@ func (n *LivepeerNode) SegmentAnything2(ctx context.Context, req worker.GenSegme
return n.AIWorker.SegmentAnything2(ctx, req)
}

func (n *LivepeerNode) LLM(ctx context.Context, req worker.GenLLMFormdataRequestBody) (interface{}, error) {
func (n *LivepeerNode) LLM(ctx context.Context, req worker.GenLLMJSONRequestBody) (interface{}, error) {
return n.AIWorker.LLM(ctx, req)
}

Expand Down
6 changes: 1 addition & 5 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,6 @@ func (orch *orchestrator) ProcessPayment(ctx context.Context, payment net.Paymen
totalEV := big.NewRat(0, 1)
totalTickets := 0
totalWinningTickets := 0
totalFaceValue := big.NewInt(0)
totalWinProb := big.NewRat(0, 1)

var receiveErr error

Expand Down Expand Up @@ -196,8 +194,6 @@ func (orch *orchestrator) ProcessPayment(ctx context.Context, payment net.Paymen
ev := ticket.EV()
orch.node.Balances.Credit(sender, manifestID, ev)
totalEV.Add(totalEV, ev)
totalFaceValue.Add(totalFaceValue, ticket.FaceValue)
totalWinProb.Add(totalWinProb, ticket.WinProbRat())
totalTickets++
}

Expand All @@ -214,7 +210,7 @@ func (orch *orchestrator) ProcessPayment(ctx context.Context, payment net.Paymen
}
}

clog.V(common.DEBUG).Infof(ctx, "Payment tickets processed sessionID=%v faceValue=%v winProb=%v ev=%v", manifestID, eth.FormatUnits(totalFaceValue, "ETH"), totalWinProb.FloatString(10), totalEV.FloatString(2))
clog.V(common.DEBUG).Infof(ctx, "Payment tickets processed sessionID=%v faceValue=%v winProb=%v totalTickets=%v totalEV=%v", manifestID, eth.FormatUnits(ticketParams.FaceValue, "ETH"), ticketParams.WinProbRat().FloatString(10), totalTickets, totalEV.FloatString(2))

if lpmon.Enabled {
lpmon.TicketValueRecv(ctx, sender.Hex(), totalEV)
Expand Down
Loading

0 comments on commit c0df840

Please sign in to comment.