8
8
// Repository: https://github.com/sisk-http/core
9
9
10
10
using System . Net . WebSockets ;
11
- using System . Runtime . CompilerServices ;
11
+ using System . Text ;
12
+ using Sisk . Core . Internal ;
12
13
13
14
namespace Sisk . Core . Http . Streams {
14
15
@@ -17,8 +18,10 @@ namespace Sisk.Core.Http.Streams {
17
18
/// </summary>
18
19
public sealed class HttpWebSocket : IDisposable {
19
20
bool isListening = true ;
21
+ bool isDisposed ;
20
22
readonly HttpStreamPingPolicy pingPolicy ;
21
23
24
+ internal SemaphoreSlim sendSemaphore = new SemaphoreSlim ( 1 ) ;
22
25
internal WebSocketMessage ? lastMessage ;
23
26
internal CancellationTokenSource asyncListenerToken = null ! ;
24
27
internal ManualResetEvent closeEvent = new ManualResetEvent ( false ) ;
@@ -178,40 +181,22 @@ public HttpWebSocket WithPing ( string probeMessage, TimeSpan interval ) {
178
181
return this ;
179
182
}
180
183
181
- /// <summary>
182
- /// Asynchronously sends an message to the remote point.
183
- /// </summary>
184
- /// <param name="message">The target message which will be as an encoded UTF-8 string.</param>
185
- public Task < bool > SendAsync ( object message ) {
186
- return Task . FromResult ( Send ( message ) ) ;
187
- }
188
-
189
184
/// <summary>
190
185
/// Asynchronously sends an text message to the remote point.
191
186
/// </summary>
192
187
/// <param name="message">The target message which will be as an encoded UTF-8 string.</param>
193
- public Task < bool > SendAsync ( string message ) {
194
- return Task . FromResult ( Send ( message ) ) ;
188
+ public ValueTask < bool > SendAsync ( string message ) {
189
+ ArgumentNullException . ThrowIfNull ( message ) ;
190
+
191
+ return SendInternalAsync ( Encoding . UTF8 . GetBytes ( message ) , WebSocketMessageType . Text ) ;
195
192
}
196
193
197
194
/// <summary>
198
195
/// Asynchronously sends an binary message to the remote point.
199
196
/// </summary>
200
197
/// <param name="buffer">The target message which will be as an encoded UTF-8 string.</param>
201
- public Task < bool > SendAsync ( byte [ ] buffer ) {
202
- return Task . FromResult ( Send ( buffer ) ) ;
203
- }
204
-
205
- /// <summary>
206
- /// Sends an text message to the remote point.
207
- /// </summary>
208
- /// <param name="message">The target message which will be as an encoded UTF-8 string.</param>
209
- public bool Send ( object message ) {
210
- string ? t = message . ToString ( ) ;
211
- if ( t is null )
212
- throw new ArgumentNullException ( nameof ( message ) ) ;
213
-
214
- return Send ( t ) ;
198
+ public ValueTask < bool > SendAsync ( ReadOnlyMemory < byte > buffer ) {
199
+ return SendInternalAsync ( buffer , WebSocketMessageType . Binary ) ;
215
200
}
216
201
217
202
/// <summary>
@@ -220,9 +205,7 @@ public bool Send ( object message ) {
220
205
/// <param name="message">The target message which will be as an encoded using the request preferred encoding.</param>
221
206
public bool Send ( string message ) {
222
207
ArgumentNullException . ThrowIfNull ( message ) ;
223
-
224
- byte [ ] messageBytes = request . RequestEncoding . GetBytes ( message ) ;
225
- return SendInternal ( messageBytes , WebSocketMessageType . Text ) ;
208
+ return SendAsync ( message ) . GetSyncronizedResult ( ) ;
226
209
}
227
210
228
211
/// <summary>
@@ -239,15 +222,15 @@ public bool Send ( string message ) {
239
222
/// <param name="length">The number of items in the memory.</param>
240
223
public bool Send ( byte [ ] buffer , int start , int length ) {
241
224
ReadOnlyMemory < byte > span = new ReadOnlyMemory < byte > ( buffer , start , length ) ;
242
- return SendInternal ( span , WebSocketMessageType . Binary ) ;
225
+ return SendAsync ( span ) . GetSyncronizedResult ( ) ;
243
226
}
244
227
245
228
/// <summary>
246
229
/// Sends an binary message to the remote point.
247
230
/// </summary>
248
231
/// <param name="buffer">The target byte memory.</param>
249
232
public bool Send ( ReadOnlyMemory < byte > buffer ) {
250
- return SendInternal ( buffer , WebSocketMessageType . Binary ) ;
233
+ return SendAsync ( buffer ) . GetSyncronizedResult ( ) ;
251
234
}
252
235
253
236
/// <summary>
@@ -263,7 +246,7 @@ public HttpResponse Close () {
263
246
// the resources of this websocket
264
247
try {
265
248
ctx . WebSocket . CloseOutputAsync ( WebSocketCloseStatus . NormalClosure , null , CancellationToken . None )
266
- . Wait ( ) ;
249
+ . GetAwaiter ( ) . GetResult ( ) ;
267
250
}
268
251
catch ( Exception ) {
269
252
;
@@ -282,41 +265,43 @@ public HttpResponse Close () {
282
265
} ;
283
266
}
284
267
285
- [ MethodImpl ( MethodImplOptions . Synchronized ) ]
286
- private bool SendInternal ( ReadOnlyMemory < byte > buffer , WebSocketMessageType msgType ) {
287
- if ( _isClosed ) { return false ; }
268
+ private async ValueTask < bool > SendInternalAsync ( ReadOnlyMemory < byte > buffer , WebSocketMessageType msgType ) {
269
+ if ( _isClosed )
270
+ return false ;
288
271
289
272
if ( closeTimeout . TotalMilliseconds > 0 )
290
273
asyncListenerToken ? . CancelAfter ( closeTimeout ) ;
291
274
275
+ await sendSemaphore . WaitAsync ( asyncListenerToken ? . Token ?? default ) ;
292
276
try {
293
- int totalLength = buffer . Length ;
294
- int chunks = ( int ) Math . Ceiling ( ( double ) totalLength / BUFFER_LENGTH ) ;
277
+ try {
278
+ int totalLength = buffer . Length ;
279
+ int chunks = ( int ) Math . Ceiling ( ( double ) totalLength / BUFFER_LENGTH ) ;
295
280
296
- for ( int i = 0 ; i < chunks ; i ++ ) {
297
- int ca = i * BUFFER_LENGTH ;
298
- int cb = Math . Min ( ca + BUFFER_LENGTH , buffer . Length ) ;
281
+ for ( int i = 0 ; i < chunks ; i ++ ) {
282
+ int ca = i * BUFFER_LENGTH ;
283
+ int cb = Math . Min ( ca + BUFFER_LENGTH , buffer . Length ) ;
299
284
300
- ReadOnlyMemory < byte > chunk = buffer [ ca ..cb ] ;
285
+ ReadOnlyMemory < byte > chunk = buffer [ ca ..cb ] ;
301
286
302
- var sendVt = ctx . WebSocket . SendAsync ( chunk , msgType , i + 1 == chunks , asyncListenerToken ? . Token ?? default ) ;
303
- if ( ! sendVt . IsCompleted ) {
304
- sendVt . AsTask ( ) . GetAwaiter ( ) . GetResult ( ) ;
287
+ await ctx . WebSocket . SendAsync ( chunk , msgType , i + 1 == chunks , asyncListenerToken ? . Token ?? default ) ;
288
+ length += chunk . Length ;
305
289
}
306
290
307
- length += chunk . Length ;
291
+ attempt = 0 ;
308
292
}
309
-
310
- attempt = 0 ;
311
- }
312
- catch ( Exception ) {
313
- attempt ++ ;
314
- if ( MaxAttempts >= 0 && attempt >= MaxAttempts ) {
315
- Close ( ) ;
316
- return false ;
293
+ catch ( Exception ) {
294
+ attempt ++ ;
295
+ if ( MaxAttempts >= 0 && attempt >= MaxAttempts ) {
296
+ Close ( ) ;
297
+ return false ;
298
+ }
317
299
}
300
+ return true ;
301
+ }
302
+ finally {
303
+ sendSemaphore . Release ( ) ;
318
304
}
319
- return true ;
320
305
}
321
306
322
307
/// <summary>
@@ -365,11 +350,22 @@ public void WaitForClose () {
365
350
366
351
/// <inheritdoc/>
367
352
public void Dispose ( ) {
353
+ if ( isDisposed )
354
+ return ;
355
+
356
+ GC . SuppressFinalize ( this ) ;
357
+
368
358
Close ( ) ;
369
359
pingPolicy . Dispose ( ) ;
370
360
closeEvent . Dispose ( ) ;
371
361
waitNextEvent . Dispose ( ) ;
372
362
receiveThread . Join ( ) ;
363
+ isDisposed = true ;
364
+ }
365
+
366
+ /// <exclude/>
367
+ ~ HttpWebSocket ( ) {
368
+ Dispose ( ) ;
373
369
}
374
370
}
375
371
0 commit comments