diff --git a/Sources/LiveKit/Core/Room+DataStream.swift b/Sources/LiveKit/Core/Room+DataStream.swift new file mode 100644 index 000000000..35948716d --- /dev/null +++ b/Sources/LiveKit/Core/Room+DataStream.swift @@ -0,0 +1,48 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +extension Room { + + /// Registers a handler for new byte streams matching the given topic. + /// + /// - Parameters: + /// - topic: Topic identifier; only streams with this topic will be handled. + /// - handler: Handler closure passed the stream reader (``ByteStreamReader``) and the identity of the remote participant who opened the stream. + /// + public func registerByteStreamHandler(for topic: String, _ handler: @escaping ByteStreamHandler) async throws { + try await incomingStreamManager.registerByteStreamHandler(for: topic, handler) + } + + /// Registers a handler for new text streams matching the given topic. + /// + /// - Parameters: + /// - topic: Topic identifier; only streams with this topic will be handled. + /// - handler: Handler closure passed the stream reader (``TextStreamReader``) and the identity of the remote participant who opened the stream. + /// + public func registerTextStreamHandler(for topic: String, _ handler: @escaping TextStreamHandler) async throws { + try await incomingStreamManager.registerTextStreamHandler(for: topic, handler) + } + + /// Unregisters a byte stream handler that was previously registered for the given topic. + public func unregisterByteStreamHandler(for topic: String) async { + await incomingStreamManager.unregisterByteStreamHandler(for: topic) + } + + /// Unregisters a text stream handler that was previously registered for the given topic. + func unregisterTextStreamHandler(for topic: String) async { + await incomingStreamManager.unregisterTextStreamHandler(for: topic) + } +} diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index da800b788..534e172fa 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -110,6 +110,8 @@ public class Room: NSObject, ObservableObject, Loggable { lazy var subscriberDataChannel = DataChannelPair(delegate: self) lazy var publisherDataChannel = DataChannelPair(delegate: self) + + lazy var incomingStreamManager = IncomingStreamManager() var _blockProcessQueue = DispatchQueue(label: "LiveKitSDK.engine.pendingBlocks", qos: .default) @@ -537,6 +539,9 @@ extension Room: DataChannelDelegate { case let .rpcResponse(response): room(didReceiveRpcResponse: response) case let .rpcAck(ack): room(didReceiveRpcAck: ack) case let .rpcRequest(request): room(didReceiveRpcRequest: request, from: dataPacket.participantIdentity) + case let .streamHeader(header): Task { await incomingStreamManager.handle(header: header, from: dataPacket.participantIdentity) } + case let .streamChunk(chunk): Task { await incomingStreamManager.handle(chunk: chunk) } + case let .streamTrailer(trailer): Task { await incomingStreamManager.handle(trailer: trailer) } default: return } } diff --git a/Sources/LiveKit/DataStream/ByteStreamReader.swift b/Sources/LiveKit/DataStream/ByteStreamReader.swift new file mode 100644 index 000000000..e39b7dfdc --- /dev/null +++ b/Sources/LiveKit/DataStream/ByteStreamReader.swift @@ -0,0 +1,150 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// An asynchronous sequence of chunks read from a byte data stream. +@objc +public final class ByteStreamReader: NSObject, AsyncSequence, Sendable { + /// Information about the incoming byte stream. + @objc + public let info: ByteStreamInfo + + let source: StreamReader + + init(info: ByteStreamInfo, source: StreamReaderSource) { + self.info = info + self.source = StreamReader(source: source) + } + + public func makeAsyncIterator() -> StreamReader.Iterator { + source.makeAsyncIterator() + } + + /// Reads incoming chunks from the byte stream, concatenating them into a single data object which is returned + /// once the stream closes normally. + /// + /// - Returns: The data consisting of all concatenated chunks. + /// - Throws: ``StreamError`` if an error occurs while reading the stream. + /// + public func readAll() async throws -> Data { + try await source.readAll() + } +} + +extension ByteStreamReader { + /// Reads incoming chunks from the byte stream, writing them to a file as they are received. + /// + /// - Parameters: + /// - directory: The directory to write the file in. The system temporary directory is used if not specified. + /// - nameOverride: The name to use for the written file. If not specified, file name and extension will be automatically + /// inferred from the stream information. + /// - Returns: The URL of the written file on disk. + /// - Throws: ``StreamError`` if an error occurs while reading the stream. + /// + public func readToFile( + in directory: URL = FileManager.default.temporaryDirectory, + name nameOverride: String? = nil + ) async throws -> URL { + guard directory.hasDirectoryPath else { + throw StreamError.notDirectory + } + let fileName = resolveFileName(override: nameOverride) + let fileURL = directory.appendingPathComponent(fileName) + + FileManager.default.createFile(atPath: fileURL.path, contents: nil) + let handle = try FileHandle(forWritingTo: fileURL) + + try await Task { + for try await chunk in self { + guard #available(macOS 10.15.4, iOS 13.4, *) else { + handle.write(chunk) + return + } + try handle.write(contentsOf: chunk) + } + }.value + + try handle.close() + return fileURL + } + + private func resolveFileName(override: String?) -> String { + Self.resolveFileName( + setName: override ?? info.fileName, + fallbackName: info.id, + mimeType: info.mimeType, + fallbackExtension: "bin" + ) + } + + /// Resolves the filename used when writing the stream to disk. + /// + /// - Parameters: + /// - setName: The name set by the user or taken from stream metadata. + /// - fallbackName: Name to fallback on when `setName` is `nil`. + /// - mimeType: MIME type used for determining file extension. + /// - fallbackExtension: File extension to fallback on when MIME type cannot be resolved. + /// - Returns: The resolved file name. + /// + static func resolveFileName( + setName: String?, + fallbackName: String, + mimeType: String, + fallbackExtension: String + ) -> String { + var resolvedExtension: String { + preferredExtension(for: mimeType) ?? fallbackExtension + } + guard let setName else { + return "\(fallbackName).\(resolvedExtension)" + } + guard setName.pathExtension != nil else { + return "\(setName).\(resolvedExtension)" + } + return setName + } +} + +// MARK: - Objective-C compatibility + +public extension ByteStreamReader { + @objc + @available(*, unavailable, message: "Use async readAll() method instead.") + func readAll(onCompletion: @escaping (Data) -> Void, onError: ((Error?) -> Void)?) { + source.readAll(onCompletion: onCompletion, onError: onError) + } + + @objc + @available(*, unavailable, message: "Use for/await on ByteStreamReader reader instead.") + func readChunks(onChunk: @escaping (Data) -> Void, onCompletion: ((Error?) -> Void)?) { + source.readChunks(onChunk: onChunk, onCompletion: onCompletion) + } + + @objc + @available(*, unavailable, message: "Use async readToFile(in:name:) method instead.") + internal func readToFile( + in directory: URL, + name nameOverride: String?, + onCompletion: @escaping (URL) -> Void, + onError: ((Error) -> Void)? + ) { + Task { + do { try onCompletion(await self.readToFile(in: directory, name: nameOverride)) } + catch { onError?(error) } + } + } +} diff --git a/Sources/LiveKit/DataStream/IncomingStreamManager.swift b/Sources/LiveKit/DataStream/IncomingStreamManager.swift new file mode 100644 index 000000000..88532c6d0 --- /dev/null +++ b/Sources/LiveKit/DataStream/IncomingStreamManager.swift @@ -0,0 +1,199 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// Manages state of incoming data streams. +actor IncomingStreamManager: Loggable { + + /// Information about an open data stream. + private struct Descriptor { + let info: StreamInfo + var readLength: Int = 0 + let openTime: TimeInterval + let continuation: StreamReaderSource.Continuation + } + + /// Mapping between stream ID and descriptor for open streams. + private var openStreams: [String: Descriptor] = [:] + + private var byteStreamHandlers: [String: ByteStreamHandler] = [:] + private var textStreamHandlers: [String: TextStreamHandler] = [:] + + // MARK: - Handler registration + + func registerByteStreamHandler(for topic: String, _ handler: @escaping ByteStreamHandler) throws { + guard byteStreamHandlers[topic] == nil else { + throw StreamError.handlerAlreadyRegistered + } + byteStreamHandlers[topic] = handler + } + + func registerTextStreamHandler(for topic: String, _ handler: @escaping TextStreamHandler) throws { + guard textStreamHandlers[topic] == nil else { + throw StreamError.handlerAlreadyRegistered + } + textStreamHandlers[topic] = handler + } + + func unregisterByteStreamHandler(for topic: String) { + byteStreamHandlers[topic] = nil + } + + func unregisterTextStreamHandler(for topic: String) { + textStreamHandlers[topic] = nil + } + + // MARK: - State + + private func openStream( + with info: StreamInfo, + continuation: StreamReaderSource.Continuation + ) { + guard openStreams[info.id] == nil else { + continuation.finish(throwing: StreamError.alreadyOpened) + return + } + continuation.onTermination = { @Sendable [weak self] termination in + guard let self else { return } + self.log("Continuation terminated: \(termination)", .debug) + Task { await self.closeStream(with: info.id) } + } + let descriptor = Descriptor( + info: info, + openTime: Date.timeIntervalSinceReferenceDate, + continuation: continuation + ) + log("Opened stream '\(info.id)'", .debug) + openStreams[info.id] = descriptor + } + + private func closeStream(with id: String) { + guard let descriptor = openStreams[id] else { + log("No descriptor for stream '\(id)'", .debug) + return + } + let openDuration = Date.timeIntervalSinceReferenceDate - descriptor.openTime + log("Closed stream '\(id)' (open for \(openDuration))", .debug) + openStreams[id] = nil + } + + // MARK: - Packet processing + + /// Handles a data stream header. + func handle(header: Livekit_DataStream.Header, from identityString: String) { + let identity = Participant.Identity(from: identityString) + + switch header.contentHeader { + case .byteHeader(let byteHeader): + guard let handler = byteStreamHandlers[header.topic] else { + log("No byte handler registered for topic '\(header.topic)'", .info) + return + } + let info = ByteStreamInfo(header, byteHeader) + let reader = ByteStreamReader(info: info, source: createSource(with: info)) + Task { + do { try await handler(reader, identity) } + catch { log("Unhandled error in byte stream handler: \(error)", .error) } + } + + case .textHeader(let textHeader): + guard let handler = textStreamHandlers[header.topic] else { + log("No text handler registered for topic '\(header.topic)'", .info) + return + } + let info = TextStreamInfo(header, textHeader) + let reader = TextStreamReader(info: info, source: createSource(with: info)) + Task { + do { try await handler(reader, identity) } + catch { log("Unhandled error in text stream handler: \(error)", .error) } + } + default: + log("Unknown header type; ignoring stream", .warning) + + } + } + + /// Creates an asynchronous stream whose continuation will be used to send new chunks to the reader. + private func createSource(with info: StreamInfo) -> StreamReaderSource { + StreamReaderSource { [weak self] continuation in + guard let self else { + continuation.finish(throwing: StreamError.terminated) + return + } + Task { await self.openStream(with: info, continuation: continuation) } + } + } + + /// Handles a data stream chunk. + func handle(chunk: Livekit_DataStream.Chunk) { + guard !chunk.content.isEmpty, let descriptor = openStreams[chunk.streamID] else { return } + + let readLength = descriptor.readLength + chunk.content.count + + if let totalLength = descriptor.info.totalLength { + guard readLength <= totalLength else { + descriptor.continuation.finish(throwing: StreamError.lengthExceeded) + return + } + } + openStreams[chunk.streamID]!.readLength = readLength + descriptor.continuation.yield(chunk.content) + } + + /// Handles a data stream trailer. + func handle(trailer: Livekit_DataStream.Trailer) { + guard let descriptor = openStreams[trailer.streamID] else { + log("Received trailer for unknown stream '\(trailer.streamID)'", .warning) + return + } + + if let totalLength = descriptor.info.totalLength { + guard descriptor.readLength == totalLength else { + descriptor.continuation.finish(throwing: StreamError.incomplete) + return + } + } + + // TODO: do something with trailer attributes + + guard trailer.reason.isEmpty else { + // According to protocol documentation, a non-empty reason string indicates an error + let error = StreamError.abnormalEnd(reason: trailer.reason) + descriptor.continuation.finish(throwing: error) + return + } + descriptor.continuation.finish() + } + + // MARK: - Clean up + + deinit { + guard !openStreams.isEmpty else { return } + log("Terminating \(openStreams.count) open stream(s)", .debug) + for descriptor in openStreams.values { + descriptor.continuation.finish(throwing: StreamError.terminated) + } + } +} + +// MARK: - Type aliases + +/// Handler for incoming byte data streams. +public typealias ByteStreamHandler = (ByteStreamReader, Participant.Identity) async throws -> Void + +/// Handler for incoming text data streams. +public typealias TextStreamHandler = (TextStreamReader, Participant.Identity) async throws -> Void diff --git a/Sources/LiveKit/DataStream/PreferredExtension.swift b/Sources/LiveKit/DataStream/PreferredExtension.swift new file mode 100644 index 000000000..c01372e45 --- /dev/null +++ b/Sources/LiveKit/DataStream/PreferredExtension.swift @@ -0,0 +1,45 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if canImport(UniformTypeIdentifiers) +import UniformTypeIdentifiers +#elseif canImport(CoreServices) +import CoreServices +#else +import MobileCoreServices +#endif + +/// Returns the preferred file extension for the given MIME type. +func preferredExtension(for mimeType: String) -> String? { + guard #available(macOS 11.0, iOS 14.0, *) else { + guard let uti = UTTypeCreatePreferredIdentifierForTag( + kUTTagClassMIMEType, + mimeType as CFString, + nil + )?.takeRetainedValue() else { + return nil + } + guard let fileExtension = UTTypeCopyPreferredTagWithClass( + uti, + kUTTagClassFilenameExtension + )?.takeRetainedValue() else { + return nil + } + return fileExtension as String + } + guard let utType = UTType(mimeType: mimeType) else { return nil } + return utType.preferredFilenameExtension +} diff --git a/Sources/LiveKit/DataStream/StreamError.swift b/Sources/LiveKit/DataStream/StreamError.swift new file mode 100644 index 000000000..b51a80149 --- /dev/null +++ b/Sources/LiveKit/DataStream/StreamError.swift @@ -0,0 +1,41 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public enum StreamError: Error, Equatable { + /// Unable to open a stream with the same ID more than once. + case alreadyOpened + + /// Stream closed abnormally by remote participant. + case abnormalEnd(reason: String) + + /// Incoming chunk data could not be decoded. + case decodeFailed + + /// Read length exceeded total length specified in stream header. + case lengthExceeded + + /// Read length less than total length specified in stream header. + case incomplete + + /// Stream terminated before completion. + case terminated + + /// Unable to register a stream handler more than once. + case handlerAlreadyRegistered + + /// Given destination URL is not a directory. + case notDirectory +} diff --git a/Sources/LiveKit/DataStream/StreamInfo.swift b/Sources/LiveKit/DataStream/StreamInfo.swift new file mode 100644 index 000000000..23683ec10 --- /dev/null +++ b/Sources/LiveKit/DataStream/StreamInfo.swift @@ -0,0 +1,210 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// Information about a data stream. +protocol StreamInfo { + var id: String { get } + var mimeType: String { get } + var topic: String { get } + var timestamp: Date { get } + var totalLength: Int? { get } + var attributes: [String: String] { get } +} + +/// Information about a text data stream. +@objcMembers +public final class TextStreamInfo: NSObject, StreamInfo, Sendable { + public let id: String + public let mimeType: String + public let topic: String + public let timestamp: Date + public let totalLength: Int? + public let attributes: [String : String] + + @objc(TextStreamInfoOperationType) + public enum OperationType: Int, Sendable { + case create + case update + case delete + case reaction + } + + public let operationType: OperationType + public let version: Int + public let replyToStreamID: String? + public let attachedStreamIDs: [String] + public let generated: Bool + + init( + id: String, + mimeType: String, + topic: String, + timestamp: Date, + totalLength: Int?, + attributes: [String: String], + operationType: OperationType, + version: Int, + replyToStreamID: String?, + attachedStreamIDs: [String], + generated: Bool + ) { + self.id = id + self.mimeType = mimeType + self.topic = topic + self.timestamp = timestamp + self.totalLength = totalLength + self.attributes = attributes + self.operationType = operationType + self.version = version + self.replyToStreamID = replyToStreamID + self.attachedStreamIDs = attachedStreamIDs + self.generated = generated + } +} + +/// Information about a byte data stream. +@objcMembers +public final class ByteStreamInfo: NSObject, StreamInfo, Sendable { + public let id: String + public let mimeType: String + public let topic: String + public let timestamp: Date + public let totalLength: Int? + public let attributes: [String: String] + + public let fileName: String? + + init( + id: String, + mimeType: String, + topic: String, + timestamp: Date, + totalLength: Int?, + attributes: [String: String], + fileName: String? + ) { + self.id = id + self.mimeType = mimeType + self.topic = topic + self.timestamp = timestamp + self.totalLength = totalLength + self.attributes = attributes + self.fileName = fileName + } +} + +// MARK: - From protocol types + +extension ByteStreamInfo { + convenience init( + _ header: Livekit_DataStream.Header, + _ byteHeader: Livekit_DataStream.ByteHeader + ) { + self.init( + id: header.streamID, + mimeType: header.mimeType, + topic: header.topic, + timestamp: header.timestampDate, + totalLength: header.hasTotalLength ? Int(header.totalLength) : nil, + attributes: header.attributes, + // --- + fileName: byteHeader.name + ) + } +} + +extension TextStreamInfo { + convenience init( + _ header: Livekit_DataStream.Header, + _ textHeader: Livekit_DataStream.TextHeader + ) { + self.init( + id: header.streamID, + mimeType: header.mimeType, + topic: header.topic, + timestamp: header.timestampDate, + totalLength: header.hasTotalLength ? Int(header.totalLength) : nil, + attributes: header.attributes, + // --- + operationType: TextStreamInfo.OperationType(textHeader.operationType), + version: Int(textHeader.version), + replyToStreamID: !textHeader.replyToStreamID.isEmpty ? textHeader.replyToStreamID : nil, + attachedStreamIDs: textHeader.attachedStreamIds, + generated: textHeader.generated + ) + } +} + +// MARK: - To protocol types + +extension Livekit_DataStream.Header { + + init(_ textStreamInfo: TextStreamInfo) { + let textHeader = Livekit_DataStream.TextHeader.with { + $0.operationType = Livekit_DataStream.OperationType(textStreamInfo.operationType) + $0.version = Int32(textStreamInfo.version) + $0.replyToStreamID = textStreamInfo.replyToStreamID ?? "" + $0.attachedStreamIds = textStreamInfo.attachedStreamIDs + $0.generated = textStreamInfo.generated + } + var baseHeader = Self(textStreamInfo as StreamInfo) + baseHeader.contentHeader = .textHeader(textHeader) + self = baseHeader + } + + init(_ byteStreamInfo: ByteStreamInfo) { + let byteHeader = Livekit_DataStream.ByteHeader.with { + if let fileName = byteStreamInfo.fileName { + $0.name = fileName + } + } + var baseHeader = Self(byteStreamInfo as StreamInfo) + baseHeader.contentHeader = .byteHeader(byteHeader) + self = baseHeader + } + + var timestampDate: Date { + get { Date(timeIntervalSince1970: TimeInterval(timestamp)) } + set { timestamp = Int64(newValue.timeIntervalSince1970) } + } + + private init(_ streamInfo: StreamInfo) { + self = Livekit_DataStream.Header.with { + $0.streamID = streamInfo.id + $0.mimeType = streamInfo.mimeType + $0.topic = streamInfo.topic + $0.timestampDate = streamInfo.timestamp + if let totalLength = streamInfo.totalLength { + $0.totalLength = UInt64(totalLength) + } + $0.attributes = streamInfo.attributes + } + } +} + +extension TextStreamInfo.OperationType { + init(_ operationType: Livekit_DataStream.OperationType) { + self = Self(rawValue: operationType.rawValue) ?? .create + } +} + +extension Livekit_DataStream.OperationType { + init(_ operationType: TextStreamInfo.OperationType) { + self = Livekit_DataStream.OperationType(rawValue: operationType.rawValue) ?? .create + } +} diff --git a/Sources/LiveKit/DataStream/StreamProgress.swift b/Sources/LiveKit/DataStream/StreamProgress.swift new file mode 100644 index 000000000..e69de29bb diff --git a/Sources/LiveKit/DataStream/StreamReader.swift b/Sources/LiveKit/DataStream/StreamReader.swift new file mode 100644 index 000000000..c66d6b262 --- /dev/null +++ b/Sources/LiveKit/DataStream/StreamReader.swift @@ -0,0 +1,80 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// An asynchronous sequence of chunks read from a data stream. +public struct StreamReader: AsyncSequence, Sendable where Element: StreamChunk { + let source: StreamReaderSource + + public struct Iterator: AsyncIteratorProtocol { + fileprivate var upstream: StreamReaderSource.Iterator + + public mutating func next() async throws -> Element? { + guard let chunkData = try await upstream.next() else { return nil } + guard let chunk = Element(chunkData) else { throw StreamError.decodeFailed } + return chunk + } + } + + public func makeAsyncIterator() -> Iterator { + Iterator(upstream: source.makeAsyncIterator()) + } +} + +/// Upstream asynchronous sequence from which raw chunk data is read. +typealias StreamReaderSource = AsyncThrowingStream + +public protocol StreamChunk { + init?(_ chunkData: Data) +} + +extension Data: StreamChunk {} + +extension String: StreamChunk { + public init?(_ chunkData: Data) { + guard let string = String(data: chunkData, encoding: .utf8) else { + return nil + } + self = string + } +} + +extension StreamReader where Element: RangeReplaceableCollection { + func readAll() async throws -> Element { + try await reduce(Element()) { $0 + $1 } + } + + func readAll(onCompletion: @escaping (Element) -> Void, onError: ((Error?) -> Void)?) { + Task { + do { try onCompletion(await self.readAll()) } + catch { onError?(error) } + } + } +} + +extension StreamReader { + func readChunks(onChunk: @escaping (Element) -> Void, onCompletion: ((Error?) -> Void)?) { + Task { + do { + for try await chunk in self { onChunk(chunk) } + onCompletion?(nil) + } catch { + onCompletion?(error) + } + } + } +} diff --git a/Sources/LiveKit/DataStream/StreamWriter.swift b/Sources/LiveKit/DataStream/StreamWriter.swift new file mode 100644 index 000000000..e69de29bb diff --git a/Sources/LiveKit/DataStream/TextStreamReader.swift b/Sources/LiveKit/DataStream/TextStreamReader.swift new file mode 100644 index 000000000..14f12a933 --- /dev/null +++ b/Sources/LiveKit/DataStream/TextStreamReader.swift @@ -0,0 +1,64 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + + +/// An asynchronous sequence of chunks read from a text data stream. +@objc +public final class TextStreamReader: NSObject, AsyncSequence, Sendable { + + /// Information about the incoming text stream. + @objc + public let info: TextStreamInfo + + let source: StreamReader + + init(info: TextStreamInfo, source: StreamReaderSource) { + self.info = info + self.source = StreamReader(source: source) + } + + public func makeAsyncIterator() -> StreamReader.Iterator { + source.makeAsyncIterator() + } + + /// Reads incoming chunks from the text stream, concatenating them into a single string which is returned + /// once the stream closes normally. + /// + /// - Returns: The string consisting of all concatenated chunks. + /// - Throws: ``StreamError`` if an error occurs while reading the stream. + /// + public func readAll() async throws -> String { + try await source.readAll() + } +} + +// MARK: - Objective-C compatibility + +extension TextStreamReader { + @objc + @available(*, unavailable, message: "Use async readAll() method instead.") + public func readAll(onCompletion: (@escaping (String) -> Void), onError: ((Error?) -> ())?) { + source.readAll(onCompletion: onCompletion, onError: onError) + } + + @objc + @available(*, unavailable, message: "Use for/await on TextStreamReader reader instead.") + public func readChunks(onChunk: (@escaping (String) -> Void), onCompletion: ((Error?) -> Void)?) { + source.readChunks(onChunk: onChunk, onCompletion: onCompletion) + } +} diff --git a/Sources/LiveKit/Extensions/String.swift b/Sources/LiveKit/Extensions/String.swift index 271ae03d8..d0ee6ea78 100644 --- a/Sources/LiveKit/Extensions/String.swift +++ b/Sources/LiveKit/Extensions/String.swift @@ -46,4 +46,10 @@ extension String { return String(prefix(low)) } + + /// The path extension, if any, of the string as interpreted as a path. + var pathExtension: String? { + let pathExtension = (self as NSString).pathExtension + return pathExtension.isEmpty ? nil : pathExtension + } } diff --git a/Sources/LiveKit/Participant/LocalParticipant+DataStream.swift b/Sources/LiveKit/Participant/LocalParticipant+DataStream.swift new file mode 100644 index 000000000..d9a3268ac --- /dev/null +++ b/Sources/LiveKit/Participant/LocalParticipant+DataStream.swift @@ -0,0 +1,66 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +public extension LocalParticipant { + // MARK: - Send + + @objc + func sendText(_ text: String, topic: String) async throws -> TextStreamInfo { + try await sendText(text, options: SendTextOptions(topic: topic)) + } + + @objc + func sendText(_ text: String, options: SendTextOptions) async throws -> TextStreamInfo { + fatalError("Not implemented") + } + + @objc + func sendFile(_ fileURL: URL, topic: String) async throws -> ByteStreamInfo { + try await sendFile(fileURL, options: SendFileOptions(topic: topic)) + } + + @objc + func sendFile(_ fileURL: URL, options: SendFileOptions) async throws -> ByteStreamInfo { + fatalError("Not implemented") + } + + // MARK: - Stream + + typealias TextStreamWriter = () + typealias ByteStreamWriter = () + + @objc + func streamText(topic: String) async throws -> TextStreamWriter { + try await streamText(options: StreamTextOptions(topic: topic)) + } + + @objc + func streamText(options: StreamTextOptions) async throws -> TextStreamWriter { + fatalError("Not implemented") + } + + @objc + func streamBytes(topic: String) async throws -> ByteStreamWriter { + try await streamBytes(options: StreamByteOptions(topic: topic)) + } + + @objc + func streamBytes(options: StreamByteOptions) async throws -> ByteStreamWriter { + fatalError("Not implemented") + } +} diff --git a/Tests/LiveKitTests/DataStream/ByteStreamInfoTests.swift b/Tests/LiveKitTests/DataStream/ByteStreamInfoTests.swift new file mode 100644 index 000000000..d7de880e6 --- /dev/null +++ b/Tests/LiveKitTests/DataStream/ByteStreamInfoTests.swift @@ -0,0 +1,50 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@testable import LiveKit +import XCTest + +class ByteStreamInfoTests: XCTestCase { + + func testProtocolTypeConversion() { + let info = ByteStreamInfo( + id: "id", + mimeType: "image/jpeg", + topic: "topic", + timestamp: Date(timeIntervalSince1970: 100), + totalLength: 128, + attributes: ["key": "value"], + fileName: "filename.bin" + ) + let header = Livekit_DataStream.Header(info) + XCTAssertEqual(header.streamID, info.id) + XCTAssertEqual(header.mimeType, info.mimeType) + XCTAssertEqual(header.topic, info.topic) + XCTAssertEqual(header.timestamp, Int64(info.timestamp.timeIntervalSince1970)) + XCTAssertEqual(header.totalLength, UInt64(info.totalLength ?? -1)) + XCTAssertEqual(header.attributes, info.attributes) + XCTAssertEqual(header.byteHeader.name, info.fileName) + + let newInfo = ByteStreamInfo(header, header.byteHeader) + XCTAssertEqual(newInfo.id, info.id) + XCTAssertEqual(newInfo.mimeType, info.mimeType) + XCTAssertEqual(newInfo.topic, info.topic) + XCTAssertEqual(newInfo.timestamp, info.timestamp) + XCTAssertEqual(newInfo.totalLength, info.totalLength) + XCTAssertEqual(newInfo.attributes, info.attributes) + XCTAssertEqual(newInfo.fileName, info.fileName) + } +} diff --git a/Tests/LiveKitTests/DataStream/ByteStreamReaderTests.swift b/Tests/LiveKitTests/DataStream/ByteStreamReaderTests.swift new file mode 100644 index 000000000..cdd360239 --- /dev/null +++ b/Tests/LiveKitTests/DataStream/ByteStreamReaderTests.swift @@ -0,0 +1,115 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@testable import LiveKit +import XCTest + +class ByteStreamReaderTests: XCTestCase { + + func testInitialization() { + let info = ByteStreamInfo( + id: UUID().uuidString, + mimeType: "application/octet-stream", + topic: "someTopic", + timestamp: Date(), + totalLength: nil, + attributes: [:], + fileName: "filename.bin" + ) + let source = StreamReaderSource { _ in } + let reader = ByteStreamReader(info: info, source: source) + + XCTAssertEqual(reader.info, info) + } + + /* + func testReadToFile() async throws { + let writtenExpectation = expectation(description: "File properly written") + Task { + do { + let fileURL = try await reader.readToFile() + XCTAssertEqual(fileURL.lastPathComponent, reader.info.fileName) + + let fileContents = try Data(contentsOf: fileURL) + XCTAssertEqual(fileContents, testPayload) + + writtenExpectation.fulfill() + } catch { + print(error) + } + } + sendPayload() + + await fulfillment( + of: [writtenExpectation], + timeout: 5 + ) + } + */ + + func testResolveFileName() { + XCTAssertEqual( + ByteStreamReader.resolveFileName( + setName: nil, + fallbackName: "[fallback]", + mimeType: "text/plain", + fallbackExtension: "bin" + ), + "[fallback].txt", + "Fallback name should be used when no set name is provided" + ) + XCTAssertEqual( + ByteStreamReader.resolveFileName( + setName: "name", + fallbackName: "[fallback]", + mimeType: "text/plain", + fallbackExtension: "bin" + ), + "name.txt", + "Set name should take precedence over fallback name" + ) + XCTAssertEqual( + ByteStreamReader.resolveFileName( + setName: "name.jpeg", + fallbackName: "[fallback]", + mimeType: "text/plain", + fallbackExtension: "bin" + ), + "name.jpeg", + "File extension in set name should take precedence" + ) + XCTAssertEqual( + ByteStreamReader.resolveFileName( + setName: "name", + fallbackName: "[fallback]", + mimeType: "image/jpeg", + fallbackExtension: "bin" + ), + "name.jpeg", + "File extension should be resolved from MIME type" + ) + XCTAssertEqual( + ByteStreamReader.resolveFileName( + setName: "name", + fallbackName: "[fallback]", + mimeType: "text/invalid", + fallbackExtension: "bin" + ), + "name.bin", + "Fallback extension should be used when MIME type is not recognized" + ) + } +} diff --git a/Tests/LiveKitTests/DataStream/IncomingStreamManagerTests.swift b/Tests/LiveKitTests/DataStream/IncomingStreamManagerTests.swift new file mode 100644 index 000000000..d12684420 --- /dev/null +++ b/Tests/LiveKitTests/DataStream/IncomingStreamManagerTests.swift @@ -0,0 +1,282 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@testable import LiveKit +import XCTest + +class IncomingStreamManagerTests: XCTestCase { + private var manager: IncomingStreamManager! + + private let topicName = "someTopic" + private let participant = Participant.Identity(from: "someName") + + override func setUp() async throws { + manager = IncomingStreamManager() + } + + func testByteStreamHandlerRegistration() async throws { + try await manager.registerByteStreamHandler(for: topicName) { _, _ in } + + let throwsExpectation = expectation(description: "Throws on duplicate registration") + do { + try await manager.registerByteStreamHandler(for: topicName) { _, _ in } + } catch { + XCTAssertEqual(error as? StreamError, .handlerAlreadyRegistered) + throwsExpectation.fulfill() + } + + await manager.unregisterByteStreamHandler(for: topicName) + + await fulfillment(of: [throwsExpectation], timeout: 5) + } + + func testTextStreamHandlerRegistration() async throws { + try await manager.registerTextStreamHandler(for: topicName) { _, _ in } + + let throwsExpectation = expectation(description: "Throws on duplicate registration") + do { + try await manager.registerTextStreamHandler(for: topicName) { _, _ in } + } catch { + XCTAssertEqual(error as? StreamError, .handlerAlreadyRegistered) + throwsExpectation.fulfill() + } + + await manager.unregisterTextStreamHandler(for: topicName) + + await fulfillment(of: [throwsExpectation], timeout: 5) + } + + func testByteStream() async throws { + let receiveExpectation = expectation(description: "Receives payload") + + let testChunks = [ + Data(repeating: 0xAB, count: 128), + Data(repeating: 0xCD, count: 128), + Data(repeating: 0xEF, count: 256), + Data(repeating: 0x12, count: 32) + ] + let testPayload = testChunks.reduce(Data()) { $0 + $1 } + + try await manager.registerByteStreamHandler(for: topicName) { reader, participant in + XCTAssertEqual(participant, self.participant) + + let payload = try await reader.readAll() + XCTAssertEqual(payload, testPayload) + + receiveExpectation.fulfill() + } + + let streamID = UUID().uuidString + + // 1. Send header packet + var header = Livekit_DataStream.Header() + header.streamID = streamID + header.topic = topicName + header.contentHeader = .byteHeader(Livekit_DataStream.ByteHeader()) + await manager.handle(header: header, from: participant.stringValue) + + // 2. Send chunk packets + for (index, chunkData) in testChunks.enumerated() { + var chunk = Livekit_DataStream.Chunk() + chunk.streamID = streamID + chunk.chunkIndex = UInt64(index) + chunk.content = chunkData + await manager.handle(chunk: chunk) + } + + // 3. Send trailer packet + var trailer = Livekit_DataStream.Trailer() + trailer.streamID = streamID + trailer.reason = "" // indicates normal closure + await manager.handle(trailer: trailer) + + await fulfillment( + of: [receiveExpectation], + timeout: 5 + ) + } + + func testTextStream() async throws { + let receiveExpectation = expectation(description: "Receives payload") + + let testChunks = [ + String(repeating: "A", count: 128), + String(repeating: "B", count: 128), + String(repeating: "C", count: 256), + String(repeating: "D", count: 32) + ] + let testPayload = testChunks.reduce("") { $0 + $1 } + + try await manager.registerTextStreamHandler(for: topicName) { reader, participant in + XCTAssertEqual(participant, self.participant) + + let payload = try await reader.readAll() + XCTAssertEqual(payload, testPayload) + + receiveExpectation.fulfill() + } + + let streamID = UUID().uuidString + + // 1. Send header packet + var header = Livekit_DataStream.Header() + header.streamID = streamID + header.topic = topicName + header.contentHeader = .textHeader(Livekit_DataStream.TextHeader()) + await manager.handle(header: header, from: participant.stringValue) + + // 2. Send chunk packets + for (index, chunkData) in testChunks.enumerated() { + var chunk = Livekit_DataStream.Chunk() + chunk.streamID = streamID + chunk.chunkIndex = UInt64(index) + chunk.content = Data(chunkData.utf8) + await manager.handle(chunk: chunk) + } + + // 3. Send trailer packet + var trailer = Livekit_DataStream.Trailer() + trailer.streamID = streamID + trailer.reason = "" // indicates normal closure + await manager.handle(trailer: trailer) + + await fulfillment( + of: [receiveExpectation], + timeout: 5 + ) + } + + func testNonTextData() async throws { + let throwsExpectation = expectation(description: "Throws error on non-text data") + + // This cannot be decoded as valid UTF-8 + let testPayload = Data(repeating: 0xAB, count: 128) + + try await manager.registerTextStreamHandler(for: topicName) { reader, _ in + do { + _ = try await reader.readAll() + } catch { + XCTAssertEqual(error as? StreamError, .decodeFailed) + throwsExpectation.fulfill() + } + } + + let streamID = UUID().uuidString + + // 1. Send header packet + var header = Livekit_DataStream.Header() + header.streamID = streamID + header.topic = topicName + header.contentHeader = .textHeader(Livekit_DataStream.TextHeader()) + header.totalLength = UInt64(testPayload.count) + await manager.handle(header: header, from: participant.stringValue) + + // 2. Send chunk packet + var chunk = Livekit_DataStream.Chunk() + chunk.streamID = streamID + chunk.chunkIndex = 0 + chunk.content = Data(testPayload) + await manager.handle(chunk: chunk) + + // 3. Send trailer packet + var trailer = Livekit_DataStream.Trailer() + trailer.streamID = streamID + trailer.reason = "" // indicates normal closure + await manager.handle(trailer: trailer) + + await fulfillment( + of: [throwsExpectation], + timeout: 5 + ) + } + + func testAbnormalClosure() async throws { + let throwsExpectation = expectation(description: "Throws error on abnormal closure") + let closureReason = "test" + + try await manager.registerByteStreamHandler(for: topicName) { reader, _ in + do { + _ = try await reader.readAll() + } catch { + XCTAssertEqual(error as? StreamError, .abnormalEnd(reason: closureReason)) + throwsExpectation.fulfill() + } + } + + let streamID = UUID().uuidString + + // 1. Send header packet + var header = Livekit_DataStream.Header() + header.streamID = streamID + header.topic = topicName + header.contentHeader = .byteHeader(Livekit_DataStream.ByteHeader()) + await manager.handle(header: header, from: participant.stringValue) + + // 2. Send trailer packet + var trailer = Livekit_DataStream.Trailer() + trailer.streamID = streamID + trailer.reason = closureReason // indicates abnormal closure + await manager.handle(trailer: trailer) + + await fulfillment( + of: [throwsExpectation], + timeout: 5 + ) + } + + func testIncomplete() async throws { + let throwsExpectation = expectation(description: "Throws error on incomplete stream") + + let testPayload = Data(repeating: 0xAB, count: 128) + + try await manager.registerByteStreamHandler(for: topicName) { reader, _ in + do { + _ = try await reader.readAll() + } catch { + XCTAssertEqual(error as? StreamError, .incomplete) + throwsExpectation.fulfill() + } + } + + let streamID = UUID().uuidString + + // 1. Send header packet + var header = Livekit_DataStream.Header() + header.streamID = streamID + header.topic = topicName + header.contentHeader = .byteHeader(Livekit_DataStream.ByteHeader()) + header.totalLength = UInt64(testPayload.count + 10) // expect more bytes + await manager.handle(header: header, from: participant.stringValue) + + // 2. Send chunk packet + var chunk = Livekit_DataStream.Chunk() + chunk.streamID = streamID + chunk.chunkIndex = 0 + chunk.content = Data(testPayload) + await manager.handle(chunk: chunk) + + // 3. Send trailer packet + var trailer = Livekit_DataStream.Trailer() + trailer.streamID = streamID + trailer.reason = "" // indicates normal closure + await manager.handle(trailer: trailer) + + await fulfillment( + of: [throwsExpectation], + timeout: 5 + ) + } +} diff --git a/Tests/LiveKitTests/DataStream/PreferredExtensionTests.swift b/Tests/LiveKitTests/DataStream/PreferredExtensionTests.swift new file mode 100644 index 000000000..ddc532d37 --- /dev/null +++ b/Tests/LiveKitTests/DataStream/PreferredExtensionTests.swift @@ -0,0 +1,33 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@testable import LiveKit +import XCTest + +class PreferredExtensionTests: XCTestCase { + + func testCommon() { + XCTAssertEqual(preferredExtension(for: "text/plain"), "txt") + XCTAssertEqual(preferredExtension(for: "application/json"), "json") + XCTAssertEqual(preferredExtension(for: "image/jpeg"), "jpeg") + XCTAssertEqual(preferredExtension(for: "application/pdf"), "pdf") + } + + func testInvalid() { + XCTAssertNil(preferredExtension(for: "text/invalid")) + XCTAssertNil(preferredExtension(for: "")) + } +} diff --git a/Tests/LiveKitTests/DataStream/StreamReaderTests.swift b/Tests/LiveKitTests/DataStream/StreamReaderTests.swift new file mode 100644 index 000000000..ad816b5e9 --- /dev/null +++ b/Tests/LiveKitTests/DataStream/StreamReaderTests.swift @@ -0,0 +1,138 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@testable import LiveKit +import XCTest + +class StreamReaderTests: XCTestCase { + + private var continuation: StreamReaderSource.Continuation! + private var reader: StreamReader! + + private let testChunks = [ + Data(repeating: 0xAB, count: 128), + Data(repeating: 0xCD, count: 128), + Data(repeating: 0xEF, count: 256), + Data(repeating: 0x12, count: 32) + ] + + /// All chunks combined. + private var testPayload: Data { + testChunks.reduce(Data()) { $0 + $1 } + } + + private func sendPayload(closingError: Error? = nil) { + for chunk in testChunks { + continuation.yield(chunk) + } + continuation.finish(throwing: closingError) + } + + override func setUp() { + super.setUp() + let source = StreamReaderSource { + self.continuation = $0 + } + reader = StreamReader(source: source) + } + + func testChunkRead() async throws { + let receiveExpectation = expectation(description: "Receive all chunks") + let closureExpectation = expectation(description: "Normal closure") + + Task { + var chunkIndex = 0 + for try await chunk in reader { + XCTAssertEqual(chunk, testChunks[chunkIndex]) + if chunkIndex == testChunks.count - 1 { + receiveExpectation.fulfill() + } + chunkIndex += 1 + } + closureExpectation.fulfill() + } + + sendPayload() + + await fulfillment( + of: [receiveExpectation, closureExpectation], + timeout: 5, + enforceOrder: true + ) + } + + func testChunkReadCallback() async { + let receiveExpectation = expectation(description: "Receive all chunks") + let closureExpectation = expectation(description: "Normal closure") + + var chunkIndex = 0 + + reader.readChunks { chunk in + XCTAssertEqual(chunk, self.testChunks[chunkIndex]) + if chunkIndex == self.testChunks.count - 1 { + receiveExpectation.fulfill() + } + chunkIndex += 1 + } onCompletion: { error in + XCTAssertNil(error) + closureExpectation.fulfill() + } + + sendPayload() + + await fulfillment( + of: [receiveExpectation, closureExpectation], + timeout: 5, + enforceOrder: true + ) + } + + func testChunkReadError() async throws { + let throwsExpectation = expectation(description: "Read throws error") + let testError = StreamError.abnormalEnd(reason: "test") + + Task { + do { + for try await _ in reader {} + } catch { + XCTAssertEqual(error as? StreamError, testError) + throwsExpectation.fulfill() + } + } + sendPayload(closingError: testError) + + await fulfillment( + of: [throwsExpectation], + timeout: 5 + ) + } + + func testReadAll() async throws { + let readExpectation = expectation(description: "Read full payload") + + Task { + let fullPayload = try await reader.readAll() + XCTAssertEqual(fullPayload, testPayload) + readExpectation.fulfill() + } + sendPayload() + + await fulfillment( + of: [readExpectation], + timeout: 5 + ) + } +} diff --git a/Tests/LiveKitTests/DataStream/TextStreamInfoTests.swift b/Tests/LiveKitTests/DataStream/TextStreamInfoTests.swift new file mode 100644 index 000000000..1b328b498 --- /dev/null +++ b/Tests/LiveKitTests/DataStream/TextStreamInfoTests.swift @@ -0,0 +1,62 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@testable import LiveKit +import XCTest + +class TextStreamInfoTests: XCTestCase { + + func testProtocolTypeConversion() { + let info = TextStreamInfo( + id: "id", + mimeType: "text/plain", + topic: "topic", + timestamp: Date(timeIntervalSince1970: 100), + totalLength: 128, + attributes: ["key": "value"], + operationType: .reaction, + version: 10, + replyToStreamID: "replyID", + attachedStreamIDs: ["attachedID"], + generated: true + ) + let header = Livekit_DataStream.Header(info) + XCTAssertEqual(header.streamID, info.id) + XCTAssertEqual(header.mimeType, info.mimeType) + XCTAssertEqual(header.topic, info.topic) + XCTAssertEqual(header.timestamp, Int64(info.timestamp.timeIntervalSince1970)) + XCTAssertEqual(header.totalLength, UInt64(info.totalLength ?? -1)) + XCTAssertEqual(header.attributes, info.attributes) + XCTAssertEqual(header.textHeader.operationType.rawValue, info.operationType.rawValue) + XCTAssertEqual(header.textHeader.version, Int32(info.version)) + XCTAssertEqual(header.textHeader.replyToStreamID, info.replyToStreamID) + XCTAssertEqual(header.textHeader.attachedStreamIds, info.attachedStreamIDs) + XCTAssertEqual(header.textHeader.generated, info.generated) + + let newInfo = TextStreamInfo(header, header.textHeader) + XCTAssertEqual(newInfo.id, info.id) + XCTAssertEqual(newInfo.mimeType, info.mimeType) + XCTAssertEqual(newInfo.topic, info.topic) + XCTAssertEqual(newInfo.timestamp, info.timestamp) + XCTAssertEqual(newInfo.totalLength, info.totalLength) + XCTAssertEqual(newInfo.attributes, info.attributes) + XCTAssertEqual(newInfo.operationType, info.operationType) + XCTAssertEqual(newInfo.version, info.version) + XCTAssertEqual(newInfo.replyToStreamID, info.replyToStreamID) + XCTAssertEqual(newInfo.attachedStreamIDs, info.attachedStreamIDs) + XCTAssertEqual(newInfo.generated, info.generated) + } +} diff --git a/Tests/LiveKitTests/DataStream/TextStreamReaderTests.swift b/Tests/LiveKitTests/DataStream/TextStreamReaderTests.swift new file mode 100644 index 000000000..8f7f0d1c4 --- /dev/null +++ b/Tests/LiveKitTests/DataStream/TextStreamReaderTests.swift @@ -0,0 +1,41 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@testable import LiveKit +import XCTest + +class TextStreamReaderTests: XCTestCase { + + func testInitialization() { + let info = TextStreamInfo( + id: UUID().uuidString, + mimeType: "text/plain", + topic: "someTopic", + timestamp: Date(), + totalLength: nil, + attributes: [:], + operationType: .create, + version: 1, + replyToStreamID: nil, + attachedStreamIDs: [], + generated: false + ) + let source = StreamReaderSource { _ in } + let reader = TextStreamReader(info: info, source: source) + + XCTAssertEqual(reader.info, info) + } +} diff --git a/Tests/LiveKitTests/DataStreamsTests.swift b/Tests/LiveKitTests/DataStreamsTests.swift new file mode 100644 index 000000000..b703165fe --- /dev/null +++ b/Tests/LiveKitTests/DataStreamsTests.swift @@ -0,0 +1,57 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@testable import LiveKit +import XCTest + +class DataStreamsTests: XCTestCase { + + // MARK: - Text + + func testSendText() { + XCTFail() + } + + func testStreamText() { + XCTFail() + } + + func testRegisterTextStreamHandler() { + XCTFail() + } + + func testUnregisterTextStreamHandler() { + XCTFail() + } + + // MARK: - Bytes + + func testSendFile() { + XCTFail() + } + + func testStreamBytes() { + XCTFail() + } + + func testRegisterByteStreamHandler() { + XCTFail() + } + + func testUnregisterByteStreamHandler() { + XCTFail() + } +} diff --git a/Tests/LiveKitTests/RpcTests.swift b/Tests/LiveKitTests/RpcTests.swift index a59c488f2..005abce57 100644 --- a/Tests/LiveKitTests/RpcTests.swift +++ b/Tests/LiveKitTests/RpcTests.swift @@ -18,19 +18,6 @@ import XCTest class RpcTests: LKTestCase { - // Mock DataChannelPair to intercept outgoing packets - class MockDataChannelPair: DataChannelPair { - var packetHandler: (Livekit_DataPacket) -> Void - - init(packetHandler: @escaping (Livekit_DataPacket) -> Void) { - self.packetHandler = packetHandler - } - - override func send(dataPacket packet: Livekit_DataPacket) throws { - packetHandler(packet) - } - } - // Test performing RPC calls and verifying outgoing packets func testPerformRpc() async throws { try await withRooms([RoomTestingOptions()]) { rooms in diff --git a/Tests/LiveKitTests/Support/MockDataChannelPair.swift b/Tests/LiveKitTests/Support/MockDataChannelPair.swift new file mode 100644 index 000000000..0c37c25b5 --- /dev/null +++ b/Tests/LiveKitTests/Support/MockDataChannelPair.swift @@ -0,0 +1,30 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@testable import LiveKit + +/// Mock ``DataChannelPair`` to intercept outgoing packets. +class MockDataChannelPair: DataChannelPair { + var packetHandler: (Livekit_DataPacket) -> Void + + init(packetHandler: @escaping (Livekit_DataPacket) -> Void) { + self.packetHandler = packetHandler + } + + override func send(dataPacket packet: Livekit_DataPacket) throws { + packetHandler(packet) + } +}