Skip to content

Commit

Permalink
Rebase fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
onitake authored and srgoni committed Oct 31, 2022
1 parent 4b4be50 commit e0b2b86
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 5 deletions.
2 changes: 1 addition & 1 deletion cmd/restreamer/restreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func main() {

auth := auth.NewAuthenticator(streamdef.Authentication, config.UserList)

streamer := streaming.NewStreamer(streamdef.Serve, config.OutputBuffer, controller, auth)
streamer := streaming.NewStreamer(streamdef.Serve, config.OutputBuffer, config.CacheSize, controller, auth)
streamer.SetCollector(reg)
streamer.SetNotifier(queue)

Expand Down
3 changes: 3 additions & 0 deletions configuration/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ type Configuration struct {
Resources []Resource `json:"resources"`
// Notifications defines event callbacks.
Notifications []Notification `json:"notifications"`
// CacheSize is the number of packets to precache to boost client stream
// startup time.
CacheSize uint `json:"cachesize"`
}

// DefaultConfiguration creates and returns a configuration object
Expand Down
22 changes: 18 additions & 4 deletions streaming/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ type Streamer struct {
auth auth.Authenticator
// promCounter allows enabling/disabling Prometheus packet metrics.
promCounter bool
// cacheSize is the number of bytes to keep in precache
cacheSize int
}

// ConnectionBroker represents a policy handler for new connections.
Expand All @@ -178,9 +180,10 @@ type ConnectionBroker interface {
// NewStreamer creates a new packet streamer.
// queue is an input packet queue.
// qsize is the length of each connection's queue (in packets).
// cachesize is the size of the precache buffer, in number of packets
// broker handles policy enforcement
// stats is a statistics collector object.
func NewStreamer(name string, qsize uint, broker ConnectionBroker, auth auth.Authenticator) *Streamer {
func NewStreamer(name string, cachesize uint, qsize uint, broker ConnectionBroker, auth auth.Authenticator) *Streamer {
streamer := &Streamer{
name: name,
broker: broker,
Expand All @@ -189,6 +192,7 @@ func NewStreamer(name string, qsize uint, broker ConnectionBroker, auth auth.Aut
stats: &metrics.DummyCollector{},
request: make(chan *ConnectionRequest),
auth: auth,
cacheSize: int(cachesize) * protocol.MpegTsPacketSize,
}
// start the command eater
go streamer.eatCommands()
Expand Down Expand Up @@ -248,9 +252,10 @@ func (streamer *Streamer) eatCommands() {
// This routine will block; you should run it asynchronously like this:
//
// queue := make(chan protocol.MpegTsPacket, inputQueueSize)
// go func() {
// log.Fatal(streamer.Stream(queue))
// }
//
// go func() {
// log.Fatal(streamer.Stream(queue))
// }
//
// or simply:
//
Expand All @@ -271,6 +276,9 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error {
Command: streamerCommandStart,
}

// prepare the precache buffer
precache := util.CreateSlidingWindow(streamer.cacheSize)

logger.Logkv(
"event", eventStreamerStart,
"message", "Starting streaming",
Expand All @@ -286,7 +294,10 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error {
//log.Printf("Got packet (length %d):\n%s\n", len(packet), hex.Dump(packet))
//log.Printf("Got packet (length %d)\n", len(packet))

precache.Put(packet)

for conn := range pool {

select {
case conn.Queue <- packet:
// packet distributed, done
Expand Down Expand Up @@ -338,6 +349,9 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error {
)
pool[request.Connection] = true
request.Ok = true
// write precached data
// TODO maybe don't write this directly, use the queue?
request.Connection.writer.Write(precache.Get())
} else {
logger.Logkv(
"event", eventStreamerError,
Expand Down

0 comments on commit e0b2b86

Please sign in to comment.