diff --git a/CHANGELOG.md b/CHANGELOG.md
index bf002df3bb..09575a61db 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,22 @@
# Changelog
+## v0.8.2
+
+### Features ⚒
+
+#### Broadcaster
+
+- [#3321](https://github.com/livepeer/go-livepeer/pull/3321) Add orchestrator info on live AI monitoring events
+
+#### Orchestrator
+
+- [#3355](https://github.com/livepeer/go-livepeer/pull/3355) Allow O/T AI orchs to run without `transcoder` flag.
+- [#2968](https://github.com/livepeer/go-livepeer/pull/2968) Enhance payment processing log line for better processing by Loki.
+
+#### Transcoder
+
+- [#3359](https://github.com/livepeer/go-livepeer/pull/3359) Update LPMS to use ffmpeg H.264 parser
+
## v0.8.1
- [#3279](https://github.com/livepeer/go-livepeer/pull/3279) Enable automatic worker image pulling.
diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md
index dba2dfde60..422c7bcd3c 100644
--- a/CHANGELOG_PENDING.md
+++ b/CHANGELOG_PENDING.md
@@ -6,10 +6,11 @@
### Features ⚒
+- [#3365](https://github.com/livepeer/go-livepeer/pull/3336/) updated AI llm pipeline to new OpenAI compatible API format.
+
#### General
#### Broadcaster
-- [#3321](https://github.com/livepeer/go-livepeer/pull/3321) Add orchestrator info on live AI monitoring events
#### Orchestrator
@@ -19,6 +20,8 @@
#### CLI
+- [#3364](https://github.com/livepeer/go-livepeer/pull/3364) fix orchestrator status json unmarshalling issue.
+
#### General
#### Broadcaster
diff --git a/VERSION b/VERSION
index c18d72be30..53a48a1e8c 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.8.1
\ No newline at end of file
+0.8.2
\ No newline at end of file
diff --git a/cmd/livepeer/livepeer.go b/cmd/livepeer/livepeer.go
index fc2c9d4a58..b4564ea170 100755
--- a/cmd/livepeer/livepeer.go
+++ b/cmd/livepeer/livepeer.go
@@ -161,7 +161,8 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.AIWorker = flag.Bool("aiWorker", *cfg.AIWorker, "Set to true to run an AI worker")
cfg.AIModels = flag.String("aiModels", *cfg.AIModels, "Set models (pipeline:model_id) for AI worker to load upon initialization")
cfg.AIModelsDir = flag.String("aiModelsDir", *cfg.AIModelsDir, "Set directory where AI model weights are stored")
- cfg.AIRunnerImage = flag.String("aiRunnerImage", *cfg.AIRunnerImage, "Set the docker image for the AI runner: Example - livepeer/ai-runner:0.0.1")
+ cfg.AIRunnerImage = flag.String("aiRunnerImage", *cfg.AIRunnerImage, "[Deprecated] Specify the base Docker image for the AI runner. Example: livepeer/ai-runner:0.0.1. Use -aiRunnerImageOverrides instead.")
+ cfg.AIRunnerImageOverrides = flag.String("aiRunnerImageOverrides", *cfg.AIRunnerImageOverrides, `Specify overrides for the Docker images used by the AI runner. Example: '{"default": "livepeer/ai-runner:v1.0", "batch": {"text-to-speech": "livepeer/ai-runner:text-to-speech-v1.0"}, "live": {"another-pipeline": "livepeer/ai-runner:another-pipeline-v1.0"}}'`)
// Live AI:
cfg.MediaMTXApiPassword = flag.String("mediaMTXApiPassword", "", "HTTP basic auth password for MediaMTX API requests")
diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go
index 78dfc96f46..45285952b5 100755
--- a/cmd/livepeer/starter/starter.go
+++ b/cmd/livepeer/starter/starter.go
@@ -164,6 +164,7 @@ type LivepeerConfig struct {
OrchMinLivepeerVersion *string
TestOrchAvail *bool
AIRunnerImage *string
+ AIRunnerImageOverrides *string
KafkaBootstrapServers *string
KafkaUsername *string
KafkaPassword *string
@@ -214,6 +215,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultAIModels := ""
defaultAIModelsDir := ""
defaultAIRunnerImage := "livepeer/ai-runner:latest"
+ defaultAIRunnerImageOverrides := ""
defaultLiveAIAuthWebhookURL := ""
defaultLivePaymentInterval := 5 * time.Second
defaultGatewayHost := ""
@@ -318,14 +320,15 @@ func DefaultLivepeerConfig() LivepeerConfig {
TestTranscoder: &defaultTestTranscoder,
// AI:
- AIServiceRegistry: &defaultAIServiceRegistry,
- AIWorker: &defaultAIWorker,
- AIModels: &defaultAIModels,
- AIModelsDir: &defaultAIModelsDir,
- AIRunnerImage: &defaultAIRunnerImage,
- LiveAIAuthWebhookURL: &defaultLiveAIAuthWebhookURL,
- LivePaymentInterval: &defaultLivePaymentInterval,
- GatewayHost: &defaultGatewayHost,
+ AIServiceRegistry: &defaultAIServiceRegistry,
+ AIWorker: &defaultAIWorker,
+ AIModels: &defaultAIModels,
+ AIModelsDir: &defaultAIModelsDir,
+ AIRunnerImage: &defaultAIRunnerImage,
+ AIRunnerImageOverrides: &defaultAIRunnerImageOverrides,
+ LiveAIAuthWebhookURL: &defaultLiveAIAuthWebhookURL,
+ LivePaymentInterval: &defaultLivePaymentInterval,
+ GatewayHost: &defaultGatewayHost,
// Onchain:
EthAcctAddr: &defaultEthAcctAddr,
@@ -1211,7 +1214,24 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
return
}
- n.AIWorker, err = worker.NewWorker(*cfg.AIRunnerImage, gpus, modelsDir)
+ // Retrieve image overrides from the config.
+ var imageOverrides worker.ImageOverrides
+ if *cfg.AIRunnerImageOverrides != "" {
+ if err := json.Unmarshal([]byte(*cfg.AIRunnerImageOverrides), &imageOverrides); err != nil {
+ glog.Errorf("Error unmarshaling image overrides: %v", err)
+ return
+ }
+ }
+
+ // Backwards compatibility for deprecated flags.
+ if *cfg.AIRunnerImage != "" {
+ glog.Warning("-aiRunnerImage flag is deprecated and will be removed in a future release. Please use -aiWorkerImageOverrides instead")
+ if imageOverrides.Default == "" {
+ imageOverrides.Default = *cfg.AIRunnerImage
+ }
+ }
+
+ n.AIWorker, err = worker.NewWorker(imageOverrides, gpus, modelsDir)
if err != nil {
glog.Errorf("Error starting AI worker: %v", err)
return
@@ -1534,10 +1554,11 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
*cfg.CliAddr = defaultAddr(*cfg.CliAddr, "127.0.0.1", TranscoderCliPort)
} else if n.NodeType == core.AIWorkerNode {
*cfg.CliAddr = defaultAddr(*cfg.CliAddr, "127.0.0.1", AIWorkerCliPort)
- // Need to have default Capabilities if not running transcoder.
- if !*cfg.Transcoder {
- aiCaps = append(aiCaps, core.DefaultCapabilities()...)
- }
+ }
+
+ // Apply default capabilities if not running as a transcoder.
+ if !*cfg.Transcoder && (n.NodeType == core.AIWorkerNode || n.NodeType == core.OrchestratorNode) {
+ aiCaps = append(aiCaps, core.DefaultCapabilities()...)
}
n.Capabilities = core.NewCapabilities(append(transcoderCaps, aiCaps...), nil)
@@ -1588,7 +1609,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
//Create Livepeer Node
//Set up the media server
- s, err := server.NewLivepeerServer(*cfg.RtmpAddr, n, httpIngest, *cfg.TranscodingOptions)
+ s, err := server.NewLivepeerServer(ctx, *cfg.RtmpAddr, n, httpIngest, *cfg.TranscodingOptions)
if err != nil {
exit("Error creating Livepeer server: err=%q", err)
}
diff --git a/common/types.go b/common/types.go
index dcb258add2..925a2e6916 100644
--- a/common/types.go
+++ b/common/types.go
@@ -86,6 +86,7 @@ func FromRemoteInfos(infos []*net.OrchestratorInfo) OrchestratorDescriptors {
return ods
}
+// MarshalJSON ensures that URL is marshaled as a string.
func (u *OrchestratorLocalInfo) MarshalJSON() ([]byte, error) {
type Alias OrchestratorLocalInfo
return json.Marshal(&struct {
@@ -97,6 +98,26 @@ func (u *OrchestratorLocalInfo) MarshalJSON() ([]byte, error) {
})
}
+// UnmarshalJSON ensures that URL string is unmarshaled as a URL.
+func (o *OrchestratorLocalInfo) UnmarshalJSON(data []byte) error {
+ type Alias OrchestratorLocalInfo
+ aux := &struct {
+ URL string `json:"Url"`
+ *Alias
+ }{
+ Alias: (*Alias)(o),
+ }
+ if err := json.Unmarshal(data, aux); err != nil {
+ return err
+ }
+ parsedURL, err := url.Parse(aux.URL)
+ if err != nil {
+ return err
+ }
+ o.URL = parsedURL
+ return nil
+}
+
type ScorePred = func(float32) bool
type OrchestratorPool interface {
GetInfos() []OrchestratorLocalInfo
diff --git a/core/ai.go b/core/ai.go
index a9eeae9f72..a30c6412ce 100644
--- a/core/ai.go
+++ b/core/ai.go
@@ -23,7 +23,7 @@ type AI interface {
ImageToVideo(context.Context, worker.GenImageToVideoMultipartRequestBody) (*worker.VideoResponse, error)
Upscale(context.Context, worker.GenUpscaleMultipartRequestBody) (*worker.ImageResponse, error)
AudioToText(context.Context, worker.GenAudioToTextMultipartRequestBody) (*worker.TextResponse, error)
- LLM(context.Context, worker.GenLLMFormdataRequestBody) (interface{}, error)
+ LLM(context.Context, worker.GenLLMJSONRequestBody) (interface{}, error)
SegmentAnything2(context.Context, worker.GenSegmentAnything2MultipartRequestBody) (*worker.MasksResponse, error)
ImageToText(context.Context, worker.GenImageToTextMultipartRequestBody) (*worker.ImageToTextResponse, error)
TextToSpeech(context.Context, worker.GenTextToSpeechJSONRequestBody) (*worker.AudioResponse, error)
diff --git a/core/ai_test.go b/core/ai_test.go
index 3e4ab8207b..863dfcbb81 100644
--- a/core/ai_test.go
+++ b/core/ai_test.go
@@ -651,8 +651,11 @@ func (a *stubAIWorker) SegmentAnything2(ctx context.Context, req worker.GenSegme
return &worker.MasksResponse{Logits: "logits", Masks: "masks", Scores: "scores"}, nil
}
-func (a *stubAIWorker) LLM(ctx context.Context, req worker.GenLLMFormdataRequestBody) (interface{}, error) {
- return &worker.LLMResponse{Response: "response tokens", TokensUsed: 10}, nil
+func (a *stubAIWorker) LLM(ctx context.Context, req worker.GenLLMJSONRequestBody) (interface{}, error) {
+ var choices []worker.LLMChoice
+ choices = append(choices, worker.LLMChoice{Delta: &worker.LLMMessage{Content: "choice1", Role: "assistant"}, Index: 0})
+ tokensUsed := worker.LLMTokenUsage{PromptTokens: 40, CompletionTokens: 10, TotalTokens: 50}
+ return &worker.LLMResponse{Choices: choices, Created: 1, Model: "llm_model", Usage: tokensUsed}, nil
}
func (a *stubAIWorker) ImageToText(ctx context.Context, req worker.GenImageToTextMultipartRequestBody) (*worker.ImageToTextResponse, error) {
diff --git a/core/ai_worker.go b/core/ai_worker.go
index b38e951730..0e7aa4a424 100644
--- a/core/ai_worker.go
+++ b/core/ai_worker.go
@@ -787,14 +787,14 @@ func (orch *orchestrator) SegmentAnything2(ctx context.Context, requestID string
}
// Return type is LLMResponse, but a stream is available as well as chan(string)
-func (orch *orchestrator) LLM(ctx context.Context, requestID string, req worker.GenLLMFormdataRequestBody) (interface{}, error) {
+func (orch *orchestrator) LLM(ctx context.Context, requestID string, req worker.GenLLMJSONRequestBody) (interface{}, error) {
// local AIWorker processes job if combined orchestrator/ai worker
if orch.node.AIWorker != nil {
// no file response to save, response is text sent back to gateway
return orch.node.AIWorker.LLM(ctx, req)
}
- res, err := orch.node.AIWorkerManager.Process(ctx, requestID, "llm", *req.ModelId, "", AIJobRequestData{Request: req})
+ res, err := orch.node.AIWorkerManager.Process(ctx, requestID, "llm", *req.Model, "", AIJobRequestData{Request: req})
if err != nil {
return nil, err
}
@@ -805,7 +805,7 @@ func (orch *orchestrator) LLM(ctx context.Context, requestID string, req worker.
if err != nil {
clog.Errorf(ctx, "Error saving remote ai result err=%q", err)
if monitor.Enabled {
- monitor.AIResultSaveError(ctx, "llm", *req.ModelId, string(monitor.SegmentUploadErrorUnknown))
+ monitor.AIResultSaveError(ctx, "llm", *req.Model, string(monitor.SegmentUploadErrorUnknown))
}
return nil, err
@@ -1050,7 +1050,7 @@ func (n *LivepeerNode) SegmentAnything2(ctx context.Context, req worker.GenSegme
return n.AIWorker.SegmentAnything2(ctx, req)
}
-func (n *LivepeerNode) LLM(ctx context.Context, req worker.GenLLMFormdataRequestBody) (interface{}, error) {
+func (n *LivepeerNode) LLM(ctx context.Context, req worker.GenLLMJSONRequestBody) (interface{}, error) {
return n.AIWorker.LLM(ctx, req)
}
diff --git a/core/orchestrator.go b/core/orchestrator.go
index beb8b72954..089b66a4da 100644
--- a/core/orchestrator.go
+++ b/core/orchestrator.go
@@ -160,8 +160,6 @@ func (orch *orchestrator) ProcessPayment(ctx context.Context, payment net.Paymen
totalEV := big.NewRat(0, 1)
totalTickets := 0
totalWinningTickets := 0
- totalFaceValue := big.NewInt(0)
- totalWinProb := big.NewRat(0, 1)
var receiveErr error
@@ -196,8 +194,6 @@ func (orch *orchestrator) ProcessPayment(ctx context.Context, payment net.Paymen
ev := ticket.EV()
orch.node.Balances.Credit(sender, manifestID, ev)
totalEV.Add(totalEV, ev)
- totalFaceValue.Add(totalFaceValue, ticket.FaceValue)
- totalWinProb.Add(totalWinProb, ticket.WinProbRat())
totalTickets++
}
@@ -214,7 +210,7 @@ func (orch *orchestrator) ProcessPayment(ctx context.Context, payment net.Paymen
}
}
- clog.V(common.DEBUG).Infof(ctx, "Payment tickets processed sessionID=%v faceValue=%v winProb=%v ev=%v", manifestID, eth.FormatUnits(totalFaceValue, "ETH"), totalWinProb.FloatString(10), totalEV.FloatString(2))
+ clog.V(common.DEBUG).Infof(ctx, "Payment tickets processed sessionID=%v faceValue=%v winProb=%v totalTickets=%v totalEV=%v", manifestID, eth.FormatUnits(ticketParams.FaceValue, "ETH"), ticketParams.WinProbRat().FloatString(10), totalTickets, totalEV.FloatString(2))
if lpmon.Enabled {
lpmon.TicketValueRecv(ctx, sender.Hex(), totalEV)
diff --git a/doc/development.md b/doc/development.md
index 414d3150a6..61aee5677f 100644
--- a/doc/development.md
+++ b/doc/development.md
@@ -4,11 +4,383 @@
Some tests depend on access to the JSON-RPC API of an Ethereum node connected to mainnet or Rinkeby.
-- To run mainnet tests, the `MAINNET_ETH_URL` environment variable should be set. If the variable is not set, the mainnet tests will be skipped.
-- To run Rinkeby tests, the `RINKEBY_ETH_URL` environment variable should be set. If the variable is not set, the Rinkeby tests will b eskipped
+- To run mainnet tests, the `MAINNET_ETH_URL` environment variable should be set. If the variable is not set, the mainnet tests will be skipped.
+- To run Rinkeby tests, the `RINKEBY_ETH_URL` environment variable should be set. If the variable is not set, the Rinkeby tests will b eskipped
To run tests:
-```
+```bash
bash test.sh
-```
\ No newline at end of file
+```
+
+## Debugging
+
+To debug the code, it is recommended to use [Visual Studio Code](https://code.visualstudio.com/) with the [Go extension](https://marketplace.visualstudio.com/items?itemName=golang.Go). Example VSCode configuration files are provided below. For more information on how to interact with the [go-livepeer](https://github.com/livepeer/go-livepeer) software, please check out the [Livepeer Docs](https://docs.livepeer.org/orchestrators/guides/get-started). Please ensure that you followed the steps in the [Build from Source documentation](https://docs.livepeer.org/orchestrators/guides/install-go-livepeer#build-from-source) and have the right dependencies and environment variables set in your shell configuration file (e.g., `.bashrc`, `.zshrc`).
+
+### Configuration Files
+
+
+Launch.json (transcoding)
+
+
+```json
+{
+ "version": "0.2.0",
+ "configurations": [
+ {
+ "name": "Run CLI",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer_cli",
+ "console": "integratedTerminal",
+ "buildFlags": "-ldflags=-extldflags=-lm", // Fix missing symbol error.
+ "args": [
+ // "--http=7935", // Uncomment for Orch CLI.
+ "--http=5935" // Uncomment for Gateway CLI.
+ ]
+ },
+ {
+ "name": "Launch O/T (off-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-ldflags=-extldflags=-lm", // Fix missing symbol error.
+ "args": [
+ "-orchestrator",
+ "-transcoder",
+ "-serviceAddr=0.0.0.0:8935",
+ "-v=6",
+ "-nvidia=all"
+ ]
+ },
+ {
+ "name": "Launch O (off-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-ldflags=-extldflags=-lm", // Fix missing symbol error.
+ "args": [
+ "-orchestrator",
+ "-orchSecret=orchSecret",
+ "-serviceAddr=0.0.0.0:8935",
+ "-v=6"
+ ]
+ },
+ {
+ "name": "Launch T (off-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-ldflags=-extldflags=-lm", // Fix missing symbol error.
+ "args": [
+ "-transcoder",
+ "-orchSecret=orchSecret",
+ "-orchAddr=0.0.0.0:8935",
+ "-v=6",
+ "-nvidia=all"
+ ]
+ },
+ {
+ "name": "Launch G (off-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-ldflags=-extldflags=-lm", // Fix missing symbol error.
+ "args": [
+ "-gateway",
+ "-transcodingOptions=${env:HOME}/.lpData/offchain/transcodingOptions.json",
+ "-orchAddr=0.0.0.0:8935",
+ "-httpAddr=0.0.0.0:9935",
+ "-v",
+ "6"
+ ]
+ },
+ {
+ "name": "Launch O/T (on-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-tags=mainnet,experimental -ldflags=-extldflags=-lm", // Fix missing symbol error and enable mainnet.
+ "args": [
+ "-orchestrator",
+ "-transcoder",
+ "-serviceAddr=0.0.0.0:8935",
+ "-v=6",
+ "-nvidia=all",
+ "-network=arbitrum-one-mainnet",
+ "-ethUrl=https://arb1.arbitrum.io/rpc",
+ "-ethPassword=",
+ "-ethAcctAddr=",
+ "-ethOrchAddr=",
+ "-pricePerUnit="
+ ]
+ },
+ {
+ "name": "Launch O (on-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-tags=mainnet,experimental -ldflags=-extldflags=-lm", // Fix missing symbol error and enable mainnet.
+ "args": [
+ "-orchestrator",
+ "-orchSecret=orchSecret",
+ "-serviceAddr=0.0.0.0:8935",
+ "-v=6",
+ "-network=arbitrum-one-mainnet",
+ "-ethUrl=https://arb1.arbitrum.io/rpc",
+ "-ethPassword=",
+ "-ethAcctAddr=",
+ "-ethOrchAddr=",
+ "-pricePerUnit="
+ ]
+ },
+ {
+ "name": "Launch T (on-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-tags=mainnet,experimental -ldflags=-extldflags=-lm", // Fix missing symbol error and enable mainnet.
+ "args": [
+ "-transcoder",
+ "-orchSecret=orchSecret",
+ "-orchAddr=0.0.0.0:8935",
+ "-v=6",
+ "-nvidia=all"
+ ]
+ },
+ {
+ "name": "Launch G (on-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-tags=mainnet,experimental -ldflags=-extldflags=-lm", // Fix missing symbol error and enable mainnet.
+ "args": [
+ "-gateway",
+ "-transcodingOptions=${env:HOME}/.lpData/offchain/transcodingOptions.json",
+ "-orchAddr=0.0.0.0:8935",
+ "-httpAddr=0.0.0.0:9935",
+ "-v",
+ "6",
+ "-httpIngest",
+ "-network=arbitrum-one-mainnet",
+ "-ethUrl=https://arb1.arbitrum.io/rpc",
+ "-ethPassword=",
+ "-ethAcctAddr="
+ ]
+ }
+ ],
+ "compounds": [
+ {
+ "name": "Launch full stack (off-chain)",
+ "configurations": ["Launch O/T (off-chain)", "Launch G (off-chain)"],
+ "stopAll": true
+ },
+ {
+ "name": "Launch full stack (on-chain)",
+ "configurations": ["Launch O/T (on-chain)", "Launch G (on-chain)"],
+ "stopAll": true
+ }
+ ]
+}
+```
+
+
+
+
+Launch.json (AI)
+
+
+```json
+{
+ "version": "0.2.0",
+ "configurations": [
+ {
+ "name": "Run AI CLI",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer_cli",
+ "console": "integratedTerminal",
+ "buildFlags": "-ldflags=-extldflags=-lm", // Fix missing symbol error.
+ "args": [
+ // "--http=7935", // Uncomment for Orch CLI.
+ "--http=5935" // Uncomment for Gateway CLI.
+ ]
+ },
+ {
+ "name": "Launch AI O/W (off-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-ldflags=-extldflags=-lm", // Fix missing symbol error.
+ "args": [
+ "-orchestrator",
+ "-aiWorker",
+ "-serviceAddr=0.0.0.0:8935",
+ "-v=6",
+ "-nvidia=all",
+ "-aiModels=${env:HOME}/.lpData/cfg/aiModels.json",
+ "-aiModelsDir=${env:HOME}/.lpData/models"
+ ]
+ },
+ {
+ "name": "Launch AI O (off-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-ldflags='-extldflags=-lm -X github.com/livepeer/go-livepeer/core.LivepeerVersion=0.0.0'", // Fix missing symbol and version mismatch errors.
+ "args": [
+ "-orchestrator",
+ "-orchSecret=orchSecret",
+ "-serviceAddr=0.0.0.0:8935",
+ "-v=6"
+ ]
+ },
+ {
+ "name": "Launch AI W (off-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-ldflags='-extldflags=-lm -X github.com/livepeer/go-livepeer/core.LivepeerVersion=0.0.0'", // Fix missing symbol and version mismatch errors.
+ "args": [
+ "-aiWorker",
+ "-orchSecret=orchSecret",
+ "-orchAddr=0.0.0.0:8935",
+ "-v=6",
+ "-nvidia=all",
+ "-aiModels=${env:HOME}/.lpData/cfg/aiModels.json",
+ "-aiModelsDir=${env:HOME}/.lpData/models"
+ ]
+ },
+ {
+ "name": "Launch AI G (off-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-ldflags=-extldflags=-lm", // Fix missing symbol error.
+ "args": [
+ "-gateway",
+ "-datadir=${env:HOME}/.lpData2",
+ "-orchAddr=0.0.0.0:8935",
+ "-httpAddr=0.0.0.0:9935",
+ "-v",
+ "6",
+ "-httpIngest"
+ ]
+ },
+ {
+ "name": "Launch AI O/W (on-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-tags=mainnet,experimental -ldflags=-extldflags=-lm", // Fix missing symbol error and enable mainnet.
+ "args": [
+ "-orchestrator",
+ "-aiWorker",
+ "-aiServiceRegistry",
+ "-serviceAddr=0.0.0.0:8935",
+ "-v=6",
+ "-nvidia=all",
+ "-aiModels=${env:HOME}/.lpData/cfg/aiModels.json",
+ "-aiModelsDir=${env:HOME}/.lpData/models",
+ "-network=arbitrum-one-mainnet",
+ "-ethUrl=https://arb1.arbitrum.io/rpc",
+ "-ethPassword=",
+ "-ethAcctAddr=",
+ "-ethOrchAddr="
+ ]
+ },
+ {
+ "name": "Launch AI O (on-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-tags=mainnet,experimental -ldflags='-extldflags=-lm -X github.com/livepeer/go-livepeer/core.LivepeerVersion=0.0.0'", // Fix missing symbol error, version mismatch error and enable mainnet.
+ "args": [
+ "-orchestrator",
+ "-orchSecret=orchSecret",
+ "-aiServiceRegistry",
+ "-serviceAddr=0.0.0.0:8935",
+ "-v=6",
+ "-network=arbitrum-one-mainnet",
+ "-ethUrl=https://arb1.arbitrum.io/rpc",
+ "-ethPassword=",
+ "-ethAcctAddr=",
+ "-ethOrchAddr=",
+ "-pricePerUnit=0"
+ ]
+ },
+ {
+ "name": "Launch AI W (on-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-tags=mainnet,experimental -ldflags='-extldflags=-lm -X github.com/livepeer/go-livepeer/core.LivepeerVersion=0.0.0'", // Fix missing symbol error, version mismatch error and enable mainnet.
+ "args": [
+ "-aiWorker",
+ "-orchSecret=orchSecret",
+ "-orchAddr=0.0.0.0:8935",
+ "-v=6",
+ "-nvidia=all",
+ "-aiModels=${env:HOME}/.lpData/cfg/aiModels.json",
+ "-aiModelsDir=${env:HOME}/.lpData/models"
+ ]
+ },
+ {
+ "name": "Launch AI G (on-chain)",
+ "type": "go",
+ "request": "launch",
+ "mode": "debug",
+ "program": "cmd/livepeer",
+ "buildFlags": "-tags=mainnet,experimental -ldflags=-extldflags=-lm", // Fix missing symbol error and enable mainnet.
+ "args": [
+ "-gateway",
+ "-aiServiceRegistry",
+ "-datadir=${env:HOME}/.lpData2",
+ "-orchAddr=0.0.0.0:8935",
+ "-httpAddr=0.0.0.0:9935",
+ "-v",
+ "6",
+ "-httpIngest",
+ "-network=arbitrum-one-mainnet",
+ "-ethUrl=https://arb1.arbitrum.io/rpc",
+ "-ethPassword=",
+ "-ethAcctAddr="
+ ]
+ }
+ ],
+ "compounds": [
+ {
+ "name": "Launch full AI stack (off-chain)",
+ "configurations": [
+ "Launch AI O/W (off-chain)",
+ "Launch AI G (off-chain)"
+ ],
+ "stopAll": true
+ },
+ {
+ "name": "Launch full AI stack (on-chain)",
+ "configurations": ["Launch AI O/W (on-chain)", "Launch AI G (on-chain)"],
+ "stopAll": true
+ }
+ ]
+}
+```
+
+
diff --git a/go.mod b/go.mod
index 2b7806e0c0..0e82044924 100644
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@ require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
github.com/Masterminds/semver/v3 v3.2.1
github.com/cenkalti/backoff v2.2.1+incompatible
+ github.com/dustin/go-humanize v1.0.1
github.com/ethereum/go-ethereum v1.13.5
github.com/getkin/kin-openapi v0.128.0
github.com/golang/glog v1.2.1
@@ -14,10 +15,10 @@ require (
github.com/google/uuid v1.6.0
github.com/jaypipes/ghw v0.10.0
github.com/jaypipes/pcidb v1.0.0
- github.com/livepeer/ai-worker v0.12.7-0.20241219141308-c19289d128a3
+ github.com/livepeer/ai-worker v0.13.3
github.com/livepeer/go-tools v0.3.6-0.20240130205227-92479de8531b
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18
- github.com/livepeer/lpms v0.0.0-20241203012405-fc96cadb6393
+ github.com/livepeer/lpms v0.0.0-20250118014304-79e6dcf08057
github.com/livepeer/m3u8 v0.11.1
github.com/mattn/go-sqlite3 v1.14.18
github.com/oapi-codegen/nethttp-middleware v1.0.1
@@ -84,7 +85,6 @@ require (
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127 // indirect
- github.com/dustin/go-humanize v1.0.1 // indirect
github.com/ethereum/c-kzg-4844 v0.4.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
diff --git a/go.sum b/go.sum
index 14a5b8a051..1f5416e8f8 100644
--- a/go.sum
+++ b/go.sum
@@ -605,16 +605,16 @@ github.com/libp2p/go-netroute v0.2.0 h1:0FpsbsvuSnAhXFnCY0VLFbJOzaK0VnP0r1QT/o4n
github.com/libp2p/go-netroute v0.2.0/go.mod h1:Vio7LTzZ+6hoT4CMZi5/6CpY3Snzh2vgZhWgxMNwlQI=
github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo=
github.com/libp2p/go-openssl v0.1.0/go.mod h1:OiOxwPpL3n4xlenjx2h7AwSGaFSC/KZvf6gNdOBQMtc=
-github.com/livepeer/ai-worker v0.12.7-0.20241219141308-c19289d128a3 h1:uutmGZq2YdIKnKhn6QGHtGnKfBGYAUMMOr44LXYs23w=
-github.com/livepeer/ai-worker v0.12.7-0.20241219141308-c19289d128a3/go.mod h1:ZibfmZQQh6jFvnPLHeIPInghfX5ln+JpN845nS3GuyM=
+github.com/livepeer/ai-worker v0.13.3 h1:vcKUK56GRwiHIhz0UbNeKffFBAPtJPWuZNmcJrhAV8o=
+github.com/livepeer/ai-worker v0.13.3/go.mod h1:rbcoIzQewbf5rvosCvG2M9DLg/ZMl7yMsaSce3svXFA=
github.com/livepeer/go-tools v0.3.6-0.20240130205227-92479de8531b h1:VQcnrqtCA2UROp7q8ljkh2XA/u0KRgVv0S1xoUvOweE=
github.com/livepeer/go-tools v0.3.6-0.20240130205227-92479de8531b/go.mod h1:hwJ5DKhl+pTanFWl+EUpw1H7ukPO/H+MFpgA7jjshzw=
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded h1:ZQlvR5RB4nfT+cOQee+WqmaDOgGtP2oDMhcVvR4L0yA=
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw=
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18 h1:4oH3NqV0NvcdS44Ld3zK2tO8IUiNozIggm74yobQeZg=
github.com/livepeer/livepeer-data v0.7.5-0.20231004073737-06f1f383fb18/go.mod h1:Jpf4jHK+fbWioBHRDRM1WadNT1qmY27g2YicTdO0Rtc=
-github.com/livepeer/lpms v0.0.0-20241203012405-fc96cadb6393 h1:aoDFI66Kj1pQueka93PLY59WlnI7jy4cJUfPxteIgCE=
-github.com/livepeer/lpms v0.0.0-20241203012405-fc96cadb6393/go.mod h1:z5ROP1l5OzAKSoqVRLc34MjUdueil6wHSecQYV7llIw=
+github.com/livepeer/lpms v0.0.0-20250118014304-79e6dcf08057 h1:ciq0bEaG+2LlOTYMd01s8KffQDoqq4QlSsLhqrWYuxg=
+github.com/livepeer/lpms v0.0.0-20250118014304-79e6dcf08057/go.mod h1:z5ROP1l5OzAKSoqVRLc34MjUdueil6wHSecQYV7llIw=
github.com/livepeer/m3u8 v0.11.1 h1:VkUJzfNTyjy9mqsgp5JPvouwna8wGZMvd/gAfT5FinU=
github.com/livepeer/m3u8 v0.11.1/go.mod h1:IUqAtwWPAG2CblfQa4SVzTQoDcEMPyfNOaBSxqHMS04=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
diff --git a/media/rtmp2segment.go b/media/rtmp2segment.go
index 4cb3310cf1..60783844e2 100644
--- a/media/rtmp2segment.go
+++ b/media/rtmp2segment.go
@@ -22,7 +22,12 @@ import (
"golang.org/x/sys/unix"
)
-var waitTimeout = 20 * time.Second
+const (
+ waitTimeout = 20 * time.Second
+ fileCleanupInterval = time.Hour
+ fileCleanupMaxAge = 4 * time.Hour
+ outFileSuffix = ".ts"
+)
type MediaSegmenter struct {
Workdir string
@@ -30,12 +35,14 @@ type MediaSegmenter struct {
}
func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmentHandler SegmentHandler) {
- outFilePattern := filepath.Join(ms.Workdir, randomString()+"-%d.ts")
+ outFilePattern := filepath.Join(ms.Workdir, randomString()+"-%d"+outFileSuffix)
completionSignal := make(chan bool, 1)
+ procCtx, procCancel := context.WithCancel(context.Background()) // parent ctx is a short lived http request
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
+ defer procCancel()
processSegments(ctx, segmentHandler, outFilePattern, completionSignal)
}()
@@ -49,7 +56,7 @@ func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmen
clog.Errorf(ctx, "Stopping segmentation, input stream does not exist. in=%s err=%s", in, err)
break
}
- cmd := exec.Command("ffmpeg",
+ cmd := exec.CommandContext(procCtx, "ffmpeg",
"-i", in,
"-c:a", "copy",
"-c:v", "copy",
@@ -58,7 +65,7 @@ func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmen
)
output, err := cmd.CombinedOutput()
if err != nil {
- clog.Errorf(ctx, "Error sending RTMP out process: %v", err)
+ clog.Errorf(ctx, "Error receiving RTMP: %v", err)
clog.Infof(ctx, "Process output: %s", output)
return
}
@@ -185,12 +192,12 @@ func processSegments(ctx context.Context, segmentHandler SegmentHandler, outFile
defer mu.Unlock()
if currentSegment != nil {
// Trigger EOF on the current segment by closing the file
- slog.Info("Completion signal received. Closing current segment to trigger EOF.")
+ clog.Infof(ctx, "Completion signal received. Closing current segment to trigger EOF.")
currentSegment.Close()
}
isComplete = true
pipeCompletion <- true
- slog.Info("Got completion signal")
+ clog.Infof(ctx, "Got completion signal")
}()
pipeNum := 0
@@ -207,7 +214,7 @@ func processSegments(ctx context.Context, segmentHandler SegmentHandler, outFile
// Blocks if no writer is available so do some tricks to it
file, err := openNonBlockingWithRetry(pipeName, waitTimeout, pipeCompletion)
if err != nil {
- slog.Error("Error opening pipe", "pipeName", pipeName, "err", err)
+ clog.Errorf(ctx, "Error opening pipe pipeName=%s err=%s", pipeName, err)
cleanUpPipe(pipeName)
cleanUpPipe(nextPipeName)
break
@@ -255,3 +262,44 @@ func randomString() string {
}
return strings.TrimRight(base32.StdEncoding.EncodeToString(b), "=")
}
+
+// StartFileCleanup starts a goroutine to periodically remove any old temporary files accidentally left behind
+func StartFileCleanup(ctx context.Context, workDir string) {
+ go func() {
+ ticker := time.NewTicker(fileCleanupInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ if err := cleanUpLocalTmpFiles(ctx, workDir, "*"+outFileSuffix, fileCleanupMaxAge); err != nil {
+ clog.Errorf(ctx, "Error cleaning up segment files: %v", err)
+ }
+ }
+ }
+ }()
+}
+
+func cleanUpLocalTmpFiles(ctx context.Context, dir string, filenamePattern string, maxAge time.Duration) error {
+ filesRemoved := 0
+ err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return nil
+ }
+ if info.Mode().IsRegular() {
+ if match, _ := filepath.Match(filenamePattern, info.Name()); match {
+ if time.Since(info.ModTime()) > maxAge {
+ err = os.Remove(path)
+ if err != nil {
+ return fmt.Errorf("error removing file %s: %w", path, err)
+ }
+ filesRemoved++
+ }
+ }
+ }
+ return nil
+ })
+ clog.Infof(ctx, "Segment file cleanup removed %d files", filesRemoved)
+ return err
+}
diff --git a/media/rtmp2segment_windows.go b/media/rtmp2segment_windows.go
index 444943a990..b9d03aca4a 100644
--- a/media/rtmp2segment_windows.go
+++ b/media/rtmp2segment_windows.go
@@ -12,3 +12,7 @@ type MediaSegmenter struct {
func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmentHandler SegmentHandler) {
// Not supported for Windows
}
+
+func StartFileCleanup(ctx context.Context, workDir string) {
+ // Not supported for Windows
+}
diff --git a/server/ai_http.go b/server/ai_http.go
index 020c6dc0c6..f8e96f65a4 100644
--- a/server/ai_http.go
+++ b/server/ai_http.go
@@ -66,7 +66,7 @@ func startAIServer(lp *lphttp) error {
lp.transRPC.Handle("/image-to-video", oapiReqValidator(aiHttpHandle(lp, multipartDecoder[worker.GenImageToVideoMultipartRequestBody])))
lp.transRPC.Handle("/upscale", oapiReqValidator(aiHttpHandle(lp, multipartDecoder[worker.GenUpscaleMultipartRequestBody])))
lp.transRPC.Handle("/audio-to-text", oapiReqValidator(aiHttpHandle(lp, multipartDecoder[worker.GenAudioToTextMultipartRequestBody])))
- lp.transRPC.Handle("/llm", oapiReqValidator(aiHttpHandle(lp, multipartDecoder[worker.GenLLMFormdataRequestBody])))
+ lp.transRPC.Handle("/llm", oapiReqValidator(aiHttpHandle(lp, jsonDecoder[worker.GenLLMJSONRequestBody])))
lp.transRPC.Handle("/segment-anything-2", oapiReqValidator(aiHttpHandle(lp, multipartDecoder[worker.GenSegmentAnything2MultipartRequestBody])))
lp.transRPC.Handle("/image-to-text", oapiReqValidator(aiHttpHandle(lp, multipartDecoder[worker.GenImageToTextMultipartRequestBody])))
lp.transRPC.Handle("/text-to-speech", oapiReqValidator(aiHttpHandle(lp, jsonDecoder[worker.GenTextToSpeechJSONRequestBody])))
@@ -404,10 +404,10 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
return
}
outPixels *= 1000 // Convert to milliseconds
- case worker.GenLLMFormdataRequestBody:
+ case worker.GenLLMJSONRequestBody:
pipeline = "llm"
cap = core.Capability_LLM
- modelID = *v.ModelId
+ modelID = *v.Model
submitFn = func(ctx context.Context) (interface{}, error) {
return orch.LLM(ctx, requestID, v)
}
@@ -585,7 +585,7 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
}
// Check if the response is a streaming response
- if streamChan, ok := resp.(<-chan worker.LlmStreamChunk); ok {
+ if streamChan, ok := resp.(<-chan *worker.LLMResponse); ok {
glog.Infof("Streaming response for request id=%v", requestID)
// Set headers for SSE
@@ -609,7 +609,7 @@ func handleAIRequest(ctx context.Context, w http.ResponseWriter, r *http.Request
fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush()
- if chunk.Done {
+ if chunk.Choices[0].FinishReason != nil && *chunk.Choices[0].FinishReason != "" {
break
}
}
@@ -682,8 +682,8 @@ func (h *lphttp) AIResults() http.Handler {
case "text/event-stream":
resultType = "streaming"
glog.Infof("Received %s response from remote worker=%s taskId=%d", resultType, r.RemoteAddr, tid)
- resChan := make(chan worker.LlmStreamChunk, 100)
- workerResult.Results = (<-chan worker.LlmStreamChunk)(resChan)
+ resChan := make(chan *worker.LLMResponse, 100)
+ workerResult.Results = (<-chan *worker.LLMResponse)(resChan)
defer r.Body.Close()
defer close(resChan)
@@ -702,12 +702,12 @@ func (h *lphttp) AIResults() http.Handler {
line := scanner.Text()
if strings.HasPrefix(line, "data: ") {
data := strings.TrimPrefix(line, "data: ")
- var chunk worker.LlmStreamChunk
+ var chunk worker.LLMResponse
if err := json.Unmarshal([]byte(data), &chunk); err != nil {
clog.Errorf(ctx, "Error unmarshaling stream data: %v", err)
continue
}
- resChan <- chunk
+ resChan <- &chunk
}
}
}
diff --git a/server/ai_mediaserver.go b/server/ai_mediaserver.go
index 16e45b59d9..eceed362ee 100644
--- a/server/ai_mediaserver.go
+++ b/server/ai_mediaserver.go
@@ -10,11 +10,13 @@ import (
"log/slog"
"net/http"
"net/url"
+ "os/exec"
"strings"
"time"
"github.com/livepeer/go-livepeer/monitor"
+ "github.com/cenkalti/backoff"
"github.com/getkin/kin-openapi/openapi3filter"
"github.com/livepeer/ai-worker/worker"
"github.com/livepeer/go-livepeer/clog"
@@ -51,7 +53,7 @@ const (
Complete ImageToVideoStatus = "complete"
)
-func startAIMediaServer(ls *LivepeerServer) error {
+func startAIMediaServer(ctx context.Context, ls *LivepeerServer) error {
swagger, err := worker.GetSwagger()
if err != nil {
return err
@@ -86,10 +88,12 @@ func startAIMediaServer(ls *LivepeerServer) error {
ls.HTTPMux.Handle("/live/video-to-video/{stream}/start", ls.StartLiveVideo())
ls.HTTPMux.Handle("/live/video-to-video/{prefix}/{stream}/start", ls.StartLiveVideo())
ls.HTTPMux.Handle("/live/video-to-video/{stream}/update", ls.UpdateLiveVideo())
+ ls.HTTPMux.Handle("/live/video-to-video/smoketest", ls.SmokeTestLiveVideo())
// Stream status
ls.HTTPMux.Handle("/live/video-to-video/{streamId}/status", ls.GetLiveVideoToVideoStatus())
+ media.StartFileCleanup(ctx, ls.LivepeerNode.WorkDir)
return nil
}
@@ -255,20 +259,19 @@ func (ls *LivepeerServer) LLM() http.Handler {
requestID := string(core.RandomManifestID())
ctx = clog.AddVal(ctx, "request_id", requestID)
- var req worker.GenLLMFormdataRequestBody
-
- multiRdr, err := r.MultipartReader()
- if err != nil {
+ var req worker.GenLLMJSONRequestBody
+ if err := jsonDecoder(&req, r); err != nil {
respondJsonError(ctx, w, err, http.StatusBadRequest)
return
}
- if err := runtime.BindMultipart(&req, *multiRdr); err != nil {
- respondJsonError(ctx, w, err, http.StatusBadRequest)
+ //check required fields
+ if req.Model == nil || req.Messages == nil || req.Stream == nil || req.MaxTokens == nil || len(req.Messages) == 0 {
+ respondJsonError(ctx, w, errors.New("missing required fields"), http.StatusBadRequest)
return
}
- clog.V(common.VERBOSE).Infof(ctx, "Received LLM request prompt=%v model_id=%v stream=%v", req.Prompt, *req.ModelId, *req.Stream)
+ clog.V(common.VERBOSE).Infof(ctx, "Received LLM request model_id=%v stream=%v", *req.Model, *req.Stream)
params := aiRequestParams{
node: ls.LivepeerNode,
@@ -289,9 +292,9 @@ func (ls *LivepeerServer) LLM() http.Handler {
}
took := time.Since(start)
- clog.V(common.VERBOSE).Infof(ctx, "Processed LLM request prompt=%v model_id=%v took=%v", req.Prompt, *req.ModelId, took)
+ clog.V(common.VERBOSE).Infof(ctx, "Processed LLM request model_id=%v took=%v", *req.Model, took)
- if streamChan, ok := resp.(chan worker.LlmStreamChunk); ok {
+ if streamChan, ok := resp.(chan *worker.LLMResponse); ok {
// Handle streaming response (SSE)
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
@@ -301,7 +304,7 @@ func (ls *LivepeerServer) LLM() http.Handler {
data, _ := json.Marshal(chunk)
fmt.Fprintf(w, "data: %s\n\n", data)
w.(http.Flusher).Flush()
- if chunk.Done {
+ if chunk.Choices[0].FinishReason != nil && *chunk.Choices[0].FinishReason != "" {
break
}
}
@@ -652,3 +655,86 @@ func (ls *LivepeerServer) cleanupLive(stream string) {
pub.StopControl()
}
}
+
+const defaultSmokeTestDuration = 5 * time.Minute
+const maxSmokeTestDuration = 60 * time.Minute
+
+type smokeTestRequest struct {
+ StreamURL string `json:"stream_url"`
+ DurationSecs int `json:"duration_secs"`
+}
+
+func (ls *LivepeerServer) SmokeTestLiveVideo() http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodPut {
+ http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ var req smokeTestRequest
+ defer r.Body.Close()
+ d := json.NewDecoder(r.Body)
+ err := d.Decode(&req)
+ if err != nil {
+ http.Error(w, "Failed to parse request body", http.StatusBadRequest)
+ return
+ }
+ if req.StreamURL == "" {
+ http.Error(w, "Missing stream url", http.StatusBadRequest)
+ return
+ }
+
+ ingestURL := req.StreamURL
+ duration := defaultSmokeTestDuration
+ if req.DurationSecs != 0 {
+ if float64(req.DurationSecs) > maxSmokeTestDuration.Seconds() {
+ http.Error(w, "Request exceeds max duration "+maxSmokeTestDuration.String(), http.StatusBadRequest)
+ return
+ }
+ duration = time.Duration(req.DurationSecs) * time.Second
+ }
+ // Use an FFMPEG test card
+ var params = []string{
+ "-re",
+ "-f", "lavfi",
+ "-i", "testsrc=size=1920x1080:rate=30,format=yuv420p",
+ "-f", "lavfi",
+ "-i", "sine",
+ "-c:v", "libx264",
+ "-b:v", "1000k",
+ "-x264-params", "keyint=60",
+ "-c:a", "aac",
+ "-to", fmt.Sprintf("%f", duration.Seconds()),
+ "-f", "flv",
+ ingestURL,
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), duration+time.Minute)
+ cmd := exec.CommandContext(ctx, "ffmpeg", params...)
+ var outputBuf bytes.Buffer
+ cmd.Stdout = &outputBuf
+ cmd.Stderr = &outputBuf
+
+ clog.Infof(ctx, "Starting smoke test for %s duration %s", ingestURL, duration)
+
+ if err := cmd.Start(); err != nil {
+ cancel()
+ clog.Errorf(ctx, "failed to start ffmpeg. Error: %s\nCommand: ffmpeg %s", err, strings.Join(params, " "))
+ http.Error(w, "Failed to start stream", http.StatusInternalServerError)
+ return
+ }
+
+ go func() {
+ defer cancel()
+ _ = backoff.Retry(func() error {
+ if state, err := cmd.Process.Wait(); err != nil || state.ExitCode() != 0 {
+ clog.Errorf(ctx, "failed to run ffmpeg. Exit Code: %d, Error: %s\nCommand: ffmpeg %s\n", state.ExitCode(), err, strings.Join(params, " "))
+ clog.Errorf(ctx, "ffmpeg output:\n%s\n", outputBuf.String())
+ return fmt.Errorf("ffmpeg failed")
+ }
+ clog.Infof(ctx, "Smoke test finished successfully for %s", ingestURL)
+ return nil
+ }, backoff.WithMaxRetries(backoff.NewConstantBackOff(30*time.Second), 3))
+ }()
+ })
+}
diff --git a/server/ai_process.go b/server/ai_process.go
index bb2839de8d..cd38feb2cc 100644
--- a/server/ai_process.go
+++ b/server/ai_process.go
@@ -1107,14 +1107,14 @@ func CalculateLLMLatencyScore(took time.Duration, tokensUsed int) float64 {
return took.Seconds() / float64(tokensUsed)
}
-func processLLM(ctx context.Context, params aiRequestParams, req worker.GenLLMFormdataRequestBody) (interface{}, error) {
+func processLLM(ctx context.Context, params aiRequestParams, req worker.GenLLMJSONRequestBody) (interface{}, error) {
resp, err := processAIRequest(ctx, params, req)
if err != nil {
return nil, err
}
if req.Stream != nil && *req.Stream {
- streamChan, ok := resp.(chan worker.LlmStreamChunk)
+ streamChan, ok := resp.(chan *worker.LLMResponse)
if !ok {
return nil, errors.New("unexpected response type for streaming request")
}
@@ -1129,20 +1129,12 @@ func processLLM(ctx context.Context, params aiRequestParams, req worker.GenLLMFo
return llmResp, nil
}
-func submitLLM(ctx context.Context, params aiRequestParams, sess *AISession, req worker.GenLLMFormdataRequestBody) (interface{}, error) {
- var buf bytes.Buffer
- mw, err := worker.NewLLMMultipartWriter(&buf, req)
- if err != nil {
- if monitor.Enabled {
- monitor.AIRequestError(err.Error(), "llm", *req.ModelId, nil)
- }
- return nil, err
- }
+func submitLLM(ctx context.Context, params aiRequestParams, sess *AISession, req worker.GenLLMJSONRequestBody) (interface{}, error) {
client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient))
if err != nil {
if monitor.Enabled {
- monitor.AIRequestError(err.Error(), "llm", *req.ModelId, sess.OrchestratorInfo)
+ monitor.AIRequestError(err.Error(), "llm", *req.Model, sess.OrchestratorInfo)
}
return nil, err
}
@@ -1155,17 +1147,17 @@ func submitLLM(ctx context.Context, params aiRequestParams, sess *AISession, req
setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, int64(*req.MaxTokens))
if err != nil {
if monitor.Enabled {
- monitor.AIRequestError(err.Error(), "llm", *req.ModelId, sess.OrchestratorInfo)
+ monitor.AIRequestError(err.Error(), "llm", *req.Model, sess.OrchestratorInfo)
}
return nil, err
}
defer completeBalanceUpdate(sess.BroadcastSession, balUpdate)
start := time.Now()
- resp, err := client.GenLLMWithBody(ctx, mw.FormDataContentType(), &buf, setHeaders)
+ resp, err := client.GenLLM(ctx, req, setHeaders)
if err != nil {
if monitor.Enabled {
- monitor.AIRequestError(err.Error(), "llm", *req.ModelId, sess.OrchestratorInfo)
+ monitor.AIRequestError(err.Error(), "llm", *req.Model, sess.OrchestratorInfo)
}
return nil, err
}
@@ -1175,6 +1167,12 @@ func submitLLM(ctx context.Context, params aiRequestParams, sess *AISession, req
return nil, fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body))
}
+ // We treat a response as "receiving change" where the change is the difference between the credit and debit for the update
+ // TODO: move to after receive stream response in handleSSEStream and handleNonStreamingResponse to count input tokens
+ if balUpdate != nil {
+ balUpdate.Status = ReceivedChange
+ }
+
if req.Stream != nil && *req.Stream {
return handleSSEStream(ctx, resp.Body, sess, req, start)
}
@@ -1182,28 +1180,29 @@ func submitLLM(ctx context.Context, params aiRequestParams, sess *AISession, req
return handleNonStreamingResponse(ctx, resp.Body, sess, req, start)
}
-func handleSSEStream(ctx context.Context, body io.ReadCloser, sess *AISession, req worker.GenLLMFormdataRequestBody, start time.Time) (chan worker.LlmStreamChunk, error) {
- streamChan := make(chan worker.LlmStreamChunk, 100)
+func handleSSEStream(ctx context.Context, body io.ReadCloser, sess *AISession, req worker.GenLLMJSONRequestBody, start time.Time) (chan *worker.LLMResponse, error) {
+ streamChan := make(chan *worker.LLMResponse, 100)
go func() {
defer close(streamChan)
defer body.Close()
scanner := bufio.NewScanner(body)
- var totalTokens int
+ var totalTokens worker.LLMTokenUsage
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "data: ") {
data := strings.TrimPrefix(line, "data: ")
- if data == "[DONE]" {
- streamChan <- worker.LlmStreamChunk{Done: true, TokensUsed: totalTokens}
- break
- }
- var chunk worker.LlmStreamChunk
+
+ var chunk worker.LLMResponse
if err := json.Unmarshal([]byte(data), &chunk); err != nil {
clog.Errorf(ctx, "Error unmarshaling SSE data: %v", err)
continue
}
- totalTokens += chunk.TokensUsed
- streamChan <- chunk
+ totalTokens = chunk.Usage
+ streamChan <- &chunk
+ //check if stream is finished
+ if chunk.Choices[0].FinishReason != nil && *chunk.Choices[0].FinishReason != "" {
+ break
+ }
}
}
if err := scanner.Err(); err != nil {
@@ -1211,26 +1210,26 @@ func handleSSEStream(ctx context.Context, body io.ReadCloser, sess *AISession, r
}
took := time.Since(start)
- sess.LatencyScore = CalculateLLMLatencyScore(took, totalTokens)
+ sess.LatencyScore = CalculateLLMLatencyScore(took, totalTokens.TotalTokens)
if monitor.Enabled {
var pricePerAIUnit float64
if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil && priceInfo.PixelsPerUnit != 0 {
pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit)
}
- monitor.AIRequestFinished(ctx, "llm", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)
+ monitor.AIRequestFinished(ctx, "llm", *req.Model, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)
}
}()
return streamChan, nil
}
-func handleNonStreamingResponse(ctx context.Context, body io.ReadCloser, sess *AISession, req worker.GenLLMFormdataRequestBody, start time.Time) (*worker.LLMResponse, error) {
+func handleNonStreamingResponse(ctx context.Context, body io.ReadCloser, sess *AISession, req worker.GenLLMJSONRequestBody, start time.Time) (*worker.LLMResponse, error) {
data, err := io.ReadAll(body)
defer body.Close()
if err != nil {
if monitor.Enabled {
- monitor.AIRequestError(err.Error(), "llm", *req.ModelId, sess.OrchestratorInfo)
+ monitor.AIRequestError(err.Error(), "llm", *req.Model, sess.OrchestratorInfo)
}
return nil, err
}
@@ -1238,20 +1237,21 @@ func handleNonStreamingResponse(ctx context.Context, body io.ReadCloser, sess *A
var res worker.LLMResponse
if err := json.Unmarshal(data, &res); err != nil {
if monitor.Enabled {
- monitor.AIRequestError(err.Error(), "llm", *req.ModelId, sess.OrchestratorInfo)
+ monitor.AIRequestError(err.Error(), "llm", *req.Model, sess.OrchestratorInfo)
}
return nil, err
}
took := time.Since(start)
- sess.LatencyScore = CalculateLLMLatencyScore(took, res.TokensUsed)
+
+ sess.LatencyScore = CalculateLLMLatencyScore(took, res.Usage.TotalTokens)
if monitor.Enabled {
var pricePerAIUnit float64
if priceInfo := sess.OrchestratorInfo.GetPriceInfo(); priceInfo != nil && priceInfo.PixelsPerUnit != 0 {
pricePerAIUnit = float64(priceInfo.PricePerUnit) / float64(priceInfo.PixelsPerUnit)
}
- monitor.AIRequestFinished(ctx, "llm", *req.ModelId, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)
+ monitor.AIRequestFinished(ctx, "llm", *req.Model, monitor.AIJobInfo{LatencyScore: sess.LatencyScore, PricePerUnit: pricePerAIUnit}, sess.OrchestratorInfo)
}
return &res, nil
@@ -1410,16 +1410,16 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface
submitFn = func(ctx context.Context, params aiRequestParams, sess *AISession) (interface{}, error) {
return submitAudioToText(ctx, params, sess, v)
}
- case worker.GenLLMFormdataRequestBody:
+ case worker.GenLLMJSONRequestBody:
cap = core.Capability_LLM
modelID = defaultLLMModelID
- if v.ModelId != nil {
- modelID = *v.ModelId
+ if v.Model != nil {
+ modelID = *v.Model
}
submitFn = func(ctx context.Context, params aiRequestParams, sess *AISession) (interface{}, error) {
return submitLLM(ctx, params, sess, v)
}
- ctx = clog.AddVal(ctx, "prompt", v.Prompt)
+
case worker.GenSegmentAnything2MultipartRequestBody:
cap = core.Capability_SegmentAnything2
modelID = defaultSegmentAnything2ModelID
diff --git a/server/ai_process_test.go b/server/ai_process_test.go
index 52cf9b483b..dd156d77f0 100644
--- a/server/ai_process_test.go
+++ b/server/ai_process_test.go
@@ -13,7 +13,7 @@ func Test_submitLLM(t *testing.T) {
ctx context.Context
params aiRequestParams
sess *AISession
- req worker.GenLLMFormdataRequestBody
+ req worker.GenLLMJSONRequestBody
}
tests := []struct {
name string
diff --git a/server/ai_worker.go b/server/ai_worker.go
index 34dc722bf8..02ac6e2d1c 100644
--- a/server/ai_worker.go
+++ b/server/ai_worker.go
@@ -271,12 +271,12 @@ func runAIJob(n *core.LivepeerNode, orchAddr string, httpc *http.Client, notify
}
reqOk = true
case "llm":
- var req worker.GenLLMFormdataRequestBody
+ var req worker.GenLLMJSONRequestBody
err = json.Unmarshal(reqData.Request, &req)
- if err != nil || req.ModelId == nil {
+ if err != nil || req.Model == nil {
break
}
- modelID = *req.ModelId
+ modelID = *req.Model
resultType = "application/json"
if req.Stream != nil && *req.Stream {
resultType = "text/event-stream"
@@ -354,7 +354,7 @@ func runAIJob(n *core.LivepeerNode, orchAddr string, httpc *http.Client, notify
if resp != nil {
if resultType == "text/event-stream" {
- streamChan, ok := resp.(<-chan worker.LlmStreamChunk)
+ streamChan, ok := resp.(<-chan *worker.LLMResponse)
if ok {
sendStreamingAIResult(ctx, n, orchAddr, notify.AIJobData.Pipeline, httpc, resultType, streamChan)
return
@@ -530,7 +530,7 @@ func sendAIResult(ctx context.Context, n *core.LivepeerNode, orchAddr string, pi
}
func sendStreamingAIResult(ctx context.Context, n *core.LivepeerNode, orchAddr string, pipeline string, httpc *http.Client,
- contentType string, streamChan <-chan worker.LlmStreamChunk,
+ contentType string, streamChan <-chan *worker.LLMResponse,
) {
clog.Infof(ctx, "sending streaming results back to Orchestrator")
taskId := clog.GetVal(ctx, "taskId")
@@ -571,7 +571,7 @@ func sendStreamingAIResult(ctx context.Context, n *core.LivepeerNode, orchAddr s
}
fmt.Fprintf(pWriter, "data: %s\n\n", data)
- if chunk.Done {
+ if chunk.Choices[0].FinishReason != nil && *chunk.Choices[0].FinishReason != "" {
pWriter.Close()
clog.Infof(ctx, "streaming results finished")
return
diff --git a/server/ai_worker_test.go b/server/ai_worker_test.go
index ab31a3e712..de169fbfe4 100644
--- a/server/ai_worker_test.go
+++ b/server/ai_worker_test.go
@@ -19,7 +19,6 @@ import (
"github.com/livepeer/ai-worker/worker"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/core"
- "github.com/livepeer/go-livepeer/eth"
"github.com/livepeer/go-livepeer/net"
"github.com/livepeer/go-tools/drivers"
oapitypes "github.com/oapi-codegen/runtime/types"
@@ -324,7 +323,7 @@ func TestRunAIJob(t *testing.T) {
assert.Equal("7", headers.Get("TaskId"))
assert.Equal(len(results.Files), 0)
- expectedResp, _ := wkr.LLM(context.Background(), worker.GenLLMFormdataRequestBody{})
+ expectedResp, _ := wkr.LLM(context.Background(), worker.GenLLMJSONRequestBody{})
assert.Equal(expectedResp, &jsonRes)
case "image-to-text":
res, _ := json.Marshal(results.Results)
@@ -372,7 +371,10 @@ func createAIJob(taskId int64, pipeline, modelId, inputUrl string) *net.NotifyAI
inputFile.InitFromBytes(nil, inputUrl)
req = worker.GenSegmentAnything2MultipartRequestBody{ModelId: &modelId, Image: inputFile}
case "llm":
- req = worker.GenLLMFormdataRequestBody{Prompt: "tell me a story", ModelId: &modelId}
+ var msgs []worker.LLMMessage
+ msgs = append(msgs, worker.LLMMessage{Role: "system", Content: "you are a robot"})
+ msgs = append(msgs, worker.LLMMessage{Role: "user", Content: "tell me a story"})
+ req = worker.GenLLMJSONRequestBody{Messages: msgs, Model: &modelId}
case "image-to-text":
inputFile.InitFromBytes(nil, inputUrl)
req = worker.GenImageToImageMultipartRequestBody{Prompt: "test prompt", ModelId: &modelId, Image: inputFile}
@@ -403,11 +405,6 @@ func createAIJob(taskId int64, pipeline, modelId, inputUrl string) *net.NotifyAI
return notify
}
-type stubResult struct {
- Attachment []byte
- Result string
-}
-
func aiResultsTest(l lphttp, w *httptest.ResponseRecorder, r *http.Request) (int, string) {
handler := l.AIResults()
handler.ServeHTTP(w, r)
@@ -418,23 +415,6 @@ func aiResultsTest(l lphttp, w *httptest.ResponseRecorder, r *http.Request) (int
return resp.StatusCode, string(body)
}
-func newMockAIOrchestratorServer() *httptest.Server {
- n, _ := core.NewLivepeerNode(ð.StubClient{}, "./tmp", nil)
- n.NodeType = core.OrchestratorNode
- n.AIWorkerManager = core.NewRemoteAIWorkerManager()
- s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "")
- mux := s.cliWebServerHandlers("addr")
- srv := httptest.NewServer(mux)
- return srv
-}
-
-func connectWorker(n *core.LivepeerNode) {
- strm := &StubAIWorkerServer{}
- caps := createStubAIWorkerCapabilities()
- go func() { n.AIWorkerManager.Manage(strm, caps.ToNetCapabilities()) }()
- time.Sleep(1 * time.Millisecond)
-}
-
func createStubAIWorkerCapabilities() *core.Capabilities {
//create capabilities and constraints the ai worker sends to orch
constraints := make(core.PerCapabilityConstraints)
@@ -597,12 +577,15 @@ func (a *stubAIWorker) SegmentAnything2(ctx context.Context, req worker.GenSegme
}
}
-func (a *stubAIWorker) LLM(ctx context.Context, req worker.GenLLMFormdataRequestBody) (interface{}, error) {
+func (a *stubAIWorker) LLM(ctx context.Context, req worker.GenLLMJSONRequestBody) (interface{}, error) {
a.Called++
if a.Err != nil {
return nil, a.Err
} else {
- return &worker.LLMResponse{Response: "output tokens", TokensUsed: 10}, nil
+ var choices []worker.LLMChoice
+ choices = append(choices, worker.LLMChoice{Delta: &worker.LLMMessage{Content: "choice1", Role: "assistant"}, Index: 0})
+ tokensUsed := worker.LLMTokenUsage{PromptTokens: 40, CompletionTokens: 10, TotalTokens: 50}
+ return &worker.LLMResponse{Choices: choices, Created: 1, Model: "llm_model", Usage: tokensUsed}, nil
}
}
diff --git a/server/cliserver_test.go b/server/cliserver_test.go
index 2083759021..6b3f9800cc 100644
--- a/server/cliserver_test.go
+++ b/server/cliserver_test.go
@@ -2,6 +2,7 @@ package server
import (
"bytes"
+ "context"
"encoding/json"
"errors"
"fmt"
@@ -33,7 +34,9 @@ func newMockServer() *httptest.Server {
go func() { n.TranscoderManager.Manage(strm, 5, nil) }()
time.Sleep(1 * time.Millisecond)
n.Transcoder = n.TranscoderManager
- s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "")
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ s, _ := NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "")
mux := s.cliWebServerHandlers("addr")
srv := httptest.NewServer(mux)
return srv
@@ -52,7 +55,9 @@ func TestActivateOrchestrator(t *testing.T) {
go func() { n.TranscoderManager.Manage(strm, 5, nil) }()
time.Sleep(1 * time.Millisecond)
n.Transcoder = n.TranscoderManager
- s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "")
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ s, _ := NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "")
mux := s.cliWebServerHandlers("addr")
srv := httptest.NewServer(mux)
defer srv.Close()
@@ -220,7 +225,9 @@ func TestGetEthChainID(t *testing.T) {
err = dbh.SetChainID(big.NewInt(1))
require.Nil(err)
n, _ := core.NewLivepeerNode(ð.StubClient{}, "./tmp", dbh)
- s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "")
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ s, _ := NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "")
mux := s.cliWebServerHandlers("addr")
srv := httptest.NewServer(mux)
defer srv.Close()
@@ -308,7 +315,9 @@ func TestRegisteredOrchestrators(t *testing.T) {
n, _ := core.NewLivepeerNode(eth, "./tmp", dbh)
- s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "")
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ s, _ := NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "")
mux := s.cliWebServerHandlers("addr")
srv := httptest.NewServer(mux)
defer srv.Close()
diff --git a/server/mediaserver.go b/server/mediaserver.go
index aaec9210f6..95bd8571af 100644
--- a/server/mediaserver.go
+++ b/server/mediaserver.go
@@ -153,7 +153,7 @@ type authWebhookResponse struct {
ForceSessionReinit bool `json:"forceSessionReinit"`
}
-func NewLivepeerServer(rtmpAddr string, lpNode *core.LivepeerNode, httpIngest bool, transcodingOptions string) (*LivepeerServer, error) {
+func NewLivepeerServer(ctx context.Context, rtmpAddr string, lpNode *core.LivepeerNode, httpIngest bool, transcodingOptions string) (*LivepeerServer, error) {
opts := lpmscore.LPMSOpts{
RtmpAddr: rtmpAddr,
RtmpDisabled: true,
@@ -201,7 +201,9 @@ func NewLivepeerServer(rtmpAddr string, lpNode *core.LivepeerNode, httpIngest bo
if lpNode.NodeType == core.BroadcasterNode && httpIngest {
opts.HttpMux.HandleFunc("/live/", ls.HandlePush)
- startAIMediaServer(ls)
+ if err := startAIMediaServer(ctx, ls); err != nil {
+ return nil, fmt.Errorf("failed to start AI media server: %w", err)
+ }
}
opts.HttpMux.HandleFunc("/recordings/", ls.HandleRecordings)
return ls, nil
diff --git a/server/mediaserver_test.go b/server/mediaserver_test.go
index 16b4d91e23..6f97de4582 100644
--- a/server/mediaserver_test.go
+++ b/server/mediaserver_test.go
@@ -88,7 +88,7 @@ func setupServerWithCancel() (*LivepeerServer, context.CancelFunc) {
}
n, _ := core.NewLivepeerNode(nil, "./tmp", nil)
// doesn't really starts server at 1938
- S, _ = NewLivepeerServer("127.0.0.1:1938", n, true, "")
+ S, _ = NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "")
// rtmpurl := fmt.Sprintf("rtmp://127.0.0.1:%d", port)
// S, _ = NewLivepeerServer(rtmpurl, n, true, "")
// glog.Errorf("++> rtmp server with port %d", port)
diff --git a/server/push_test.go b/server/push_test.go
index bb17ac8a11..254b6d64e8 100644
--- a/server/push_test.go
+++ b/server/push_test.go
@@ -426,7 +426,7 @@ func TestPush_HTTPIngest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// HTTP ingest disabled
- s, _ := NewLivepeerServer("127.0.0.1:1938", n, false, "")
+ s, _ := NewLivepeerServer(ctx, "127.0.0.1:1938", n, false, "")
s.SetContextFromUnitTest(ctx)
h, pattern := s.HTTPMux.Handler(req)
assert.Equal("", pattern)
@@ -440,7 +440,7 @@ func TestPush_HTTPIngest(t *testing.T) {
ctx, cancel = context.WithCancel(context.Background())
// HTTP ingest enabled
- s, _ = NewLivepeerServer("127.0.0.1:1938", n, true, "")
+ s, _ = NewLivepeerServer(ctx, "127.0.0.1:1938", n, true, "")
s.SetContextFromUnitTest(ctx)
h, pattern = s.HTTPMux.Handler(req)
assert.Equal("/live/", pattern)
@@ -1106,8 +1106,8 @@ func TestPush_OSPerStream(t *testing.T) {
assert := assert.New(t)
drivers.NodeStorage = drivers.NewMemoryDriver(nil)
n, _ := core.NewLivepeerNode(nil, "./tmp", nil)
- s, _ := NewLivepeerServer("127.0.0.1:1939", n, true, "")
serverCtx, serverCancel := context.WithCancel(context.Background())
+ s, _ := NewLivepeerServer(serverCtx, "127.0.0.1:1939", n, true, "")
s.SetContextFromUnitTest(serverCtx)
defer serverCleanup(s)
@@ -1273,8 +1273,8 @@ func TestPush_ConcurrentSegments(t *testing.T) {
drivers.NodeStorage = drivers.NewMemoryDriver(nil)
n, _ := core.NewLivepeerNode(nil, "./tmp", nil)
n.NodeType = core.BroadcasterNode
- s, _ := NewLivepeerServer("127.0.0.1:1938", n, true, "")
serverCtx, serverCancel := context.WithCancel(context.Background())
+ s, _ := NewLivepeerServer(serverCtx, "127.0.0.1:1938", n, true, "")
s.SetContextFromUnitTest(serverCtx)
oldURL := AuthWebhookURL
defer func() { AuthWebhookURL = oldURL }()
diff --git a/server/rpc.go b/server/rpc.go
index 7223c56a98..ff3c05e43c 100644
--- a/server/rpc.go
+++ b/server/rpc.go
@@ -74,7 +74,7 @@ type Orchestrator interface {
ImageToVideo(ctx context.Context, requestID string, req worker.GenImageToVideoMultipartRequestBody) (interface{}, error)
Upscale(ctx context.Context, requestID string, req worker.GenUpscaleMultipartRequestBody) (interface{}, error)
AudioToText(ctx context.Context, requestID string, req worker.GenAudioToTextMultipartRequestBody) (interface{}, error)
- LLM(ctx context.Context, requestID string, req worker.GenLLMFormdataRequestBody) (interface{}, error)
+ LLM(ctx context.Context, requestID string, req worker.GenLLMJSONRequestBody) (interface{}, error)
SegmentAnything2(ctx context.Context, requestID string, req worker.GenSegmentAnything2MultipartRequestBody) (interface{}, error)
ImageToText(ctx context.Context, requestID string, req worker.GenImageToTextMultipartRequestBody) (interface{}, error)
TextToSpeech(ctx context.Context, requestID string, req worker.GenTextToSpeechJSONRequestBody) (interface{}, error)
diff --git a/server/rpc_test.go b/server/rpc_test.go
index 43ec1a3045..fad0b63365 100644
--- a/server/rpc_test.go
+++ b/server/rpc_test.go
@@ -210,7 +210,7 @@ func (r *stubOrchestrator) Upscale(ctx context.Context, requestID string, req wo
func (r *stubOrchestrator) AudioToText(ctx context.Context, requestID string, req worker.GenAudioToTextMultipartRequestBody) (interface{}, error) {
return nil, nil
}
-func (r *stubOrchestrator) LLM(ctx context.Context, requestID string, req worker.GenLLMFormdataRequestBody) (interface{}, error) {
+func (r *stubOrchestrator) LLM(ctx context.Context, requestID string, req worker.GenLLMJSONRequestBody) (interface{}, error) {
return nil, nil
}
func (r *stubOrchestrator) SegmentAnything2(ctx context.Context, requestID string, req worker.GenSegmentAnything2MultipartRequestBody) (interface{}, error) {
@@ -1417,7 +1417,7 @@ func (r *mockOrchestrator) Upscale(ctx context.Context, requestID string, req wo
func (r *mockOrchestrator) AudioToText(ctx context.Context, requestID string, req worker.GenAudioToTextMultipartRequestBody) (interface{}, error) {
return nil, nil
}
-func (r *mockOrchestrator) LLM(ctx context.Context, requestID string, req worker.GenLLMFormdataRequestBody) (interface{}, error) {
+func (r *mockOrchestrator) LLM(ctx context.Context, requestID string, req worker.GenLLMJSONRequestBody) (interface{}, error) {
return nil, nil
}
func (r *mockOrchestrator) SegmentAnything2(ctx context.Context, requestID string, req worker.GenSegmentAnything2MultipartRequestBody) (interface{}, error) {