Skip to content

feat: refresh watched queries on schema changes #377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
73daf49
update watched queries when schema updated
stevensJourney Oct 22, 2024
1125911
Using iterateAsyncListeners for schema change event.
Chriztiaan Oct 22, 2024
4dc0e6d
Fixed throttle overflow test breaking.
Chriztiaan Oct 22, 2024
e27e2b6
Fix iterating schemaChange listeners causing infinite loop.
Chriztiaan Oct 23, 2024
7e8f1a1
Broke out triggerWatchQuery to separate function.
Chriztiaan Oct 23, 2024
65f7773
Updated watchWithAsyncGenerator to use watchWithCallback instead of o…
Chriztiaan Oct 23, 2024
3ddc420
Separated runOnSchemaChange from AbstractPowerSyncData.
Chriztiaan Oct 24, 2024
3deaa97
useQuery will recalculate dependant tables if schema changes. Fixed a…
Chriztiaan Oct 28, 2024
9c2576b
useSuspenseQuery will recalculate tables on a schema change. Fix unit…
Chriztiaan Oct 28, 2024
55260d4
Merge branch 'main' into feat/watch-schema-changes
Chriztiaan Oct 28, 2024
9cc05f8
Tanstack queries will nou recalculate dependent tables when schema ch…
Chriztiaan Oct 28, 2024
ab34416
Changeset entries.
Chriztiaan Oct 28, 2024
7816d88
Vue queries will recalculate dependent tables on schema change.
Chriztiaan Oct 28, 2024
515b6fc
Added refreshSchema to DBAdapter.
Chriztiaan Oct 31, 2024
5d8fb96
Added changeset entries for react-native, web, and op-sqlite.
Chriztiaan Oct 31, 2024
185d766
Merge branch 'main' into feat/watch-schema-changes
Chriztiaan Nov 5, 2024
28d0fd2
Bumped react-native rnqslite version.
Chriztiaan Nov 5, 2024
19c4df8
Merge branch 'main' into feat/watch-schema-changes
Chriztiaan Nov 5, 2024
e0c72a1
Merge branch 'main' into feat/watch-schema-changes
Chriztiaan Nov 5, 2024
21a0f0c
Merge branch 'feat/watch-schema-changes' of github.com:powersync-ja/p…
Chriztiaan Nov 5, 2024
944bc83
Updated OPSqliteAdapter with incoming changes.
Chriztiaan Nov 5, 2024
ac2839f
Minor comment and await on initialized in OPSQlite refreshSchema.
Chriztiaan Nov 5, 2024
506aaee
Merge branch 'main' into feat/watch-schema-changes
Chriztiaan Nov 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/itchy-years-drop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Updated watch functions to recalculate depedendent tables if schema is updated.
7 changes: 7 additions & 0 deletions .changeset/short-owls-play.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/op-sqlite': minor
'@powersync/react-native': minor
'@powersync/web': minor
---

Added `refreshSchema()` which will cause all connections to be aware of a schema change.
5 changes: 5 additions & 0 deletions .changeset/ten-birds-camp.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/vue': patch
---

Queries will recalculate dependent tables if schema is updated.
6 changes: 6 additions & 0 deletions .changeset/tender-llamas-shop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/tanstack-react-query': patch
'@powersync/react': patch
---

Queries will recalculate dependent tables if schema is updated.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export default function ViewsLayout({ children }: { children: React.ReactNode })
beforeNavigate: async () => {
// If user is logged in, sign out and stay on the current page
if (supabase?.currentSession) {
await supabase?.client.auth.signOut();
await supabase?.logout();
await powerSync.disconnectAndClear();
setSyncEnabled(powerSync.database.name, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ export class SupabaseConnector extends BaseObserver<SupabaseConnectorListener> i
this.updateSession(session);
}

async logout() {
await this.client.auth.signOut();
this.updateSession(null);
}

async fetchCredentials() {
const {
data: { session },
Expand Down
39 changes: 24 additions & 15 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
StreamingSyncImplementation,
StreamingSyncImplementationListener
} from './sync/stream/AbstractStreamingSyncImplementation.js';
import { runOnSchemaChange } from './runOnSchemaChange.js';

export interface DisconnectAndClearOptions {
/** When set to false, data in local-only tables is preserved. */
Expand Down Expand Up @@ -103,6 +104,7 @@ export interface WatchOnChangeHandler {

export interface PowerSyncDBListener extends StreamingSyncImplementationListener {
initialized: () => void;
schemaChanged: (schema: Schema) => void;
}

export interface PowerSyncCloseOptions {
Expand Down Expand Up @@ -360,7 +362,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
this.options.logger?.warn('Schema validation failed. Unexpected behaviour could occur', ex);
}
this._schema = schema;

await this.database.execute('SELECT powersync_replace_schema(?)', [JSON.stringify(this.schema.toJSON())]);
await this.database.refreshSchema();
this.iterateListeners(async (cb) => cb.schemaChanged?.(schema));
}

/**
Expand Down Expand Up @@ -758,10 +763,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
throw new Error('onResult is required');
}

(async () => {
const watchQuery = async (abortSignal: AbortSignal) => {
try {
const resolvedTables = await this.resolveTables(sql, parameters, options);

// Fetch initial data
const result = await this.executeReadOnly(sql, parameters);
onResult(result);
Expand All @@ -780,13 +784,17 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
},
{
...(options ?? {}),
tables: resolvedTables
tables: resolvedTables,
// Override the abort signal since we intercept it
signal: abortSignal
}
);
} catch (error) {
onError?.(error);
}
})();
};

runOnSchemaChange(watchQuery, this, options);
}

/**
Expand All @@ -796,19 +804,20 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*/
watchWithAsyncGenerator(sql: string, parameters?: any[], options?: SQLWatchOptions): AsyncIterable<QueryResult> {
return new EventIterator<QueryResult>((eventOptions) => {
(async () => {
const resolvedTables = await this.resolveTables(sql, parameters, options);
const handler: WatchHandler = {
onResult: (result) => {
eventOptions.push(result);
},
onError: (error) => {
eventOptions.fail(error);
}
};

// Fetch initial data
eventOptions.push(await this.executeReadOnly(sql, parameters));
this.watchWithCallback(sql, parameters, handler, options);

for await (const event of this.onChangeWithAsyncGenerator({
...(options ?? {}),
tables: resolvedTables
})) {
eventOptions.push(await this.executeReadOnly(sql, parameters));
}
})();
options?.signal?.addEventListener('abort', () => {
eventOptions.stop();
});
});
}

Expand Down
31 changes: 31 additions & 0 deletions packages/common/src/client/runOnSchemaChange.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { AbstractPowerSyncDatabase, SQLWatchOptions } from './AbstractPowerSyncDatabase.js';

export function runOnSchemaChange(
callback: (signal: AbortSignal) => void,
db: AbstractPowerSyncDatabase,
options?: SQLWatchOptions
): void {
const triggerWatchedQuery = () => {
const abortController = new AbortController();
let disposeSchemaListener: (() => void) | null = null;
const stopWatching = () => {
abortController.abort('Abort triggered');
disposeSchemaListener?.();
disposeSchemaListener = null;
// Stop listening to upstream abort for this watch
options?.signal?.removeEventListener('abort', stopWatching);
};

options?.signal?.addEventListener('abort', stopWatching);
disposeSchemaListener = db.registerListener({
schemaChanged: async () => {
stopWatching();
// Re trigger the watched query (recursively), setTimeout ensures that we don't modify the list of listeners while iterating through them
setTimeout(() => triggerWatchedQuery(), 0);
}
});
callback(abortController.signal);
};

triggerWatchedQuery();
}
4 changes: 4 additions & 0 deletions packages/common/src/db/DBAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ export interface DBAdapter extends BaseObserverInterface<DBAdapterListener>, DBG
readTransaction: <T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions) => Promise<T>;
writeLock: <T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions) => Promise<T>;
writeTransaction: <T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions) => Promise<T>;
/**
* This method refreshes the schema information across all connections. This is for advanced use cases, and should generally not be needed.
*/
refreshSchema: () => Promise<void>;
}

export function isBatchedUpdateNotification(
Expand Down
1 change: 1 addition & 0 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export * from './client/SQLOpenFactory.js';
export * from './client/connection/PowerSyncBackendConnector.js';
export * from './client/connection/PowerSyncCredentials.js';
export * from './client/sync/bucket/BucketStorageAdapter.js';
export { runOnSchemaChange } from './client/runOnSchemaChange.js';
export { UpdateType, CrudEntry, OpId } from './client/sync/bucket/CrudEntry.js';
export * from './client/sync/bucket/SqliteBucketStorage.js';
export * from './client/sync/bucket/CrudBatch.js';
Expand Down
4 changes: 4 additions & 0 deletions packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@ export class OPSQLiteConnection extends BaseObserver<DBAdapterListener> {
}
return result as T;
}

async refreshSchema() {
await this.get("PRAGMA table_info('sqlite_master')");
}
}
9 changes: 9 additions & 0 deletions packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,13 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
throw ex;
}
}

async refreshSchema(): Promise<void> {
await this.initialized;
await this.writeConnection!.refreshSchema();

for (let readConnection of this.readConnections) {
await readConnection.connection.refreshSchema();
}
}
}
6 changes: 3 additions & 3 deletions packages/react-native/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
},
"homepage": "https://docs.powersync.com/",
"peerDependencies": {
"@journeyapps/react-native-quick-sqlite": "^2.0.0",
"@powersync/common": "workspace:^1.20.2",
"@journeyapps/react-native-quick-sqlite": "^2.1.0",
"@powersync/common": "workspace:^1.20.1",
"react": "*",
"react-native": "*"
},
Expand All @@ -46,7 +46,7 @@
},
"devDependencies": {
"@craftzdog/react-native-buffer": "^6.0.5",
"@journeyapps/react-native-quick-sqlite": "^2.0.0",
"@journeyapps/react-native-quick-sqlite": "^2.1.0",
"@rollup/plugin-alias": "^5.1.0",
"@rollup/plugin-commonjs": "^25.0.7",
"@rollup/plugin-inject": "^5.0.5",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,8 @@ export class RNQSDBAdapter extends BaseObserver<DBAdapterListener> implements DB
}
};
}

async refreshSchema() {
await this.baseDB.refreshSchema();
}
}
16 changes: 12 additions & 4 deletions packages/react/src/WatchedQuery.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { AbstractPowerSyncDatabase, BaseListener, BaseObserver, CompilableQuery, Disposable } from '@powersync/common';
import {
AbstractPowerSyncDatabase,
BaseListener,
BaseObserver,
CompilableQuery,
Disposable,
runOnSchemaChange
} from '@powersync/common';
import { AdditionalOptions } from './hooks/useQuery';

export class Query<T> {
Expand Down Expand Up @@ -117,7 +124,7 @@ export class WatchedQuery extends BaseObserver<WatchedQueryListener> implements
this.setError(error);
};

(async () => {
const watchQuery = async (abortSignal: AbortSignal) => {
await this.fetchTables();
await this.fetchData();

Expand All @@ -131,12 +138,13 @@ export class WatchedQuery extends BaseObserver<WatchedQueryListener> implements
},
{
...this.options,
signal: this.controller.signal,
signal: abortSignal,
tables: this.tables
}
);
}
})();
};
runOnSchemaChange(watchQuery, this.db, { signal: this.controller.signal });
}

private setData(results: any[]) {
Expand Down
12 changes: 10 additions & 2 deletions packages/react/src/hooks/useQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,18 @@ export const useQuery = <T = any>(
};

React.useEffect(() => {
(async () => {
const updateData = async () => {
await fetchTables();
await fetchData();
})();
};

updateData();

const l = powerSync.registerListener({
schemaChanged: updateData
});

return () => l?.();
}, [powerSync, memoizedParams, sqlStatement]);

React.useEffect(() => {
Expand Down
12 changes: 3 additions & 9 deletions packages/react/tests/useQuery.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import { useQuery } from '../src/hooks/useQuery';

const mockPowerSync = {
currentStatus: { status: 'initial' },
registerListener: vi.fn(() => ({
statusChanged: vi.fn(() => 'updated')
})),
registerListener: vi.fn(() => {}),
resolveTables: vi.fn(() => ['table1', 'table2']),
onChangeWithCallback: vi.fn(),
getAll: vi.fn(() => Promise.resolve(['list1', 'list2']))
Expand Down Expand Up @@ -92,9 +90,7 @@ describe('useQuery', () => {
it('should set error when error occurs and runQueryOnce flag is set', async () => {
const mockPowerSyncError = {
currentStatus: { status: 'initial' },
registerListener: vi.fn(() => ({
statusChanged: vi.fn(() => 'updated')
})),
registerListener: vi.fn(() => {}),
onChangeWithCallback: vi.fn(),
resolveTables: vi.fn(() => ['table1', 'table2']),
getAll: vi.fn(() => {
Expand All @@ -119,9 +115,7 @@ describe('useQuery', () => {
it('should set error when error occurs', async () => {
const mockPowerSyncError = {
currentStatus: { status: 'initial' },
registerListener: vi.fn(() => ({
statusChanged: vi.fn(() => 'updated')
})),
registerListener: vi.fn(() => {}),
onChangeWithCallback: vi.fn(),
resolveTables: vi.fn(() => ['table1', 'table2']),
getAll: vi.fn(() => {
Expand Down
4 changes: 1 addition & 3 deletions packages/react/tests/useStatus.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ describe('useStatus', () => {
it.skip('should update the status when the listener is called', () => {
const mockPowerSyncInTest = {
currentStatus: { status: 'initial' },
registerListener: () => ({
statusChanged: () => 'updated'
})
registerListener: vi.fn(() => {})
};

const wrapper = ({ children }) => (
Expand Down
4 changes: 1 addition & 3 deletions packages/react/tests/useSuspenseQuery.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ const defaultQueryResult = ['list1', 'list2'];
const createMockPowerSync = () => {
return {
currentStatus: { status: 'initial' },
registerListener: vi.fn(() => ({
statusChanged: vi.fn(() => 'updated')
})),
registerListener: vi.fn(() => {}),
resolveTables: vi.fn(() => ['table1', 'table2']),
onChangeWithCallback: vi.fn(),
getAll: vi.fn(() => Promise.resolve(defaultQueryResult)) as Mock<any, any>
Expand Down
11 changes: 10 additions & 1 deletion packages/tanstack-react-query/src/hooks/useQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,20 @@ function useQueryCore<
};

React.useEffect(() => {
if (!query) return;
if (!query) return () => {};

(async () => {
await fetchTables();
})();

const l = powerSync.registerListener({
schemaChanged: async () => {
await fetchTables();
queryClient.invalidateQueries({ queryKey: options.queryKey });
}
});

return () => l?.();
}, [powerSync, sqlStatement, stringifiedParams]);

const queryFn = React.useCallback(async () => {
Expand Down
Loading
Loading