Skip to content

Commit 4fa2168

Browse files
committed
update proto and implement slots
1 parent 0cc9941 commit 4fa2168

File tree

6 files changed

+952
-535
lines changed

6 files changed

+952
-535
lines changed

client-web/proto/sfu.proto

Lines changed: 101 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,129 @@
11
syntax = "proto3";
2-
32
package sfu;
43

5-
// Represents the kind of media track.
6-
enum TrackKind {
7-
TRACK_KIND_UNSPECIFIED = 0;
8-
VIDEO = 1;
9-
AUDIO = 2;
10-
}
4+
// -------------------------------------
5+
// Common Types
6+
// -------------------------------------
117

12-
// --- Client to Server Messages ---
8+
// Media configuration for a participant's stream
9+
message MediaConfig {
10+
bool audio = 1; // True if audio is active
11+
bool video = 2; // True if video is active
12+
}
1313

14-
message ClientSubscribePayload {
15-
string mid = 1; // The client's MID (transceiver slot) to use for this track.
16-
string remote_track_id = 2; // The application-level ID of the remote track to subscribe to.
14+
// Video quality preferences
15+
message VideoSettings {
16+
int32 max_height = 1; // Maximum height for the video stream
1717
}
1818

19-
message ClientUnsubscribePayload {
20-
string mid = 1; // The client's MID (transceiver slot) to unsubscribe from.
19+
// State of a participant's stream
20+
message ParticipantStream {
21+
string participant_id = 1; // Unique SFU-internal ID
22+
string external_participant_id = 2; // Developer-provided user ID
23+
optional MediaConfig media = 3; // Media state (if unset, participant has left)
2124
}
2225

23-
// ClientMessage encapsulates all possible messages from client to SFU.
26+
// -------------------------------------
27+
// Client → Server
28+
// -------------------------------------
29+
30+
// Client message wrapper with sequence number for reliability
2431
message ClientMessage {
25-
oneof payload {
26-
ClientSubscribePayload subscribe = 1;
27-
ClientUnsubscribePayload unsubscribe = 2;
32+
uint32 sequence = 1; // Sequence number for message ordering/acknowledgment
33+
oneof msg {
34+
PublishIntent publish_intent = 2;
35+
VideoSubscription video_subscription = 3;
2836
}
2937
}
3038

39+
// Intent to publish media
40+
message PublishIntent {
41+
MediaConfig media = 1; // Media to publish (audio/video)
42+
}
43+
44+
// Subscriptions for receiving media from specific participants
45+
message VideoSubscription {
46+
repeated ParticipantSubscription subscriptions = 1;
47+
}
48+
49+
// Subscription for a specific participant's video
50+
message ParticipantSubscription {
51+
string participant_id = 1;
52+
VideoSettings video_settings = 2;
53+
}
54+
55+
// -------------------------------------
56+
// Server → Client
57+
// -------------------------------------
3158

32-
// --- Server to Client Messages ---
59+
// Server message wrapper
60+
message ServerMessage {
61+
oneof msg {
62+
RoomSnapshot room_snapshot = 1;
63+
StreamStateUpdate stream_update = 2;
64+
ActiveSpeakersUpdate active_speakers = 3;
65+
MessageAck message_ack = 4;
66+
ConnectionQuality connection_quality = 5;
67+
ErrorNotification error = 6;
68+
}
69+
}
3370

34-
message TrackInfo {
35-
string track_id = 1; // The ID of the newly available remote track.
36-
TrackKind kind = 2; // The kind of track.
37-
string participant_id = 3; // The ID of the participant who published this track.
38-
// map<string, string> metadata = 4; // Optional: any other app-specific metadata about the track.
71+
// Full snapshot of the room state
72+
message RoomSnapshot {
73+
repeated ParticipantStream participants = 1;
74+
string room_id = 2; // Room identifier for verification
3975
}
4076

41-
message TrackSwitchInfo {
42-
string mid = 2; // The client's MID that the SFU will use (confirming client's request).
43-
optional TrackInfo remote_track = 3;
77+
// Incremental update of a participant's stream
78+
message StreamStateUpdate {
79+
ParticipantStream participant_stream = 1;
4480
}
4581

46-
message TrackPublishedPayload {
47-
repeated TrackInfo remote_tracks = 1;
82+
// Current active speakers in the room
83+
message ActiveSpeakersUpdate {
84+
repeated string participant_ids = 1; // Ordered by speaking activity (most active first)
85+
uint64 timestamp = 2; // Server timestamp for this update
4886
}
4987

50-
message TrackUnpublishedPayload {
51-
repeated string remote_track_ids = 1; // The ID of the remote track that is no longer available.
88+
// Connection quality metrics for adaptive streaming
89+
message ConnectionQuality {
90+
string participant_id = 1; // Which participant this applies to (empty = self)
91+
Quality quality = 2; // Connection quality level
92+
optional uint32 rtt_ms = 3; // Round-trip time in milliseconds
5293
}
5394

54-
message TrackSwitchedPayload {
55-
repeated TrackSwitchInfo switches = 1;
95+
// Message acknowledgment (only for client message processing)
96+
message MessageAck {
97+
uint32 sequence = 1; // Sequence number being acknowledged
98+
bool success = 2; // Whether the message was processed successfully
99+
optional string message = 3; // Error details if success is false
56100
}
57101

58-
message ErrorPayload {
59-
string description = 1; // General error message from the SFU.
102+
// Server-initiated error notification
103+
message ErrorNotification {
104+
ErrorType type = 1; // Type of error
105+
string message = 2; // Human-readable error message
106+
bool fatal = 3; // Whether client should disconnect
60107
}
61108

62-
// ServerMessage encapsulates all possible messages from SFU to client.
63-
message ServerMessage {
64-
oneof payload {
65-
ErrorPayload error = 1; // General error from SFU.
66-
TrackPublishedPayload track_published = 2; // SFU informs client a new remote track is available.
67-
TrackUnpublishedPayload track_unpublished = 3; // SFU informs client a remote track is no longer available.
68-
TrackSwitchedPayload track_switched = 4; // SFU confirms track switching for a mid
69-
}
109+
// Connection quality levels
110+
enum Quality {
111+
EXCELLENT = 0;
112+
GOOD = 1;
113+
FAIR = 2;
114+
POOR = 3;
115+
DISCONNECTED = 4;
116+
}
117+
118+
// Server-initiated error types
119+
enum ErrorType {
120+
ROOM_CLOSED = 0; // Room was closed by moderator/system
121+
PARTICIPANT_KICKED = 1; // Participant was removed from room
122+
SERVER_SHUTDOWN = 2; // Server is shutting down
123+
ROOM_CAPACITY_CHANGED = 3; // Room capacity reduced, participant removed
124+
AUTHENTICATION_EXPIRED = 4; // Auth token expired
125+
DUPLICATE_CONNECTION = 5; // Same participant connected elsewhere
126+
PROTOCOL_VIOLATION = 6; // Client sent malformed/invalid messages
127+
UNKNOWN_ERROR = 7;
70128
}
129+

client-web/src/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
export { MyElement } from "./my-element.ts";
21
import { ClientCore } from "./lib";
32

43
(async () => {

client-web/src/lib/core.ts

Lines changed: 75 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,19 @@
1-
import { ClientMessage, ServerMessage, TrackInfo } from "./sfu.ts";
1+
import { ClientMessage, MediaConfig, ServerMessage } from "./sfu.ts";
22

3-
const MAX_DOWNSTREAMS = 9;
4-
5-
type MID = string;
3+
const MAX_DOWNSTREAMS = 16;
4+
const LAST_N_AUDIO = 3;
65

76
// Internal Ids
87
type ParticipantId = string;
9-
type TrackId = string;
108

11-
interface Slot {
12-
transceiver: RTCRtpTransceiver;
13-
track: MediaStreamTrack;
14-
info?: TrackInfo;
9+
interface VideoSlot {
10+
trans: RTCRtpTransceiver;
11+
participantId?: ParticipantId;
1512
}
1613

17-
interface ParticipantSlot {
18-
video?: MID;
19-
audio?: MID;
14+
interface ParticipantMeta {
15+
externalParticipantId: string;
16+
media?: MediaConfig;
2017
}
2118

2219
export interface ClientCoreConfig {
@@ -32,12 +29,13 @@ export class ClientCore {
3229
#audioSender: RTCRtpTransceiver;
3330
#closed: boolean;
3431

35-
#slots: Record<MID, Slot>;
36-
#participantSlots: ParticipantSlot[];
37-
#availableTracks: Record<ParticipantId, Record<TrackId, TrackInfo>>;
32+
#videoSlots: VideoSlot[];
33+
#audioSlots: RTCRtpTransceiver[];
34+
35+
#participants: Record<ParticipantId, ParticipantMeta>;
3836

39-
onStateChanged = (state: RTCPeerConnectionState) => { };
40-
onTrack = (track: RTCPeerConnection) => { };
37+
onStateChanged = (state: RTCPeerConnectionState) => {};
38+
onTrack = (track: RTCPeerConnection) => {};
4139

4240
constructor(cfg: ClientCoreConfig) {
4341
this.#sfuUrl = cfg.sfuUrl;
@@ -46,9 +44,9 @@ export class ClientCore {
4644
0,
4745
);
4846
this.#closed = false;
49-
this.#slots = {};
50-
this.#availableTracks = {};
51-
this.#participantSlots = [];
47+
this.#videoSlots = [];
48+
this.#audioSlots = [];
49+
this.#participants = {};
5250

5351
this.#pc = new RTCPeerConnection();
5452
this.#pc.onconnectionstatechange = () => {
@@ -66,48 +64,6 @@ export class ClientCore {
6664
}
6765
};
6866

69-
this.#pc.ontrack = (event: RTCTrackEvent) => {
70-
const mid = event.transceiver?.mid;
71-
const track = event.track;
72-
const transceiver = event.transceiver;
73-
if (!mid || !track) {
74-
this.#close("Received track event without MID or track object.");
75-
return;
76-
}
77-
78-
console.log(event);
79-
this.#slots[mid] = {
80-
track,
81-
transceiver,
82-
};
83-
84-
if (track.kind === "video") {
85-
for (const slot of this.#participantSlots) {
86-
if (!slot.video) {
87-
slot.video = mid;
88-
return;
89-
}
90-
}
91-
92-
this.#participantSlots.push({
93-
video: mid,
94-
});
95-
} else if (track.kind === "audio") {
96-
for (const slot of this.#participantSlots) {
97-
if (!slot.audio) {
98-
slot.audio = mid;
99-
return;
100-
}
101-
}
102-
103-
this.#participantSlots.push({
104-
audio: mid,
105-
});
106-
} else {
107-
console.warn("unknown track kind, ignoring:", track.kind);
108-
}
109-
};
110-
11167
// SFU RPC DataChannel
11268
this.#rpc = this.#pc.createDataChannel("pulsebeam::rpc");
11369
this.#rpc.binaryType = "arraybuffer";
@@ -116,22 +72,39 @@ export class ClientCore {
11672
const serverMessage = ServerMessage.fromBinary(
11773
new Uint8Array(event.data as ArrayBuffer),
11874
);
119-
const payload = serverMessage.payload;
120-
const payloadKind = payload.oneofKind;
121-
if (!payloadKind) {
75+
const msg = serverMessage.msg;
76+
const msgKind = msg.oneofKind;
77+
if (!msgKind) {
12278
console.warn("Received SFU message with undefined payload kind.");
12379
return;
12480
}
12581

126-
switch (payloadKind) {
127-
case "trackPublished":
128-
break;
129-
case "trackUnpublished":
82+
switch (msgKind) {
83+
case "roomSnapshot":
84+
for (const participant of msg.roomSnapshot.participants) {
85+
this.#participants[participant.participantId] = {
86+
externalParticipantId: participant.externalParticipantId,
87+
media: participant.media,
88+
};
89+
}
13090
break;
131-
case "trackSwitched":
91+
case "streamUpdate":
92+
if (msg.streamUpdate.participantStream) {
93+
const stream = msg.streamUpdate.participantStream;
94+
if (stream.participantId in this.#participants) {
95+
const participant = this.#participants[stream.participantId];
96+
participant.media = stream.media;
97+
participant.externalParticipantId =
98+
stream.externalParticipantId;
99+
} else {
100+
this.#participants[stream.participantId] = {
101+
externalParticipantId: stream.externalParticipantId,
102+
media: stream.media,
103+
};
104+
}
105+
}
132106
break;
133107
}
134-
135108
// TODO: implement this
136109
} catch (e: any) {
137110
this.#close(`Error processing SFU RPC message: ${e}`);
@@ -152,13 +125,14 @@ export class ClientCore {
152125
direction: "sendonly",
153126
});
154127

155-
for (let i = 0; i < maxDownstreams; i++) {
156-
// ontrack will be fired with acknowledgement from the server
157-
this.#pc.addTransceiver("video", {
128+
for (let i = 0; i < LAST_N_AUDIO; i++) {
129+
this.#pc.addTransceiver("audio", {
158130
direction: "recvonly",
159131
});
132+
}
160133

161-
this.#pc.addTransceiver("audio", {
134+
for (let i = 0; i < maxDownstreams; i++) {
135+
this.#pc.addTransceiver("video", {
162136
direction: "recvonly",
163137
});
164138
}
@@ -182,6 +156,13 @@ export class ClientCore {
182156
throw new Error(errorMessage); // More direct feedback to developer
183157
}
184158

159+
if (this.#pc.connectionState != "new") {
160+
const errorMessage =
161+
"This client instance has been initiated and cannot be reused.";
162+
console.error(errorMessage);
163+
throw new Error(errorMessage); // More direct feedback to developer
164+
}
165+
185166
try {
186167
const offer = await this.#pc.createOffer();
187168
await this.#pc.setLocalDescription(offer);
@@ -203,7 +184,25 @@ export class ClientCore {
203184
type: "answer",
204185
sdp: await response.text(),
205186
});
206-
// Status transitions to "connected" will be handled by onconnectionstatechange and data channel onopen events.
187+
188+
// https://blog.mozilla.org/webrtc/rtcrtptransceiver-explored/
189+
// transceivers order is stable, and mid is only defined after setLocalDescription
190+
const transceivers = this.#pc.getTransceivers();
191+
for (const trans of transceivers) {
192+
if (trans.direction === "sendonly") {
193+
continue;
194+
}
195+
196+
if (trans.receiver.track.kind === "audio") {
197+
this.#audioSlots.push(trans);
198+
} else if (trans.receiver.track.kind === "video") {
199+
this.#videoSlots.push({
200+
trans,
201+
});
202+
}
203+
}
204+
205+
// Status transitions to "connected" will be handled by onconnectionstatechange
207206
} catch (error: any) {
208207
this.#close(
209208
error.message || "Signaling process failed unexpectedly.",

0 commit comments

Comments
 (0)