@@ -10,8 +10,7 @@ import {
10
10
connect as tls ,
11
11
createSecureContext ,
12
12
} from 'node:tls' ;
13
-
14
- import { Event as TypedEvent } from 'ts-typed-events' ;
13
+ import { EventEmitter } from 'node:events' ;
15
14
16
15
import { Defaults , Environment } from './defaults.js' ;
17
16
import * as logger from './logging.js' ;
@@ -94,7 +93,7 @@ export interface Configuration
94
93
export interface Notification {
95
94
processId : number ;
96
95
channel : string ;
97
- payload ? : string ;
96
+ payload : string ;
98
97
}
99
98
100
99
export interface PreparedStatement < T = ResultRecord > {
@@ -107,17 +106,6 @@ export interface PreparedStatement<T = ResultRecord> {
107
106
) => ResultIterator < T > ;
108
107
}
109
108
110
- export type Callback < T > = ( data : T ) => void ;
111
-
112
- /* eslint-disable @typescript-eslint/no-explicit-any */
113
- type CallbackOf < U > = U extends any ? Callback < U > : never ;
114
-
115
- type Event = ClientNotice | DatabaseError | Notification ;
116
-
117
- type Connect = Error | null ;
118
-
119
- type End = NodeJS . ErrnoException | null ;
120
-
121
109
type CloseHandler = ( ) => void ;
122
110
123
111
interface RowDataHandler {
@@ -161,20 +149,31 @@ interface PreFlightQueue {
161
149
162
150
const DEFAULTS = new Defaults ( env as unknown as Environment ) ;
163
151
152
+ export type EventMap <
153
+ T = {
154
+ error : DatabaseError ;
155
+ notice : ClientNotice ;
156
+ notification : Notification ;
157
+ } ,
158
+ > = {
159
+ [ K in keyof T ] : [ T [ K ] ] ;
160
+ } ;
161
+
162
+ type Resolve < T > = ( value ?: T ) => void ;
163
+
164
+ export type EventListener < K > = K extends keyof EventMap ? (
165
+ ( ...args : EventMap [ K ] ) => void
166
+ ) : never ;
167
+
164
168
export class ClientImpl {
165
- private readonly events = {
166
- connect : new TypedEvent < Connect > ( ) ,
167
- end : new TypedEvent < End > ( ) ,
168
- error : new TypedEvent < DatabaseError > ( ) ,
169
- notice : new TypedEvent < ClientNotice > ( ) ,
170
- notification : new TypedEvent < Notification > ( ) ,
171
- } ;
172
-
173
- private ending = false ;
169
+ private readonly events = new EventEmitter < EventMap > ( ) ;
170
+
174
171
private connected = false ;
175
- private connecting = false ;
176
172
private error = false ;
177
173
174
+ private ending ?: Resolve < NodeJS . ErrnoException > ;
175
+ private connecting ?: Resolve < Error > ;
176
+
178
177
private readonly encoding : BufferEncoding ;
179
178
private readonly writer : Writer ;
180
179
@@ -217,7 +216,7 @@ export class ClientImpl {
217
216
218
217
this . stream . on ( 'close' , ( ) => {
219
218
this . closed = true ;
220
- this . events . end . emit ( null ) ;
219
+ this . ending ?. ( ) ;
221
220
} ) ;
222
221
223
222
this . stream . on ( 'connect' , ( ) => {
@@ -237,14 +236,14 @@ export class ClientImpl {
237
236
/* istanbul ignore next */
238
237
this . stream . on ( 'error' , ( error : NodeJS . ErrnoException ) => {
239
238
if ( this . connecting ) {
240
- this . events . connect . emit ( error ) ;
239
+ this . connecting ( error ) ;
241
240
} else {
242
241
// Don't raise ECONNRESET errors - they can & should be
243
242
// ignored during disconnect.
244
- if ( this . ending && error . errno === constants . errno . ECONNRESET )
245
- return ;
246
-
247
- this . events . end . emit ( error ) ;
243
+ if ( this . ending ) {
244
+ if ( error . errno === constants . errno . ECONNRESET ) return ;
245
+ this . ending ( ) ;
246
+ }
248
247
}
249
248
} ) ;
250
249
@@ -294,7 +293,7 @@ export class ClientImpl {
294
293
295
294
const abort = ( error : Error ) => {
296
295
this . handleError ( error ) ;
297
- this . events . connect . emit ( error ) ;
296
+ this . connecting ?. ( error ) ;
298
297
} ;
299
298
300
299
const startup = ( stream ?: Socket ) => {
@@ -384,12 +383,14 @@ export class ClientImpl {
384
383
const read = this . handle ( buffer , offset , size ) ;
385
384
offset += read ;
386
385
remaining = size - read ;
387
- } catch ( error ) {
386
+ } catch ( error : unknown ) {
388
387
logger . warn ( error ) ;
389
388
if ( this . connecting ) {
390
- this . events . connect . emit ( error as Error ) ;
389
+ this . connecting ( error as Error ) ;
391
390
} else {
392
391
try {
392
+ // In normal operation (including regular handling of errors),
393
+ // there's nothing further to clean up at this point.
393
394
while ( this . handleError ( error as Error ) ) {
394
395
logger . info (
395
396
'Cancelled query due to an internal error' ,
@@ -420,20 +421,25 @@ export class ClientImpl {
420
421
if ( this . error ) {
421
422
throw new Error ( "Can't connect in error state" ) ;
422
423
}
423
- this . connecting = true ;
424
424
425
425
const timeout =
426
426
this . config . connectionTimeout ?? DEFAULTS . connectionTimeout ;
427
427
428
- let p = this . events . connect . once ( ) . then ( ( error : Connect ) => {
429
- if ( error ) {
430
- this . connecting = false ;
431
- this . stream . destroy ( ) ;
432
- throw error ;
433
- }
434
- return {
435
- encrypted : this . stream instanceof TLSSocket ,
436
- parameters : this . parameters as ReadonlyMap < string , string > ,
428
+ let p = new Promise < ConnectionInfo > ( ( resolve , reject ) => {
429
+ this . connecting = ( error ?: Error ) => {
430
+ this . connecting = undefined ;
431
+ if ( error ) {
432
+ this . stream . destroy ( ) ;
433
+ reject ( error ) ;
434
+ } else {
435
+ resolve ( {
436
+ encrypted : this . stream instanceof TLSSocket ,
437
+ parameters : this . parameters as ReadonlyMap <
438
+ string ,
439
+ string
440
+ > ,
441
+ } ) ;
442
+ }
437
443
} ;
438
444
} ) ;
439
445
@@ -476,8 +482,6 @@ export class ClientImpl {
476
482
throw new Error ( 'Connection unexpectedly destroyed' ) ;
477
483
}
478
484
479
- this . ending = true ;
480
-
481
485
if ( this . connected ) {
482
486
this . writer . end ( ) ;
483
487
this . send ( ) ;
@@ -486,32 +490,21 @@ export class ClientImpl {
486
490
} else {
487
491
this . stream . destroy ( ) ;
488
492
}
489
- return new Promise < void > ( ( resolve , reject ) =>
490
- this . events . end . once ( ) . then ( ( value ) => {
491
- if ( value === null ) resolve ( ) ;
492
- reject ( value ) ;
493
- } ) ,
494
- ) ;
493
+ return new Promise < void > ( ( resolve , reject ) => {
494
+ this . ending = ( error ?: NodeJS . ErrnoException ) => {
495
+ this . ending = undefined ;
496
+ if ( ! error ) resolve ( ) ;
497
+ reject ( error ) ;
498
+ } ;
499
+ } ) ;
495
500
}
496
501
497
- on ( event : 'notification' , callback : Callback < Notification > ) : void ;
498
- on ( event : 'error' , callback : Callback < DatabaseError > ) : void ;
499
- on ( event : 'notice' , callback : Callback < ClientNotice > ) : void ;
500
- on ( event : string , callback : CallbackOf < Event > ) : void {
501
- switch ( event ) {
502
- case 'error' : {
503
- this . events . error . on ( callback as Callback < DatabaseError > ) ;
504
- break ;
505
- }
506
- case 'notice' : {
507
- this . events . notice . on ( callback as Callback < ClientNotice > ) ;
508
- break ;
509
- }
510
- case 'notification' : {
511
- this . events . notification . on ( callback as Callback < Notification > ) ;
512
- break ;
513
- }
514
- }
502
+ on < K extends keyof EventMap > ( event : K , listener : EventListener < K > ) : void {
503
+ this . events . on ( event , listener ) ;
504
+ }
505
+
506
+ off < K extends keyof EventMap > ( event : K , listener : EventListener < K > ) : void {
507
+ this . events . off ( event , listener ) ;
515
508
}
516
509
517
510
/** Prepare a statement for later execution.
@@ -954,7 +947,7 @@ export class ClientImpl {
954
947
outer: switch ( code ) {
955
948
case 0 : {
956
949
nextTick ( ( ) => {
957
- this . events . connect . emit ( null ) ;
950
+ this . connecting ?. ( ) ;
958
951
} ) ;
959
952
break ;
960
953
}
@@ -1094,10 +1087,16 @@ export class ClientImpl {
1094
1087
1095
1088
if ( this . connecting ) throw error ;
1096
1089
1097
- this . events . error . emit ( error ) ;
1098
- loop: if ( ! this . handleError ( error ) ) {
1090
+ try {
1091
+ this . events . emit ( 'error' , error ) ;
1092
+ } catch {
1093
+ // If there are no subscribers for the event, an error
1094
+ // is raised. We're not interesting in this behavior.
1095
+ }
1096
+
1097
+ if ( ! this . handleError ( error ) ) {
1099
1098
throw new Error (
1100
- 'Internal error occurred while processing database error ' ,
1099
+ 'An error occurred without an active query ' ,
1101
1100
) ;
1102
1101
}
1103
1102
break ;
@@ -1106,15 +1105,15 @@ export class ClientImpl {
1106
1105
const notice = this . parseError (
1107
1106
buffer . subarray ( start , start + length ) ,
1108
1107
) ;
1109
- this . events . notice . emit ( notice ) ;
1108
+ this . events . emit ( 'notice' , notice ) ;
1110
1109
break ;
1111
1110
}
1112
1111
case Message . NotificationResponse : {
1113
1112
const reader = new Reader ( buffer , start ) ;
1114
1113
const processId = reader . readInt32BE ( ) ;
1115
1114
const channel = reader . readCString ( this . encoding ) ;
1116
1115
const payload = reader . readCString ( this . encoding ) ;
1117
- this . events . notification . emit ( {
1116
+ this . events . emit ( 'notification' , {
1118
1117
processId : processId ,
1119
1118
channel : channel ,
1120
1119
payload : payload ,
@@ -1150,7 +1149,6 @@ export class ClientImpl {
1150
1149
this . errorHandlerQueue . shift ( ) ;
1151
1150
this . cleanupQueue . expect ( Cleanup . ErrorHandler ) ;
1152
1151
} else {
1153
- this . connecting = false ;
1154
1152
this . connected = true ;
1155
1153
}
1156
1154
const status = buffer . readInt8 ( start ) ;
0 commit comments