Skip to content

Commit 23df6e1

Browse files
authored
fix: opfs support issues (#1973)
* update async_bindings.ts update opfs tests change dropFiles args * add test for copy csv * Merge branch 'develop' into feature/from_opfs_path * add opfs_util and fix * change space indents * fix registerOPFSFileName * fix * fix * remove patch * add fix * fix(format) * fix: mkdirs * fix: dropFiles * Bump to DuckDB v1.2.1 * Bump duckdb to 7c039464e4 * del: no need patch * fix: support duckdb 1.3 * fix: support duckdb 1.3
1 parent e4ee560 commit 23df6e1

File tree

17 files changed

+431
-212
lines changed

17 files changed

+431
-212
lines changed

lib/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,4 +332,4 @@ if(NOT EMSCRIPTEN)
332332

333333
add_executable(tester ${TEST_CC})
334334
target_link_libraries(tester ${TEST_LIBS})
335-
endif()
335+
endif()

lib/base_exported_list.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ _main
22
_malloc
33
_calloc
44
_free
5+
stringToUTF8
6+
lengthBytesUTF8
7+
stackAlloc
58
_duckdb_web_clear_response
69
_duckdb_web_collect_file_stats
710
_duckdb_web_connect

lib/src/webdb_api.cc

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#include <cstring>
2+
#include <exception>
13
#include <iostream>
24
#include <stdexcept>
35

@@ -93,10 +95,25 @@ void duckdb_web_fs_drop_file(WASMResponse* packed, const char* file_name) {
9395
GET_WEBDB(*packed);
9496
WASMResponseBuffer::Get().Store(*packed, webdb.DropFile(file_name));
9597
}
96-
/// Drop a file
97-
void duckdb_web_fs_drop_files(WASMResponse* packed) {
98+
/// Drop a files
99+
void duckdb_web_fs_drop_files(WASMResponse* packed, const char** names, int name_count) {
98100
GET_WEBDB(*packed);
99-
WASMResponseBuffer::Get().Store(*packed, webdb.DropFiles());
101+
if (name_count == 0 || names == NULL) {
102+
WASMResponseBuffer::Get().Store(*packed, webdb.DropFiles());
103+
} else {
104+
for (int i = 0; i < name_count; i++) {
105+
const char* name = names[i];
106+
if (name == NULL) {
107+
std::cerr << "Error: NULL pointer detected at index " << i << std::endl;
108+
continue;
109+
}
110+
if (std::strlen(name) == 0) {
111+
std::cerr << "Error: Empty string detected at index " << i << std::endl;
112+
continue;
113+
}
114+
WASMResponseBuffer::Get().Store(*packed, webdb.DropFile(name));
115+
}
116+
}
100117
}
101118
/// Glob file infos
102119
void duckdb_web_fs_glob_file_infos(WASMResponse* packed, const char* file_name) {

packages/duckdb-wasm/src/bindings/bindings_base.ts

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -622,12 +622,52 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
622622
dropResponseBuffers(this.mod);
623623
}
624624
/** Drop files */
625-
public dropFiles(): void {
626-
const [s, d, n] = callSRet(this.mod, 'duckdb_web_fs_drop_files', [], []);
627-
if (s !== StatusCode.SUCCESS) {
628-
throw new Error(readString(this.mod, d, n));
625+
public dropFiles(names?:string[]): void {
626+
const pointers:number[] = [];
627+
let pointerOfArray:number = -1;
628+
try {
629+
for (const str of (names ?? [])) {
630+
if (str !== null && str !== undefined && str.length > 0) {
631+
const size = this.mod.lengthBytesUTF8(str) + 1;
632+
const ret = this.mod._malloc(size);
633+
if (!ret) {
634+
throw new Error(`Failed to allocate memory for string: ${str}`);
635+
}
636+
this.mod.stringToUTF8(str, ret, size);
637+
pointers.push(ret);
638+
}
639+
}
640+
pointerOfArray = this.mod._malloc(pointers.length * 4);
641+
if (!pointerOfArray) {
642+
throw new Error(`Failed to allocate memory for pointers array`);
643+
}
644+
for (let i = 0; i < pointers.length; i++) {
645+
this.mod.HEAP32[(pointerOfArray >> 2) + i] = pointers[i];
646+
}
647+
const [s, d, n] = callSRet(
648+
this.mod,
649+
'duckdb_web_fs_drop_files',
650+
[
651+
'number',
652+
'number'
653+
],
654+
[
655+
pointerOfArray,
656+
pointers.length
657+
]
658+
);
659+
if (s !== StatusCode.SUCCESS) {
660+
throw new Error(readString(this.mod, d, n));
661+
}
662+
dropResponseBuffers(this.mod);
663+
} finally {
664+
for (const pointer of pointers) {
665+
this.mod._free(pointer);
666+
}
667+
if( pointerOfArray > 0 ){
668+
this.mod._free(pointerOfArray);
669+
}
629670
}
630-
dropResponseBuffers(this.mod);
631671
}
632672
/** Flush all files */
633673
public flushFiles(): void {
@@ -654,7 +694,7 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
654694
return copy;
655695
}
656696
/** Enable tracking of file statistics */
657-
public registerOPFSFileName(file: string): Promise<void> {
697+
public async registerOPFSFileName(file: string): Promise<void> {
658698
if (file.startsWith('opfs://')) {
659699
return this.prepareFileHandle(file, DuckDBDataProtocol.BROWSER_FSACCESS);
660700
} else {

packages/duckdb-wasm/src/bindings/bindings_interface.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ export interface DuckDBBindings {
5858
prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise<void>;
5959
globFiles(path: string): WebFile[];
6060
dropFile(name: string): void;
61-
dropFiles(): void;
61+
dropFiles(names?: string[]): void;
6262
flushFiles(): void;
6363
copyFileToPath(name: string, path: string): void;
6464
copyFileToBuffer(name: string): Uint8Array;
65-
registerOPFSFileName(file: string): void;
65+
registerOPFSFileName(file: string): Promise<void>;
6666
collectFileStatistics(file: string, enable: boolean): void;
6767
exportFileStatistics(file: string): FileStatistics;
6868
}

packages/duckdb-wasm/src/bindings/config.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,15 @@ export interface DuckDBFilesystemConfig {
3333
forceFullHTTPReads?: boolean;
3434
}
3535

36+
export interface DuckDBOPFSConfig {
37+
/**
38+
* Defines how `opfs://` files are handled during SQL execution.
39+
* - "auto": Automatically register `opfs://` files and drop them after execution.
40+
* - "manual": Files must be manually registered and dropped.
41+
*/
42+
fileHandling?: "auto" | "manual";
43+
}
44+
3645
export enum DuckDBAccessMode {
3746
UNDEFINED = 0,
3847
AUTOMATIC = 1,
@@ -78,4 +87,8 @@ export interface DuckDBConfig {
7887
* Custom user agent string
7988
*/
8089
customUserAgent?: string;
90+
/**
91+
* opfs string
92+
*/
93+
opfs?: DuckDBOPFSConfig;
8194
}

packages/duckdb-wasm/src/bindings/duckdb_module.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ export interface DuckDBModule extends EmscriptenModule {
77
stackSave: typeof stackSave;
88
stackAlloc: typeof stackAlloc;
99
stackRestore: typeof stackRestore;
10+
lengthBytesUTF8: typeof lengthBytesUTF8;
11+
stringToUTF8: typeof stringToUTF8;
1012

1113
ccall: typeof ccall;
1214
PThread: PThread;

packages/duckdb-wasm/src/bindings/runtime_browser.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,14 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
129129
let fileName = opfsPath;
130130
if (PATH_SEP_REGEX.test(opfsPath)) {
131131
const folders = opfsPath.split(PATH_SEP_REGEX);
132-
fileName = folders.pop()!;
132+
if (folders.length === 0) {
133+
throw new Error(`Invalid path ${opfsPath}`);
134+
}
135+
fileName = folders[folders.length - 1];
133136
if (!fileName) {
134-
throw new Error(`Invalid path ${path}`);
137+
throw new Error(`Invalid path ${opfsPath}. File Not Found.`);
135138
}
136-
// mkdir -p
139+
folders.pop();
137140
for (const folder of folders) {
138141
dirHandle = await dirHandle.getDirectoryHandle(folder, { create: true });
139142
}

packages/duckdb-wasm/src/parallel/async_bindings.ts

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { InstantiationProgress } from '../bindings/progress';
1818
import { arrowToSQLField } from '../json_typedef';
1919
import { WebFile } from '../bindings/web_file';
2020
import { DuckDBDataProtocol } from '../bindings';
21+
import { searchOPFSFiles, isOPFSProtocol } from "../utils/opfs_util";
2122
import { ProgressEntry } from '../log';
2223

2324
const TEXT_ENCODER = new TextEncoder();
@@ -49,6 +50,8 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
4950
protected _nextMessageId = 0;
5051
/** The pending requests */
5152
protected _pendingRequests: Map<number, WorkerTaskVariant> = new Map();
53+
/** The DuckDBConfig */
54+
protected _config: DuckDBConfig = {};
5255

5356
constructor(logger: Logger, worker: Worker | null = null) {
5457
this._logger = logger;
@@ -63,6 +66,11 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
6366
return this._logger;
6467
}
6568

69+
/** Get the logger */
70+
public get config(): DuckDBConfig {
71+
return this._config;
72+
}
73+
6674
/** Attach to worker */
6775
protected attach(worker: Worker): void {
6876
this._worker = worker;
@@ -104,7 +112,7 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
104112
transfer: ArrayBuffer[] = [],
105113
): Promise<WorkerTaskReturnType<W>> {
106114
if (!this._worker) {
107-
console.error('cannot send a message since the worker is not set!');
115+
console.error('cannot send a message since the worker is not set!:' + task.type+"," + task.data);
108116
return undefined as any;
109117
}
110118
const mid = this._nextMessageId++;
@@ -327,8 +335,8 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
327335
return await this.postTask(task);
328336
}
329337
/** Try to drop files */
330-
public async dropFiles(): Promise<null> {
331-
const task = new WorkerTask<WorkerRequestType.DROP_FILES, null, null>(WorkerRequestType.DROP_FILES, null);
338+
public async dropFiles(names?: string[]): Promise<null> {
339+
const task = new WorkerTask<WorkerRequestType.DROP_FILES, string[] | undefined, null>(WorkerRequestType.DROP_FILES, names);
332340
return await this.postTask(task);
333341
}
334342
/** Flush all files */
@@ -370,6 +378,7 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
370378

371379
/** Open a new database */
372380
public async open(config: DuckDBConfig): Promise<void> {
381+
this._config = config;
373382
const task = new WorkerTask<WorkerRequestType.OPEN, DuckDBConfig, null>(WorkerRequestType.OPEN, config);
374383
await this.postTask(task);
375384
}
@@ -404,6 +413,21 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
404413

405414
/** Run a query */
406415
public async runQuery(conn: ConnectionID, text: string): Promise<Uint8Array> {
416+
if( this.shouldOPFSFileHandling() ){
417+
const files = await this.registerOPFSFileFromSQL(text);
418+
try {
419+
return await this._runQueryAsync(conn, text);
420+
} finally {
421+
if( files.length > 0 ){
422+
await this.dropFiles(files);
423+
}
424+
}
425+
} else {
426+
return await this._runQueryAsync(conn, text);
427+
}
428+
}
429+
430+
private async _runQueryAsync(conn: ConnectionID, text: string): Promise<Uint8Array> {
407431
const task = new WorkerTask<WorkerRequestType.RUN_QUERY, [ConnectionID, string], Uint8Array>(
408432
WorkerRequestType.RUN_QUERY,
409433
[conn, text],
@@ -416,6 +440,25 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
416440
conn: ConnectionID,
417441
text: string,
418442
allowStreamResult: boolean = false,
443+
): Promise<Uint8Array | null> {
444+
if( this.shouldOPFSFileHandling() ){
445+
const files = await this.registerOPFSFileFromSQL(text);
446+
try {
447+
return await this._startPendingQueryAsync(conn, text, allowStreamResult);
448+
} finally {
449+
if( files.length > 0 ){
450+
await this.dropFiles(files);
451+
}
452+
}
453+
} else {
454+
return await this._startPendingQueryAsync(conn, text, allowStreamResult);
455+
}
456+
}
457+
458+
private async _startPendingQueryAsync(
459+
conn: ConnectionID,
460+
text: string,
461+
allowStreamResult: boolean = false,
419462
): Promise<Uint8Array | null> {
420463
const task = new WorkerTask<
421464
WorkerRequestType.START_PENDING_QUERY,
@@ -424,6 +467,7 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
424467
>(WorkerRequestType.START_PENDING_QUERY, [conn, text, allowStreamResult]);
425468
return await this.postTask(task);
426469
}
470+
427471
/** Poll a pending query */
428472
public async pollPendingQuery(conn: ConnectionID): Promise<Uint8Array | null> {
429473
const task = new WorkerTask<WorkerRequestType.POLL_PENDING_QUERY, ConnectionID, Uint8Array | null>(
@@ -657,4 +701,26 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
657701
);
658702
await this.postTask(task);
659703
}
704+
705+
private shouldOPFSFileHandling():boolean {
706+
if( isOPFSProtocol(this.config.path ?? "")){
707+
return this.config.opfs?.fileHandling == "auto";
708+
}
709+
return false;
710+
}
711+
712+
private async registerOPFSFileFromSQL(text: string) {
713+
const files = searchOPFSFiles(text);
714+
const result: string[] = [];
715+
for (const file of files) {
716+
try {
717+
await this.registerOPFSFileName(file);
718+
result.push(file);
719+
} catch (e) {
720+
console.error(e);
721+
throw new Error("File Not found:" + file);
722+
}
723+
}
724+
return result;
725+
}
660726
}

packages/duckdb-wasm/src/parallel/async_bindings_interface.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,7 @@ export interface AsyncDuckDBBindings {
3232
insertArrowFromIPCStream(conn: number, buffer: Uint8Array, options?: CSVInsertOptions): Promise<void>;
3333
insertCSVFromPath(conn: number, path: string, options: CSVInsertOptions): Promise<void>;
3434
insertJSONFromPath(conn: number, path: string, options: JSONInsertOptions): Promise<void>;
35+
36+
dropFile(name: string):Promise<null>;
37+
dropFiles(names?: string[]):Promise<null>;
3538
}

0 commit comments

Comments
 (0)