Skip to content

Commit 09adecb

Browse files
authored
Fix race condition so all messages are processed before dispatching next request (#1234)
1 parent 00b709e commit 09adecb

File tree

1 file changed

+43
-18
lines changed

1 file changed

+43
-18
lines changed

mesop/web/src/services/channel.ts

+43-18
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,12 @@ export class Channel {
7272
private overridedTitle = '';
7373

7474
private messageQueue: QueuedMessage[] = [];
75-
private isProcessingMessage = false;
75+
76+
private processingMessageDeferred: {
77+
promise: Promise<void>;
78+
resolve: () => void;
79+
reject: (reason?: any) => void;
80+
} | null = null;
7681

7782
constructor(
7883
private title: Title,
@@ -137,14 +142,15 @@ export class Channel {
137142

138143
this.eventSource.addEventListener('message', (e) => {
139144
// Looks like Angular has a bug where it's not intercepting EventSource onmessage.
140-
zone.run(() => {
145+
zone.run(async () => {
141146
const data = (e as any).data;
142147
if (data === STREAM_END) {
143148
this.eventSource.close();
144149
this.status = ChannelStatus.CLOSED;
145150
clearTimeout(this.isWaitingTimeout);
146151
this.isWaiting = false;
147152
this._isHotReloading = false;
153+
await this.processMessageQueue();
148154
this.dequeueEvent();
149155
return;
150156
}
@@ -185,7 +191,7 @@ export class Channel {
185191
};
186192

187193
this.webSocket.onmessage = (event) => {
188-
zone.run(() => {
194+
zone.run(async () => {
189195
const prefix = 'data: ';
190196
const payloadData = (
191197
event.data.slice(prefix.length) as string
@@ -194,6 +200,7 @@ export class Channel {
194200
if (payloadData === STREAM_END) {
195201
this._isHotReloading = false;
196202
this.status = ChannelStatus.CLOSED;
203+
await this.processMessageQueue();
197204
this.dequeueEvent();
198205
return;
199206
}
@@ -357,28 +364,32 @@ export class Channel {
357364
request,
358365
response,
359366
});
360-
this.processNextMessage();
367+
this.processMessageQueue();
361368
}
362369

363-
private async processNextMessage() {
364-
if (this.isProcessingMessage || this.messageQueue.length === 0) {
365-
return;
370+
private async processMessageQueue() {
371+
if (this.processingMessageDeferred) {
372+
return this.processingMessageDeferred.promise;
366373
}
367374

368-
this.isProcessingMessage = true;
369-
try {
375+
this.processingMessageDeferred = createDeferred();
376+
377+
while (this.messageQueue.length > 0) {
370378
const queuedMessage = this.messageQueue.shift()!;
371-
await this.handleUiResponse(
372-
queuedMessage.request,
373-
queuedMessage.response,
374-
this.initParams,
375-
);
376-
} finally {
377-
this.isProcessingMessage = false;
378-
if (this.messageQueue.length > 0) {
379-
await this.processNextMessage();
379+
try {
380+
await this.handleUiResponse(
381+
queuedMessage.request,
382+
queuedMessage.response,
383+
this.initParams,
384+
);
385+
} catch (error) {
386+
console.error('Error handling UI response:', error);
380387
}
381388
}
389+
390+
// All queued messages processed; resolve the promise and clear it.
391+
this.processingMessageDeferred.resolve();
392+
this.processingMessageDeferred = null;
382393
}
383394

384395
dispatch(userEvent: UserEvent) {
@@ -506,6 +517,20 @@ export class Channel {
506517
}
507518
}
508519

520+
function createDeferred<T = void>(): {
521+
promise: Promise<T>;
522+
resolve: (value: T | PromiseLike<T>) => void;
523+
reject: (reason?: any) => void;
524+
} {
525+
let resolve: (value: T | PromiseLike<T>) => void = () => {};
526+
let reject: (reason?: any) => void = () => {};
527+
const promise = new Promise<T>((res, rej) => {
528+
resolve = res;
529+
reject = rej;
530+
});
531+
return {promise, resolve, reject};
532+
}
533+
509534
function generatePayloadString(request: UiRequest): string {
510535
request.setPath(window.location.pathname);
511536
const array = request.serializeBinary();

0 commit comments

Comments
 (0)