1
1
import { BotConnectorResponse } from '../model/responses' ;
2
- import {
3
- EventSourceMessage ,
4
- EventStreamContentType ,
5
- fetchEventSource ,
6
- } from '@microsoft/fetch-event-source' ;
7
-
8
- class RetriableError extends Error { }
9
- class FatalError extends Error { }
10
2
11
3
const INITIAL_RETRY_DELAY = 0 ;
12
4
const RETRY_DELAY_INCREMENT = 1000 ;
13
5
const MAX_RETRY_DELAY = 15000 ;
14
6
7
+ enum SseStatus {
8
+ /**
9
+ * The server is not answering, or answering with a 1XX, 3XX, 429, or 5XX HTTP status code
10
+ */
11
+ SERVER_UNAVAILABLE = - 1 ,
12
+ /**
13
+ * The server is answering with a 4XX HTTP status code, except 429 (rate limit)
14
+ */
15
+ UNSUPPORTED = 0 ,
16
+ /**
17
+ * The server is answering with a 2XX HTTP status code
18
+ */
19
+ SUPPORTED = 1 ,
20
+ }
21
+
22
+ async function getSseStatus ( url : string ) {
23
+ try {
24
+ const response = await fetch ( url ) ;
25
+ if ( response . ok ) {
26
+ return SseStatus . SUPPORTED ;
27
+ } else if (
28
+ response . status >= 400 &&
29
+ response . status < 500 &&
30
+ response . status !== 429
31
+ ) {
32
+ return SseStatus . UNSUPPORTED ;
33
+ } else {
34
+ return SseStatus . SERVER_UNAVAILABLE ;
35
+ }
36
+ } catch ( _ ) {
37
+ return SseStatus . SERVER_UNAVAILABLE ;
38
+ }
39
+ }
40
+
15
41
export class TockEventSource {
16
42
private initialized : boolean ;
17
- private abortController : AbortController ;
43
+ private eventSource : EventSource | null ;
18
44
private retryDelay : number ;
19
45
onResponse : ( botResponse : BotConnectorResponse ) => void ;
20
46
onStateChange : ( state : number ) => void ;
@@ -38,60 +64,55 @@ export class TockEventSource {
38
64
*/
39
65
open ( endpoint : string , userId : string ) : Promise < void > {
40
66
this . onStateChange ( EventSource . CONNECTING ) ;
41
- this . abortController = new AbortController ( ) ;
67
+ const url = ` ${ endpoint } /sse?userid= ${ userId } ` ;
42
68
return new Promise < void > ( ( resolve , reject ) : void => {
43
- fetchEventSource ( `${ endpoint } /sse?userid=${ userId } ` , {
44
- signal : this . abortController . signal ,
45
- onopen : async ( response ) => {
46
- if (
47
- response . ok &&
48
- response . headers
49
- . get ( 'content-type' )
50
- ?. includes ( EventStreamContentType )
51
- ) {
52
- this . onStateChange ( EventSource . OPEN ) ;
53
- this . initialized = true ;
54
- resolve ( ) ;
55
- return ;
56
- } else if (
57
- response . status >= 400 &&
58
- response . status < 500 &&
59
- response . status !== 429
60
- ) {
61
- throw new FatalError ( ) ;
62
- } else {
63
- throw new RetriableError ( ) ;
64
- }
65
- } ,
66
- onmessage : ( e : EventSourceMessage ) => {
67
- if ( e . event === 'message' ) {
68
- this . onResponse ( JSON . parse ( e . data ) ) ;
69
- }
70
- } ,
71
- onerror : ( err ) => {
72
- if ( err instanceof FatalError ) {
73
- throw err ; // rethrow to stop the operation
74
- } else {
75
- const retryDelay = this . retryDelay ;
76
- this . retryDelay = Math . min (
77
- MAX_RETRY_DELAY ,
78
- retryDelay + RETRY_DELAY_INCREMENT ,
79
- ) ;
80
- return retryDelay ;
81
- }
82
- } ,
83
- } )
84
- . catch ( ( e ) => console . error ( e ) )
85
- . finally ( ( ) => {
86
- reject ( ) ;
87
- this . onStateChange ( EventSource . CLOSED ) ;
88
- this . initialized = false ;
89
- } ) ;
69
+ this . tryOpen ( url , resolve , reject ) ;
70
+ } ) ;
71
+ }
72
+
73
+ private tryOpen ( url : string , resolve : ( ) => void , reject : ( ) => void ) {
74
+ this . eventSource = new EventSource ( url ) ;
75
+ this . eventSource . addEventListener ( 'open' , ( ) => {
76
+ this . onStateChange ( EventSource . OPEN ) ;
77
+ this . initialized = true ;
78
+ this . retryDelay = INITIAL_RETRY_DELAY ;
79
+ resolve ( ) ;
90
80
} ) ;
81
+ this . eventSource . addEventListener ( 'error' , ( ) => {
82
+ this . eventSource ?. close ( ) ;
83
+ this . retry ( url , reject , resolve ) ;
84
+ } ) ;
85
+ this . eventSource . addEventListener ( 'message' , ( e ) => {
86
+ this . onResponse ( JSON . parse ( e . data ) ) ;
87
+ } ) ;
88
+ }
89
+
90
+ private retry ( url : string , reject : ( ) => void , resolve : ( ) => void ) {
91
+ const retryDelay = this . retryDelay ;
92
+ this . retryDelay = Math . min (
93
+ MAX_RETRY_DELAY ,
94
+ retryDelay + RETRY_DELAY_INCREMENT ,
95
+ ) ;
96
+ setTimeout ( async ( ) => {
97
+ switch ( await getSseStatus ( url ) ) {
98
+ case SseStatus . UNSUPPORTED :
99
+ reject ( ) ;
100
+ this . close ( ) ;
101
+ break ;
102
+ case SseStatus . SUPPORTED :
103
+ this . tryOpen ( url , resolve , reject ) ;
104
+ break ;
105
+ case SseStatus . SERVER_UNAVAILABLE :
106
+ this . retry ( url , reject , resolve ) ;
107
+ break ;
108
+ }
109
+ } , retryDelay ) ;
91
110
}
92
111
93
112
close ( ) {
94
- this . abortController ?. abort ( ) ;
113
+ this . eventSource ?. close ( ) ;
114
+ this . eventSource = null ;
95
115
this . initialized = false ;
116
+ this . onStateChange ( EventSource . CLOSED ) ;
96
117
}
97
118
}
0 commit comments