diff --git a/.eslintrc.cjs b/.eslintrc.cjs index a56a62c6..55d11222 100644 --- a/.eslintrc.cjs +++ b/.eslintrc.cjs @@ -57,7 +57,7 @@ module.exports = { 'line-comment-position': 'off', 'linebreak-style': ['error', 'unix'], 'lines-around-directive': 'error', - 'max-depth': 'error', + 'max-depth': 'off', 'max-len': 'off', 'max-lines': 'off', 'max-nested-callbacks': 'error', diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 9c639ca2..a1449fa3 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -10,12 +10,13 @@ env: WEAVIATE_127: 1.27.27 WEAVIATE_128: 1.28.16 WEAVIATE_129: 1.29.11 - WEAVIATE_130: 1.30.23 - WEAVIATE_131: 1.31.21 - WEAVIATE_132: 1.32.26 - WEAVIATE_133: 1.33.11 - WEAVIATE_134: 1.34.7 - WEAVIATE_135: 1.35.2 + WEAVIATE_130: 1.30.22 + WEAVIATE_131: 1.31.22 + WEAVIATE_132: 1.32.27 + WEAVIATE_133: 1.33.12 + WEAVIATE_134: 1.34.10 + WEAVIATE_135: 1.35.3 + WEAVIATE_136: 1.36.0-dev-3038dec concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} @@ -43,17 +44,18 @@ jobs: fail-fast: false matrix: versions: [ - { node: "24.x", weaviate: $WEAVIATE_127}, - { node: "24.x", weaviate: $WEAVIATE_128}, - { node: "24.x", weaviate: $WEAVIATE_129}, - { node: "24.x", weaviate: $WEAVIATE_130}, - { node: "24.x", weaviate: $WEAVIATE_131}, - { node: "24.x", weaviate: $WEAVIATE_132}, - { node: "24.x", weaviate: $WEAVIATE_133}, - { node: "24.x", weaviate: $WEAVIATE_134}, - { node: "20.x", weaviate: $WEAVIATE_135}, - { node: "22.x", weaviate: $WEAVIATE_135}, - { node: "24.x", weaviate: $WEAVIATE_135}, + { node: "24.x", weaviate: $WEAVIATE_127 }, + { node: "24.x", weaviate: $WEAVIATE_128 }, + { node: "24.x", weaviate: $WEAVIATE_129 }, + { node: "24.x", weaviate: $WEAVIATE_130 }, + { node: "24.x", weaviate: $WEAVIATE_131 }, + { node: "24.x", weaviate: $WEAVIATE_132 }, + { node: "24.x", weaviate: $WEAVIATE_133 }, + { node: "24.x", weaviate: $WEAVIATE_134 }, + { node: "20.x", weaviate: $WEAVIATE_135 }, + { node: "22.x", weaviate: $WEAVIATE_135 }, + { node: "24.x", weaviate: $WEAVIATE_135 }, + { node: "24.x", weaviate: $WEAVIATE_136 }, ] steps: - uses: actions/checkout@v3 @@ -129,7 +131,7 @@ jobs: registry-url: 'https://registry.npmjs.org' - run: npm ci - run: npm run build - - run: npm publish + - run: npm publish --tag alpha env: NODE_AUTH_TOKEN: ${{ secrets.NPM_AUTOMATION_TOKEN }} - run: npm run docs diff --git a/package-lock.json b/package-lock.json index b97979c3..d67b3814 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "weaviate-client", - "version": "3.11.0", + "version": "3.12.0-alpha.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "weaviate-client", - "version": "3.11.0", + "version": "3.12.0-alpha.0", "license": "BSD-3-Clause", "dependencies": { "@datastructures-js/deque": "^1.0.8", diff --git a/package.json b/package.json index a7d538e2..f54c7a05 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "weaviate-client", - "version": "3.11.0", + "version": "3.12.0-alpha.0", "description": "JS/TS client for Weaviate", "main": "dist/node/cjs/index.js", "type": "module", @@ -20,7 +20,7 @@ "node": ">=20.0.0" }, "scripts": { - "test": "vitest run --no-file-parallelism --isolate=true --reporter=default --reporter=hanging-process --silent=false", + "test": "vitest run --no-file-parallelism --isolate=true --reporter=verbose --reporter=hanging-process --silent=false", "test:coverage": "npm run test -- --coverage", "build": "npm run build:node", "build:web": "tsup", diff --git a/src/collections/data/batch.ts b/src/collections/data/batch.ts new file mode 100644 index 00000000..0de4c7e1 --- /dev/null +++ b/src/collections/data/batch.ts @@ -0,0 +1,453 @@ +import { Deque } from '@datastructures-js/deque'; +import { v4 as uuidv4 } from 'uuid'; +import Connection from '../../connection/grpc.js'; +import { ConsistencyLevel } from '../../index.js'; +import { + BatchObject as BatchObjectGRPC, + BatchReference as BatchReferenceGRPC, + BatchStreamRequest, +} from '../../proto/v1/batch.js'; +import { DbVersionSupport } from '../../utils/dbVersion.js'; +import { BatchObject, BatchReference, ErrorObject, ErrorReference } from '../index.js'; +import { Serialize } from '../serialize/index.js'; + +const GCP_STREAM_TIMEOUT = 160 * 1000; // 160 seconds + +type Internal = { + entry: E; + index: number; +}; + +type InternalError = { + index: number; + message: string; + entry: E; + type: 'error'; +}; + +type InternalSuccess = { + index: number; + uuid?: string; + beacon?: string; + type: 'success'; +}; + +export const isInternalError = (obj: InternalError | InternalSuccess): obj is InternalError => { + return obj.type === 'error'; +}; + +const isBatchObject = (obj: BatchObject | BatchReference | null): obj is BatchObject => { + return (obj as BatchObject).collection !== undefined; +}; + +const isBatchReference = (obj: BatchObject | BatchReference | null): obj is BatchReference => { + return (obj as BatchReference).fromObjectCollection !== undefined; +}; + +export interface Batching { + addObject: (obj: BatchObject) => Promise; + addReference: (ref: BatchReference) => Promise; + stop: () => Promise; + hasErrors: () => boolean; + uuids: () => Record; + beacons: () => Record; + objErrors: () => Record>; + refErrors: () => Record; +} + +export interface Batch { + stream: (consistencyLevel?: ConsistencyLevel) => Promise; +} + +export default function (connection: Connection, dbVersionSupport: DbVersionSupport): Batch { + return { + stream: async (consistencyLevel) => { + const { supports, message } = await dbVersionSupport.supportsServerSideBatching(); + if (!supports) { + throw new Error(message); + } + const batcher = new Batcher({ consistencyLevel, isGcpOnWcd: connection.isGcpOnWcd() }); + let batchingErr: Error | null = null; + const batching = batcher.start(connection).catch((err) => { + batchingErr = err; + }); + const check = (err: Error | null) => { + if (err) { + throw err; + } + return batcher; + }; + return { + addObject: (obj) => check(batchingErr).addObject(obj), + addReference: (ref) => check(batchingErr).addReference(ref), + stop: () => { + check(batchingErr).stop(); + return batching; + }, + hasErrors: () => + Object.keys(batcher.objErrors).length > 0 || Object.keys(batcher.refErrors).length > 0, + uuids: () => batcher.uuids, + beacons: () => batcher.beacons, + objErrors: () => batcher.objErrors, + refErrors: () => batcher.refErrors, + }; + }, + }; +} + +type BatcherArgs = { + consistencyLevel?: ConsistencyLevel; + isGcpOnWcd: boolean; +}; + +class Batcher { + private consistencyLevel?: ConsistencyLevel; + private queue: Queue | BatchReference>; + + private inflightObjs: Set = new Set(); + private inflightRefs: Set = new Set(); + private batchSize: number = 1000; + private objsCache: Record>> = {}; + private refsCache: Record> = {}; + private pendingObjs: BatchObjectGRPC[] = []; + private pendingRefs: BatchReferenceGRPC[] = []; + private isStarted: boolean = false; + private isShutdown: boolean = false; + private isShuttingDown: boolean = false; + private isOom: boolean = false; + private isStopped: boolean = false; + private isGcpOnWcd: boolean = false; + private isRenewingStream: boolean = false; + + public objErrors: Record> = {}; + public refErrors: Record = {}; + public uuids: Record = {}; + public beacons: Record = {}; + + constructor(args: BatcherArgs) { + this.consistencyLevel = args.consistencyLevel; + this.queue = new Queue(); + this.isGcpOnWcd = args.isGcpOnWcd; + } + + private healthy() { + return !this.isShuttingDown && !this.isOom && !this.isShutdown; + } + + public addObject = async (obj: BatchObject) => { + while (this.inflightObjs.size >= this.batchSize || !this.healthy()) { + await Batcher.sleep(10); // eslint-disable-line no-await-in-loop + } + if (obj.id === undefined) { + obj.id = uuidv4(); + } + this.queue.push(obj); + return obj.id!; + }; + + public addReference = async (ref: BatchReference) => { + while (this.inflightRefs.size >= this.batchSize || !this.healthy()) { + await Batcher.sleep(10); // eslint-disable-line no-await-in-loop + } + this.queue.push(ref); + }; + + private static sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); + } + + // eslint-disable-next-line complexity + private async *generateStreamRequests(grpcMaxMessageSize: number): AsyncGenerator { + while (!this.isStarted) { + console.info('Waiting for server to start the batch ingestion...'); + await Batcher.sleep(100); // eslint-disable-line no-await-in-loop + } + const streamStart = Date.now(); + + const perObjectOverhead = 4; // extra overhead bytes per object in the request + + let req = BatchStreamRequest.create({ + data: { objects: { values: this.pendingObjs }, references: { values: this.pendingRefs } }, + }); + let totalSize = BatchStreamRequest.encode(req).finish().length; + + let objIndex = 0; + let refIndex = 0; + + while (true) { + if (this.isShuttingDown) { + console.warn('Server shutting down, closing the client-side of the stream'); + this.pendingObjs = req.data?.objects?.values || []; + this.pendingRefs = req.data?.references?.values || []; + return; + } + if (this.isOom) { + console.warn('Server out-of-memory, closing the client-side of the stream'); + this.pendingObjs = req.data?.objects?.values || []; + this.pendingRefs = req.data?.references?.values || []; + return; + } + if (this.isGcpOnWcd && Date.now() - streamStart > GCP_STREAM_TIMEOUT) { + console.info( + 'GCP connections have a maximum lifetime. Re-establishing the batch stream to avoid timeout errors.' + ); + this.isRenewingStream = true; + yield BatchStreamRequest.create({ stop: {} }); + return; + } + + const entry = await this.queue.pull(100); // eslint-disable-line no-await-in-loop + if (entry === null && !this.isStopped) { + // user may be holding the batcher open and not added to it in the last second + continue; + } + if (entry === null && this.isStopped) { + // user has signaled to stop batching and there's nothing left in the queue, so close the stream + console.info('Batching stopped by user, closing the client-side of the stream'); + if ( + (req.data?.objects?.values.length !== undefined && req.data.objects.values.length > 0) || + (req.data?.references?.values.length !== undefined && req.data.references.values.length > 0) + ) { + yield req; + } + + yield BatchStreamRequest.create({ stop: {} }); + return; + } + + if (isBatchObject(entry)) { + const { grpc } = Serialize.batchObject(entry.collection, entry, false, entry.tenant); + this.objsCache[grpc.uuid] = { entry, index: objIndex }; + + const objSize = BatchObjectGRPC.encode(grpc).finish().length + perObjectOverhead; + if (totalSize + objSize >= grpcMaxMessageSize || req.data!.objects!.values.length >= this.batchSize) { + while (this.inflightObjs.size > this.batchSize) { + await Batcher.sleep(10); // eslint-disable-line no-await-in-loop + } + this.inflightObjs = new Set(req.data?.objects?.values.map((o) => o.uuid!)); + + yield req; + req = BatchStreamRequest.create({ data: { objects: { values: [] }, references: { values: [] } } }); + totalSize = BatchStreamRequest.encode(req).finish().length; + } + + req.data!.objects!.values.push(grpc); + totalSize += objSize; + objIndex++; // eslint-disable-line no-plusplus + } + + if (isBatchReference(entry)) { + const { grpc, beacon } = Serialize.batchReference(entry); + this.refsCache[beacon] = { entry, index: refIndex }; + + const refSize = BatchReferenceGRPC.encode(grpc).finish().length + perObjectOverhead; + if ( + totalSize + refSize >= grpcMaxMessageSize || + req.data!.references!.values.length >= this.batchSize + ) { + this.inflightRefs = new Set( + req.data?.references?.values.map( + (r) => `weaviate://localhost/${r.fromCollection}/${r.fromUuid}/${r.name}` + ) + ); + yield req; + req = BatchStreamRequest.create({ data: { objects: { values: [] }, references: { values: [] } } }); + totalSize = BatchStreamRequest.encode(req).finish().length; + } + + req.data!.references!.values.push(grpc); + totalSize += refSize; + refIndex++; // eslint-disable-line no-plusplus + } + } + } + + async start(connection: Connection): Promise { + console.info('Starting batch ingestion'); + for await (const result of this.do(connection)) { + if (isInternalError(result)) { + const { index, ...error } = result; + if (isBatchObject(error.entry)) + this.objErrors[index] = { message: error.message, object: error.entry }; + if (isBatchReference(error.entry)) + this.refErrors[index] = { message: error.message, reference: error.entry }; + } else if (result.type === 'success') { + if (result.uuid !== undefined) this.uuids[result.index] = result.uuid; + if (result.beacon !== undefined) this.beacons[result.index] = result.beacon; + } + } + if (this.isShutdown) { + console.warn('Reconnecting after server shutdown...'); + await this.reconnect(connection); + console.warn('Reconnected, resuming batch ingestion...'); + return this.restart(connection); + } else if (this.isRenewingStream) { + console.info('Restarting batch recv after renewing stream...'); + this.isRenewingStream = false; + return this.restart(connection); + } + } + + private async reconnect(connection: Connection, retries: number = 0): Promise { + try { + await connection.reconnect(); + } catch (error) { + if (retries >= 5) { + throw new Error('Failed to reconnect after server shutdown'); + } + console.warn(`Reconnect attempt ${retries + 1} failed, retrying...`); + await Batcher.sleep(2 ** retries * 1000); + return this.reconnect(connection, retries + 1); + } + } + + private restart(connection: Connection): Promise { + this.isShutdown = false; + this.isStarted = false; + return this.start(connection); + } + + public stop() { + this.isStopped = true; + } + + async *do( + connection: Connection + ): AsyncGenerator> | InternalError | InternalSuccess> { + const gen = await connection + .batch('', this.consistencyLevel) + .then((batch) => batch.withStream(this.generateStreamRequests(connection.grpcMaxMessageLength))); + for await (const msg of gen) { + if (msg.acks !== undefined) { + this.inflightObjs = this.inflightObjs.difference(new Set(msg.acks.uuids)); + this.inflightRefs = this.inflightRefs.difference(new Set(msg.acks.beacons)); + } + if (msg.backoff !== undefined) { + if (this.batchSize !== msg.backoff.batchSize && this.healthy() && !this.isStopped) { + this.batchSize = msg.backoff.batchSize; + } + } + if (msg.outOfMemory !== undefined) { + this.isOom = true; + msg.outOfMemory.uuids.forEach((uuid) => this.queue.push(this.objsCache[uuid].entry)); + msg.outOfMemory.beacons.forEach((beacon) => this.queue.push(this.refsCache[beacon].entry)); + this.inflightObjs = this.inflightObjs.difference(new Set(msg.outOfMemory.uuids)); + this.inflightRefs = this.inflightRefs.difference(new Set(msg.outOfMemory.beacons)); + } + if (msg.shuttingDown !== undefined) { + console.warn('Received shutting down signal from server'); + this.isShuttingDown = true; + this.isOom = false; + } + if (msg.shutdown !== undefined) { + console.warn('Received shutdown signal from server'); + this.isShutdown = true; + this.isShuttingDown = false; + } + if (msg.started !== undefined) { + this.isStarted = true; + } + if (msg.results !== undefined) { + for (const error of msg.results.errors) { + if (error.uuid !== undefined) { + const cached = this.objsCache[error.uuid]; + if (cached === undefined) { + continue; + } + yield { + index: cached.index, + message: error.error, + entry: cached.entry, + type: 'error', + }; + delete this.objsCache[error.uuid]; + } + if (error.beacon !== undefined) { + const cached = this.refsCache[error.beacon]; + if (cached === undefined) { + continue; + } + yield { + index: cached.index, + message: error.error, + entry: cached.entry, + type: 'error', + }; + delete this.refsCache[error.beacon]; + } + } + for (const success of msg.results.successes) { + if (success.uuid !== undefined) { + const cached = this.objsCache[success.uuid]; + if (cached === undefined) { + continue; + } + yield { + index: cached.index, + uuid: success.uuid, + type: 'success', + }; + delete this.objsCache[success.uuid]; + } + if (success.beacon !== undefined) { + const cached = this.refsCache[success.beacon]; + if (cached === undefined) { + continue; + } + yield { + index: cached.index, + beacon: success.beacon, + type: 'success', + }; + delete this.refsCache[success.beacon]; + } + } + } + } + console.info('Server closed its side of the stream'); + } +} + +type Resolver = (value: T) => void; + +export class Queue { + private resolvers: Deque>; + private promises: Deque>; + constructor() { + // invariant: at least one of the arrays is empty + this.resolvers = new Deque>(); + this.promises = new Deque>(); + } + _add() { + this.promises.pushBack(new Promise((resolve) => this.resolvers.pushBack(resolve))); + } + _readd(promise: Promise) { + this.promises.pushFront(promise); + } + push(t: T) { + if (!this.resolvers.size()) this._add(); + this.resolvers.popFront()!(t); + } + pull(timeout?: number): Promise { + if (!this.promises.size()) this._add(); + const promise = this.promises.popFront()!; + if (timeout === undefined) return promise; + + let timeoutRes: Resolver; + const mainPromise = promise.then((value) => { + clearTimeout(timer); + return value; + }); + const timeoutPromise = new Promise((resolve) => { + timeoutRes = resolve; + }); + const timer = setTimeout(() => { + this._readd(promise); + timeoutRes(null); + }, timeout); + return Promise.race([mainPromise, timeoutPromise]); + } + get length() { + return this.promises.size() - this.resolvers.size(); + } +} diff --git a/src/collections/data/index.ts b/src/collections/data/index.ts index 37074688..8e6bc04a 100644 --- a/src/collections/data/index.ts +++ b/src/collections/data/index.ts @@ -14,6 +14,7 @@ import { BatchReferencesReturn, DataObject, DeleteManyReturn, + ErrorObject, ErrorReference, NonReferenceInputs, Properties, @@ -21,6 +22,7 @@ import { ReferenceInputs, Vectors, } from '../types/index.js'; +import batch from './batch.js'; /** The available options to the `data.deleteMany` method. */ export type DeleteManyOptions = { @@ -83,6 +85,7 @@ export interface Data { opts?: DeleteManyOptions ) => Promise>; exists: (id: string) => Promise; + ingest: (objs: Iterable | NonReferenceInputs>) => Promise>; /** * Insert a single object into the collection. * @@ -239,6 +242,44 @@ const data = ( consistencyLevel, tenant ).do(), + ingest: async (objs) => { + const allResponses: (string | ErrorObject)[] = []; + + const batching = await batch(connection, dbVersionSupport).stream(consistencyLevel); + const start = Date.now(); + + for (const obj of objs) { + // eslint-disable-next-line no-await-in-loop + await batching.addObject({ + collection: name, + ...obj, + tenant, + }); + } + await batching.stop(); + + const errors = batching.objErrors(); + const uuids = batching.uuids(); + + for (let i = 0; i < Object.keys(uuids).length + Object.keys(errors).length; i++) { + if (uuids[i]) { + allResponses.push(uuids[i]); + } else if (errors[i]) { + allResponses.push(errors[i]); + } + } + + const end = Date.now(); + const elapsedSeconds = (end - start) / 1000; + + return { + allResponses, + elapsedSeconds, + errors, + uuids, + hasErrors: Object.keys(errors).length > 0, + }; + }, insert: (obj?: InsertObject | NonReferenceInputs): Promise => Promise.all([ objectsPath.buildCreate(consistencyLevel), diff --git a/src/collections/data/unit.test.ts b/src/collections/data/unit.test.ts new file mode 100644 index 00000000..99b96d4b --- /dev/null +++ b/src/collections/data/unit.test.ts @@ -0,0 +1,88 @@ +/* eslint-disable no-await-in-loop */ +import { describe, expect, it } from 'vitest'; +import { Queue } from './batch'; + +describe('Unit testing of the Queue class', () => { + it('should push and pull items in FIFO order', async () => { + const queue = new Queue(); + queue.push(1); + queue.push(2); + queue.push(3); + + expect(await queue.pull()).toBe(1); + expect(await queue.pull()).toBe(2); + expect(await queue.pull()).toBe(3); + }); + + it('should return null when pulling from an empty queue with timeout', async () => { + const queue = new Queue(); + const result = await queue.pull(100); // 100ms timeout + expect(result).toBeNull(); + }); + + it('should handle concurrent pushes and pulls correctly', async () => { + const queue = new Queue(); + const results: number[] = []; + + // Push items in the background + setTimeout(() => { + for (let i = 1; i <= 5; i++) { + queue.push(i); + } + }, 50); + + // Pull items concurrently + for (let i = 0; i < 5; i++) { + const item = await queue.pull(); + if (item !== null) { + results.push(item); + } + } + + expect(results).toEqual([1, 2, 3, 4, 5]); + }); + + it('should handle when pull happens before push', async () => { + const queue = new Queue(); + const pullPromise = queue.pull(500); // 500ms timeout + + setTimeout(() => { + queue.push(42); + }, 100); // Push after 100ms + + const result = await pullPromise; + expect(result).toBe(42); + }); + + it('should return the correct length', async () => { + const queue = new Queue(); + expect(queue.length).toBe(0); + queue.push(1); + queue.push(2); + expect(queue.length).toBe(2); + await queue.pull(); + expect(queue.length).toBe(1); + }); + + it('should handle timeouts correctly', async () => { + const queue = new Queue(); + const start = Date.now(); + const result = await queue.pull(200); // 200ms timeout + expect(result).toBeNull(); + expect(Date.now() - start).toBeGreaterThanOrEqual(199); // Allow some leeway for timing + }); + + it('should pull a pushed item while a pull is waiting', async () => { + const queue = new Queue(); + const pullPromise = queue.pull(100); // 100ms timeout + queue.push(99); + expect(await pullPromise).toBe(99); + }); + + it('should pull a pushed item after a pull has already timed-out', async () => { + const queue = new Queue(); + expect(await queue.pull(100)).toBeNull(); + queue.push(99); + expect(await queue.pull(0)).toBe(99); + }); +}); diff --git a/src/collections/serialize/index.ts b/src/collections/serialize/index.ts index aa63bed9..58c6fb4b 100644 --- a/src/collections/serialize/index.ts +++ b/src/collections/serialize/index.ts @@ -26,6 +26,7 @@ import { BatchObject_MultiTargetRefProps, BatchObject_Properties, BatchObject_SingleTargetRefProps, + BatchReference as BatchReferenceGRPC, } from '../../proto/v1/batch.js'; import { GenerativeProvider, @@ -109,6 +110,7 @@ import { AggregateBaseOptions, AggregateHybridOptions, AggregateNearOptions, + BatchReference, GenerativeConfigRuntime, GroupByAggregate, GroupedTask, @@ -1889,6 +1891,65 @@ export class Serialize { }; }; + public static batchObject = ( + collection: string, + object: DataObject | NonReferenceInputs, + requiresInsertFix: boolean, + tenant?: string + ) => { + const obj = DataGuards.isDataObject(object) + ? object + : { id: undefined, properties: object, references: undefined, vectors: undefined }; + + let vectorBytes: Uint8Array | undefined; + let vectors: VectorsGrpc[] | undefined; + if (obj.vectors !== undefined && !Array.isArray(obj.vectors)) { + vectors = Object.entries(obj.vectors).flatMap(([k, v]) => + NearVectorInputGuards.is1D(v) + ? [ + VectorsGrpc.fromPartial({ + vectorBytes: Serialize.vectorToBytes(v), + name: k, + }), + ] + : v.map((vv) => + VectorsGrpc.fromPartial({ + vectorBytes: Serialize.vectorToBytes(vv), + name: k, + }) + ) + ); + } else if (Array.isArray(obj.vectors) && requiresInsertFix) { + vectors = [ + VectorsGrpc.fromPartial({ + vectorBytes: Serialize.vectorToBytes(obj.vectors), + name: 'default', + }), + ]; + vectorBytes = Serialize.vectorToBytes(obj.vectors); + // required in case collection was made with <1.24.0 and has since been migrated to >=1.24.0 + } else if (obj.vectors !== undefined) { + vectorBytes = Serialize.vectorToBytes(obj.vectors); + } + const uuid = obj.id ? obj.id : uuidv4(); + return { + grpc: BatchObjectGRPC.fromPartial({ + collection: collection, + properties: Serialize.batchProperties(obj.properties, obj.references), + tenant: tenant, + uuid: uuid, + vectorBytes, + vectors, + }), + object: { + ...obj, + id: uuid, + collection: collection, + tenant: tenant, + }, + }; + }; + public static batchObjects = ( collection: string, objects: (DataObject | NonReferenceInputs)[], @@ -1909,58 +1970,14 @@ export class Serialize { return; } - const object = objects[index]; - const obj = DataGuards.isDataObject(object) - ? object - : { id: undefined, properties: object, references: undefined, vectors: undefined }; - - let vectorBytes: Uint8Array | undefined; - let vectors: VectorsGrpc[] | undefined; - if (obj.vectors !== undefined && !Array.isArray(obj.vectors)) { - vectors = Object.entries(obj.vectors).flatMap(([k, v]) => - NearVectorInputGuards.is1D(v) - ? [ - VectorsGrpc.fromPartial({ - vectorBytes: Serialize.vectorToBytes(v), - name: k, - }), - ] - : v.map((vv) => - VectorsGrpc.fromPartial({ - vectorBytes: Serialize.vectorToBytes(vv), - name: k, - }) - ) - ); - } else if (Array.isArray(obj.vectors) && requiresInsertFix) { - vectors = [ - VectorsGrpc.fromPartial({ - vectorBytes: Serialize.vectorToBytes(obj.vectors), - name: 'default', - }), - ]; - vectorBytes = Serialize.vectorToBytes(obj.vectors); - // required in case collection was made with <1.24.0 and has since been migrated to >=1.24.0 - } else if (obj.vectors !== undefined) { - vectorBytes = Serialize.vectorToBytes(obj.vectors); - } - - objs.push( - BatchObjectGRPC.fromPartial({ - collection: collection, - properties: Serialize.batchProperties(obj.properties, obj.references), - tenant: tenant, - uuid: obj.id ? obj.id : uuidv4(), - vectorBytes, - vectors, - }) + const { grpc, object } = Serialize.batchObject( + collection, + objects[index], + requiresInsertFix, + tenant ); - - batch.push({ - ...obj, - collection: collection, - tenant: tenant, - }); + objs.push(grpc); + batch.push(object); }; const waitFor = () => { @@ -1981,6 +1998,21 @@ export class Serialize { }); }; + public static batchReference = (ref: BatchReference) => { + const beacon = `weaviate://localhost/${ref.fromObjectCollection}/${ref.fromObjectUuid}/${ref.fromPropertyName}`; + return { + grpc: BatchReferenceGRPC.fromPartial({ + fromCollection: ref.fromObjectCollection, + fromUuid: ref.fromObjectUuid, + toCollection: ref.toObjectCollection, + toUuid: ref.toObjectUuid, + name: ref.fromPropertyName, + tenant: ref.tenant, + }), + beacon, + }; + }; + public static tenants(tenants: T[], mapper: (tenant: T) => M): M[][] { const mapped = []; const batches = Math.ceil(tenants.length / 100); diff --git a/src/collections/types/batch.ts b/src/collections/types/batch.ts index 0048cb41..319fda42 100644 --- a/src/collections/types/batch.ts +++ b/src/collections/types/batch.ts @@ -1,4 +1,4 @@ -import { BatchReference } from '../../openapi/types.js'; +import { BatchReference as BatchReferenceREST } from '../../openapi/types.js'; import { BatchObject as BatchObjectGRPC } from '../../proto/v1/batch.js'; import { NonReferenceInputs, ReferenceInputs, Vectors } from '../index.js'; @@ -31,9 +31,18 @@ export type BatchObjects = { mapped: BatchObjectGRPC[]; }; +export type BatchReference = { + fromObjectCollection: string; + fromObjectUuid: string; + fromPropertyName: string; + toObjectCollection?: string; + toObjectUuid: string; + tenant?: string; +}; + export type ErrorReference = { message: string; - reference: BatchReference; + reference: BatchReferenceREST; }; export type BatchReferencesReturn = { diff --git a/src/connection/grpc.ts b/src/connection/grpc.ts index d989e314..973e6a36 100644 --- a/src/connection/grpc.ts +++ b/src/connection/grpc.ts @@ -41,10 +41,14 @@ const MAX_GRPC_MESSAGE_LENGTH = 104858000; // 10mb, needs to be synchronized wit // which are tightly coupled to ConnectionGQL export default class ConnectionGRPC extends ConnectionGQL { private grpc: GrpcClient; + public grpcMaxMessageLength: number; + private params: GrpcConnectionParams & { grpcMaxMessageLength: number }; private constructor(params: GrpcConnectionParams & { grpcMaxMessageLength: number }) { super(params); this.grpc = grpcClient(params); + this.grpcMaxMessageLength = params.grpcMaxMessageLength; + this.params = params; } static use = (params: GrpcConnectionParams) => { @@ -80,6 +84,16 @@ export default class ConnectionGRPC extends ConnectionGQL { }); }; + public async reconnect() { + // Only need to reconnect grpc by making a new channel as rest/gql are stateless + this.grpc.close(); + this.grpc = grpcClient(this.params); + const isHealthy = await this.grpc.health(); + if (!isHealthy) { + throw new WeaviateGRPCUnavailableError(this.params.grpcAddress); + } + } + private static async connect( params: GrpcConnectionParams, grpcMaxLengthPromise: Promise diff --git a/src/connection/http.ts b/src/connection/http.ts index 8a435b37..1b565155 100644 --- a/src/connection/http.ts +++ b/src/connection/http.ts @@ -89,6 +89,10 @@ export default class ConnectionREST implements IConnection { this.authEnabled = this.parseAuthParams(params); } + public isGcpOnWcd = (): boolean => + ['weaviate.io', 'semi.technology', 'weaviate.cloud'].some((d) => this.host.toLowerCase().includes(d)) && + this.host.toLowerCase().includes('gcp'); + private parseAuthParams(params: InternalConnectionParams): boolean { if (params.authClientSecret && params.apiKey) { throw new WeaviateInvalidInputError( diff --git a/src/errors.ts b/src/errors.ts index 023bf7e6..a05fa72f 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -174,3 +174,9 @@ export class WeaviateUnauthenticatedError extends WeaviateError { super(`Unauthenticated: ${message}`); } } + +export class WeaviateBatchStreamError extends WeaviateError { + constructor(message: string) { + super(`Batch stream failed with message: ${message}`); + } +} diff --git a/src/grpc/batcher.ts b/src/grpc/batcher.ts index aba131fa..911baaf9 100644 --- a/src/grpc/batcher.ts +++ b/src/grpc/batcher.ts @@ -1,13 +1,20 @@ -import { Metadata, ServerError, Status } from 'nice-grpc'; +import { ClientError, Metadata, ServerError, Status } from 'nice-grpc'; import { ConsistencyLevel } from '../data/index.js'; -import { BatchObject, BatchObjectsReply, BatchObjectsRequest } from '../proto/v1/batch.js'; +import { + BatchObject, + BatchObjectsReply, + BatchObjectsRequest, + BatchStreamReply, + BatchStreamRequest, +} from '../proto/v1/batch.js'; import { WeaviateClient } from '../proto/v1/weaviate.js'; import { RetryOptions } from 'nice-grpc-client-middleware-retry'; import { WeaviateBatchError, + WeaviateBatchStreamError, WeaviateDeleteManyError, WeaviateInsufficientPermissionsError, WeaviateRequestTimeoutError, @@ -22,6 +29,7 @@ import { retryOptions } from './retry.js'; export interface Batch { withDelete: (args: BatchDeleteArgs) => Promise; withObjects: (args: BatchObjectsArgs) => Promise; + withStream: (reqs: AsyncGenerator) => AsyncGenerator; } export interface BatchObjectsArgs { @@ -46,9 +54,30 @@ export default class Batcher extends Base implements Batch { return new Batcher(connection, collection, metadata, timeout, consistencyLevel, tenant); } + public withStream = (reqs: AsyncGenerator) => this.callStream(reqs); public withDelete = (args: BatchDeleteArgs) => this.callDelete(BatchDeleteRequest.fromPartial(args)); public withObjects = (args: BatchObjectsArgs) => this.callObjects(BatchObjectsRequest.fromPartial(args)); + private async *callStream(reqs: AsyncGenerator) { + const consistencyLevel = this.consistencyLevel; + async function* generate() { + yield BatchStreamRequest.create({ start: { consistencyLevel } }); + for await (const req of reqs) { + yield req; + } + } + try { + for await (const res of this.connection.batchStream(generate(), { metadata: this.metadata })) { + yield res; + } + } catch (err) { + if (err instanceof ClientError) { + throw new WeaviateBatchStreamError(err.message); + } + throw err; + } + } + private callDelete(message: BatchDeleteRequest) { return this.sendWithTimeout((signal: AbortSignal) => this.connection.batchDelete( diff --git a/src/index.ts b/src/index.ts index 94b930e9..0ef43b3a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -39,6 +39,7 @@ import { LiveChecker, OpenidConfigurationGetter, ReadyChecker } from './misc/ind import weaviateV2 from './v2/index.js'; import alias, { Aliases } from './alias/index.js'; +import batch, { Batch } from './collections/data/batch.js'; import filter from './collections/filters/index.js'; import { ConsistencyLevel } from './data/replication.js'; import groups, { Groups } from './groups/index.js'; @@ -106,6 +107,7 @@ export type ClientParams = { export interface WeaviateClient { alias: Aliases; backup: Backup; + batch: Batch; cluster: Cluster; collections: Collections; oidcAuth?: OidcAuthenticator; @@ -230,6 +232,7 @@ async function client(params: ClientParams): Promise { const ifc: WeaviateClient = { alias: alias(connection), backup: backup(connection), + batch: batch(connection, dbVersionSupport), cluster: cluster(connection), collections: collections(connection, dbVersionSupport), groups: groups(connection), diff --git a/src/utils/dbVersion.ts b/src/utils/dbVersion.ts index dff7653d..75f86d0b 100644 --- a/src/utils/dbVersion.ts +++ b/src/utils/dbVersion.ts @@ -141,6 +141,13 @@ export class DbVersionSupport { supports: version.isAtLeast(1, 30, 0), message: this.errorMessage('Generative config runtime', version.show(), '1.30.0'), })); + + supportsServerSideBatching = () => + this.dbVersionProvider.getVersion().then((version) => ({ + version, + supports: version.isAtLeast(1, 36, 0), + message: this.errorMessage('Server-side batching', version.show(), '1.36.0'), + })); } const EMPTY_VERSION = ''; diff --git a/src/version.ts b/src/version.ts index 171d5a20..7fcb567b 100644 --- a/src/version.ts +++ b/src/version.ts @@ -1 +1 @@ -export const WEAVIATE_CLIENT_VERSION = '3.11.0'; +export const WEAVIATE_CLIENT_VERSION = '3.12.0-alpha.0'; diff --git a/test/collections/data/integration.test.ts b/test/collections/data/integration.test.ts index a7f13beb..9ce88f02 100644 --- a/test/collections/data/integration.test.ts +++ b/test/collections/data/integration.test.ts @@ -1,7 +1,9 @@ /* eslint-disable @typescript-eslint/no-non-null-assertion */ /* eslint-disable @typescript-eslint/no-non-null-asserted-optional-chain */ +/* eslint-disable no-await-in-loop */ +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest'; + import { v4 } from 'uuid'; -import { afterAll, beforeAll, describe, expect, it } from 'vitest'; import { WeaviateUnsupportedFeatureError } from '../../../src/errors.js'; import weaviate, { Collection, @@ -15,6 +17,7 @@ import weaviate, { WeaviateObject, weaviateV2, } from '../../../src/index.js'; +import { requireAtLeast } from '../../version.js'; type TestCollectionData = { testProp: string; @@ -1090,3 +1093,80 @@ describe('Testing of BYOV insertion with legacy vectorizer', () => { expect(object?.vectors.default).toEqual([7, 8, 9]); }); }); + +requireAtLeast(1, 36, 0).describe('Testing of the collection.data.{import, ingest} methods', () => { + let client: WeaviateClient; + let collection: Collection; + const collectionName = 'TestCollectionDataIngest'; + + beforeAll(async () => { + client = await weaviate.connectToLocal(); + }); + + beforeEach(async () => { + collection = await client.collections.create({ + name: collectionName, + properties: [ + { + name: 'text', + dataType: 'text', + }, + ], + references: [ + { + name: 'self', + targetCollection: collectionName, + }, + ], + vectorizers: weaviate.configure.vectors.selfProvided(), + }); + }); + + afterEach(() => client.collections.delete(collectionName)); + + it('should be able to ingest 2000 objects with vectors from the collection object', async () => { + const objects: DataObject[] = []; + for (let i = 0; i < 2000; i++) { + objects.push({ + properties: { + text: `object ${i}`, + }, + vectors: Array.from({ length: 128 }, () => Math.random()), + }); + } + const insert = await collection.data.ingest(objects); + expect(insert.hasErrors).toBeFalsy(); + expect(insert.allResponses.length).toEqual(2000); + expect(Object.values(insert.errors).length).toEqual(0); + expect(Object.values(insert.uuids).length).toEqual(2000); + expect(await collection.length()).toEqual(2000); + }); + + it('should be able to ingest 2000 self-referencing objects with vectors from the client object', async () => { + const batching = await client.batch.stream(); + + for (let i = 0; i < 2000; i++) { + const obj = { + collection: collectionName, + properties: { + text: `object ${i}`, + }, + vectors: Array.from({ length: 128 }, () => Math.random()), + }; + const id = await batching.addObject(obj); + await batching.addReference({ + fromObjectCollection: collectionName, + fromObjectUuid: id, + fromPropertyName: 'self', + toObjectUuid: id, + }); + } + + await batching.stop(); + + expect(batching.hasErrors()).toBeFalsy(); + expect(Object.values(batching.objErrors()).length).toEqual(0); + expect(Object.values(batching.uuids()).length).toEqual(2000); + expect(await collection.length()).toEqual(2000); + }); +}); diff --git a/test/collections/data/mock.test.ts b/test/collections/data/mock.test.ts new file mode 100644 index 00000000..11a11aa7 --- /dev/null +++ b/test/collections/data/mock.test.ts @@ -0,0 +1,48 @@ +import { ServerError, Status } from 'nice-grpc-common'; +import { afterAll, beforeAll, describe, expect, it } from 'vitest'; +import { WeaviateBatchStreamError } from '../../../src/errors.js'; +import weaviate from '../../../src/index.js'; +import { listen, makeGrpcApp, makeRestApp } from '../../mocks'; + +describe('Mock testing of batch streaming when the server errors', () => { + let closeFn: () => Promise; + + const serverErr = new ServerError(Status.INTERNAL, 'Simulated server error'); + const clientErr = new WeaviateBatchStreamError( + '/weaviate.v1.Weaviate/BatchStream INTERNAL: Simulated server error' + ); + + beforeAll(async () => { + const restApp = makeRestApp('1.36.0'); + const grpcApp = makeGrpcApp({ + async *batchStream(request, context) { + yield { started: {} }; + throw serverErr; + }, + }); + const { close } = await listen(restApp, grpcApp, 8976, 'localhost:8977'); + closeFn = close; + }); + + afterAll(() => closeFn()); + + it('should handle server errors in batch streaming', async () => { + const client = await weaviate.connectToLocal({ port: 8976, grpcPort: 8977 }); + const batching = await client.batch.stream(); + + // give time for the server to throw the expected error + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // errors are thrown at the creation of the promise and not as part of the promise rejection + expect(() => batching.addObject({ collection: 'Test Object' })).toThrowError(clientErr); + + // verify that error is still thrown if the promise is awaited instead + // eslint-disable-next-line no-return-await + await expect(async () => await batching.addObject({ collection: 'Test Object' })).rejects.toThrow( + clientErr + ); + + // ensure that error is thrown when stopping the batch too in case users aren't calling .addObject + expect(() => batching.stop()).toThrowError(clientErr); + }); +}); diff --git a/test/connection/mock.test.ts b/test/connection/mock.test.ts index a6fe8306..74c4378f 100644 --- a/test/connection/mock.test.ts +++ b/test/connection/mock.test.ts @@ -351,7 +351,7 @@ describe('client version header', () => { next(); }); app.get('/v1/test', (req, res) => res.json({ message: 'ok' })); - const port = 40202; + const port = 8960; const server = app.listen(port); beforeAll(() => server); afterAll(() => server.close()); diff --git a/test/data/journey.test.ts b/test/data/journey.test.ts index c0544154..8bb6bced 100644 --- a/test/data/journey.test.ts +++ b/test/data/journey.test.ts @@ -1,6 +1,5 @@ /* eslint-disable @typescript-eslint/no-non-null-assertion */ import { describe, expect, it } from 'vitest'; -import weaviate, { WeaviateClient } from '../../src/v2/index.js'; import { Properties, Tenant, @@ -8,7 +7,8 @@ import { WeaviateError, WeaviateObject, WeaviateObjectsList, -} from '../openapi/types.js'; +} from '../../src/openapi/types.js'; +import weaviate, { WeaviateClient } from '../../src/v2/index.js'; const thingClassName = 'DataJourneyTestThing'; const refSourceClassName = 'DataJourneyTestRefSource'; diff --git a/test/mocks.ts b/test/mocks.ts new file mode 100644 index 00000000..5dff3268 --- /dev/null +++ b/test/mocks.ts @@ -0,0 +1,49 @@ +import express, { Express } from 'express'; +import { createServer, Server as GrpcServer } from 'nice-grpc'; +import { vi } from 'vitest'; +import { + HealthCheckResponse, + HealthCheckResponse_ServingStatus, + HealthDefinition, + HealthServiceImplementation, +} from '../src/proto/google/health/v1/health'; +import { DeepPartial } from '../src/proto/v1/batch'; +import { WeaviateDefinition, WeaviateServiceImplementation } from '../src/proto/v1/weaviate'; + +export const makeRestApp = (version: string, endpoints?: (app: Express) => void) => { + const app = express(); + app.get('/v1/meta', (req, res) => res.send({ version })); + if (endpoints) { + endpoints(app); + } + return app; +}; + +export const makeGrpcApp = (methods?: DeepPartial) => { + const weaviateMockImpl: WeaviateServiceImplementation = { + aggregate: methods?.aggregate || vi.fn(), + tenantsGet: methods?.tenantsGet || vi.fn(), + search: methods?.search || vi.fn(), + batchDelete: methods?.batchDelete || vi.fn(), + batchObjects: methods?.batchObjects || vi.fn(), + batchReferences: methods?.batchReferences || vi.fn(), + batchStream: methods?.batchStream || vi.fn(), + }; + const healthMockImpl: HealthServiceImplementation = { + check: (request) => + Promise.resolve(HealthCheckResponse.create({ status: HealthCheckResponse_ServingStatus.SERVING })), + watch: vi.fn(), + }; + + const grpcApp = createServer(); + grpcApp.add(WeaviateDefinition, weaviateMockImpl); + grpcApp.add(HealthDefinition, healthMockImpl); + + return grpcApp; +}; + +export const listen = async (rest: Express, grpc: GrpcServer, httpPort: number, grpcAddress: string) => { + const server = await rest.listen(httpPort); + await grpc.listen(grpcAddress); + return { rest: server, grpc, express, close: () => Promise.all([server.close(), grpc.shutdown()]) }; +}; diff --git a/tools/prepare_release.sh b/tools/prepare_release.sh index 6d14e66c..ff80b5f7 100755 --- a/tools/prepare_release.sh +++ b/tools/prepare_release.sh @@ -29,4 +29,5 @@ fi npm run generate:version $VERSION git add . git commit -m "chore: prepare release $VERSION" +git push npm version "${VERSION/v}"