Skip to content

Commit

Permalink
concurrent map updates
Browse files Browse the repository at this point in the history
  • Loading branch information
USA-RedDragon committed Feb 25, 2023
1 parent 3cee169 commit 8ec12b3
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 232 deletions.
50 changes: 13 additions & 37 deletions internal/dmr/calltracker/call_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package calltracker
import (
"context"
"encoding/json"
"strconv"
"time"

"github.com/USA-RedDragon/DMRHub/internal/config"
Expand Down Expand Up @@ -54,7 +53,7 @@ type callMapStruct struct {
GroupCall bool
}

func getCallHashFromPacket(packet models.Packet) (string, error) {
func getCallHashFromPacket(packet models.Packet) (uint64, error) {
v := callMapStruct{
Active: true,
StreamID: packet.StreamID,
Expand All @@ -68,10 +67,10 @@ func getCallHashFromPacket(packet models.Packet) (string, error) {
if err != nil {
klog.Errorf("CallTracker: Error hashing call: %v", err)
}
return strconv.Itoa(int(hash)), err
return hash, err //nolint:golint,wrapcheck
}

func getCallHash(call models.Call) (string, error) {
func getCallHash(call models.Call) (uint64, error) {
v := callMapStruct{
Active: call.Active,
StreamID: call.StreamID,
Expand All @@ -85,25 +84,24 @@ func getCallHash(call models.Call) (string, error) {
if err != nil {
klog.Errorf("CallTracker: Error hashing call: %v", err)
}
return strconv.Itoa(int(hash)), err
return hash, err //nolint:golint,wrapcheck
}

// CallTracker is a struct that holds the state of the calls that are currently in progress.
type CallTracker struct {
db *gorm.DB
redis *redis.Client
callEndTimers *xsync.Map
inFlightCalls *xsync.Map
callEndTimers *xsync.MapOf[uint64, *time.Timer]
inFlightCalls *xsync.MapOf[uint64, *models.Call]
}

// NewCallTracker creates a new CallTracker.
func NewCallTracker(db *gorm.DB, redis *redis.Client) *CallTracker {
xsync.NewMap()
return &CallTracker{
db: db,
redis: redis,
callEndTimers: xsync.NewMap(),
inFlightCalls: xsync.NewMap(),
callEndTimers: xsync.NewIntegerMapOf[uint64, *time.Timer](),
inFlightCalls: xsync.NewIntegerMapOf[uint64, *models.Call](),
}
}

Expand Down Expand Up @@ -360,12 +358,7 @@ func (c *CallTracker) updateCall(ctx context.Context, call *models.Call, packet
return
}

timerInterface, ok := c.callEndTimers.Load(hash)
if !ok {
return
}

timer, ok := timerInterface.(*time.Timer)
timer, ok := c.callEndTimers.Load(hash)
if !ok {
return
}
Expand Down Expand Up @@ -498,13 +491,7 @@ func (c *CallTracker) ProcessCallPacket(ctx context.Context, packet models.Packe
return
}

callInterface, ok := c.inFlightCalls.Load(hash)
if !ok {
klog.Errorf("Active call not found")
return
}

call, ok := callInterface.(*models.Call)
call, ok := c.inFlightCalls.Load(hash)
if !ok {
klog.Errorf("Active call not found")
return
Expand Down Expand Up @@ -534,13 +521,7 @@ func (c *CallTracker) EndCall(ctx context.Context, packet models.Packet) {
return
}

callInterface, ok := c.inFlightCalls.LoadAndDelete(hash)
if !ok {
klog.Errorf("Active call not found")
return
}

call, ok := callInterface.(*models.Call)
call, ok := c.inFlightCalls.LoadAndDelete(hash)
if !ok {
klog.Errorf("Active call not found")
return
Expand All @@ -553,16 +534,11 @@ func (c *CallTracker) EndCall(ctx context.Context, packet models.Packet) {
}

// Delete the call end timer
timerInterface, ok := c.callEndTimers.LoadAndDelete(hash)
timer, ok := c.callEndTimers.LoadAndDelete(hash)
if !ok {
klog.Errorf("Call end timer not found")
} else {
timer, ok := timerInterface.(*time.Timer)
if !ok {
klog.Errorf("Call end timer not found")
} else {
timer.Stop()
}
timer.Stop()
}

call.Duration = time.Since(call.StartTime)
Expand Down
6 changes: 3 additions & 3 deletions internal/dmr/servers/hbrp/packet_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *Server) switchDynamicTalkgroup(ctx context.Context, packet models.Packe
// field, then we need to update the database entry to reflect
// the new dynamic talkgroup on the appropriate slot.

ctx, span := otel.Tracer("DMRHub").Start(ctx, "Server.switchDynamicTalkgroup")
_, span := otel.Tracer("DMRHub").Start(ctx, "Server.switchDynamicTalkgroup")
defer span.End()

repeaterExists, err := models.RepeaterIDExists(s.DB, packet.Repeater)
Expand Down Expand Up @@ -116,7 +116,7 @@ func (s *Server) switchDynamicTalkgroup(ctx context.Context, packet models.Packe
logging.GetLogger(logging.Access).Logf(s.switchDynamicTalkgroup, "Dynamically Linking %d timeslot 2 to %d", packet.Repeater, packet.Dst)
repeater.TS2DynamicTalkgroup = talkgroup
repeater.TS2DynamicTalkgroupID = &packet.Dst
go GetSubscriptionManager().ListenForCallsOn(ctx, s.Redis.Redis, repeater, packet.Dst)
go GetSubscriptionManager().ListenForCallsOn(s.Redis.Redis, repeater, packet.Dst) //nolint:golint,contextcheck
err := s.DB.Save(&repeater).Error
if err != nil {
klog.Errorf("Error saving repeater: %s", err.Error())
Expand All @@ -127,7 +127,7 @@ func (s *Server) switchDynamicTalkgroup(ctx context.Context, packet models.Packe
logging.GetLogger(logging.Access).Logf(s.switchDynamicTalkgroup, "Dynamically Linking %d timeslot 1 to %d", packet.Repeater, packet.Dst)
repeater.TS1DynamicTalkgroup = talkgroup
repeater.TS1DynamicTalkgroupID = &packet.Dst
go GetSubscriptionManager().ListenForCallsOn(ctx, s.Redis.Redis, repeater, packet.Dst)
go GetSubscriptionManager().ListenForCallsOn(s.Redis.Redis, repeater, packet.Dst) //nolint:golint,contextcheck
err := s.DB.Save(&repeater).Error
if err != nil {
klog.Errorf("Error saving repeater: %s", err.Error())
Expand Down
Loading

0 comments on commit 8ec12b3

Please sign in to comment.