Skip to content

Commit

Permalink
Fix RTMP pull (#3377)
Browse files Browse the repository at this point in the history
* Fix RTMP pull

* Retries for StreamExists check

* add log line
  • Loading branch information
mjh1 authored Jan 31, 2025
1 parent baedf97 commit 654a229
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 9 deletions.
33 changes: 24 additions & 9 deletions media/rtmp2segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"syscall"
"time"

"github.com/cenkalti/backoff"
"github.com/livepeer/go-livepeer/clog"
"golang.org/x/sys/unix"
)
Expand Down Expand Up @@ -46,16 +47,23 @@ func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmen
processSegments(ctx, segmentHandler, outFilePattern, completionSignal)
}()

retryCount := 0
for {
streamExists, err := ms.MediaMTXClient.StreamExists()
err := backoff.Retry(func() error {
streamExists, err := ms.MediaMTXClient.StreamExists()
if err != nil {
return fmt.Errorf("StreamExists check failed: %w", err)
}
if !streamExists {
clog.Errorf(ctx, "input stream does not exist")
return fmt.Errorf("input stream does not exist")
}
return nil
}, backoff.WithMaxRetries(newExponentialBackOff(), 3))
if err != nil {
clog.Errorf(ctx, "StreamExists check failed. err=%s", err)
}
if retryCount > 2 && !streamExists {
clog.Errorf(ctx, "Stopping segmentation, input stream does not exist. in=%s err=%s", in, err)
clog.Errorf(ctx, "Stopping segmentation in=%s err=%s", in, err)
break
}
clog.Infof(ctx, "Starting segmentation. in=%s", in)
cmd := exec.CommandContext(procCtx, "ffmpeg",
"-i", in,
"-c:a", "copy",
Expand All @@ -66,17 +74,24 @@ func (ms *MediaSegmenter) RunSegmentation(ctx context.Context, in string, segmen
output, err := cmd.CombinedOutput()
if err != nil {
clog.Errorf(ctx, "Error receiving RTMP: %v", err)
clog.Infof(ctx, "Process output: %s", output)
return
break
}
retryCount++
clog.Infof(ctx, "Segmentation ffmpeg output: %s", output)
time.Sleep(5 * time.Second)
}
completionSignal <- true
clog.Infof(ctx, "sent completion signal, now waiting")
wg.Wait()
}

func newExponentialBackOff() *backoff.ExponentialBackOff {
backOff := backoff.NewExponentialBackOff()
backOff.InitialInterval = 500 * time.Millisecond
backOff.MaxInterval = 5 * time.Second
backOff.Reset()
return backOff
}

func createNamedPipe(pipeName string) {
err := syscall.Mkfifo(pipeName, 0666)
if err != nil && !os.IsExist(err) {
Expand Down
1 change: 1 addition & 0 deletions server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa
}
}()
for {
clog.V(6).Infof(ctx, "Starting output rtmp")
if !params.inputStreamExists() {
clog.Errorf(ctx, "Stopping output rtmp stream, input stream does not exist.")
break
Expand Down

0 comments on commit 654a229

Please sign in to comment.