@@ -37,18 +37,6 @@ type GraphQLWebSocketMiddleware<'Root>
37
37
let endpointUrl = PathString options.WebsocketOptions.EndpointUrl
38
38
let connectionInitTimeout = options.WebsocketOptions.ConnectionInitTimeout
39
39
40
- let serializeServerMessage ( jsonSerializerOptions : JsonSerializerOptions ) ( serverMessage : ServerMessage ) = task {
41
- let raw =
42
- match serverMessage with
43
- | ConnectionAck -> { Id = ValueNone; Type = " connection_ack" ; Payload = ValueNone }
44
- | ServerPing -> { Id = ValueNone; Type = " ping" ; Payload = ValueNone }
45
- | ServerPong p -> { Id = ValueNone; Type = " pong" ; Payload = p |> ValueOption.map CustomResponse }
46
- | Next ( id, payload) -> { Id = ValueSome id; Type = " next" ; Payload = ValueSome <| ExecutionResult payload }
47
- | Complete id -> { Id = ValueSome id; Type = " complete" ; Payload = ValueNone }
48
- | Error ( id, errMsgs) -> { Id = ValueSome id; Type = " error" ; Payload = ValueSome <| ErrorMessages errMsgs }
49
- return JsonSerializer.Serialize ( raw, jsonSerializerOptions)
50
- }
51
-
52
40
static let invalidJsonInClientMessageError =
53
41
Result.Error <| InvalidMessage ( 4400 , " Invalid json in client message" )
54
42
@@ -83,7 +71,7 @@ type GraphQLWebSocketMiddleware<'Root>
83
71
let buffer = ArrayPool.Shared.Rent options.ReadBufferSize
84
72
try
85
73
let completeMessage = new PooledList< byte> ()
86
- let mutable segmentResponse : WebSocketReceiveResult = null
74
+ let mutable segmentResponse : WebSocketReceiveResult | null = null
87
75
while ( not cancellationToken.IsCancellationRequested)
88
76
&& socket |> isSocketOpen
89
77
&& (( segmentResponse = null )
@@ -111,17 +99,15 @@ type GraphQLWebSocketMiddleware<'Root>
111
99
ArrayPool.Shared.Return buffer
112
100
}
113
101
114
- let sendMessageViaSocket ( jsonSerializerOptions ) ( socket : WebSocket ) ( message : ServerMessage ) : Task = task {
102
+ let sendMessageViaSocket ( jsonSerializerOptions : JsonSerializerOptions ) ( socket : WebSocket ) ( message : ServerMessage ) : Task = task {
115
103
if not ( socket.State = WebSocketState.Open) then
116
104
logger.LogTrace ( $" Ignoring message to be sent via socket, since its state is not '{nameof WebSocketState.Open}', but '{{state}}'" , socket.State)
117
105
else
118
- // TODO: Allocate string only if a debugger is attached
119
- let! serializedMessage = message |> serializeServerMessage jsonSerializerOptions
120
- let segment = new ArraySegment< byte> ( System.Text.Encoding.UTF8.GetBytes ( serializedMessage))
121
106
if not ( socket.State = WebSocketState.Open) then
122
107
logger.LogTrace ( $" Ignoring message to be sent via socket, since its state is not '{nameof WebSocketState.Open}', but '{{state}}'" , socket.State)
123
108
else
124
- do ! socket.SendAsync ( segment, WebSocketMessageType.Text, endOfMessage = true , cancellationToken = CancellationToken.None)
109
+ let bytes = JsonSerializer.SerializeToUtf8Bytes( message, jsonSerializerOptions)
110
+ do ! socket.SendAsync ( bytes.AsMemory(), WebSocketMessageType.Text, endOfMessage = true , cancellationToken = CancellationToken.None)
125
111
126
112
logger.LogTrace ( " <- Response: {response}" , message)
127
113
}
@@ -169,27 +155,26 @@ type GraphQLWebSocketMiddleware<'Root>
169
155
let sendMsg = sendMessageViaSocket serializerOptions socket
170
156
let rcv () = socket |> rcvMsgViaSocket serializerOptions
171
157
172
- let sendOutput id ( output : SubscriptionExecutionResult ) =
158
+ let sendOutput id ( output : GQLWebSocketResponse ) =
173
159
sendMsg ( Next ( id, output))
174
160
175
161
let sendSubscriptionResponseOutput id subscriptionResult =
176
162
match subscriptionResult with
177
- | SubscriptionResult output -> { Data = ValueSome output ; Errors = [] } |> sendOutput id
178
- | SubscriptionErrors ( output , errors) ->
163
+ | SubscriptionResult data -> GQLWebSocketResponse.Direct data |> sendOutput id
164
+ | SubscriptionErrors ( data , errors) ->
179
165
logger.LogWarning ( " Subscription errors: {subscriptionErrors}" , ( String.Join ( '\n' , errors |> Seq.map ( fun x -> $" - %s {x.Message}" ))))
180
- { Data = ValueNone ; Errors = errors } |> sendOutput id
166
+ GQLWebSocketResponse.Error ( data , errors) |> sendOutput id
181
167
182
168
let sendDeferredResponseOutput id deferredResult =
183
169
match deferredResult with
184
- | DeferredResult ( obj, path) ->
185
- let output = obj :?> Dictionary< string, obj>
186
- { Data = ValueSome output; Errors = [] } |> sendOutput id
187
- | DeferredErrors ( obj, errors, _) ->
170
+ | DeferredResult ( data, path) ->
171
+ GQLWebSocketResponse.Deferred ( data, path) |> sendOutput id
172
+ | DeferredErrors ( data, errors, path) ->
188
173
logger.LogWarning (
189
174
" Deferred response errors: {deferredErrors}" ,
190
175
( String.Join ( '\n' , errors |> Seq.map ( fun x -> $" - %s {x.Message}" )))
191
176
)
192
- { Data = ValueNone ; Errors = errors } |> sendOutput id
177
+ GQLWebSocketResponse.Error ( data , path , errors) |> sendOutput id
193
178
194
179
let sendDeferredResultDelayedBy ( ct : CancellationToken ) ( ms : int ) id deferredResult : Task = task {
195
180
do ! Task.Delay ( ms, ct)
@@ -202,16 +187,16 @@ type GraphQLWebSocketMiddleware<'Root>
202
187
( subscriptions, socket, observableOutput, serializerOptions)
203
188
|> addClientSubscription id sendSubscriptionResponseOutput
204
189
| Deferred ( data, errors, observableOutput) ->
205
- do ! { Data = ValueSome data; Errors = [] } |> sendOutput id
190
+ do ! GQLWebSocketResponse.Direct data |> sendOutput id
206
191
if errors.IsEmpty then
207
192
( subscriptions, socket, observableOutput, serializerOptions)
208
193
|> addClientSubscription id ( sendDeferredResultDelayedBy cancellationToken 5000 )
209
194
else
210
195
()
211
- | Direct ( data, _ ) -> do ! { Data = ValueSome data; Errors = [] } |> sendOutput id
212
- | RequestError problemDetails ->
213
- logger.LogWarning( " Request errors:\n {errors}" , problemDetails )
214
- do ! { Data = ValueNone ; Errors = problemDetails } |> sendOutput id
196
+ | Direct ( data, errors ) -> do ! GQLWebSocketResponse.Error ( data, errors ) |> sendOutput id
197
+ | RequestError errors ->
198
+ logger.LogWarning( " Request errors:\n {errors}" , errors )
199
+ do ! GQLWebSocketResponse.Error errors |> sendOutput id
215
200
}
216
201
217
202
let logMsgReceivedWithOptionalPayload optionalPayload ( msgAsStr : string ) =
@@ -278,7 +263,7 @@ type GraphQLWebSocketMiddleware<'Root>
278
263
do ! planExecutionResult |> applyPlanExecutionResult id socket
279
264
with ex ->
280
265
logger.LogError ( ex, " Unexpected error during subscription with id '{id}'" , id)
281
- do ! sendMsg ( Error ( id, [ new Shared.NameValueLookup ([ ( " subscription " , " Unexpected error during subscription" :> obj ) ]) ]))
266
+ do ! sendMsg ( Error ( id, [ GQLProblemDetails.Create ( " Unexpected error during subscription" , ex ) ]))
282
267
| ClientComplete id ->
283
268
" ClientComplete" |> logMsgWithIdReceived id
284
269
subscriptions
@@ -349,8 +334,7 @@ type GraphQLWebSocketMiddleware<'Root>
349
334
do ! next.Invoke ( ctx)
350
335
else if ctx.WebSockets.IsWebSocketRequest then
351
336
use! socket = ctx.WebSockets.AcceptWebSocketAsync ( " graphql-transport-ws" )
352
- let! connectionInitResult = socket |> waitForConnectionInitAndRespondToClient
353
- match connectionInitResult with
337
+ match ! socket |> waitForConnectionInitAndRespondToClient with
354
338
| Result.Error errMsg -> logger.LogWarning errMsg
355
339
| Ok _ ->
356
340
let longRunningCancellationToken =
0 commit comments