Skip to content

wip transcription test #1102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/ninety-kangaroos-kneel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@livekit/components-core": patch
"@livekit/components-react": minor
"@livekit/components-styles": patch
---

Add support for datastream based chat
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
"@livekit/changesets-changelog-github": "^0.0.4",
"@rushstack/heft": "^0.68.0",
"glob": "^11.0.0",
"globals": "^15.14.0",
"husky": "^8.0.3",
"nodemon": "^3.0.3",
"prettier": "^3.2.5",
"turbo": "^2.1.1",
"typescript": "5.7.3"
"typescript": "5.7.3",
"typescript-eslint": "^8.24.0"
},
"engines": {
"node": ">=18"
Expand Down
26 changes: 13 additions & 13 deletions packages/core/etc/components-core.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { ChatMessage } from 'livekit-client';
import { ConnectionQuality } from 'livekit-client';
import { ConnectionState } from 'livekit-client';
import { DataPacket_Kind } from 'livekit-client';
import type { DataPublishOptions } from 'livekit-client';
import { DataPublishOptions } from 'livekit-client';
import { LocalAudioTrack } from 'livekit-client';
import { LocalParticipant } from 'livekit-client';
import { LocalVideoTrack } from 'livekit-client';
Expand All @@ -27,6 +27,7 @@ import { Room } from 'livekit-client';
import { RoomEvent } from 'livekit-client';
import type { RoomEventCallbacks } from 'livekit-client/dist/src/room/Room';
import type { ScreenShareCaptureOptions } from 'livekit-client';
import { SendTextOptions } from 'livekit-client';
import { setLogLevel as setLogLevel_2 } from 'livekit-client';
import { Track } from 'livekit-client';
import { TrackEvent as TrackEvent_2 } from 'livekit-client';
Expand Down Expand Up @@ -156,8 +157,7 @@ export const cssPrefix = "lk";

// @public (undocumented)
export const DataTopic: {
readonly CHAT: "lk-chat-topic";
readonly CHAT_UPDATE: "lk-chat-update-topic";
readonly CHAT: "lk.chat";
};

// @public (undocumented)
Expand Down Expand Up @@ -262,13 +262,18 @@ export function isWeb(): boolean;
// @public (undocumented)
export interface LegacyChatMessage extends ChatMessage {
// (undocumented)
ignore?: boolean;
ignoreLegacy?: boolean;
}

// @public @deprecated (undocumented)
export const LegacyDataTopic: {
readonly CHAT: "lk-chat-topic";
};

// @public (undocumented)
export interface LegacyReceivedChatMessage extends ReceivedChatMessage {
// (undocumented)
ignore?: boolean;
ignoreLegacy?: boolean;
}

// @alpha
Expand Down Expand Up @@ -497,24 +502,19 @@ export type SetMediaDeviceOptions = {
export function setupChat(room: Room, options?: ChatOptions): {
messageObservable: Observable<ReceivedChatMessage[]>;
isSendingObservable: BehaviorSubject<boolean>;
send: (message: string) => Promise<ChatMessage>;
update: (message: string, originalMessageOrId: string | ChatMessage) => Promise<{
readonly message: string;
readonly editTimestamp: number;
readonly id: string;
readonly timestamp: number;
}>;
send: (message: string, options?: SendTextOptions) => Promise<ReceivedChatMessage>;
};

// @public (undocumented)
export function setupChatMessageHandler(room: Room): {
chatObservable: Observable<[message: ChatMessage, participant?: LocalParticipant | RemoteParticipant | undefined]>;
send: (text: string) => Promise<ChatMessage>;
send: (text: string, options: SendTextOptions) => Promise<ReceivedChatMessage>;
edit: (text: string, originalMsg: ChatMessage) => Promise<{
readonly message: string;
readonly editTimestamp: number;
readonly id: string;
readonly timestamp: number;
readonly attachedFiles?: Array<File>;
}>;
};

Expand Down
190 changes: 92 additions & 98 deletions packages/core/src/components/chat.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
/* eslint-disable camelcase */
import type { Participant, Room, ChatMessage } from 'livekit-client';
import type { Participant, Room, ChatMessage, SendTextOptions } from 'livekit-client';
import { compareVersions, RoomEvent } from 'livekit-client';
import { BehaviorSubject, Subject, scan, map, takeUntil, merge } from 'rxjs';
import { BehaviorSubject, Subject, scan, map, takeUntil, from, filter } from 'rxjs';
import {
DataTopic,
LegacyDataTopic,
sendMessage,
setupChatMessageHandler,
setupDataMessageHandler,
} from '../observables/dataChannel';
import { log } from '../logger';

/** @public */
export type { ChatMessage };
Expand All @@ -18,11 +19,11 @@ export interface ReceivedChatMessage extends ChatMessage {
}

export interface LegacyChatMessage extends ChatMessage {
ignore?: boolean;
ignoreLegacy?: boolean;
}

export interface LegacyReceivedChatMessage extends ReceivedChatMessage {
ignore?: boolean;
ignoreLegacy?: boolean;
}

/**
Expand All @@ -41,83 +42,87 @@ export type ChatOptions = {
messageEncoder?: (message: LegacyChatMessage) => Uint8Array;
/** @deprecated the new chat API doesn't rely on encoders and decoders anymore and uses a dedicated chat API instead */
messageDecoder?: (message: Uint8Array) => LegacyReceivedChatMessage;
/** @deprecated the new chat API doesn't rely on topics anymore and uses a dedicated chat API instead */
channelTopic?: string;
/** @deprecated the new chat API doesn't rely on topics anymore and uses a dedicated chat API instead */
/** @deprecated the new chat API doesn't rely on update topics anymore and uses a dedicated chat API instead */
updateChannelTopic?: string;
};

type RawMessage = {
payload: Uint8Array;
topic: string | undefined;
from: Participant | undefined;
};

const encoder = new TextEncoder();
const decoder = new TextDecoder();
const topicSubjectMap: WeakMap<Room, Map<string, Subject<ReceivedChatMessage>>> = new WeakMap();

const topicSubjectMap: Map<Room, Map<string, Subject<RawMessage>>> = new Map();
function isIgnorableChatMessage(msg: ReceivedChatMessage | LegacyReceivedChatMessage) {
return (msg as LegacyChatMessage).ignoreLegacy == true;
}

const encode = (message: LegacyReceivedChatMessage) => encoder.encode(JSON.stringify(message));
const decodeLegacyMsg = (message: Uint8Array) =>
JSON.parse(new TextDecoder().decode(message)) as LegacyReceivedChatMessage | ReceivedChatMessage;

const decode = (message: Uint8Array) =>
JSON.parse(decoder.decode(message)) as LegacyReceivedChatMessage | ReceivedChatMessage;
const encodeLegacyMsg = (message: LegacyReceivedChatMessage) =>
new TextEncoder().encode(JSON.stringify(message));

export function setupChat(room: Room, options?: ChatOptions) {
const onDestroyObservable = new Subject<void>();

const serverSupportsChatApi = () =>
const serverSupportsDataStreams = () =>
room.serverInfo?.edition === 1 ||
(!!room.serverInfo?.version && compareVersions(room.serverInfo?.version, '1.17.2') > 0);

const { messageDecoder, messageEncoder, channelTopic, updateChannelTopic } = options ?? {};
(!!room.serverInfo?.version && compareVersions(room.serverInfo?.version, '1.8.2') > 0);

const topic = channelTopic ?? DataTopic.CHAT;
const onDestroyObservable = new Subject<void>();

const updateTopic = updateChannelTopic ?? DataTopic.CHAT_UPDATE;
const topic = options?.channelTopic ?? DataTopic.CHAT;
const legacyTopic = options?.channelTopic ?? LegacyDataTopic.CHAT;

let needsSetup = false;
if (!topicSubjectMap.has(room)) {
needsSetup = true;
}
const topicMap = topicSubjectMap.get(room) ?? new Map<string, Subject<RawMessage>>();
const messageSubject = topicMap.get(topic) ?? new Subject<RawMessage>();
const topicMap = topicSubjectMap.get(room) ?? new Map<string, Subject<ReceivedChatMessage>>();
const messageSubject = topicMap.get(topic) ?? new Subject<ReceivedChatMessage>();
topicMap.set(topic, messageSubject);
topicSubjectMap.set(room, topicMap);

const finalMessageDecoder = options?.messageDecoder ?? decodeLegacyMsg;
if (needsSetup) {
/** Subscribe to all appropriate messages sent over the wire. */
const { messageObservable } = setupDataMessageHandler(room, [topic, updateTopic]);
messageObservable.pipe(takeUntil(onDestroyObservable)).subscribe(messageSubject);
room.registerTextStreamHandler(topic, async (reader, participantInfo) => {
const { id, timestamp } = reader.info;
const streamObservable = from(reader).pipe(
scan((acc: string, chunk: string) => {
return acc + chunk;
}),
map((chunk: string) => {
console.log('text stream updated', { id, chunk, attributes: reader.info.attributes });
return {
id,
timestamp,
message: chunk,
from: room.getParticipantByIdentity(participantInfo.identity),
// editTimestamp: type === 'update' ? timestamp : undefined,
} as ReceivedChatMessage;
}),
);
streamObservable.subscribe({
next: (value) => messageSubject.next(value),
});
});

/** legacy chat protocol handling */
const { messageObservable } = setupDataMessageHandler(room, [legacyTopic]);
messageObservable
.pipe(
map((msg) => {
const parsedMessage = finalMessageDecoder(msg.payload);
if (isIgnorableChatMessage(parsedMessage)) {
return undefined;
}
const newMessage: ReceivedChatMessage = { ...parsedMessage, from: msg.from };
return newMessage;
}),
filter((msg) => !!msg),
takeUntil(onDestroyObservable),
)
.subscribe(messageSubject);
}
const { chatObservable, send: sendChatMessage } = setupChatMessageHandler(room);

const finalMessageDecoder = messageDecoder ?? decode;

/** Build up the message array over time. */
const messagesObservable = merge(
messageSubject.pipe(
map((msg) => {
const parsedMessage = finalMessageDecoder(msg.payload);
const newMessage = { ...parsedMessage, from: msg.from };
if (isIgnorableChatMessage(newMessage)) {
return undefined;
}
return newMessage;
}),
),
chatObservable.pipe(
map(([msg, participant]) => {
return { ...msg, from: participant };
}),
),
).pipe(
scan<ReceivedChatMessage | undefined, ReceivedChatMessage[]>((acc, value) => {
// ignore legacy message updates
if (!value) {
return acc;
}
// handle message updates
const messagesObservable = messageSubject.pipe(
scan<ReceivedChatMessage, ReceivedChatMessage[]>((acc, value) => {
if (
'id' in value &&
acc.find((msg) => msg.from?.identity === value.from?.identity && msg.id === value.id)
Expand All @@ -128,10 +133,9 @@ export function setupChat(room: Room, options?: ChatOptions) {
acc[replaceIndex] = {
...value,
timestamp: originalMsg.timestamp,
editTimestamp: value.editTimestamp ?? value.timestamp,
editTimestamp: value.timestamp,
};
}

return [...acc];
}
return [...acc, value];
Expand All @@ -140,42 +144,37 @@ export function setupChat(room: Room, options?: ChatOptions) {
);

const isSending$ = new BehaviorSubject<boolean>(false);
const finalMessageEncoder = options?.messageEncoder ?? encodeLegacyMsg;

const finalMessageEncoder = messageEncoder ?? encode;

const send = async (message: string) => {
isSending$.next(true);
try {
const chatMessage = await sendChatMessage(message);
const encodedLegacyMsg = finalMessageEncoder({
...chatMessage,
ignore: serverSupportsChatApi(),
});
await sendMessage(room.localParticipant, encodedLegacyMsg, {
reliable: true,
topic,
});
return chatMessage;
} finally {
isSending$.next(false);
const send = async (message: string, options?: SendTextOptions) => {
if (!options) {
options = {};
}
};

const update = async (message: string, originalMessageOrId: string | ChatMessage) => {
const timestamp = Date.now();
const originalMessage: ChatMessage =
typeof originalMessageOrId === 'string'
? { id: originalMessageOrId, message: '', timestamp }
: originalMessageOrId;
options.topic ??= topic;
isSending$.next(true);

try {
const editedMessage = await room.localParticipant.editChatMessage(message, originalMessage);
const encodedLegacyMessage = finalMessageEncoder(editedMessage);
await sendMessage(room.localParticipant, encodedLegacyMessage, {
topic: updateTopic,
reliable: true,
const info = await room.localParticipant.sendText(message, options);
const chatMsg: ReceivedChatMessage = {
id: info.id,
timestamp: Date.now(),
message,
from: room.localParticipant,
};
messageSubject.next(chatMsg);
const encodedLegacyMsg = finalMessageEncoder({
...chatMsg,
ignoreLegacy: serverSupportsDataStreams(),
});
return editedMessage;
try {
await sendMessage(room.localParticipant, encodedLegacyMsg, {
reliable: true,
topic: legacyTopic,
});
} catch (error) {
log.info('could not send message in legacy chat format', error);
}
return chatMsg;
} finally {
isSending$.next(false);
}
Expand All @@ -184,20 +183,15 @@ export function setupChat(room: Room, options?: ChatOptions) {
function destroy() {
onDestroyObservable.next();
onDestroyObservable.complete();
messageSubject.complete();
topicSubjectMap.delete(room);
room.unregisterTextStreamHandler(topic);
}
room.once(RoomEvent.Disconnected, destroy);

return {
messageObservable: messagesObservable,
isSendingObservable: isSending$,
send,
update,
};
}

function isIgnorableChatMessage(
msg: ReceivedChatMessage | LegacyReceivedChatMessage,
): msg is ReceivedChatMessage {
return (msg as LegacyChatMessage).ignore == true;
}
Loading