Skip to content

Commit c2b7366

Browse files
authored
Bidirectional stream events (#583)
1 parent f6d37bb commit c2b7366

File tree

14 files changed

+391
-0
lines changed

14 files changed

+391
-0
lines changed

Examples/README.md

+2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ The following packages show working with various content types, such as JSON, UR
3434
- [various-content-types-server-example](./various-content-types-server-example) - A server showing how to handle and provide the various content types.
3535
- [event-streams-client-example](./event-streams-client-example) - A client showing how to provide and handle event streams.
3636
- [event-streams-server-example](./event-streams-server-example) - A server showing how to handle and provide event streams.
37+
- [bidirectional-event-streams-client-example](./bidirectional-event-streams-client-example) - A client showing how to provide and handle bidirectional event streams.
38+
- [bidirectional-event-streams-server-example](./bidirectional-event-streams-server-example) - A server showing how to handle and provide bidirectional event streams.
3739

3840
## Integrations
3941

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
.DS_Store
2+
.build
3+
/Packages
4+
/*.xcodeproj
5+
xcuserdata/
6+
DerivedData/
7+
.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata
8+
.vscode
9+
/Package.resolved
10+
.ci/
11+
.docc-build/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// swift-tools-version:5.9
2+
//===----------------------------------------------------------------------===//
3+
//
4+
// This source file is part of the SwiftOpenAPIGenerator open source project
5+
//
6+
// Copyright (c) 2024 Apple Inc. and the SwiftOpenAPIGenerator project authors
7+
// Licensed under Apache License v2.0
8+
//
9+
// See LICENSE.txt for license information
10+
// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors
11+
//
12+
// SPDX-License-Identifier: Apache-2.0
13+
//
14+
//===----------------------------------------------------------------------===//
15+
import PackageDescription
16+
17+
let package = Package(
18+
name: "bidirectional-event-streams-client-example",
19+
platforms: [.macOS(.v10_15), .iOS(.v13), .tvOS(.v13), .watchOS(.v6), .visionOS(.v1)],
20+
dependencies: [
21+
.package(url: "https://github.com/apple/swift-openapi-generator", from: "1.0.0"),
22+
.package(url: "https://github.com/apple/swift-openapi-runtime", from: "1.2.0"),
23+
.package(url: "https://github.com/swift-server/swift-openapi-async-http-client", from: "1.0.0"),
24+
],
25+
targets: [
26+
.executableTarget(
27+
name: "BidirectionalEventStreamsClient",
28+
dependencies: [
29+
.product(name: "OpenAPIRuntime", package: "swift-openapi-runtime"),
30+
.product(name: "OpenAPIAsyncHTTPClient", package: "swift-openapi-async-http-client"),
31+
],
32+
plugins: [.plugin(name: "OpenAPIGenerator", package: "swift-openapi-generator")]
33+
)
34+
]
35+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Client handling bidirectional event streams
2+
3+
An example project using [Swift OpenAPI Generator](https://github.com/apple/swift-openapi-generator).
4+
5+
> **Disclaimer:** This example is deliberately simplified and is intended for illustrative purposes only.
6+
7+
## Overview
8+
9+
A command-line tool that uses a generated client to show how to work with bidirectional event streams.
10+
11+
Instead of [URLSession](https://developer.apple.com/documentation/foundation/urlsession), which will return stream only until at least “some” bytes of the body have also been received (see [comment](https://github.com/apple/swift-openapi-urlsession/blob/main/Tests/OpenAPIURLSessionTests/URLSessionBidirectionalStreamingTests/URLSessionBidirectionalStreamingTests.swift#L193-L206)), tool uses the [AsyncHTTPClient](https://github.com/swift-server/async-http-client) API to perform the HTTP call, wrapped in the [AsyncHTTPClient Transport for Swift OpenAPI Generator](https://github.com/swift-server/swift-openapi-async-http-client). A workaround for URLSession could be sending an `empty`, `.joined` or some kind of hearbeat message from server first when initialising a stream.
12+
13+
The server can be started by running `bidirectional-event-streams-server-example` locally.
14+
15+
## Usage
16+
17+
Build and run the client CLI using:
18+
19+
```console
20+
% swift run
21+
Sending and fetching back greetings using JSON Lines
22+
...
23+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftOpenAPIGenerator open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
import OpenAPIRuntime
15+
import OpenAPIAsyncHTTPClient
16+
import Foundation
17+
18+
@main struct BidirectionalEventStreamsClient {
19+
private static let templates: [String] = [
20+
"Hello, %@!", "Good morning, %@!", "Hi, %@!", "Greetings, %@!", "Hey, %@!", "Hi there, %@!",
21+
"Good evening, %@!",
22+
]
23+
static func main() async throws {
24+
let client = Client(serverURL: URL(string: "http://localhost:8080/api")!, transport: AsyncHTTPClientTransport())
25+
do {
26+
print("Sending and fetching back greetings using JSON Lines")
27+
let (stream, continuation) = AsyncStream<Components.Schemas.Greeting>.makeStream()
28+
/// To keep it simple, using JSON Lines, as it most straightforward and easy way to have streams.
29+
/// For SSE and JSON Sequences cases please check `event-streams-client-example`.
30+
let requestBody: Operations.getGreetingsStream.Input.Body = .application_jsonl(
31+
.init(stream.asEncodedJSONLines(), length: .unknown, iterationBehavior: .single)
32+
)
33+
let response = try await client.getGreetingsStream(query: .init(name: "Example"), body: requestBody)
34+
let greetingStream = try response.ok.body.application_jsonl.asDecodedJSONLines(
35+
of: Components.Schemas.Greeting.self
36+
)
37+
try await withThrowingTaskGroup(of: Void.self) { group in
38+
// Listen for upcoming messages
39+
group.addTask {
40+
for try await greeting in greetingStream {
41+
try Task.checkCancellation()
42+
print("Got greeting: \(greeting.message)")
43+
}
44+
}
45+
// Send messages
46+
group.addTask {
47+
for template in Self.templates {
48+
try Task.checkCancellation()
49+
continuation.yield(.init(message: template))
50+
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
51+
}
52+
continuation.finish()
53+
}
54+
return try await group.waitForAll()
55+
}
56+
}
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
generate:
2+
- types
3+
- client
4+
accessModifier: internal
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
openapi: '3.1.0'
2+
info:
3+
title: EventStreamsService
4+
version: 1.0.0
5+
servers:
6+
- url: https://example.com/api
7+
description: Example service deployment.
8+
paths:
9+
/greetings:
10+
post:
11+
operationId: getGreetingsStream
12+
parameters:
13+
- name: name
14+
required: false
15+
in: query
16+
description: The name used in the returned greeting.
17+
schema:
18+
type: string
19+
requestBody:
20+
description: A body with a greetings stream.
21+
required: true
22+
content:
23+
application/jsonl: {}
24+
responses:
25+
'200':
26+
description: A success response with a greetings stream.
27+
content:
28+
application/jsonl: {}
29+
components:
30+
schemas:
31+
Greeting:
32+
type: object
33+
description: A value with the greeting contents.
34+
properties:
35+
message:
36+
type: string
37+
description: The string representation of the greeting.
38+
required:
39+
- message
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
.DS_Store
2+
.build
3+
/Packages
4+
/*.xcodeproj
5+
xcuserdata/
6+
DerivedData/
7+
.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata
8+
.vscode
9+
/Package.resolved
10+
.ci/
11+
.docc-build/
12+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// swift-tools-version:5.9
2+
//===----------------------------------------------------------------------===//
3+
//
4+
// This source file is part of the SwiftOpenAPIGenerator open source project
5+
//
6+
// Copyright (c) 2024 Apple Inc. and the SwiftOpenAPIGenerator project authors
7+
// Licensed under Apache License v2.0
8+
//
9+
// See LICENSE.txt for license information
10+
// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors
11+
//
12+
// SPDX-License-Identifier: Apache-2.0
13+
//
14+
//===----------------------------------------------------------------------===//
15+
import PackageDescription
16+
17+
let package = Package(
18+
name: "bidirectional-event-streams-server-example",
19+
platforms: [.macOS(.v10_15)],
20+
dependencies: [
21+
.package(url: "https://github.com/apple/swift-openapi-generator", from: "1.0.0"),
22+
.package(url: "https://github.com/apple/swift-openapi-runtime", from: "1.2.0"),
23+
.package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0-rc.1"),
24+
.package(url: "https://github.com/swift-server/swift-openapi-hummingbird.git", from: "2.0.0-beta.4"),
25+
],
26+
targets: [
27+
.executableTarget(
28+
name: "BidirectionalEventStreamsServer",
29+
dependencies: [
30+
.product(name: "OpenAPIRuntime", package: "swift-openapi-runtime"),
31+
.product(name: "OpenAPIHummingbird", package: "swift-openapi-hummingbird"),
32+
.product(name: "Hummingbird", package: "hummingbird"),
33+
],
34+
plugins: [.plugin(name: "OpenAPIGenerator", package: "swift-openapi-generator")]
35+
)
36+
]
37+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Server supporting bidirectional event streams
2+
3+
An example project using [Swift OpenAPI Generator](https://github.com/apple/swift-openapi-generator).
4+
5+
> **Disclaimer:** This example is deliberately simplified and is intended for illustrative purposes only.
6+
7+
## Overview
8+
9+
A server that uses generated server stubs to show how to work with bidirectional event streams.
10+
11+
The tool uses the [Hummingbird](https://github.com/hummingbird-project/hummingbird) server framework to handle HTTP requests, wrapped in the [Swift OpenAPI Hummingbird](https://github.com/swift-server/swift-openapi-hummingbird).
12+
13+
The CLI starts the server on `http://localhost:8080` and can be invoked by running `bidirectional-event-streams-client-example`.
14+
15+
## Usage
16+
17+
Build and run the server CLI using:
18+
19+
```console
20+
% swift run
21+
2024-07-04T08:56:23+0200 info Hummingbird : [HummingbirdCore] Server started and listening on 127.0.0.1:8080
22+
...
23+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftOpenAPIGenerator open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
import OpenAPIRuntime
15+
import OpenAPIHummingbird
16+
import Hummingbird
17+
import Foundation
18+
19+
struct Handler: APIProtocol {
20+
private let storage: StreamStorage = .init()
21+
func getGreetingsStream(_ input: Operations.getGreetingsStream.Input) async throws
22+
-> Operations.getGreetingsStream.Output
23+
{
24+
let eventStream = await self.storage.makeStream(input: input)
25+
/// To keep it simple, using JSON Lines, as it most straightforward and easy way to have streams.
26+
/// For SSE and JSON Sequences cases please check `event-streams-server-example`.
27+
let responseBody = Operations.getGreetingsStream.Output.Ok.Body.application_jsonl(
28+
.init(eventStream.asEncodedJSONLines(), length: .unknown, iterationBehavior: .single)
29+
)
30+
return .ok(.init(body: responseBody))
31+
}
32+
}
33+
34+
@main struct BidirectionalEventStreamsServer {
35+
static func main() async throws {
36+
let router = Router()
37+
let handler = Handler()
38+
try handler.registerHandlers(on: router, serverURL: URL(string: "/api")!)
39+
let app = Application(router: router, configuration: .init())
40+
try await app.run()
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftOpenAPIGenerator open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Foundation
16+
17+
actor StreamStorage: Sendable {
18+
private typealias StreamType = AsyncStream<Components.Schemas.Greeting>
19+
private var streams: [String: Task<Void, any Error>] = [:]
20+
init() {}
21+
private func finishedStream(id: String) {
22+
guard self.streams[id] != nil else { return }
23+
self.streams.removeValue(forKey: id)
24+
}
25+
private func cancelStream(id: String) {
26+
guard let task = self.streams[id] else { return }
27+
self.streams.removeValue(forKey: id)
28+
task.cancel()
29+
print("Canceled stream \(id)")
30+
}
31+
func makeStream(input: Operations.getGreetingsStream.Input) -> AsyncStream<Components.Schemas.Greeting> {
32+
let name = input.query.name ?? "Stranger"
33+
let id = UUID().uuidString
34+
print("Creating stream \(id) for name: \(name)")
35+
let (stream, continuation) = StreamType.makeStream()
36+
continuation.onTermination = { termination in
37+
Task { [weak self] in
38+
switch termination {
39+
case .cancelled: await self?.cancelStream(id: id)
40+
case .finished: await self?.finishedStream(id: id)
41+
@unknown default: await self?.finishedStream(id: id)
42+
}
43+
}
44+
}
45+
let inputStream =
46+
switch input.body {
47+
case .application_jsonl(let body): body.asDecodedJSONLines(of: Components.Schemas.Greeting.self)
48+
}
49+
let task = Task<Void, any Error> {
50+
for try await message in inputStream {
51+
try Task.checkCancellation()
52+
print("Recieved a message \(message)")
53+
print("Sending greeting back for \(id)")
54+
let greetingText = String(format: message.message, name)
55+
continuation.yield(.init(message: greetingText))
56+
}
57+
continuation.finish()
58+
}
59+
self.streams[id] = task
60+
return stream
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
generate:
2+
- types
3+
- server
4+
accessModifier: internal

0 commit comments

Comments
 (0)