Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rafal/debug-initial-startup-time #3350

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,6 @@ COPY --from=build /usr/bin/grpc_health_probe /usr/local/bin/grpc_health_probe
COPY --from=build /src/tasmodel.pb /tasmodel.pb
COPY --from=build /usr/share/misc/pci.ids /usr/share/misc/pci.ids

RUN apt update && apt install -yqq ffmpeg

ENTRYPOINT ["/usr/local/bin/livepeer"]
18 changes: 12 additions & 6 deletions media/mediamtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"fmt"
"io"
"net/http"
"time"

"github.com/livepeer/go-livepeer/clog"
)
Expand All @@ -27,10 +28,11 @@
}

const (
mediaMTXControlPort = "9997"
mediaMTXControlUser = "admin"
MediaMTXWebrtcSession = "webrtcSession"
MediaMTXRtmpConn = "rtmpConn"
mediaMTXControlPort = "9997"
mediaMTXControlTimeout = 30 * time.Second
mediaMTXControlUser = "admin"
MediaMTXWebrtcSession = "webrtcSession"
MediaMTXRtmpConn = "rtmpConn"
)

func MediamtxSourceTypeToString(s string) (string, error) {
Expand All @@ -44,7 +46,7 @@
}
}

func getApiPath(sourceType string) (string, error) {

Check warning on line 49 in media/mediamtx.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

func getApiPath should be getAPIPath
var apiPath string
switch sourceType {
case MediaMTXWebrtcSession:
Expand All @@ -64,7 +66,9 @@
return err
}

req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s:%s/v3/%s/kick/%s", mc.host, mediaMTXControlPort, apiPath, mc.sourceID), nil)
ctx, cancel := context.WithTimeout(context.Background(), mediaMTXControlTimeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("http://%s:%s/v3/%s/kick/%s", mc.host, mediaMTXControlPort, apiPath, mc.sourceID), nil)

Check warning on line 71 in media/mediamtx.go

View check run for this annotation

Codecov / codecov/patch

media/mediamtx.go#L69-L71

Added lines #L69 - L71 were not covered by tests
if err != nil {
return fmt.Errorf("failed to create kick request: %w", err)
}
Expand All @@ -85,7 +89,9 @@
if err != nil {
return false, err
}
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s:%s/v3/%s/get/%s", mc.host, mediaMTXControlPort, apiPath, mc.sourceID), nil)
ctx, cancel := context.WithTimeout(context.Background(), mediaMTXControlTimeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://%s:%s/v3/%s/get/%s", mc.host, mediaMTXControlPort, apiPath, mc.sourceID), nil)

Check warning on line 94 in media/mediamtx.go

View check run for this annotation

Codecov / codecov/patch

media/mediamtx.go#L92-L94

Added lines #L92 - L94 were not covered by tests
if err != nil {
return false, fmt.Errorf("failed to create get stream request: %w", err)
}
Expand Down
23 changes: 12 additions & 11 deletions media/rtmp2segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
"log/slog"
"math/rand"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/lpms/ffmpeg"
"golang.org/x/sys/unix"
)

Expand Down Expand Up @@ -49,17 +49,18 @@
clog.Errorf(ctx, "Stopping segmentation, input stream does not exist. in=%s err=%s", in, err)
break
}
ffmpeg.FfmpegSetLogLevel(ffmpeg.FFLogWarning)
_, err = ffmpeg.Transcode3(&ffmpeg.TranscodeOptionsIn{
Fname: in,
}, []ffmpeg.TranscodeOptions{{
Oname: outFilePattern,
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
VideoEncoder: ffmpeg.ComponentOptions{Name: "copy"},
Muxer: ffmpeg.ComponentOptions{Name: "segment"},
}})
cmd := exec.Command("ffmpeg",
"-i", in,
"-c:a", "copy",
"-c:v", "copy",
"-f", "segment",
outFilePattern,
)
output, err := cmd.CombinedOutput()

Check warning on line 59 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L52-L59

Added lines #L52 - L59 were not covered by tests
if err != nil {
clog.Errorf(ctx, "Failed to run segmentation. in=%s err=%s", in, err)
clog.Errorf(ctx, "Error sending RTMP out process: %v", err)
clog.Infof(ctx, "Process output: %s", output)
return

Check warning on line 63 in media/rtmp2segment.go

View check run for this annotation

Codecov / codecov/patch

media/rtmp2segment.go#L61-L63

Added lines #L61 - L63 were not covered by tests
}
retryCount++
time.Sleep(5 * time.Second)
Expand Down Expand Up @@ -174,7 +175,7 @@
// things protected by the mutex mu
mu := &sync.Mutex{}
isComplete := false
var currentSegment *os.File = nil

Check warning on line 178 in media/rtmp2segment.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

should drop = nil from declaration of var currentSegment; it is the zero value
pipeCompletion := make(chan bool, 1)

// Start a goroutine to wait for the completion signal
Expand Down Expand Up @@ -237,7 +238,7 @@
}
}

func readSegment(ctx context.Context, segmentHandler SegmentHandler, file *os.File, pipeName string) {

Check warning on line 241 in media/rtmp2segment.go

View workflow job for this annotation

GitHub Actions / Run tests defined for the project

parameter 'ctx' seems to be unused, consider removing or renaming it as _
defer file.Close()
reader := bufio.NewReader(file)
writer := NewMediaWriter()
Expand Down
25 changes: 14 additions & 11 deletions server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"net/http"
"net/url"
"os"
"os/exec"
"strings"
"sync"
"time"
Expand All @@ -21,8 +22,6 @@
"github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/go-livepeer/trickle"

"github.com/livepeer/lpms/ffmpeg"

"github.com/dustin/go-humanize"
)

Expand Down Expand Up @@ -211,16 +210,20 @@
break
}

_, err = ffmpeg.Transcode3(&ffmpeg.TranscodeOptionsIn{
Fname: fmt.Sprintf("pipe:%d", r.Fd()),
}, []ffmpeg.TranscodeOptions{{
Oname: params.liveParams.outputRTMPURL,
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
VideoEncoder: ffmpeg.ComponentOptions{Name: "copy"},
Muxer: ffmpeg.ComponentOptions{Name: "flv"},
}})
out := fmt.Sprintf("rtmp://%s/out%s", MediaMTXHost, params.liveParams.stream)
cmd := exec.Command("ffmpeg",
"-i", "pipe:0",
"-c:a", "copy",
"-c:v", "copy",
"-f", "flv",
out,
)
cmd.Stdin = r
output, err := cmd.CombinedOutput()

Check warning on line 222 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L213-L222

Added lines #L213 - L222 were not covered by tests
if err != nil {
clog.Infof(ctx, "Error sending RTMP out: %s", err)
clog.Errorf(ctx, "Error sending RTMP out: %v", err)
clog.Infof(ctx, "Process output: %s", output)
return

Check warning on line 226 in server/ai_live_video.go

View check run for this annotation

Codecov / codecov/patch

server/ai_live_video.go#L224-L226

Added lines #L224 - L226 were not covered by tests
}
time.Sleep(5 * time.Second)
}
Expand Down
6 changes: 6 additions & 0 deletions server/ai_mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@
})
}

var MediaMTXHost string

func (ls *LivepeerServer) StartLiveVideo() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
Expand All @@ -373,6 +375,9 @@
http.Error(w, "Missing stream name", http.StatusBadRequest)
return
}
if strings.HasPrefix(streamName, "out") {
return
}

Check warning on line 380 in server/ai_mediaserver.go

View check run for this annotation

Codecov / codecov/patch

server/ai_mediaserver.go#L378-L380

Added lines #L378 - L380 were not covered by tests

ctx = clog.AddVal(ctx, "stream", streamName)
sourceID := r.FormValue("source_id")
Expand Down Expand Up @@ -441,6 +446,7 @@
}

mediaMTXClient := media.NewMediaMTXClient(remoteHost, ls.mediaMTXApiPassword, sourceID, sourceType)
MediaMTXHost = remoteHost

Check warning on line 449 in server/ai_mediaserver.go

View check run for this annotation

Codecov / codecov/patch

server/ai_mediaserver.go#L449

Added line #L449 was not covered by tests

if LiveAIAuthWebhookURL != nil {
authResp, err := authenticateAIStream(LiveAIAuthWebhookURL, ls.liveAIAuthApiKey, AIAuthRequest{
Expand Down
Loading