Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data streams #593

Draft
wants to merge 39 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9bbd075
Move `MockDataChannelPair` to seperate file
ladvoc Feb 13, 2025
0941ed2
Create test suite
ladvoc Feb 13, 2025
1e4717b
Update proto to 1.33.0
ladvoc Feb 14, 2025
9326959
Merge remote-tracking branch 'upstream/main' into proto-update
ladvoc Feb 14, 2025
590787a
Merge branch 'proto-update' into data-streams
ladvoc Feb 14, 2025
4efb4f0
Define steam info classes
ladvoc Feb 18, 2025
6e76d98
Define stream error
ladvoc Feb 18, 2025
2f0fd70
Define stream readers
ladvoc Feb 18, 2025
9d551c5
Define stream manager
ladvoc Feb 18, 2025
1b66c71
Integrate stream manager
ladvoc Feb 18, 2025
5c13aa1
Publicly expose handler registration methods
ladvoc Feb 18, 2025
5044030
Update protocol definition
ladvoc Feb 18, 2025
ecf4b49
Add default implementations
ladvoc Feb 18, 2025
4d350fe
Add error cases and documentation
ladvoc Feb 18, 2025
e2b6136
Remove skeleton methods
ladvoc Feb 18, 2025
18c1270
Implement read to file for byte stream
ladvoc Feb 18, 2025
21396b1
Test stream readers
ladvoc Feb 18, 2025
ea81b31
Merge remote-tracking branch 'upstream/main' into data-streams
ladvoc Feb 18, 2025
ddec23e
Rename property
ladvoc Feb 18, 2025
f277d5b
Keep track of bytes read
ladvoc Feb 18, 2025
08f90de
Terminate open streams on deinit
ladvoc Feb 18, 2025
7ef4dae
Terminate stream if manager is no longer available
ladvoc Feb 18, 2025
35e8342
Use typealias
ladvoc Feb 18, 2025
07b97fc
Implement default file name
ladvoc Feb 19, 2025
e4463f9
Refactor file name resolution
ladvoc Feb 19, 2025
cb765e1
Rename `StreamManager` to `IncomingStreamManager`
ladvoc Feb 19, 2025
9cb5ce4
Protocol type conversion
ladvoc Feb 19, 2025
219612c
Add iOS to version check
ladvoc Feb 20, 2025
26fe835
Move test case
ladvoc Feb 20, 2025
275b314
Update tests
ladvoc Feb 20, 2025
a332eb3
Rename type
ladvoc Feb 20, 2025
5a06e24
Refactor tests
ladvoc Feb 20, 2025
72422a4
Test incoming stream manager
ladvoc Feb 20, 2025
860eb85
Clean up
ladvoc Feb 20, 2025
58b6729
Refactor stream reader
ladvoc Feb 20, 2025
5bfdce3
Rename error case
ladvoc Feb 20, 2025
d8cd81f
Organize
ladvoc Feb 21, 2025
8cdc9b2
Create skeleton methods
ladvoc Feb 21, 2025
47b6c31
Merge remote-tracking branch 'upstream/main' into data-streams
ladvoc Feb 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions Sources/LiveKit/Core/Room+DataStream.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

extension Room {

/// Registers a handler for new byte streams matching the given topic.
///
/// - Parameters:
/// - topic: Topic identifier; only streams with this topic will be handled.
/// - handler: Handler closure passed the stream reader (``ByteStreamReader``) and the identity of the remote participant who opened the stream.
///
public func registerByteStreamHandler(for topic: String, _ handler: @escaping ByteStreamHandler) async throws {
try await incomingStreamManager.registerByteStreamHandler(for: topic, handler)
}

/// Registers a handler for new text streams matching the given topic.
///
/// - Parameters:
/// - topic: Topic identifier; only streams with this topic will be handled.
/// - handler: Handler closure passed the stream reader (``TextStreamReader``) and the identity of the remote participant who opened the stream.
///
public func registerTextStreamHandler(for topic: String, _ handler: @escaping TextStreamHandler) async throws {
try await incomingStreamManager.registerTextStreamHandler(for: topic, handler)
}

/// Unregisters a byte stream handler that was previously registered for the given topic.
public func unregisterByteStreamHandler(for topic: String) async {
await incomingStreamManager.unregisterByteStreamHandler(for: topic)
}

/// Unregisters a text stream handler that was previously registered for the given topic.
func unregisterTextStreamHandler(for topic: String) async {
await incomingStreamManager.unregisterTextStreamHandler(for: topic)
}
}
5 changes: 5 additions & 0 deletions Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ public class Room: NSObject, ObservableObject, Loggable {

lazy var subscriberDataChannel = DataChannelPair(delegate: self)
lazy var publisherDataChannel = DataChannelPair(delegate: self)

lazy var incomingStreamManager = IncomingStreamManager()

var _blockProcessQueue = DispatchQueue(label: "LiveKitSDK.engine.pendingBlocks",
qos: .default)
Expand Down Expand Up @@ -537,6 +539,9 @@ extension Room: DataChannelDelegate {
case let .rpcResponse(response): room(didReceiveRpcResponse: response)
case let .rpcAck(ack): room(didReceiveRpcAck: ack)
case let .rpcRequest(request): room(didReceiveRpcRequest: request, from: dataPacket.participantIdentity)
case let .streamHeader(header): Task { await incomingStreamManager.handle(header: header, from: dataPacket.participantIdentity) }
case let .streamChunk(chunk): Task { await incomingStreamManager.handle(chunk: chunk) }
case let .streamTrailer(trailer): Task { await incomingStreamManager.handle(trailer: trailer) }
default: return
}
}
Expand Down
150 changes: 150 additions & 0 deletions Sources/LiveKit/DataStream/ByteStreamReader.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Foundation

/// An asynchronous sequence of chunks read from a byte data stream.
@objc
public final class ByteStreamReader: NSObject, AsyncSequence, Sendable {

Check failure on line 21 in Sources/LiveKit/DataStream/ByteStreamReader.swift

View workflow job for this annotation

GitHub Actions / test (macos-14, 15.4, macOS)

type 'ByteStreamReader' does not conform to protocol 'AsyncSequence'

Check failure on line 21 in Sources/LiveKit/DataStream/ByteStreamReader.swift

View workflow job for this annotation

GitHub Actions / test (macos-14, 15.4, tvOS Simulator,name=Apple TV)

type 'ByteStreamReader' does not conform to protocol 'AsyncSequence'

Check failure on line 21 in Sources/LiveKit/DataStream/ByteStreamReader.swift

View workflow job for this annotation

GitHub Actions / test (macos-14, 15.4, macOS,variant=Mac Catalyst)

type 'ByteStreamReader' does not conform to protocol 'AsyncSequence'

Check failure on line 21 in Sources/LiveKit/DataStream/ByteStreamReader.swift

View workflow job for this annotation

GitHub Actions / test (macos-14, 15.4, iOS Simulator,OS=17.5,name=iPhone 15 Pro)

type 'ByteStreamReader' does not conform to protocol 'AsyncSequence'

Check failure on line 21 in Sources/LiveKit/DataStream/ByteStreamReader.swift

View workflow job for this annotation

GitHub Actions / test (macos-13, 14.2, macOS,variant=Mac Catalyst)

type 'ByteStreamReader' does not conform to protocol 'AsyncSequence'

Check failure on line 21 in Sources/LiveKit/DataStream/ByteStreamReader.swift

View workflow job for this annotation

GitHub Actions / test (macos-13, 14.2, macOS)

type 'ByteStreamReader' does not conform to protocol 'AsyncSequence'

Check failure on line 21 in Sources/LiveKit/DataStream/ByteStreamReader.swift

View workflow job for this annotation

GitHub Actions / test (macos-14, 15.4, visionOS Simulator,name=Apple Vision Pro)

type 'ByteStreamReader' does not conform to protocol 'AsyncSequence'

Check failure on line 21 in Sources/LiveKit/DataStream/ByteStreamReader.swift

View workflow job for this annotation

GitHub Actions / test (macos-13, 14.2, iOS Simulator,OS=17.2,name=iPhone 14 Pro)

type 'ByteStreamReader' does not conform to protocol 'AsyncSequence'
/// Information about the incoming byte stream.
@objc
public let info: ByteStreamInfo

let source: StreamReader<Data>

init(info: ByteStreamInfo, source: StreamReaderSource) {
self.info = info
self.source = StreamReader(source: source)
}

public func makeAsyncIterator() -> StreamReader<Data>.Iterator {
source.makeAsyncIterator()
}

/// Reads incoming chunks from the byte stream, concatenating them into a single data object which is returned
/// once the stream closes normally.
///
/// - Returns: The data consisting of all concatenated chunks.
/// - Throws: ``StreamError`` if an error occurs while reading the stream.
///
public func readAll() async throws -> Data {
try await source.readAll()
}
}

extension ByteStreamReader {
/// Reads incoming chunks from the byte stream, writing them to a file as they are received.
///
/// - Parameters:
/// - directory: The directory to write the file in. The system temporary directory is used if not specified.
/// - nameOverride: The name to use for the written file. If not specified, file name and extension will be automatically
/// inferred from the stream information.
/// - Returns: The URL of the written file on disk.
/// - Throws: ``StreamError`` if an error occurs while reading the stream.
///
public func readToFile(
in directory: URL = FileManager.default.temporaryDirectory,
name nameOverride: String? = nil
) async throws -> URL {
guard directory.hasDirectoryPath else {
throw StreamError.notDirectory
}
let fileName = resolveFileName(override: nameOverride)
let fileURL = directory.appendingPathComponent(fileName)

FileManager.default.createFile(atPath: fileURL.path, contents: nil)
let handle = try FileHandle(forWritingTo: fileURL)

try await Task {
for try await chunk in self {
guard #available(macOS 10.15.4, iOS 13.4, *) else {
handle.write(chunk)
return
}
try handle.write(contentsOf: chunk)
}
}.value

try handle.close()
return fileURL
}

private func resolveFileName(override: String?) -> String {
Self.resolveFileName(
setName: override ?? info.fileName,
fallbackName: info.id,
mimeType: info.mimeType,
fallbackExtension: "bin"
)
}

/// Resolves the filename used when writing the stream to disk.
///
/// - Parameters:
/// - setName: The name set by the user or taken from stream metadata.
/// - fallbackName: Name to fallback on when `setName` is `nil`.
/// - mimeType: MIME type used for determining file extension.
/// - fallbackExtension: File extension to fallback on when MIME type cannot be resolved.
/// - Returns: The resolved file name.
///
static func resolveFileName(
setName: String?,
fallbackName: String,
mimeType: String,
fallbackExtension: String
) -> String {
var resolvedExtension: String {
preferredExtension(for: mimeType) ?? fallbackExtension
}
guard let setName else {
return "\(fallbackName).\(resolvedExtension)"
}
guard setName.pathExtension != nil else {
return "\(setName).\(resolvedExtension)"
}
return setName
}
}

// MARK: - Objective-C compatibility

public extension ByteStreamReader {
@objc
@available(*, unavailable, message: "Use async readAll() method instead.")
func readAll(onCompletion: @escaping (Data) -> Void, onError: ((Error?) -> Void)?) {
source.readAll(onCompletion: onCompletion, onError: onError)
}

@objc
@available(*, unavailable, message: "Use for/await on ByteStreamReader reader instead.")
func readChunks(onChunk: @escaping (Data) -> Void, onCompletion: ((Error?) -> Void)?) {
source.readChunks(onChunk: onChunk, onCompletion: onCompletion)
}

@objc
@available(*, unavailable, message: "Use async readToFile(in:name:) method instead.")
internal func readToFile(
in directory: URL,
name nameOverride: String?,
onCompletion: @escaping (URL) -> Void,
onError: ((Error) -> Void)?
) {
Task {
do { try onCompletion(await self.readToFile(in: directory, name: nameOverride)) }
catch { onError?(error) }
}
}
}
Loading
Loading