Skip to content

Commit

Permalink
feat(supabase): support realtime subscription #454 (#472)
Browse files Browse the repository at this point in the history
  • Loading branch information
tshedor authored Oct 30, 2024
1 parent ba89e5a commit 2f7010a
Show file tree
Hide file tree
Showing 17 changed files with 1,303 additions and 157 deletions.
21 changes: 21 additions & 0 deletions docs/offline_first/offline_first_with_supabase_repository.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,27 @@ final (client, queue) = OfflineFirstWithSupabaseRepository.clientQueue(

!> This is an admittedly brittle solution for ignoring core Supabase paths. If you change the default values for `ignorePaths`, you are responsible for maintaining the right paths when Supabase changes or upgrades their endpoint paths.

## Realtime

Brick can automatically update with [Supabase realtime events](https://supabase.com/docs/guides/realtime). After setting up [your table](https://supabase.com/docs/guides/realtime?queryGroups=language&language=dart#realtime-api) to broadcast, listen for changes in your application:

```dart
// Listen to all changes
final customers = MyRepository().subscribeToRealtime<Customer>();
// Or listen to results of a specific filter
final customers = MyRepository().subscribeToRealtime<Customer>(query: Query.where('id', 1));
// Use the stream results
final customersSubscription = customers.listen((value) {});
// Always close your streams
await customersSubscription.cancel();
```

Complex queries more than one level deep (e.g. with associations) or with comparison operators that are not [supported by Supabase's `PostgresChangeFilterType`](https://github.com/supabase/supabase-flutter/blob/main/packages/realtime_client/lib/src/types.dart#L239-L260) will be ignored - when such invalid queries are used, the realtime connection will be unfiltered even though Brick will respect the query in the stream's results.

!> Realtime can become [expensive quickly](https://supabase.com/pricing). Be sure to design your application for appropriate scale. For cheaper, on-device reactivity, use `.subscribe()` instead.

### @ConnectOfflineFirstWithSupabase

`@ConnectOfflineFirstWithSupabase` decorates the model that can be serialized by one or more providers. Offline First does not have configuration at the class level and only extends configuration held by its providers:
Expand Down
4 changes: 4 additions & 0 deletions packages/brick_offline_first/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
## Unreleased

## 3.3.0

- Added `subscriptionByQuery` to `OfflineFirstRepository#notifySubscriptionsWithLocalData` to pass a custom map of `StreamControllers`
- Add `GetFirstMixin` for convenient retrieval of the first results of `OfflineFirstRepository#get`
- Close all controllers in `OfflineFirstRepository#subscriptions` and clear the map on `OfflineFirstRepository#reset`

## 3.2.1

Expand Down
14 changes: 12 additions & 2 deletions packages/brick_offline_first/lib/src/offline_first_repository.dart
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,12 @@ abstract class OfflineFirstRepository<RepositoryModel extends OfflineFirstModel>
/// Iterate through subscriptions after an upsert and notify any [subscribe] listeners.
@protected
@visibleForTesting
@visibleForOverriding
Future<void> notifySubscriptionsWithLocalData<TModel extends RepositoryModel>({
bool notifyWhenEmpty = true,
Map<Query?, StreamController<List<RepositoryModel>>>? subscriptionsByQuery,
}) async {
final queriesControllers = subscriptions[TModel]?.entries;
final queriesControllers = (subscriptionsByQuery ?? subscriptions[TModel])?.entries;
if (queriesControllers?.isEmpty ?? true) return;

// create a copy of the controllers to avoid concurrent modification while looping
Expand Down Expand Up @@ -336,9 +338,17 @@ abstract class OfflineFirstRepository<RepositoryModel extends OfflineFirstModel>
Future<void> reset() async {
await sqliteProvider.resetDb();
memoryCacheProvider.reset();
for (final subscription in subscriptions.values) {
for (final controller in subscription.values) {
await controller.close();
}
}
subscriptions.clear();
}

/// Listen for streaming changes when the [sqliteProvider] is `upsert`ed. This method utilizes [remoteProvider]'s [get].
/// Listen for streaming changes when the [sqliteProvider] is invoked. For example,
/// whenever new data is acquired from remote, or data is upserted locally, or
/// data is deleted locally, the stream will be notified with a local fetch of [query].
///
/// [get] is invoked on the [memoryCacheProvider] and [sqliteProvider] following an [upsert]
/// invocation. For more, see [notifySubscriptionsWithLocalData].
Expand Down
2 changes: 1 addition & 1 deletion packages/brick_offline_first/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ homepage: https://github.com/GetDutchie/brick/tree/main/packages/brick_offline_f
issue_tracker: https://github.com/GetDutchie/brick/issues
repository: https://github.com/GetDutchie/brick

version: 3.2.1
version: 3.3.0

environment:
sdk: ">=2.18.0 <4.0.0"
Expand Down
4 changes: 4 additions & 0 deletions packages/brick_offline_first_with_supabase/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## Unreleased

## 1.1.0

- Added `OfflineFirstWithSupabaseRepository#subscribeToRealtime`to sync Brick data with Supabase changes (#472, #454)

## 1.0.0

- Stable release
Expand Down
21 changes: 21 additions & 0 deletions packages/brick_offline_first_with_supabase/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,27 @@ final (client, queue) = OfflineFirstWithSupabaseRepository.clientQueue(

:warning: This is an admittedly brittle solution for ignoring core Supabase paths. If you change the default values for `ignorePaths`, you are responsible for maintaining the right paths when Supabase changes or upgrades their endpoint paths.

## Realtime

Brick can automatically update with [Supabase realtime events](https://supabase.com/docs/guides/realtime). After setting up [your table](https://supabase.com/docs/guides/realtime?queryGroups=language&language=dart#realtime-api) to broadcast, listen for changes in your application:

```dart
// Listen to all changes
final customers = MyRepository().subscribeToRealtime<Customer>();
// Or listen to results of a specific filter
final customers = MyRepository().subscribeToRealtime<Customer>(query: Query.where('id', 1));
// Use the stream results
final customersSubscription = customers.listen((value) {});
// Always close your streams
await customersSubscription.cancel();
```

Complex queries more than one level deep (e.g. with associations) or with comparison operators that are not [supported by Supabase's `PostgresChangeFilterType`](https://github.com/supabase/supabase-flutter/blob/main/packages/realtime_client/lib/src/types.dart#L239-L260) will be ignored - when such invalid queries are used, the realtime connection will be unfiltered even though Brick will respect the query in the stream's results.

:warning: Realtime can become [expensive quickly](https://supabase.com/pricing). Be sure to design your application for appropriate scale. For cheaper, on-device reactivity, use `.subscribe()` instead.

## Models

### @ConnectOfflineFirstWithSupabase
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:async';

import 'package:brick_offline_first/brick_offline_first.dart';
import 'package:brick_offline_first_with_rest/offline_queue.dart';
import 'package:brick_offline_first_with_supabase/src/offline_first_with_supabase_model.dart';
Expand Down Expand Up @@ -51,6 +53,14 @@ abstract class OfflineFirstWithSupabaseRepository
@protected
final RestOfflineRequestQueue offlineRequestQueue;

@protected
@visibleForTesting
final Map<
Type,
Map<PostgresChangeEvent,
Map<Query, StreamController<List<OfflineFirstWithSupabaseModel>>>>>
supabaseRealtimeSubscriptions = {};

OfflineFirstWithSupabaseRepository({
super.autoHydrate,
super.loggerName,
Expand Down Expand Up @@ -137,6 +147,221 @@ abstract class OfflineFirstWithSupabaseRepository
await offlineRequestQueue.client.requestManager.migrate();
}

@override
Future<void> notifySubscriptionsWithLocalData<TModel extends OfflineFirstWithSupabaseModel>({
bool notifyWhenEmpty = true,
Map<Query?, StreamController<List<OfflineFirstWithSupabaseModel>>>? subscriptionsByQuery,
}) async {
final supabaseControllers = supabaseRealtimeSubscriptions[TModel]
?.values
.fold(<Query, StreamController<List<OfflineFirstWithSupabaseModel>>>{}, (acc, eventMap) {
acc.addEntries(eventMap.entries);
return acc;
});
await super.notifySubscriptionsWithLocalData<TModel>(
notifyWhenEmpty: notifyWhenEmpty,
subscriptionsByQuery: {
...?subscriptionsByQuery,
...?subscriptions[TModel],
...?supabaseControllers,
},
);
}

/// Supabase's realtime payload only returns unique columns;
/// the instance must be discovered from these values so it
/// can be deleted by all providers.
@protected
@visibleForOverriding
@visibleForTesting
Query queryFromSupabaseDeletePayload(
Map<String, dynamic> payload, {
required Map<String, RuntimeSupabaseColumnDefinition> supabaseDefinitions,
}) {
final columnsToFields = supabaseDefinitions.entries.fold(<String, String>{}, (acc, entry) {
acc[entry.value.columnName] = entry.key;
return acc;
});

final fieldsWithValues = payload.entries.fold(<String, dynamic>{}, (acc, entry) {
if (columnsToFields[entry.key] != null) {
acc[columnsToFields[entry.key]!] = entry.value;
}
return acc;
});

return Query(
where: fieldsWithValues.entries.map((entry) => Where.exact(entry.key, entry.value)).toList(),
providerArgs: {'limit': 1},
);
}

@protected
@visibleForTesting
@visibleForOverriding
PostgresChangeFilter? queryToPostgresChangeFilter<TModel extends OfflineFirstWithSupabaseModel>(
Query query,
) {
final adapter = remoteProvider.modelDictionary.adapterFor[TModel]!;
if (query.where?.isEmpty ?? true) return null;
final condition = query.where!.first;
final column = adapter.fieldsToSupabaseColumns[condition.evaluatedField]?.columnName;

if (column == null) return null;

final type = _compareToFilterParam(condition.compare);
if (type == null) return null;

return PostgresChangeFilter(
type: type,
column: column,
value: condition.value,
);
}

@override
Future<void> reset() async {
await super.reset();
for (final subscription in supabaseRealtimeSubscriptions.values) {
for (final eventType in subscription.values) {
for (final controller in eventType.values) {
await controller.close();
}
}
}
supabaseRealtimeSubscriptions.clear();
}

/// Subscribes to realtime updates using
/// [Supabase channels](https://supabase.com/docs/guides/realtime?queryGroups=language&language=dart).
/// **This will only work if your Supabase table has realtime enabled.**
/// Follow [Supabase's documentation](https://supabase.com/docs/guides/realtime?queryGroups=language&language=dart#realtime-api)
/// to setup your table.
///
/// The resulting stream will also notify for locally-made changes. In an online state, this
/// will result in duplicate events on the stream - the local copy is updated and notifies
/// the caller, then the Supabase realtime event is received and notifies the caller again.
///
/// Supabase's channels can
/// [become expensive quickly](https://supabase.com/docs/guides/realtime/quotas);
/// please consider scale when utilizing this method.
///
/// See [subscribe] for reactivity without using realtime.
///
/// [eventType] is the triggering remote event.
///
/// [policy] determines how data is fetched (local or remote). When [OfflineFirstGetPolicy.localOnly],
/// Supabase channels will not be used.
///
/// [query] is an optional query to filter the data. The query **must be** one level -
/// `Query.where('user', Query.exact('name', 'Tom'))` is invalid but `Query.where('name', 'Tom')`
/// is valid. The [Compare] operator is limited to a [PostgresChangeFilterType] equivalent.
/// See [_compareToFilterParam] for a precise breakdown.
Stream<List<TModel>> subscribeToRealtime<TModel extends OfflineFirstWithSupabaseModel>({
PostgresChangeEvent eventType = PostgresChangeEvent.all,
OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.alwaysHydrate,
Query? query,
String schema = 'public',
}) {
query ??= Query();

if (supabaseRealtimeSubscriptions[TModel]?[eventType]?[query] != null) {
return supabaseRealtimeSubscriptions[TModel]![eventType]![query]!.stream
as Stream<List<TModel>>;
}

final adapter = remoteProvider.modelDictionary.adapterFor[TModel]!;
if (policy == OfflineFirstGetPolicy.localOnly) {
return subscribe<TModel>(policy: policy, query: query);
}

final channel = remoteProvider.client
.channel(adapter.supabaseTableName)
.onPostgresChanges(
event: eventType,
schema: schema,
table: adapter.supabaseTableName,
filter: queryToPostgresChangeFilter<TModel>(query),
callback: (payload) async {
switch (payload.eventType) {
// This code path is likely never hit; `PostgresChangeEvent.all` is used
// to listen to changes but as far as can be determined is not delivered within
// the payload of the callback.
//
// It's handled just in case this behavior changes.
case PostgresChangeEvent.all:
final localResults = await sqliteProvider.get<TModel>(repository: this);
final remoteResults =
await get<TModel>(query: query, policy: OfflineFirstGetPolicy.awaitRemote);
final toDelete = localResults.where((r) => !remoteResults.contains(r));

for (final deletableModel in toDelete) {
await sqliteProvider.delete<TModel>(deletableModel, repository: this);
memoryCacheProvider.delete<TModel>(deletableModel, repository: this);
}

case PostgresChangeEvent.delete:
final query = queryFromSupabaseDeletePayload(
payload.oldRecord,
supabaseDefinitions: adapter.fieldsToSupabaseColumns,
);

if (query.where?.isEmpty ?? true) return;

final results = await get<TModel>(
query: query,
policy: OfflineFirstGetPolicy.localOnly,
seedOnly: true,
);
if (results.isEmpty) return;

await sqliteProvider.delete<TModel>(results.first, repository: this);
memoryCacheProvider.delete<TModel>(results.first, repository: this);

case PostgresChangeEvent.insert || PostgresChangeEvent.update:
final instance = await adapter.fromSupabase(
payload.newRecord,
provider: remoteProvider,
repository: this,
);

await sqliteProvider.upsert<TModel>(instance as TModel, repository: this);
memoryCacheProvider.upsert<TModel>(instance, repository: this);
}

await notifySubscriptionsWithLocalData<TModel>();
},
)
.subscribe();

final controller = StreamController<List<TModel>>(
onCancel: () async {
await channel.unsubscribe();
await supabaseRealtimeSubscriptions[TModel]?[eventType]?[query]?.close();
supabaseRealtimeSubscriptions[TModel]?[eventType]?.remove(query);

if (supabaseRealtimeSubscriptions[TModel]?[eventType]?.isEmpty ?? false) {
supabaseRealtimeSubscriptions[TModel]?.remove(eventType);
}

if (supabaseRealtimeSubscriptions[TModel]?.isEmpty ?? false) {
supabaseRealtimeSubscriptions.remove(TModel);
}
},
);
supabaseRealtimeSubscriptions[TModel] ??= {};
supabaseRealtimeSubscriptions[TModel]![eventType] ??= {};
supabaseRealtimeSubscriptions[TModel]![eventType]![query] = controller;

// Fetch initial data
// ignore: discarded_futures
get<TModel>(query: query, policy: policy).then((results) {
if (!controller.isClosed) controller.add(results);
});

return controller.stream;
}

@override
Future<TModel> upsert<TModel extends OfflineFirstWithSupabaseModel>(
TModel instance, {
Expand All @@ -156,6 +381,29 @@ abstract class OfflineFirstWithSupabaseRepository
}
}

PostgresChangeFilterType? _compareToFilterParam(Compare compare) {
switch (compare) {
case Compare.exact:
return PostgresChangeFilterType.eq;
case Compare.contains:
return PostgresChangeFilterType.inFilter;
case Compare.greaterThan:
return PostgresChangeFilterType.gt;
case Compare.greaterThanOrEqualTo:
return PostgresChangeFilterType.gte;
case Compare.lessThan:
return PostgresChangeFilterType.lt;
case Compare.lessThanOrEqualTo:
return PostgresChangeFilterType.lte;
case Compare.notEqual:
return PostgresChangeFilterType.neq;
case Compare.between:
return null;
case Compare.doesNotContain:
return null;
}
}

/// This is a convenience method to create the basic offline client and queue.
/// The client is used to add offline capabilities to [SupabaseProvider];
/// the queue is used to add offline to the repository.
Expand Down
Loading

0 comments on commit 2f7010a

Please sign in to comment.