diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 9ba3a3273..cf4966fae 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -241,35 +241,6 @@ jobs: - run: make benchmark - backend-race-tests: - runs-on: ubuntu-latest - needs: frontend - permissions: - contents: read - checks: write - - steps: - - name: Checkout repository - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version-file: go.mod - - - name: Copy built frontend - uses: actions/download-artifact@v3 - with: - name: frontend - path: internal/http/frontend/dist - - - run: go install github.com/tinylib/msgp - - run: go generate ./... - - - name: Race tests - run: | - go test ./... -race - backend-unit-tests: runs-on: ubuntu-latest needs: frontend diff --git a/docker-compose.yml b/docker-compose.yml index 2902f2bb8..38579323d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -32,5 +32,6 @@ services: ports: - 3005:3005 - 62031:62031/udp + - 62032:62032/udp volumes: postgres: diff --git a/internal/config/config.go b/internal/config/config.go index ccefc93ba..55543dbf2 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -48,6 +48,7 @@ type Config struct { PasswordSalt string ListenAddr string DMRPort int + OpenBridgePort int HTTPPort int CORSHosts []string TrustedProxies []string @@ -80,6 +81,12 @@ func loadConfig() Config { httpPort = 0 } + portStr = os.Getenv("OPENBRIDGE_PORT") + openBridgePort, err := strconv.ParseInt(portStr, 10, 0) + if err != nil { + openBridgePort = 0 + } + tmpConfig := Config{ RedisHost: os.Getenv("REDIS_HOST"), postgresUser: os.Getenv("PG_USER"), @@ -97,6 +104,7 @@ func loadConfig() Config { InitialAdminUserPassword: os.Getenv("INIT_ADMIN_USER_PASSWORD"), RedisPassword: os.Getenv("REDIS_PASSWORD"), Debug: os.Getenv("DEBUG") != "", + OpenBridgePort: int(openBridgePort), } if tmpConfig.RedisHost == "" { tmpConfig.RedisHost = "localhost:6379" @@ -119,11 +127,11 @@ func loadConfig() Config { tmpConfig.PostgresDSN = "host=" + tmpConfig.postgresHost + " port=" + strconv.FormatInt(int64(tmpConfig.postgresPort), 10) + " user=" + tmpConfig.postgresUser + " dbname=" + tmpConfig.postgresDatabase + " password=" + tmpConfig.postgresPassword if tmpConfig.strSecret == "" { tmpConfig.strSecret = "secret" - logging.GetLogger(logging.Error).Log(loadConfig, "Session secret not set, using INSECURE default") + logging.GetLogger(logging.Error).Log(loadConfig, "SECRET not set, using INSECURE default") } if tmpConfig.PasswordSalt == "" { tmpConfig.PasswordSalt = "salt" - logging.GetLogger(logging.Error).Log(loadConfig, "Password salt not set, using INSECURE default") + logging.GetLogger(logging.Error).Log(loadConfig, "PASSWORD_SALT not set, using INSECURE default") } if tmpConfig.ListenAddr == "" { tmpConfig.ListenAddr = "0.0.0.0" @@ -131,11 +139,14 @@ func loadConfig() Config { if tmpConfig.DMRPort == 0 { tmpConfig.DMRPort = 62031 } + if tmpConfig.OpenBridgePort == 0 { + logging.GetLogger(logging.Error).Log(loadConfig, "OPENBRIDGE_PORT not set, disabling OpenBridge support") + } if tmpConfig.HTTPPort == 0 { tmpConfig.HTTPPort = 3005 } if tmpConfig.InitialAdminUserPassword == "" { - logging.GetLogger(logging.Error).Log(loadConfig, "Initial admin user password not set, using auto-generated password") + logging.GetLogger(logging.Error).Log(loadConfig, "INIT_ADMIN_USER_PASSWORD not set, using auto-generated password") const randLen = 15 const randNums = 4 const randSpecial = 2 @@ -147,7 +158,7 @@ func loadConfig() Config { } if tmpConfig.RedisPassword == "" { tmpConfig.RedisPassword = "password" - logging.GetLogger(logging.Error).Log(loadConfig, "Redis password not set, using INSECURE default") + logging.GetLogger(logging.Error).Log(loadConfig, "REDIS_PASSWORD not set, using INSECURE default") } // CORS_HOSTS is a comma separated list of hosts that are allowed to access the API corsHosts := os.Getenv("CORS_HOSTS") diff --git a/internal/db/db.go b/internal/db/db.go index f8af3ef2f..6a4afdadf 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -58,7 +58,7 @@ func MakeDB() *gorm.DB { } } - err = db.AutoMigrate(&models.AppSettings{}, &models.Call{}, &models.Repeater{}, &models.Talkgroup{}, &models.User{}) + err = db.AutoMigrate(&models.AppSettings{}, &models.Call{}, &models.Peer{}, &models.Repeater{}, &models.Talkgroup{}, &models.User{}) if err != nil { logging.GetLogger(logging.Error).Logf(MakeDB, "Could not migrate database: %s", err) os.Exit(1) diff --git a/internal/db/models/peer.go b/internal/db/models/peer.go new file mode 100644 index 000000000..c7347ada1 --- /dev/null +++ b/internal/db/models/peer.go @@ -0,0 +1,104 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// DMRHub - Run a DMR network server in a single binary +// Copyright (C) 2023 Jacob McSwain +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . +// +// The source code is available at + +package models + +import ( + "encoding/json" + "time" + + "gorm.io/gorm" + "k8s.io/klog/v2" +) + +// Peer is the model for an OpenBridge DMR peer +// +//go:generate msgp +type Peer struct { + ID uint `json:"id" gorm:"primaryKey" msg:"id"` + LastPing time.Time `json:"last_ping_time" msg:"last_ping"` + IP string `json:"-" gorm:"-" msg:"ip"` + Port int `json:"-" gorm:"-" msg:"port"` + Password string `json:"-" msg:"-"` + Owner User `json:"owner" gorm:"foreignKey:OwnerID" msg:"-"` + OwnerID uint `json:"-" msg:"-"` + Ingress bool `json:"ingress" msg:"-"` + Egress bool `json:"egress" msg:"-"` + CreatedAt time.Time `json:"created_at" msg:"-"` + UpdatedAt time.Time `json:"-" msg:"-"` + DeletedAt gorm.DeletedAt `json:"-" gorm:"index" msg:"-"` +} + +func (p *Peer) String() string { + jsn, err := json.Marshal(p) + if err != nil { + klog.Errorf("Failed to marshal peer to json: %s", err) + return "" + } + return string(jsn) +} + +func ListPeers(db *gorm.DB) []Peer { + var peers []Peer + db.Preload("Owner").Order("id asc").Find(&peers) + return peers +} + +func CountPeers(db *gorm.DB) int { + var count int64 + db.Model(&Peer{}).Count(&count) + return int(count) +} + +func GetUserPeers(db *gorm.DB, id uint) []Peer { + var peers []Peer + db.Preload("Owner").Where("owner_id = ?", id).Order("id asc").Find(&peers) + return peers +} + +func CountUserPeers(db *gorm.DB, id uint) int { + var count int64 + db.Model(&Peer{}).Where("owner_id = ?", id).Count(&count) + return int(count) +} + +func FindPeerByID(db *gorm.DB, id uint) Peer { + var peer Peer + db.Preload("Owner").First(&peer, id) + return peer +} + +func PeerExists(db *gorm.DB, peer Peer) bool { + var count int64 + db.Model(&Peer{}).Where("id = ?", peer.ID).Limit(1).Count(&count) + return count > 0 +} + +func PeerIDExists(db *gorm.DB, id uint) bool { + var count int64 + db.Model(&Peer{}).Where("id = ?", id).Limit(1).Count(&count) + return count > 0 +} + +func DeletePeer(db *gorm.DB, id uint) { + tx := db.Unscoped().Delete(&Peer{ID: id}) + if tx.Error != nil { + klog.Errorf("Error deleting repeater: %s", tx.Error) + } +} diff --git a/internal/dmr/servers/openbridge/redis.go b/internal/dmr/servers/openbridge/redis.go new file mode 100644 index 000000000..f5292501a --- /dev/null +++ b/internal/dmr/servers/openbridge/redis.go @@ -0,0 +1,68 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// DMRHub - Run a DMR network server in a single binary +// Copyright (C) 2023 Jacob McSwain +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . +// +// The source code is available at + +package openbridge + +import ( + "context" + "errors" + "fmt" + + "github.com/USA-RedDragon/DMRHub/internal/db/models" + "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" + "k8s.io/klog/v2" +) + +type redisClient struct { + Redis *redis.Client + Tracer trace.Tracer +} + +var ( + errNoSuchPeer = errors.New("no such peer") + errUnmarshalPeer = errors.New("unmarshal peer") + errCastPeer = errors.New("unable to cast peer id") +) + +func makeRedisClient(redis *redis.Client) redisClient { + return redisClient{ + Redis: redis, + Tracer: otel.Tracer("openbridge-redis"), + } +} + +func (s *redisClient) getPeer(ctx context.Context, peerID uint) (models.Peer, error) { + ctx, span := otel.Tracer("DMRHub").Start(ctx, "Server.handlePacket") + defer span.End() + + peerBits, err := s.Redis.Get(ctx, fmt.Sprintf("openbridge:peer:%d", peerID)).Result() + if err != nil { + klog.Errorf("Error getting peer from redis", err) + return models.Peer{}, errNoSuchPeer + } + var peer models.Peer + _, err = peer.UnmarshalMsg([]byte(peerBits)) + if err != nil { + klog.Errorf("Error unmarshalling peer", err) + return models.Peer{}, errUnmarshalPeer + } + return peer, nil +} diff --git a/internal/dmr/servers/openbridge/server.go b/internal/dmr/servers/openbridge/server.go new file mode 100644 index 000000000..be6511c31 --- /dev/null +++ b/internal/dmr/servers/openbridge/server.go @@ -0,0 +1,314 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// DMRHub - Run a DMR network server in a single binary +// Copyright (C) 2023 Jacob McSwain +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . +// +// The source code is available at + +package openbridge + +import ( + "context" + "crypto/hmac" + "crypto/sha1" //#nosec G505 -- False positive, used for a protocol + "encoding/binary" + "net" + + "github.com/USA-RedDragon/DMRHub/internal/config" + "github.com/USA-RedDragon/DMRHub/internal/db/models" + "github.com/USA-RedDragon/DMRHub/internal/dmr/calltracker" + "github.com/USA-RedDragon/DMRHub/internal/dmr/utils" + "github.com/USA-RedDragon/DMRHub/internal/dmrconst" + "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" + "gorm.io/gorm" + "k8s.io/klog/v2" +) + +const packetLength = 73 +const largestMessageSize = 73 +const bufferSize = 1000000 // 1MB + +// OpenBridge is the same as HBRP, but with a single packet type. +type Server struct { + Buffer []byte + SocketAddress net.UDPAddr + Server *net.UDPConn + Tracer trace.Tracer + + DB *gorm.DB + Redis redisClient + + CallTracker *calltracker.CallTracker +} + +// MakeServer creates a new DMR server. +func MakeServer(db *gorm.DB, redis *redis.Client, callTracker *calltracker.CallTracker) Server { + return Server{ + Buffer: make([]byte, largestMessageSize), + SocketAddress: net.UDPAddr{ + IP: net.ParseIP(config.GetConfig().ListenAddr), + Port: config.GetConfig().OpenBridgePort, + }, + DB: db, + Redis: makeRedisClient(redis), + CallTracker: callTracker, + Tracer: otel.Tracer("dmr-openbridge-server"), + } +} + +// Start starts the DMR server. +func (s *Server) Start(ctx context.Context) { + ctx, span := otel.Tracer("DMRHub").Start(ctx, "Server.Start") + defer span.End() + + server, err := net.ListenUDP("udp", &s.SocketAddress) + if err != nil { + klog.Exitf("Error opening UDP Socket", err) + } + + err = server.SetReadBuffer(bufferSize) + if err != nil { + klog.Exitf("Error opening UDP Socket", err) + } + err = server.SetWriteBuffer(bufferSize) + if err != nil { + klog.Exitf("Error opening UDP Socket", err) + } + + s.Server = server + + klog.Infof("OpenBridge Server listening at %s on port %d", s.SocketAddress.IP.String(), s.SocketAddress.Port) + + go s.listen(ctx) + go s.subcribeOutgoing(ctx) + + go func() { + for { + length, remoteaddr, err := s.Server.ReadFromUDP(s.Buffer) + if config.GetConfig().Debug { + klog.Infof("Read a message from %v\n", remoteaddr) + } + if err != nil { + klog.Warningf("Error reading from UDP Socket, Swallowing Error: %v", err) + continue + } + go func() { + p := models.RawDMRPacket{ + Data: s.Buffer[:length], + RemoteIP: remoteaddr.IP.String(), + RemotePort: remoteaddr.Port, + } + packedBytes, err := p.MarshalMsg(nil) + if err != nil { + klog.Errorf("Error marshalling packet", err) + return + } + s.Redis.Redis.Publish(ctx, "openbridge:incoming", packedBytes) + }() + } + }() +} + +// Stop stops the DMR server. +func (s *Server) Stop(ctx context.Context) { + ctx, span := otel.Tracer("DMRHub").Start(ctx, "Server.Stop") + defer span.End() +} + +func (s *Server) listen(ctx context.Context) { + ctx, span := otel.Tracer("DMRHub").Start(ctx, "Server.listen") + defer span.End() + + pubsub := s.Redis.Redis.Subscribe(ctx, "openbridge:incoming") + defer func() { + err := pubsub.Close() + if err != nil { + klog.Errorf("Error closing pubsub", err) + } + }() + for msg := range pubsub.Channel() { + var packet models.RawDMRPacket + _, err := packet.UnmarshalMsg([]byte(msg.Payload)) + if err != nil { + klog.Errorf("Error unmarshalling packet", err) + continue + } + s.handlePacket(ctx, &net.UDPAddr{ + IP: net.ParseIP(packet.RemoteIP), + Port: packet.RemotePort, + }, packet.Data) + } +} + +func (s *Server) subcribeOutgoing(ctx context.Context) { + pubsub := s.Redis.Redis.Subscribe(ctx, "openbridge:outgoing") + defer func() { + err := pubsub.Close() + if err != nil { + klog.Errorf("Error closing pubsub", err) + } + }() + for msg := range pubsub.Channel() { + packet, ok := models.UnpackPacket([]byte(msg.Payload)) + if !ok { + klog.Errorf("Error unpacking packet") + continue + } + peer, err := s.Redis.getPeer(ctx, packet.Repeater) + if err != nil { + klog.Errorf("Error getting peer %d from redis", packet.Repeater) + continue + } + _, err = s.Server.WriteToUDP(packet.Encode(), &net.UDPAddr{ + IP: net.ParseIP(peer.IP), + Port: peer.Port, + }) + if err != nil { + klog.Errorf("Error sending packet", err) + } + } +} + +func (s *Server) sendPacket(ctx context.Context, repeaterIDBytes uint, packet models.Packet) { + if packet.Signature != string(dmrconst.CommandDMRD) { + klog.Errorf("Invalid packet type: %s", packet.Signature) + return + } + + if config.GetConfig().Debug { + klog.Infof("Sending Packet: %s\n", packet.String()) + klog.Infof("Sending DMR packet to Repeater ID: %d", repeaterIDBytes) + } + repeater, err := s.Redis.getPeer(ctx, repeaterIDBytes) + if err != nil { + klog.Errorf("Error getting repeater from Redis", err) + return + } + p := models.RawDMRPacket{ + Data: packet.Encode(), + RemoteIP: repeater.IP, + RemotePort: repeater.Port, + } + packedBytes, err := p.MarshalMsg(nil) + if err != nil { + klog.Errorf("Error marshalling packet", err) + return + } + s.Redis.Redis.Publish(ctx, "openbridge:outgoing", packedBytes) +} + +func (s *Server) validateHMAC(ctx context.Context, packetBytes []byte, hmacBytes []byte, peer models.Peer) bool { + ctx, span := otel.Tracer("DMRHub").Start(ctx, "Server.validateHMAC") + defer span.End() + + h := hmac.New(sha1.New, []byte(peer.Password)) + _, err := h.Write(packetBytes) + if err != nil { + klog.Warningf("Error hashing OpenBridge packet: %s", err) + return false + } + if !hmac.Equal(h.Sum(nil), hmacBytes) { + klog.Warningf("Invalid OpenBridge HMAC") + return false + } + return true +} + +func (s *Server) handlePacket(ctx context.Context, remoteAddr *net.UDPAddr, data []byte) { + ctx, span := otel.Tracer("DMRHub").Start(ctx, "Server.handlePacket") + defer span.End() + + const signatureLength = 4 + + if len(data) != packetLength { + klog.Warningf("Invalid OpenBridge packet length: %d", len(data)) + return + } + + if dmrconst.Command(data[:signatureLength]) != dmrconst.CommandDMRD { + klog.Warningf("Unknown command: %s", data[:signatureLength]) + return + } + + packetBytes := data[:dmrconst.HBRPPacketLength] + hmacBytes := data[dmrconst.HBRPPacketLength:packetLength] + + packet, ok := models.UnpackPacket(packetBytes) + if !ok { + klog.Warningf("Invalid OpenBridge packet") + return + } + + if config.GetConfig().Debug { + klog.Infof("DMRD packet: %s", packet.String()) + } + + if packet.Slot { + // Drop TS2 packets on OpenBridge + klog.Warningf("Dropping TS2 packet from OpenBridge") + return + } + + peerIDBytes := data[11:15] + peerID := uint(binary.BigEndian.Uint32(peerIDBytes)) + if config.GetConfig().Debug { + klog.Infof("DMR Data from Peer ID: %d", peerID) + } + + if !models.PeerIDExists(s.DB, peerID) { + klog.Warningf("Unknown peer ID: %d", peerID) + return + } + + peer := models.FindPeerByID(s.DB, peerID) + + if !s.validateHMAC(ctx, packetBytes, hmacBytes, peer) { + klog.Warningf("Invalid OpenBridge HMAC") + return + } + + isVoice, _ := utils.CheckPacketType(packet) + + if packet.Dst == 0 { + return + } + + if peer.Egress { + s.TrackCall(ctx, packet, isVoice) + s.sendPacket(ctx, packet.Dst, packet) + } +} + +func (s *Server) TrackCall(ctx context.Context, packet models.Packet, isVoice bool) { + ctx, span := otel.Tracer("DMRHub").Start(ctx, "Server.TrackCall") + defer span.End() + + // Don't call track unlink + if isVoice { + go func() { + if !s.CallTracker.IsCallActive(ctx, packet) { + s.CallTracker.StartCall(ctx, packet) + } + if s.CallTracker.IsCallActive(ctx, packet) { + s.CallTracker.ProcessCallPacket(ctx, packet) + if packet.FrameType == dmrconst.FrameDataSync && dmrconst.DataType(packet.DTypeOrVSeq) == dmrconst.DTypeVoiceTerm { + s.CallTracker.EndCall(ctx, packet) + } + } + }() + } +} diff --git a/internal/dmr/servers/openbridge/server_test.go b/internal/dmr/servers/openbridge/server_test.go new file mode 100644 index 000000000..a5e4ca285 --- /dev/null +++ b/internal/dmr/servers/openbridge/server_test.go @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// DMRHub - Run a DMR network server in a single binary +// Copyright (C) 2023 Jacob McSwain +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . +// +// The source code is available at + +package openbridge_test + +import ( + "testing" +) + +func TestNoop(t *testing.T) { + t.Parallel() + t.Log("Noop") +} diff --git a/internal/dmr/servers/openbridge/subscriptions_manager.go b/internal/dmr/servers/openbridge/subscriptions_manager.go new file mode 100644 index 000000000..b0e6f0408 --- /dev/null +++ b/internal/dmr/servers/openbridge/subscriptions_manager.go @@ -0,0 +1,151 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// DMRHub - Run a DMR network server in a single binary +// Copyright (C) 2023 Jacob McSwain +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . +// +// The source code is available at + +package openbridge + +import ( + "context" + "sync" + + "github.com/USA-RedDragon/DMRHub/internal/config" + "github.com/USA-RedDragon/DMRHub/internal/db/models" + "github.com/redis/go-redis/v9" + "go.opentelemetry.io/otel" + "k8s.io/klog/v2" +) + +var subscriptionManager *SubscriptionManager //nolint:golint,gochecknoglobals + +type SubscriptionManager struct { + subscriptions map[uint]context.CancelFunc + subscriptionsMutex *sync.RWMutex + subscriptionCancelMutex map[uint]*sync.RWMutex +} + +func GetSubscriptionManager() *SubscriptionManager { + if subscriptionManager == nil { + subscriptionManager = &SubscriptionManager{ + subscriptions: make(map[uint]context.CancelFunc), + subscriptionsMutex: &sync.RWMutex{}, + subscriptionCancelMutex: make(map[uint]*sync.RWMutex), + } + } + return subscriptionManager +} + +func (m *SubscriptionManager) CancelSubscription(p models.Peer) { + m.subscriptionsMutex.RLock() + m.subscriptionCancelMutex[p.ID].RLock() + cancel, ok := m.subscriptions[p.ID] + m.subscriptionCancelMutex[p.ID].RUnlock() + m.subscriptionsMutex.RUnlock() + if ok { + m.subscriptionsMutex.Lock() + m.subscriptionCancelMutex[p.ID].Lock() + delete(m.subscriptions, p.ID) + m.subscriptionCancelMutex[p.ID].Unlock() + delete(m.subscriptionCancelMutex, p.ID) + m.subscriptionsMutex.Unlock() + cancel() + } +} + +func (m *SubscriptionManager) Subscribe(ctx context.Context, redis *redis.Client, p models.Peer) { + ctx, span := otel.Tracer("DMRHub").Start(ctx, "Server.handlePacket") + defer span.End() + + if !p.Ingress { + return + } + m.subscriptionsMutex.RLock() + _, ok := m.subscriptions[p.ID] + m.subscriptionsMutex.RUnlock() + if !ok { + newCtx, cancel := context.WithCancel(context.Background()) + m.subscriptionsMutex.Lock() + _, ok = m.subscriptionCancelMutex[p.ID] + if !ok { + m.subscriptionCancelMutex[p.ID] = &sync.RWMutex{} + } + m.subscriptionCancelMutex[p.ID].Lock() + m.subscriptions[p.ID] = cancel + m.subscriptionCancelMutex[p.ID].Unlock() + m.subscriptionsMutex.Unlock() + go m.subscribe(newCtx, redis, p) //nolint:golint,contextcheck + } +} + +func (m *SubscriptionManager) subscribe(ctx context.Context, redis *redis.Client, p models.Peer) { + if config.GetConfig().Debug { + klog.Infof("Listening for calls on peer %d", p.ID) + } + pubsub := redis.Subscribe(ctx, "openbridge:packets") + defer func() { + err := pubsub.Unsubscribe(ctx, "openbridge:packets") + if err != nil { + klog.Errorf("Error unsubscribing from openbridge:packets: %s", err) + } + err = pubsub.Close() + if err != nil { + klog.Errorf("Error closing pubsub connection: %s", err) + } + }() + pubsubChannel := pubsub.Channel() + + for { + select { + case <-ctx.Done(): + if config.GetConfig().Debug { + klog.Info("Context canceled, stopping subscription to openbridge:packets") + } + m.subscriptionsMutex.Lock() + _, ok := m.subscriptionCancelMutex[p.ID] + if ok { + m.subscriptionCancelMutex[p.ID].Lock() + } + delete(m.subscriptions, p.ID) + if ok { + m.subscriptionCancelMutex[p.ID].Unlock() + delete(m.subscriptionCancelMutex, p.ID) + } + m.subscriptionsMutex.Unlock() + return + case msg := <-pubsubChannel: + rawPacket := models.RawDMRPacket{} + _, err := rawPacket.UnmarshalMsg([]byte(msg.Payload)) + if err != nil { + klog.Errorf("Failed to unmarshal raw packet: %s", err) + continue + } + packet, ok := models.UnpackPacket(rawPacket.Data) + if !ok { + klog.Errorf("Failed to unpack packet: %s", err) + continue + } + + if packet.Repeater == p.ID { + continue + } + + packet.Repeater = p.ID + packet.Slot = false + redis.Publish(ctx, "openbridge:outgoing", packet.Encode()) + } + } +} diff --git a/internal/http/api/apimodels/peer.go b/internal/http/api/apimodels/peer.go new file mode 100644 index 000000000..bf2083d69 --- /dev/null +++ b/internal/http/api/apimodels/peer.go @@ -0,0 +1,26 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// DMRHub - Run a DMR network server in a single binary +// Copyright (C) 2023 Jacob McSwain +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . +// +// The source code is available at + +package apimodels + +type PeerPost struct { + ID uint `json:"id" binding:"required"` + Ingress bool `json:"ingress"` + Egress bool `json:"egress"` +} diff --git a/internal/http/api/controllers/v1/peers/peers.go b/internal/http/api/controllers/v1/peers/peers.go new file mode 100644 index 000000000..86804e3e1 --- /dev/null +++ b/internal/http/api/controllers/v1/peers/peers.go @@ -0,0 +1,225 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// DMRHub - Run a DMR network server in a single binary +// Copyright (C) 2023 Jacob McSwain +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . +// +// The source code is available at + +package peers + +import ( + "net/http" + "strconv" + + "github.com/USA-RedDragon/DMRHub/internal/db/models" + "github.com/USA-RedDragon/DMRHub/internal/dmr/servers/openbridge" + "github.com/USA-RedDragon/DMRHub/internal/http/api/apimodels" + "github.com/USA-RedDragon/DMRHub/internal/http/api/utils" + "github.com/gin-contrib/sessions" + "github.com/gin-gonic/gin" + "github.com/redis/go-redis/v9" + "gorm.io/gorm" + "k8s.io/klog/v2" +) + +const ( + LinkTypeDynamic = "dynamic" + LinkTypeStatic = "static" +) + +func GETPeers(c *gin.Context) { + db, ok := c.MustGet("PaginatedDB").(*gorm.DB) + if !ok { + klog.Errorf("Unable to get DB from context") + c.JSON(http.StatusInternalServerError, gin.H{"error": "Try again later"}) + return + } + cDb, ok := c.MustGet("DB").(*gorm.DB) + if !ok { + klog.Errorf("Unable to get DB from context") + c.JSON(http.StatusInternalServerError, gin.H{"error": "Try again later"}) + return + } + peers := models.ListPeers(db) + count := models.CountPeers(cDb) + c.JSON(http.StatusOK, gin.H{"total": count, "peers": peers}) +} + +func GETMyPeers(c *gin.Context) { + db, ok := c.MustGet("PaginatedDB").(*gorm.DB) + if !ok { + klog.Errorf("Unable to get DB from context") + c.JSON(http.StatusInternalServerError, gin.H{"error": "Try again later"}) + return + } + cDb, ok := c.MustGet("DB").(*gorm.DB) + if !ok { + klog.Errorf("Unable to get DB from context") + c.JSON(http.StatusInternalServerError, gin.H{"error": "Try again later"}) + return + } + session := sessions.Default(c) + + userID := session.Get("user_id") + if userID == nil { + klog.Error("userID not found") + c.JSON(http.StatusUnauthorized, gin.H{"error": "Authentication failed"}) + return + } + + uid, ok := userID.(uint) + if !ok { + klog.Errorf("Unable to convert userID to uint: %v", userID) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Try again later"}) + return + } + + // Get all peers owned by user + peers := models.GetUserPeers(db, uid) + if db.Error != nil { + klog.Errorf("Error getting peers owned by user %d: %v", userID, db.Error) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Error getting peers owned by user"}) + return + } + + count := models.CountUserPeers(cDb, uid) + + c.JSON(http.StatusOK, gin.H{"total": count, "peers": peers}) +} + +func GETPeer(c *gin.Context) { + db, ok := c.MustGet("DB").(*gorm.DB) + if !ok { + klog.Errorf("Unable to get DB from context") + c.JSON(http.StatusInternalServerError, gin.H{"error": "Try again later"}) + return + } + id := c.Param("id") + // Convert string id into uint + peerID, err := strconv.ParseUint(id, 10, 32) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid peer ID"}) + return + } + if models.PeerIDExists(db, uint(peerID)) { + peer := models.FindPeerByID(db, uint(peerID)) + c.JSON(http.StatusOK, peer) + } else { + c.JSON(http.StatusBadRequest, gin.H{"error": "Peer does not exist"}) + } +} + +func DELETEPeer(c *gin.Context) { + db, ok := c.MustGet("DB").(*gorm.DB) + if !ok { + klog.Errorf("Unable to get DB from context") + c.JSON(http.StatusInternalServerError, gin.H{"error": "Try again later"}) + return + } + idUint64, err := strconv.ParseUint(c.Param("id"), 10, 32) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid peer ID"}) + return + } + models.DeletePeer(db, uint(idUint64)) + if db.Error != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": db.Error.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{"message": "Peer deleted"}) +} + +func POSTPeer(c *gin.Context) { + session := sessions.Default(c) + usID := session.Get("user_id") + if usID == nil { + klog.Error("userID not found") + c.JSON(http.StatusUnauthorized, gin.H{"error": "Authentication failed"}) + } + userID, ok := usID.(uint) + if !ok { + klog.Error("userID cast failed") + c.JSON(http.StatusUnauthorized, gin.H{"error": "Authentication failed"}) + } + db, ok := c.MustGet("DB").(*gorm.DB) + if !ok { + klog.Error("DB cast failed") + c.JSON(http.StatusInternalServerError, gin.H{"error": "Try again later"}) + return + } + redis, ok := c.MustGet("Redis").(*redis.Client) + if !ok { + klog.Error("Redis cast failed") + c.JSON(http.StatusInternalServerError, gin.H{"error": "Try again later"}) + return + } + + var user models.User + db.First(&user, userID) + if db.Error != nil { + klog.Errorf("Error getting user %d: %v", userID, db.Error) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Error getting user"}) + return + } + + var json apimodels.PeerPost + err := c.ShouldBindJSON(&json) + if err != nil { + klog.Errorf("POSTPeer: JSON data is invalid: %v", err) + c.JSON(http.StatusBadRequest, gin.H{"error": "JSON data is invalid"}) + } else { + if models.PeerIDExists(db, json.ID) { + klog.Errorf("POSTPeer: Peer ID already exists: %v", json.ID) + c.JSON(http.StatusBadRequest, gin.H{"error": "Peer ID already exists"}) + return + } + + var peer models.Peer + + peer.Egress = json.Egress + peer.Ingress = json.Ingress + + // Peer validated to fit within a 4 byte integer + if json.ID <= 0 || json.ID > 4294967295 { + klog.Errorf("POSTPeer: Peer ID is invalid: %v", json.ID) + c.JSON(http.StatusBadRequest, gin.H{"error": "Peer ID is invalid"}) + return + } + + peer.ID = json.ID + + // Generate a random password of 12 characters + const randLen = 12 + const randNum = 1 + const randSpecial = 2 + peer.Password, err = utils.RandomPassword(randLen, randNum, randSpecial) + if err != nil { + klog.Errorf("Failed to generate a peer password %v", err) + c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to generate a peer password"}) + return + } + + // Find user by userID + peer.Owner = user + peer.OwnerID = user.ID + db.Preload("Owner").Create(&peer) + if db.Error != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": db.Error.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{"message": "Peer created", "password": peer.Password}) + go openbridge.GetSubscriptionManager().Subscribe(c.Request.Context(), redis, peer) + } +} diff --git a/internal/http/api/controllers/v1/peers/peers_test.go b/internal/http/api/controllers/v1/peers/peers_test.go new file mode 100644 index 000000000..284839f75 --- /dev/null +++ b/internal/http/api/controllers/v1/peers/peers_test.go @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// DMRHub - Run a DMR network server in a single binary +// Copyright (C) 2023 Jacob McSwain +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . +// +// The source code is available at + +package peers_test + +import ( + "testing" +) + +func TestNoop(t *testing.T) { + t.Parallel() + t.Log("Noop") +} diff --git a/internal/http/api/middleware/auth.go b/internal/http/api/middleware/auth.go index 9cf9c1f63..3d97fab6c 100644 --- a/internal/http/api/middleware/auth.go +++ b/internal/http/api/middleware/auth.go @@ -269,6 +269,65 @@ func RequireLogin() gin.HandlerFunc { } } +func RequirePeerOwnerOrAdmin() gin.HandlerFunc { + return func(c *gin.Context) { + session := sessions.Default(c) + id := c.Param("id") + userID := session.Get("user_id") + if userID == nil { + if config.GetConfig().Debug { + klog.Error("RequirePeerOwnerOrAdmin: Failed to get user_id from session") + } + c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "Authentication failed"}) + return + } + uid, ok := userID.(uint) + if !ok { + klog.Error("RequirePeerOwnerOrAdmin: Unable to convert user_id to uint") + c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "Authentication failed"}) + return + } + ctx := c.Request.Context() + span := trace.SpanFromContext(ctx) + if span.IsRecording() { + span.SetAttributes( + attribute.String("http.auth", "RequirePeerOwnerOrAdmin"), + attribute.Int("user.id", int(uid)), + ) + } + + valid := false + db, ok := c.MustGet("DB").(*gorm.DB) + if !ok { + klog.Error("RequirePeerOwnerOrAdmin: Unable to get DB from context") + c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "Authentication failed"}) + return + } + db = db.WithContext(ctx) + // Open up the DB and check if the user is an admin or if they own peer with id = id + var user models.User + db.Find(&user, "id = ?", uid) + if span.IsRecording() { + span.SetAttributes( + attribute.Bool("user.admin", user.Admin), + ) + } + if user.Approved && !user.Suspended && user.Admin { + valid = true + } else { + var peer models.Peer + db.Find(&peer, "radio_id = ?", id) + if peer.OwnerID == user.ID && !user.Suspended && user.Approved { + valid = true + } + } + + if !valid { + c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "Authentication failed"}) + } + } +} + func RequireRepeaterOwnerOrAdmin() gin.HandlerFunc { return func(c *gin.Context) { session := sessions.Default(c) diff --git a/internal/http/api/routes.go b/internal/http/api/routes.go index ce317665e..f036080b7 100644 --- a/internal/http/api/routes.go +++ b/internal/http/api/routes.go @@ -23,6 +23,7 @@ import ( v1Controllers "github.com/USA-RedDragon/DMRHub/internal/http/api/controllers/v1" v1AuthControllers "github.com/USA-RedDragon/DMRHub/internal/http/api/controllers/v1/auth" v1LastheardControllers "github.com/USA-RedDragon/DMRHub/internal/http/api/controllers/v1/lastheard" + v1PeersControllers "github.com/USA-RedDragon/DMRHub/internal/http/api/controllers/v1/peers" v1RepeatersControllers "github.com/USA-RedDragon/DMRHub/internal/http/api/controllers/v1/repeaters" v1TalkgroupsControllers "github.com/USA-RedDragon/DMRHub/internal/http/api/controllers/v1/talkgroups" v1UsersControllers "github.com/USA-RedDragon/DMRHub/internal/http/api/controllers/v1/users" @@ -85,6 +86,15 @@ func v1(group *gin.RouterGroup, userSuspension gin.HandlerFunc) { v1Users.PATCH("/:id", middleware.RequireSelfOrAdmin(), userSuspension, v1UsersControllers.PATCHUser) v1Users.DELETE("/:id", middleware.RequireSuperAdmin(), userSuspension, v1UsersControllers.DELETEUser) + v1Peers := group.Group("/peers") + // Paginated + v1Peers.GET("", middleware.RequireAdmin(), v1PeersControllers.GETPeers) + // Paginated + v1Peers.GET("/my", middleware.RequireLogin(), v1PeersControllers.GETMyPeers) + v1Peers.POST("", middleware.RequireLogin(), v1PeersControllers.POSTPeer) + v1Peers.GET("/:id", middleware.RequireLogin(), v1PeersControllers.GETPeer) + v1Peers.DELETE("/:id", middleware.RequirePeerOwnerOrAdmin(), v1PeersControllers.DELETEPeer) + v1Lastheard := group.Group("/lastheard") // Returns the lastheard data for the server, adds personal data if logged in // Paginated diff --git a/internal/http/frontend/cypress/support/e2e.js b/internal/http/frontend/cypress/support/e2e.js index eab64c703..8c3152479 100644 --- a/internal/http/frontend/cypress/support/e2e.js +++ b/internal/http/frontend/cypress/support/e2e.js @@ -14,6 +14,9 @@ // *********************************************************** import '@cypress/code-coverage/support'; import 'cypress-mochawesome-reporter/register'; +import { registerCommand } from 'cypress-wait-for-stable-dom'; + +registerCommand(); // Import commands.js using ES2015 syntax: import './commands'; diff --git a/internal/http/frontend/package-lock.json b/internal/http/frontend/package-lock.json index 89150a87c..8854b9849 100644 --- a/internal/http/frontend/package-lock.json +++ b/internal/http/frontend/package-lock.json @@ -30,6 +30,7 @@ "cypress": "^12.6.0", "cypress-mochawesome-reporter": "^3.3.0", "cypress-multi-reporters": "^1.6.2", + "cypress-wait-for-stable-dom": "^0.1.0", "eslint": "^8.34.0", "eslint-config-google": "^0.14.0", "eslint-plugin-cypress": "^2.12.1", @@ -4219,6 +4220,12 @@ "mocha": ">=3.1.2" } }, + "node_modules/cypress-wait-for-stable-dom": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/cypress-wait-for-stable-dom/-/cypress-wait-for-stable-dom-0.1.0.tgz", + "integrity": "sha512-iVJc6CDzlu1xUnTcZph+zbkOlImaDelpvRv4G+3naugvjkF6b9EFpDmRCC/16xL1pqpkFq4rFyfhuNw4C3PQjw==", + "dev": true + }, "node_modules/dashdash": { "version": "1.14.1", "resolved": "https://registry.npmjs.org/dashdash/-/dashdash-1.14.1.tgz", @@ -12675,6 +12682,12 @@ "lodash": "^4.17.15" } }, + "cypress-wait-for-stable-dom": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/cypress-wait-for-stable-dom/-/cypress-wait-for-stable-dom-0.1.0.tgz", + "integrity": "sha512-iVJc6CDzlu1xUnTcZph+zbkOlImaDelpvRv4G+3naugvjkF6b9EFpDmRCC/16xL1pqpkFq4rFyfhuNw4C3PQjw==", + "dev": true + }, "dashdash": { "version": "1.14.1", "resolved": "https://registry.npmjs.org/dashdash/-/dashdash-1.14.1.tgz", diff --git a/internal/http/frontend/package.json b/internal/http/frontend/package.json index cb4140ea4..8259fec06 100644 --- a/internal/http/frontend/package.json +++ b/internal/http/frontend/package.json @@ -6,6 +6,7 @@ "dev": "vite --host", "build": "vite build", "preview": "vite preview", + "screenshot": "start-server-and-test preview :4173 \"cypress run --e2e --headed --browser chrome --config excludeSpecPattern='**/no-op' --spec tests/e2e/screenshots/**.cy.js\"", "test:e2e": "start-server-and-test preview :4173 'cypress run --e2e --headed'", "test:e2e:chrome": "start-server-and-test preview :4173 'cypress run --e2e --headed --browser chrome'", "test:e2e:firefox": "start-server-and-test preview :4173 'cypress run --e2e --headed --browser firefox'", @@ -39,6 +40,7 @@ "cypress": "^12.6.0", "cypress-mochawesome-reporter": "^3.3.0", "cypress-multi-reporters": "^1.6.2", + "cypress-wait-for-stable-dom": "^0.1.0", "eslint": "^8.34.0", "eslint-config-google": "^0.14.0", "eslint-plugin-cypress": "^2.12.1", @@ -53,4 +55,4 @@ "vite": "^4.1.4", "vite-plugin-istanbul": "^4.0.0" } -} +} \ No newline at end of file diff --git a/internal/http/frontend/src/components/AppHeader.vue b/internal/http/frontend/src/components/AppHeader.vue index 585a4411a..7454bab8e 100644 --- a/internal/http/frontend/src/components/AppHeader.vue +++ b/internal/http/frontend/src/components/AppHeader.vue @@ -27,9 +27,53 @@