Skip to content

Commit

Permalink
feat: recoverState method to allow SDK components to initiaite state …
Browse files Browse the repository at this point in the history
…recovery
  • Loading branch information
szuperaz committed Feb 27, 2025
1 parent fbc73bd commit 5e8c7dc
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 53 deletions.
1 change: 1 addition & 0 deletions projects/stream-chat-angular/src/assets/i18n/en.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,6 @@ export const en = {
'You currently have {{count}} attachments, the maximum is {{max}}':
'You currently have {{count}} attachments, the maximum is {{max}}',
'and others': 'and others',
'Reload channels': 'Reload channels',
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,13 @@
</div>
} @else {
@if (isError$ | async) {
<div data-testid="chatdown-container" class="str-chat__down">
<ng-container *ngTemplateOutlet="loadingChannels" />
<div
data-testid="chatdown-container"
class="str-chat__dow str-chat__channel-list-empty"
>
<button (click)="recoverState()" class="str-chat__cta-button">
{{ "streamChat.Reload channels" | translate }}
</button>
</div>
}
@if (isInitializing$ | async) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ describe('ChannelListComponent', () => {
it('should display error indicator, if error happened', () => {
expect(queryChatdownContainer()).toBeNull();

channelServiceMock.channelQueryState$.next({
state: 'error',
error: new Error('error'),
});
channelServiceMock.shouldRecoverState$.next(true);
fixture.detectChanges();

expect(queryChatdownContainer()).not.toBeNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ export class ChannelListComponent implements OnDestroy {
this.theme$ = this.themeService.theme$;
this.channels$ = this.channelService.channels$;
this.hasMoreChannels$ = this.channelService.hasMoreChannels$;
this.isError$ = this.channelService.channelQueryState$.pipe(
map((s) => !this.isLoadingMoreChannels && s?.state === 'error'),
);
this.isError$ = this.channelService.shouldRecoverState$;
this.isInitializing$ = this.channelService.channelQueryState$.pipe(
map((s) => !this.isLoadingMoreChannels && s?.state === 'in-progress'),
);
Expand All @@ -56,6 +54,10 @@ export class ChannelListComponent implements OnDestroy {
this.isLoadingMoreChannels = false;
}

recoverState() {
void this.channelService.recoverState();
}

trackByChannelId(_: number, item: Channel<DefaultStreamChatGenerics>) {
return item.cid;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Component, Input, NgZone, OnDestroy, OnInit } from '@angular/core';
import { Component, Input, OnDestroy, OnInit } from '@angular/core';
import { Subscription } from 'rxjs';
import { filter } from 'rxjs/operators';
import { Channel, Event, FormatMessageResponse } from 'stream-chat';
Expand Down Expand Up @@ -41,7 +41,6 @@ export class ChannelPreviewComponent implements OnInit, OnDestroy {

constructor(
private channelService: ChannelService,
private ngZone: NgZone,
private chatClientService: ChatClientService,
messageService: MessageService,
public customTemplatesService: CustomTemplatesService,
Expand Down
53 changes: 53 additions & 0 deletions projects/stream-chat-angular/src/lib/channel.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,9 @@ describe('ChannelService', () => {
});

it('should reset state after connection recovered', async () => {
const spy = jasmine.createSpy();
service.shouldRecoverState$.subscribe(spy);
spy.calls.reset();
await init();
mockChatClient.queryChannels.calls.reset();
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
Expand All @@ -1851,6 +1854,8 @@ describe('ChannelService', () => {
jasmine.any(Object),
jasmine.any(Object),
);

expect(spy).not.toHaveBeenCalled();
});

it(`shouldn't do duplicate state reset after connection recovered`, async () => {
Expand Down Expand Up @@ -2414,4 +2419,52 @@ describe('ChannelService', () => {

expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
});

it('should signal if state recovery is needed - initial load', async () => {
const spy = jasmine.createSpy();
service.shouldRecoverState$.subscribe(spy);

expect(spy).toHaveBeenCalledWith(false);
spy.calls.reset();
const error = 'there was an error';

await expectAsync(
init(undefined, undefined, undefined, () =>
mockChatClient.queryChannels.and.rejectWith(error),
),
).toBeRejectedWith(error);

expect(spy).toHaveBeenCalledWith(true);

spy.calls.reset();
mockChatClient.queryChannels.and.resolveTo([]);
await service.recoverState();

expect(spy).toHaveBeenCalledWith(false);
});

it('should signal if state recovery is needed - failed state recover after connection.recovered', fakeAsync(() => {
void init();
tick();
const spy = jasmine.createSpy();
service.shouldRecoverState$.subscribe(spy);
spy.calls.reset();
mockChatClient.queryChannels.and.rejectWith(
new Error('there was an error'),
);
events$.next({ eventType: 'connection.recovered' } as ClientEvent);

tick();
flush();

expect(spy).toHaveBeenCalledWith(true);

spy.calls.reset();
mockChatClient.queryChannels.and.resolveTo([]);
void service.recoverState();
tick();
flush();

expect(spy).toHaveBeenCalledWith(false);
}));
});
125 changes: 83 additions & 42 deletions projects/stream-chat-angular/src/lib/channel.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ import {
ReplaySubject,
Subscription,
} from 'rxjs';
import { filter, first, map, shareReplay, take } from 'rxjs/operators';
import {
distinctUntilChanged,
filter,
first,
map,
shareReplay,
take,
} from 'rxjs/operators';
import {
Attachment,
Channel,
Expand Down Expand Up @@ -65,6 +72,12 @@ export class ChannelService<
* The result of the latest channel query request.
*/
channelQueryState$: Observable<ChannelQueryState | undefined>;
/**
* Emits `true` when the state needs to be recovered after an error
*
* You can recover it by calling the `recoverState` method
*/
shouldRecoverState$: Observable<boolean>;
/**
* Emits the currently active channel.
*
Expand Down Expand Up @@ -224,7 +237,7 @@ export class ChannelService<
private _shouldMarkActiveChannelAsRead = true;
private shouldSetActiveChannel = true;
private clientEventsSubscription: Subscription | undefined;
private isStateRecoveryInProgress = false;
private isStateRecoveryInProgress$ = new BehaviorSubject(false);
private channelQueryStateSubject = new BehaviorSubject<
ChannelQueryState | undefined
>(undefined);
Expand Down Expand Up @@ -323,6 +336,20 @@ export class ChannelService<
this.channelQueryState$ = this.channelQueryStateSubject
.asObservable()
.pipe(shareReplay(1));
this.shouldRecoverState$ = combineLatest([
this.channels$,
this.channelQueryState$,
this.isStateRecoveryInProgress$,
]).pipe(
map(([channels, queryState, isStateRecoveryInProgress]) => {
return (
(!channels || channels.length === 0) &&
queryState?.state === 'error' &&
!isStateRecoveryInProgress
);
}),
distinctUntilChanged(),
);
}

/**
Expand Down Expand Up @@ -583,6 +610,7 @@ export class ChannelService<
this.dismissErrorNotification = undefined;
this.channelQueryConfig = undefined;
this.destroyChannelManager();
this.isStateRecoveryInProgress$.next(false);
}

/**
Expand Down Expand Up @@ -1120,36 +1148,52 @@ export class ChannelService<
}
}

private async handleNotification(clientEvent: ClientEvent<T>) {
/**
* Reloads all channels and messages. Useful if state is empty due to an error.
*
* The SDK will automatically call this after `connection.recovered` event. In other cases it's up to integrators to recover state.
*
* Use the `shouldRecoverState$` to know if state recover is necessary.
* @returns when recovery is completed
*/
async recoverState() {
if (this.isStateRecoveryInProgress$.getValue()) {
return;
}
this.isStateRecoveryInProgress$.next(true);
try {
await this.queryChannels('recover-state');
if (this.activeChannelSubject.getValue()) {
// Thread messages are not refetched so active thread gets deselected to avoid displaying stale messages
void this.setAsActiveParentMessage(undefined);
// Update and reselect message to quote
const messageToQuote = this.messageToQuoteSubject.getValue();
this.setChannelState(this.activeChannelSubject.getValue()!);
let messages!: StreamMessage<T>[];
this.activeChannelMessages$
.pipe(take(1))
.subscribe((m) => (messages = m));
const updatedMessageToQuote = messages.find(
(m) => m.id === messageToQuote?.id,
);
if (updatedMessageToQuote) {
this.selectMessageToQuote(updatedMessageToQuote);
}
}
} finally {
this.isStateRecoveryInProgress$.next(false);
}
}

private handleNotification(clientEvent: ClientEvent<T>) {
switch (clientEvent.eventType) {
case 'connection.recovered': {
if (this.isStateRecoveryInProgress) {
return;
}
this.isStateRecoveryInProgress = true;
try {
await this.queryChannels('recover-state');
if (this.activeChannelSubject.getValue()) {
// Thread messages are not refetched so active thread gets deselected to avoid displaying stale messages
void this.setAsActiveParentMessage(undefined);
// Update and reselect message to quote
const messageToQuote = this.messageToQuoteSubject.getValue();
this.setChannelState(this.activeChannelSubject.getValue()!);
let messages!: StreamMessage<T>[];
this.activeChannelMessages$
.pipe(take(1))
.subscribe((m) => (messages = m));
const updatedMessageToQuote = messages.find(
(m) => m.id === messageToQuote?.id,
);
if (updatedMessageToQuote) {
this.selectMessageToQuote(updatedMessageToQuote);
}
}
this.isStateRecoveryInProgress = false;
} catch {
this.isStateRecoveryInProgress = false;
}
void this.recoverState().catch((error) =>
this.chatClientService.chatClient.logger(
'warn',
`Failed to recover state after connection recovery: ${error}`,
),
);
break;
}
case 'user.updated': {
Expand Down Expand Up @@ -1610,6 +1654,13 @@ export class ChannelService<
if (queryType === 'recover-state') {
this.channelManager.setChannels([]);
}
if (queryType !== 'next-page') {
this.dismissErrorNotification =
this.notificationService.addPermanentNotification(
'streamChat.Error loading channels',
'error',
);
}
throw error;
}
}
Expand Down Expand Up @@ -1818,7 +1869,7 @@ export class ChannelService<
this.markReadTimeout = undefined;
}

private async _init(
private _init(
options: ChannelServiceOptions<T> & { messagePageSize: number },
) {
this.messagePageSize = options.messagePageSize;
Expand All @@ -1838,17 +1889,7 @@ export class ChannelService<
this.clientEventsSubscription = this.chatClientService.events$.subscribe(
(notification) => void this.handleNotification(notification),
);
try {
const result = await this.queryChannels('first-page');
return result;
} catch (error) {
this.dismissErrorNotification =
this.notificationService.addPermanentNotification(
'streamChat.Error loading channels',
'error',
);
throw error;
}
return this.queryChannels('first-page');
}

private createChannelManager({
Expand Down
3 changes: 3 additions & 0 deletions projects/stream-chat-angular/src/lib/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ export type MockChannelService = {
usersTypingInThread$: BehaviorSubject<UserResponse[]>;
jumpToMessage$: BehaviorSubject<{ id?: string; parentId?: string }>;
channelQueryState$: BehaviorSubject<ChannelQueryState | undefined>;
shouldRecoverState$: BehaviorSubject<boolean>;
activeChannelLastReadMessageId?: string;
activeChannelUnreadCount?: number;
activeChannel?: Channel<DefaultStreamChatGenerics>;
Expand Down Expand Up @@ -319,6 +320,7 @@ export const mockChannelService = (): MockChannelService => {
const channelQueryState$ = new BehaviorSubject<ChannelQueryState | undefined>(
undefined,
);
const shouldRecoverState$ = new BehaviorSubject(false);

return {
activeChannelMessages$,
Expand All @@ -343,6 +345,7 @@ export const mockChannelService = (): MockChannelService => {
clearMessageJump,
channelQueryState$,
activeChannel,
shouldRecoverState$,
};
};

Expand Down

0 comments on commit 5e8c7dc

Please sign in to comment.