From 10ec2bc7006ce53a79b2a8a9d82512d916ff08b3 Mon Sep 17 00:00:00 2001 From: Frederik Bache Date: Thu, 23 Mar 2023 20:23:19 +0100 Subject: [PATCH 1/4] wip --- src/FetchList.vue | 49 +++++++++++++++++++++ src/createStores.ts | 4 ++ src/createVroom.ts | 24 +++++++++++ src/server/Mocket.ts | 97 ++++++++++++++++++++++++++++++++++++++++++ src/server/Server.ts | 4 +- src/server/createDb.ts | 40 +++++++++++++++++ src/sockets.ts | 66 ++++++++++++++++++++++++++++ 7 files changed, 282 insertions(+), 2 deletions(-) create mode 100644 src/server/Mocket.ts create mode 100644 src/sockets.ts diff --git a/src/FetchList.vue b/src/FetchList.vue index 81562a8..7fda382 100644 --- a/src/FetchList.vue +++ b/src/FetchList.vue @@ -13,6 +13,8 @@ import { computed, inject, onUnmounted, ref, useSlots, watch } from 'vue'; import useFetchState from './useFetchState'; +import type Sockets from './sockets'; + type SortSettings = { field: string; dir?: 'ASC' | 'DESC'; @@ -39,11 +41,47 @@ const props = defineProps({ const emit = defineEmits(['ready', 'loaded', 'update:modelValue']); const stores = inject('stores') as any; +const socket = inject('socket') as Sockets; const store = stores[props.model](); const cache = (inject('cache') as any)(); const settings = (inject('models') as any)[props.model]; const slots = useSlots(); +const socketId = ref('#' + Math.random()); + +socketId.value = socket.subscribeToModel(props.model, props.filter, (event) => { + if (event.type === 'db:create') { + store.add([event.data]); + pushId(event.data.id); + } else if (event.type === 'db:update') { + store.add([event.data]); + } else if (event.type === 'db:delete') { + store.localDelete(event.data.id); + } + console.log('Subscription gave event', event); +}); +/* if (socket.readyState === 1) { + socket.send( + JSON.stringify({ + id: socketId.value, + subscribe: props.model, + filter: props.filter, + events: ['db:create', 'db:update', 'db:delete'], + }) + ); +} else { + socket.addEventListener('open', () => { + socket.send( + JSON.stringify({ + id: socketId.value, + subscribe: props.model, + filter: props.filter, + events: ['db:create', 'db:update', 'db:delete'], + }) + ); + }); +} */ + const relations = computed(() => ({ ...settings.hasMany, ...settings.belongsTo, @@ -144,6 +182,7 @@ fetch(); watch(filterString, () => { fetch(); }); + watch(ids, (newIds, oldIds) => { cache.subscribe(props.model, newIds); cache.unsubscribe(props.model, oldIds); @@ -158,11 +197,21 @@ watch(includeIds, (newIds, oldIds) => { }); }); +function handleSocketEvent(event: any) { + if (event.type === 'db:create' && event.subIds.includes(socketId.value)) { + pushId(event.id); + } +} + +// socket.addEventListener('message', handleSocketEvent); + onUnmounted(() => { cache.unsubscribe(props.model, ids.value); Object.keys(includeIds.value).forEach((model) => { cache.unsubscribe(model, includeIds.value[model]); }); + + socket.unsubscribe(socketId.value); }); emit('ready', { diff --git a/src/createStores.ts b/src/createStores.ts index 56317e5..c02357e 100644 --- a/src/createStores.ts +++ b/src/createStores.ts @@ -235,6 +235,10 @@ function createStore( this.items = this.items.filter((item: any) => item.id !== id); }); }, + localDelete(id: ID) { + console.log('local delete', id); + this.items = this.items.filter((item: any) => item.id !== id); + }, bulkDelete(ids: ID[]) { return api .delete(endpoint + '/bulk', ids.map((id) => ({ id })) as any) diff --git a/src/createVroom.ts b/src/createVroom.ts index 8dbec2c..7876bde 100644 --- a/src/createVroom.ts +++ b/src/createVroom.ts @@ -1,5 +1,6 @@ import type { Settings, FieldTypes, IdType } from './types'; import createDb, { VroomDb } from './server/createDb'; +import socketConnection from './server/Mocket'; import createServer from './server/createServer'; import createStores from './createStores'; import createCache from './createCache'; @@ -8,6 +9,7 @@ import FetchList from './FetchList.vue'; import FetchSingle from './FetchSingle.vue'; import FetchSingleton from './FetchSingleton.vue'; import api from './api'; +import Sockets from './sockets'; export default function createVroom( options: Options @@ -30,13 +32,33 @@ export default function createVroom( const server = __DEV__ ? createServer(settings, models, db) : null; + + const socket = new Sockets(settings.ws); + + // const socket = __DEV__ ? socketConnection : new WebSocket(settings.ws); const stores = createStores( models, settings.baseURL, namingWithDefault ); + const cache = createCache(stores); + /* socket.addEventListener('message', (event: any) => { + if (event.type === 'db:delete') { + stores[event.model]().localDelete(event.id); + console.log('Delete something'); + } + + if (event.type === 'db:update') { + stores[event.model]().add([event.data]); + } + + if (event.type === 'db:create') { + stores[event.model]().add([event.data]); + } + }); */ + return { api, db: db as VroomDb['id']>, @@ -44,11 +66,13 @@ export default function createVroom( server, stores, cache, + socket, types: {} as ModelTypes, install(app: any) { app.provide('stores', stores); app.provide('models', models); app.provide('cache', cache); + app.provide('socket', socket); app.provide('vroomTypes', {} as ModelTypes); app.component('FetchList', FetchList); app.component('FetchSingle', FetchSingle); diff --git a/src/server/Mocket.ts b/src/server/Mocket.ts new file mode 100644 index 0000000..8c612d7 --- /dev/null +++ b/src/server/Mocket.ts @@ -0,0 +1,97 @@ +import { parseFilterField } from './handlers/helpers'; +type EventName = 'open' | 'close' | 'message'; + +const bc = new BroadcastChannel('socket:events'); + +class Mocket { + public listeners: { [key in EventName]?: Array<(event: Event) => void> }; + public subscriptions: Array; + public readyState: number; + constructor() { + this.listeners = {}; + this.subscriptions = []; + this.readyState = 0; + + setTimeout(() => { + this.readyState = 1; + this.listeners.open?.forEach((h) => h({} as any)); + }, 150); + } + + public addEventListener(name: EventName, handler: (event: Event) => void) { + if (!this.listeners[name]) { + this.listeners[name] = []; + } + this.listeners[name]?.push(handler); + } + + public removeEventListener(name: EventName, handler: (event: Event) => void) { + this.listeners[name] = this.listeners[name]?.filter((h) => h !== handler); + } + + public send(data: string) { + const object = JSON.parse(data); + if (object.subscribe) { + this.subscriptions.push({ + id: object.id, + model: object.subscribe, + filter: object.filter, + events: object.events || null, + }); + console.log('Subscriptions are now', this.subscriptions); + } else if (object.unsubscribe) { + this.subscriptions = this.subscriptions.filter((s) => s.id !== object.id); + console.log('Subscriptions are now', this.subscriptions); + } else if (object.on) { + this.subscriptions.push({ + id: object.id, + event: object.on, + }); + } else { + sendMessage(object); + console.log('WS: recieved', data); + } + } +} + +const socketConnection = new Mocket(); + +export function sendMessage(message: any) { + if (socketConnection.readyState !== 1) return; + console.log('Sending message', message); + bc.postMessage(message); +} + +function checkIfItemMatchesFilter(item: any, filters: any) { + let match = true; + Object.entries(filters).forEach(([field, value]) => { + const { name, fn } = parseFilterField(field); + if (name in item && !fn(item[name], value)) match = false; + }); + return match; +} + +bc.onmessage = function (ev: any) { + let subscribed = false; + const subIds = [] as string[]; + socketConnection.subscriptions.forEach((s) => { + if (ev.data.event) { + if (ev.data.event === s.event) subscribed = true; + } else if ( + ev.data.model === s.model && + checkIfItemMatchesFilter(ev.data.data, s.filter) && + (!s.events || s.events.includes(ev.data.type)) + ) { + subIds.push(s.id); + subscribed = true; + } + }); + if (!subscribed) return; + const data = { ...ev.data, subIds }; + console.log('WS:', data, socketConnection.listeners.message?.length); + socketConnection.listeners['message']?.forEach((handler) => { + handler(data); + }); +}; + +export default socketConnection; diff --git a/src/server/Server.ts b/src/server/Server.ts index aed2836..b926e27 100644 --- a/src/server/Server.ts +++ b/src/server/Server.ts @@ -517,6 +517,8 @@ export default class Server { // @ts-expect-error window.fetch = async (...args) => { const [path, config] = args; + const delay = this.settings.delay || 150; + await new Promise((r) => setTimeout(r, delay)); const customResponse = this.parseRequest( { @@ -529,8 +531,6 @@ export default class Server { ); if (customResponse !== null) { - const delay = this.settings.delay || 150; - await new Promise((r) => setTimeout(r, delay)); this.logEvent( '🛬 Response', path.toString().replace(this.baseURL, ''), diff --git a/src/server/createDb.ts b/src/server/createDb.ts index 3bf50c2..c8bbe12 100644 --- a/src/server/createDb.ts +++ b/src/server/createDb.ts @@ -1,5 +1,6 @@ import ServerError from '../ServerError'; import type { FieldTypes, HasId, ID } from '../types'; +import socketConnection, { sendMessage } from './Mocket'; type Relation = { [K: string]: () => string; @@ -19,6 +20,8 @@ type Schema = { let db = {} as any; +const bc = new BroadcastChannel('db:changes'); + export class Collection { model: string; schema: Schema; @@ -188,6 +191,14 @@ export class Collection { this.updateRelations(relationsToUpdate, []); } + this.sync(); + sendMessage({ + type: 'db:create', + model: this.model, + id: newId, + data: this.items[this.items.length - 1], + }); + return this.items[this.items.length - 1]; } @@ -410,10 +421,20 @@ export class Collection { this.updateRelations(relationsToUpdate, relationsToRemove); } + this.sync(); + + sendMessage({ + type: 'db:update', + model: this.model, + id: id, + data: this.items[index], + }); + return this.items[index]; } destroy(id: Type['id']) { + const item = this.find(id); this.items = this.items.filter((item) => item.id !== id); if (this.addDevtoolsEvent) { @@ -422,6 +443,9 @@ export class Collection { }); } + this.sync(); + sendMessage({ type: 'db:delete', model: this.model, id: id, data: item }); + // TODO Update relations on delete also } @@ -429,6 +453,15 @@ export class Collection { this.items = []; this.lastId = 0; } + + sync() { + bc.postMessage({ + type: 'db:sync', + model: this.model, + items: this.items, + lastId: this.lastId, + }); + } } type Database = { @@ -463,5 +496,12 @@ export default function createDb(options: any) { ); }); + setTimeout(() => { + bc.onmessage = function (ev: any) { + db[ev.data.model].items = ev.data.items; + db[ev.data.model].lastId = ev.data.lastId; + }; + }, 100); + return db as Database; } diff --git a/src/sockets.ts b/src/sockets.ts new file mode 100644 index 0000000..f176cec --- /dev/null +++ b/src/sockets.ts @@ -0,0 +1,66 @@ +import socketConnection from './server/Mocket'; + +type ModelEvent = 'create' | 'update' | 'delete'; +socketConnection; +export default class Sockets { + private subscriptions: { [key: string]: () => void }; + private connection: typeof socketConnection | WebSocket; + + constructor(wsUrl = '') { + this.connection = __DEV__ ? socketConnection : new WebSocket(wsUrl); + this.subscriptions = {}; + + this.connection.addEventListener('message', (message: Event) => { + this.handleMessage(message); + }); + } + + handleMessage(msg: Object) { + if (msg.subIds) { + msg.subIds.forEach((i) => { + this.subscriptions[i](msg); + }); + } + } + + send(msg: Object) { + if (this.connection.readyState === 1) { + this.connection.send(JSON.stringify(msg)); + } else { + this.connection.addEventListener('open', () => { + this.connection.send(JSON.stringify(msg)); + }); + // TODO remove the event listener again? + } + } + + subscribeToModel( + model: keyof Models, + filter: any, + cb: () => void, + events?: ModelEvent[] + ) { + const socketEvents = events + ? events.map((e) => 'db:' + e) + : ['db:create', 'db:update', 'db:delete']; + const subscriptionId = Math.random(); + this.send({ + id: subscriptionId, + subscribe: model, + filter: filter, + events: socketEvents, + }); + + this.subscriptions[subscriptionId] = cb; + return subscriptionId; + } + + unsubscribe(id: string | number) { + delete this.subscriptions[id]; + + this.send({ + id: id, + unsubscribe: 'any', + }); + } +} From 1661f381e497df9a77eecbc811806b4478e80623 Mon Sep 17 00:00:00 2001 From: Frederik Bache Date: Tue, 4 Apr 2023 14:55:09 +0200 Subject: [PATCH 2/4] wip --- src/FetchList.vue | 18 ++++- src/createStores.ts | 7 ++ src/createVroom.ts | 33 ++++----- src/server/Mocket.ts | 120 +++++++++++++++++++------------- src/server/createDb.ts | 12 ++-- src/sockets.ts | 151 +++++++++++++++++++++++++++++++++-------- 6 files changed, 235 insertions(+), 106 deletions(-) diff --git a/src/FetchList.vue b/src/FetchList.vue index 7fda382..4217963 100644 --- a/src/FetchList.vue +++ b/src/FetchList.vue @@ -49,7 +49,19 @@ const slots = useSlots(); const socketId = ref('#' + Math.random()); -socketId.value = socket.subscribeToModel(props.model, props.filter, (event) => { +/* const sub = socket.subscribeToModel(props.model, props.filter); +sub.on('db:create', (data) => { + store.add([data]); + pushId(data.id); +}); +sub.on('db:update', (data) => { + store.add([data]); +}); +sub.on('db:delete', (data) => { + store.localDelete(data.id); +}); */ + +/* socketId.value = socket.subscribeToModel(props.model, props.filter, (event) => { if (event.type === 'db:create') { store.add([event.data]); pushId(event.data.id); @@ -59,7 +71,7 @@ socketId.value = socket.subscribeToModel(props.model, props.filter, (event) => { store.localDelete(event.data.id); } console.log('Subscription gave event', event); -}); +}); */ /* if (socket.readyState === 1) { socket.send( JSON.stringify({ @@ -211,7 +223,7 @@ onUnmounted(() => { cache.unsubscribe(model, includeIds.value[model]); }); - socket.unsubscribe(socketId.value); + // sub.unsubscribe(); }); emit('ready', { diff --git a/src/createStores.ts b/src/createStores.ts index c02357e..92cd2f1 100644 --- a/src/createStores.ts +++ b/src/createStores.ts @@ -251,6 +251,12 @@ function createStore( garbageCollect(ids: ID[]) { this.items = this.items.filter((item: any) => !ids.includes(item.id)); }, + sync(itemOrItems: any[] | any) { + const asArray = Array.isArray(itemOrItems) + ? itemOrItems + : [itemOrItems]; + this.add(asArray); + }, }, }); } @@ -326,6 +332,7 @@ export default function createStores( delete: (id: Type[K]['id']) => Promise; // @ts-expect-error bulkDelete: (ids: Type[K]['id'][]) => Promise; + sync: (itemOrItems: Partial | Partial[]) => void; } >; }; diff --git a/src/createVroom.ts b/src/createVroom.ts index 7876bde..57f6cc8 100644 --- a/src/createVroom.ts +++ b/src/createVroom.ts @@ -1,6 +1,5 @@ import type { Settings, FieldTypes, IdType } from './types'; import createDb, { VroomDb } from './server/createDb'; -import socketConnection from './server/Mocket'; import createServer from './server/createServer'; import createStores from './createStores'; import createCache from './createCache'; @@ -10,6 +9,9 @@ import FetchSingle from './FetchSingle.vue'; import FetchSingleton from './FetchSingleton.vue'; import api from './api'; import Sockets from './sockets'; +import Mocket from './server/Mocket'; +import type { InjectionKey } from 'vue'; +import createComponent from './democomponent'; export default function createVroom( options: Options @@ -32,10 +34,19 @@ export default function createVroom( const server = __DEV__ ? createServer(settings, models, db) : null; + const mocket = + __DEV__ && settings.server?.enable + ? new Mocket( + db, + settings.identityModel ? settings.identityModel() : null + ) + : null; - const socket = new Sockets(settings.ws); + const socket = new Sockets( + settings.ws, + mocket as any as typeof Mocket + ); - // const socket = __DEV__ ? socketConnection : new WebSocket(settings.ws); const stores = createStores( models, settings.baseURL, @@ -44,21 +55,6 @@ export default function createVroom( const cache = createCache(stores); - /* socket.addEventListener('message', (event: any) => { - if (event.type === 'db:delete') { - stores[event.model]().localDelete(event.id); - console.log('Delete something'); - } - - if (event.type === 'db:update') { - stores[event.model]().add([event.data]); - } - - if (event.type === 'db:create') { - stores[event.model]().add([event.data]); - } - }); */ - return { api, db: db as VroomDb['id']>, @@ -67,6 +63,7 @@ export default function createVroom( stores, cache, socket, + mocket, types: {} as ModelTypes, install(app: any) { app.provide('stores', stores); diff --git a/src/server/Mocket.ts b/src/server/Mocket.ts index 8c612d7..03a70e0 100644 --- a/src/server/Mocket.ts +++ b/src/server/Mocket.ts @@ -1,21 +1,59 @@ -import { parseFilterField } from './handlers/helpers'; type EventName = 'open' | 'close' | 'message'; -const bc = new BroadcastChannel('socket:events'); +export const socketChannel = new BroadcastChannel('socket:events'); -class Mocket { +let readyState = 0; + +export default class Mocket { public listeners: { [key in EventName]?: Array<(event: Event) => void> }; public subscriptions: Array; + public handlers: Array; public readyState: number; - constructor() { + private db: DbType; + private identityModel: string; + private identity: IdentityModel | null; + + constructor(db: DbType, identityModel: string) { this.listeners = {}; this.subscriptions = []; + this.handlers = []; this.readyState = 0; + this.db = db; + this.identityModel = identityModel; + this.identity = null; setTimeout(() => { this.readyState = 1; + readyState = 1; this.listeners.open?.forEach((h) => h({} as any)); }, 150); + + socketChannel.onmessage = (ev: any) => { + let subscribed = false; + + this.subscriptions.forEach((s) => { + // console.log('Message received', ev, s); + if (ev.data.event) { + if (ev.data.event === s.event) subscribed = true; + } else if ( + ev.data.model === s.model && + s.ids && + s.ids.includes(ev.data.id) && + (!s.events || s.events.includes(ev.data.type)) + ) { + subscribed = true; + } + }); + + if (!subscribed) return; + const data = { ...ev.data }; + console.log('🔻', data, this.listeners.message?.length); + this.listeners['message']?.forEach((handler) => { + handler({ + data: JSON.stringify(data), + }); + }); + }; } public addEventListener(name: EventName, handler: (event: Event) => void) { @@ -29,69 +67,53 @@ class Mocket { this.listeners[name] = this.listeners[name]?.filter((h) => h !== handler); } + public addHandler( + model: string, + type: string, + handler: (data: any, db: DbType, identity: IdentityModel | null) => any + ) { + this.handlers.push({ model, type, handler }); + } + + public broadcast(data: Object) { + this.send(JSON.stringify(data)); + } + public send(data: string) { const object = JSON.parse(data); + console.log('🟢:', object); if (object.subscribe) { this.subscriptions.push({ id: object.id, model: object.subscribe, - filter: object.filter, + ids: object.ids, events: object.events || null, }); console.log('Subscriptions are now', this.subscriptions); } else if (object.unsubscribe) { this.subscriptions = this.subscriptions.filter((s) => s.id !== object.id); console.log('Subscriptions are now', this.subscriptions); - } else if (object.on) { - this.subscriptions.push({ - id: object.id, - event: object.on, - }); + } else if (object.auth) { + this.identity = this.db[this.identityModel].find(object.auth); } else { - sendMessage(object); - console.log('WS: recieved', data); + const handler = this.handlers.find( + (h) => h.model === object.model && h.type === object.type + ); + if (handler) sendMessage(handler.handler(object, this.db, this.identity)); + else { + sendMessage(object); + } } } } -const socketConnection = new Mocket(); - export function sendMessage(message: any) { - if (socketConnection.readyState !== 1) return; - console.log('Sending message', message); - bc.postMessage(message); + if (readyState !== 1) return; + socketChannel.postMessage(message); } -function checkIfItemMatchesFilter(item: any, filters: any) { - let match = true; - Object.entries(filters).forEach(([field, value]) => { - const { name, fn } = parseFilterField(field); - if (name in item && !fn(item[name], value)) match = false; - }); - return match; -} +/* const socketConnection = new Mocket(); -bc.onmessage = function (ev: any) { - let subscribed = false; - const subIds = [] as string[]; - socketConnection.subscriptions.forEach((s) => { - if (ev.data.event) { - if (ev.data.event === s.event) subscribed = true; - } else if ( - ev.data.model === s.model && - checkIfItemMatchesFilter(ev.data.data, s.filter) && - (!s.events || s.events.includes(ev.data.type)) - ) { - subIds.push(s.id); - subscribed = true; - } - }); - if (!subscribed) return; - const data = { ...ev.data, subIds }; - console.log('WS:', data, socketConnection.listeners.message?.length); - socketConnection.listeners['message']?.forEach((handler) => { - handler(data); - }); -}; - -export default socketConnection; + + +export default socketConnection; */ diff --git a/src/server/createDb.ts b/src/server/createDb.ts index c8bbe12..2292b99 100644 --- a/src/server/createDb.ts +++ b/src/server/createDb.ts @@ -1,6 +1,6 @@ import ServerError from '../ServerError'; import type { FieldTypes, HasId, ID } from '../types'; -import socketConnection, { sendMessage } from './Mocket'; +import { sendMessage } from './Mocket'; type Relation = { [K: string]: () => string; @@ -192,12 +192,12 @@ export class Collection { } this.sync(); - sendMessage({ + /* sendMessage({ type: 'db:create', model: this.model, id: newId, data: this.items[this.items.length - 1], - }); + }); */ return this.items[this.items.length - 1]; } @@ -423,12 +423,12 @@ export class Collection { this.sync(); - sendMessage({ + /* sendMessage({ type: 'db:update', model: this.model, id: id, data: this.items[index], - }); + }); */ return this.items[index]; } @@ -444,7 +444,7 @@ export class Collection { } this.sync(); - sendMessage({ type: 'db:delete', model: this.model, id: id, data: item }); + /* sendMessage({ type: 'db:delete', model: this.model, id: id, data: item }); */ // TODO Update relations on delete also } diff --git a/src/sockets.ts b/src/sockets.ts index f176cec..5cee823 100644 --- a/src/sockets.ts +++ b/src/sockets.ts @@ -1,58 +1,149 @@ -import socketConnection from './server/Mocket'; +import type Mocket from './server/Mocket'; + +type SubscriptionSettings = { + model: string; + ids: Array; + onUnsubscribe: () => void; + onEmit: (event: string, data: any) => void; +}; + +class Subscription { + onUnsubscribe: () => void; + onEmit: (event: string, data: any) => void; + listeners: { [key: string]: Array<(data: any) => void> }; + public model: string; + public ids: Array; + + constructor({ model, ids, onUnsubscribe, onEmit }: SubscriptionSettings) { + this.onUnsubscribe = onUnsubscribe; + this.onEmit = onEmit; + this.model = model; + this.ids = ids; + this.listeners = {}; + } + + public on(event: string, handler: (data: any) => void) { + if (!this.listeners[event]) { + this.listeners[event] = []; + } + this.listeners[event].push(handler); + } + + public handle(msg: any) { + const handlers = this.listeners[msg.type] || []; + // console.log('Subscription handle', type, item, handlers); + // console.log('Handle', type, data, handlers, this.listeners); + handlers.forEach((handler) => { + handler(msg); + }); + } + + public emit(event: string, data: any) { + this.onEmit(event, data); + } + + public unsubscribe() { + this.onUnsubscribe(); + } +} + +type EventMessage = { + subIds: Array; + type: string; + data: any; +}; -type ModelEvent = 'create' | 'update' | 'delete'; -socketConnection; export default class Sockets { - private subscriptions: { [key: string]: () => void }; - private connection: typeof socketConnection | WebSocket; + private subscriptions: Subscription[]; + private connection: Mocket | WebSocket; + private pongTimeout: NodeJS.Timeout | null; - constructor(wsUrl = '') { - this.connection = __DEV__ ? socketConnection : new WebSocket(wsUrl); - this.subscriptions = {}; + constructor(wsUrl = '', connection: Mocket) { + this.connection = connection || new WebSocket(wsUrl); + this.subscriptions = []; - this.connection.addEventListener('message', (message: Event) => { - this.handleMessage(message); + this.connection.addEventListener('message', (event: Event) => { + console.log('event', event); + if (event.data === 'pong') { + if (this.pongTimeout) clearTimeout(this.pongTimeout); + } else { + const message = JSON.parse(event.data); + this.handleMessage(message as any as EventMessage); + } }); + + /* this.pongTimeout = null; + this.connection.addEventListener('open', () => { + setInterval(() => { + this.sendPingPong(); + }, 10000); + }); */ + } + + sendPingPong() { + this.send('ping'); + + this.pongTimeout = setTimeout(() => { + console.log('Did not get pong back'); + }, 5000); } - handleMessage(msg: Object) { - if (msg.subIds) { + handleMessage(msg: EventMessage) { + // console.log('Handle message', msg, this.subscriptions); + this.subscriptions + .filter( + (subscription) => + subscription.model === msg.model && subscription.ids.includes(msg.id) + ) + .forEach((subscription) => { + subscription.handle(msg); + }); + /* if (msg.subIds) { msg.subIds.forEach((i) => { - this.subscriptions[i](msg); + this.subscriptions[i].handle(msg); }); - } + } */ } - send(msg: Object) { + send(msg: Object | string) { if (this.connection.readyState === 1) { - this.connection.send(JSON.stringify(msg)); + this.connection.send(typeof msg === 'string' ? msg : JSON.stringify(msg)); } else { this.connection.addEventListener('open', () => { - this.connection.send(JSON.stringify(msg)); + this.connection.send( + typeof msg === 'string' ? msg : JSON.stringify(msg) + ); }); // TODO remove the event listener again? } } - subscribeToModel( - model: keyof Models, - filter: any, - cb: () => void, - events?: ModelEvent[] - ) { - const socketEvents = events - ? events.map((e) => 'db:' + e) - : ['db:create', 'db:update', 'db:delete']; + public subscribeToModel(model: keyof Models, ids?: any[]) { const subscriptionId = Math.random(); this.send({ id: subscriptionId, subscribe: model, - filter: filter, - events: socketEvents, + ids: ids, + }); + + const subscription = new Subscription({ + model, + ids, + onUnsubscribe: () => { + this.unsubscribe(subscriptionId); + }, + onEmit: (event, id, data) => { + this.send({ + type: event, + model, + id, + data, + }); + }, }); - this.subscriptions[subscriptionId] = cb; - return subscriptionId; + this.subscriptions.push(subscription); + return subscription; } unsubscribe(id: string | number) { From 65497bc4b47cccc6106135888ee5dbf37abf240e Mon Sep 17 00:00:00 2001 From: Frederik Bache Date: Sat, 8 Apr 2023 12:49:46 +0200 Subject: [PATCH 3/4] wip --- src/components/createUseSingle.ts | 1 + src/createStores.ts | 1 - src/createVroom.ts | 2 +- src/server/Mocket.ts | 2 ++ src/server/createDb.ts | 1 + src/sockets.ts | 23 +++++++++++++---------- src/types.ts | 1 + 7 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/components/createUseSingle.ts b/src/components/createUseSingle.ts index 80405c0..39b38fd 100644 --- a/src/components/createUseSingle.ts +++ b/src/components/createUseSingle.ts @@ -5,6 +5,7 @@ import { ref, watch, type ComputedRef, + nextTick, } from 'vue'; import useFetchState from '../useFetchState'; import unwrap from './unwrap'; diff --git a/src/createStores.ts b/src/createStores.ts index 7b61260..e529f10 100644 --- a/src/createStores.ts +++ b/src/createStores.ts @@ -168,7 +168,6 @@ function createStore( if (sort.length) params.sort = createSortString(sort); if (include.length) params.include = include.join(','); if ('cursor' in params && params.cursor === undefined) { - console.log('Deleting emoty nextcursor'); delete params.cursor; } const url = overridePath || endpoint; diff --git a/src/createVroom.ts b/src/createVroom.ts index 6067ac1..a82e5a8 100644 --- a/src/createVroom.ts +++ b/src/createVroom.ts @@ -45,7 +45,7 @@ export default function createVroom( const socket = new Sockets( settings.ws, - mocket as any as typeof Mocket + mocket as any as Mocket ); const api = createApi(server); diff --git a/src/server/Mocket.ts b/src/server/Mocket.ts index 03a70e0..3fd5faf 100644 --- a/src/server/Mocket.ts +++ b/src/server/Mocket.ts @@ -50,6 +50,7 @@ export default class Mocket { console.log('🔻', data, this.listeners.message?.length); this.listeners['message']?.forEach((handler) => { handler({ + // @ts-ignore data: JSON.stringify(data), }); }); @@ -94,6 +95,7 @@ export default class Mocket { this.subscriptions = this.subscriptions.filter((s) => s.id !== object.id); console.log('Subscriptions are now', this.subscriptions); } else if (object.auth) { + // @ts-ignore this.identity = this.db[this.identityModel].find(object.auth); } else { const handler = this.handlers.find( diff --git a/src/server/createDb.ts b/src/server/createDb.ts index 1d554dc..9163652 100644 --- a/src/server/createDb.ts +++ b/src/server/createDb.ts @@ -502,6 +502,7 @@ export default function createDb(options: any) { setTimeout(() => { bc.onmessage = function (ev: any) { + console.log('Getting a bc message', ev); db[ev.data.model].items = ev.data.items; db[ev.data.model].lastId = ev.data.lastId; }; diff --git a/src/sockets.ts b/src/sockets.ts index 5cee823..d7ac3fe 100644 --- a/src/sockets.ts +++ b/src/sockets.ts @@ -4,12 +4,12 @@ type SubscriptionSettings = { model: string; ids: Array; onUnsubscribe: () => void; - onEmit: (event: string, data: any) => void; + onEmit?: (event: string, data: any) => void; }; class Subscription { onUnsubscribe: () => void; - onEmit: (event: string, data: any) => void; + onEmit?: (event: string, data: any) => void; listeners: { [key: string]: Array<(data: any) => void> }; public model: string; public ids: Array; @@ -39,7 +39,9 @@ class Subscription { } public emit(event: string, data: any) { - this.onEmit(event, data); + if (this.onEmit) { + this.onEmit(event, data); + } } public unsubscribe() { @@ -61,8 +63,9 @@ export default class Sockets { constructor(wsUrl = '', connection: Mocket) { this.connection = connection || new WebSocket(wsUrl); this.subscriptions = []; + this.pongTimeout = null; - this.connection.addEventListener('message', (event: Event) => { + this.connection.addEventListener('message', (event: any) => { console.log('event', event); if (event.data === 'pong') { if (this.pongTimeout) clearTimeout(this.pongTimeout); @@ -88,7 +91,7 @@ export default class Sockets { }, 5000); } - handleMessage(msg: EventMessage) { + handleMessage(msg: any) { // console.log('Handle message', msg, this.subscriptions); this.subscriptions .filter( @@ -127,19 +130,19 @@ export default class Sockets { }); const subscription = new Subscription({ - model, - ids, + model: model as string, + ids: ids as any, onUnsubscribe: () => { this.unsubscribe(subscriptionId); }, - onEmit: (event, id, data) => { + /* onEmit: (event: string, id: any, data: any) => { this.send({ type: event, model, id, data, }); - }, + }, */ }); this.subscriptions.push(subscription); @@ -147,7 +150,7 @@ export default class Sockets { } unsubscribe(id: string | number) { - delete this.subscriptions[id]; + delete this.subscriptions[id as any]; this.send({ id: id, diff --git a/src/types.ts b/src/types.ts index b2b894b..c4cb83d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -62,6 +62,7 @@ export type FetchRequestOptions = { export type Settings = { baseURL?: string; + ws?: string; server?: ServerSettings; idsAreNumbers?: boolean; idFactory?: (i: number) => string; From 9aeeeac39fc7d8ad1d9d91961e62cb46f4dcc964 Mon Sep 17 00:00:00 2001 From: Frederik Bache Date: Tue, 25 Apr 2023 09:13:05 +0200 Subject: [PATCH 4/4] wip --- src/components/createUseSingle.ts | 1 - src/sockets.ts | 79 +++++++++++++++++++------------ 2 files changed, 50 insertions(+), 30 deletions(-) diff --git a/src/components/createUseSingle.ts b/src/components/createUseSingle.ts index 39b38fd..fb614a9 100644 --- a/src/components/createUseSingle.ts +++ b/src/components/createUseSingle.ts @@ -118,7 +118,6 @@ export default function createUseSingle( // Update cache subscriptions when ids change watch(includedIds, (newIds, oldIds) => { - console.log('Newold', newIds, oldIds); Object.keys(newIds).forEach((model) => { cacheStore.subscribe(model, newIds[model]); }); diff --git a/src/sockets.ts b/src/sockets.ts index d7ac3fe..78ea98b 100644 --- a/src/sockets.ts +++ b/src/sockets.ts @@ -31,8 +31,6 @@ class Subscription { public handle(msg: any) { const handlers = this.listeners[msg.type] || []; - // console.log('Subscription handle', type, item, handlers); - // console.log('Handle', type, data, handlers, this.listeners); handlers.forEach((handler) => { handler(msg); }); @@ -59,21 +57,21 @@ export default class Sockets { private subscriptions: Subscription[]; private connection: Mocket | WebSocket; private pongTimeout: NodeJS.Timeout | null; - - constructor(wsUrl = '', connection: Mocket) { - this.connection = connection || new WebSocket(wsUrl); + private messageQueue: string[]; + private lastId: number; + private mock: Mocket | null; + private wsUrl: string; + + constructor(wsUrl = '', mock: Mocket) { + this.wsUrl = wsUrl; + this.mock = mock; this.subscriptions = []; + this.messageQueue = []; this.pongTimeout = null; + this.lastId = Math.random(); + this.connection = mock || new WebSocket(wsUrl); - this.connection.addEventListener('message', (event: any) => { - console.log('event', event); - if (event.data === 'pong') { - if (this.pongTimeout) clearTimeout(this.pongTimeout); - } else { - const message = JSON.parse(event.data); - this.handleMessage(message as any as EventMessage); - } - }); + this.setupListeners(); /* this.pongTimeout = null; this.connection.addEventListener('open', () => { @@ -83,16 +81,21 @@ export default class Sockets { }); */ } + reconnect() { + console.log('Reconnecting'); + this.connection = this.mock || new WebSocket(this.wsUrl); + } + sendPingPong() { this.send('ping'); this.pongTimeout = setTimeout(() => { console.log('Did not get pong back'); + this.reconnect(); }, 5000); } handleMessage(msg: any) { - // console.log('Handle message', msg, this.subscriptions); this.subscriptions .filter( (subscription) => @@ -101,28 +104,46 @@ export default class Sockets { .forEach((subscription) => { subscription.handle(msg); }); - /* if (msg.subIds) { - msg.subIds.forEach((i) => { - this.subscriptions[i].handle(msg); - }); - } */ + } + + setupListeners() { + this.connection.addEventListener('open', () => { + this.sendMessages(); + }); + + this.connection.addEventListener('message', (event: any) => { + if (event.data === 'pong') { + if (this.pongTimeout) clearTimeout(this.pongTimeout); + } else { + const message = JSON.parse(event.data); + this.handleMessage(message as any as EventMessage); + } + }); + } + + sendMessages() { + while (this.messageQueue.length) { + const m = this.messageQueue.shift(); + this.connection.send(m as string); + } } send(msg: Object | string) { + const payload = typeof msg === 'string' ? msg : JSON.stringify(msg); + this.messageQueue.push(payload); if (this.connection.readyState === 1) { - this.connection.send(typeof msg === 'string' ? msg : JSON.stringify(msg)); - } else { - this.connection.addEventListener('open', () => { - this.connection.send( - typeof msg === 'string' ? msg : JSON.stringify(msg) - ); - }); - // TODO remove the event listener again? + this.sendMessages(); } } + generateId() { + const x = Math.sin(this.lastId + 1) * 10000; + this.lastId += 1; + return (x - Math.floor(x)).toString(36).substring(7); + } + public subscribeToModel(model: keyof Models, ids?: any[]) { - const subscriptionId = Math.random(); + const subscriptionId = this.generateId(); this.send({ id: subscriptionId, subscribe: model,