Skip to content

Commit

Permalink
Merge pull request #434 from openaddresses/switch-to-duckdb
Browse files Browse the repository at this point in the history
Switch to using DuckDB node bindings instead of unmaintained parquet libraries
  • Loading branch information
iandees authored Feb 24, 2025
2 parents e6ad170 + 569e826 commit fc6e1ef
Show file tree
Hide file tree
Showing 3 changed files with 690 additions and 463 deletions.
114 changes: 72 additions & 42 deletions task/collect.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { mkdirp } from 'mkdirp';
import S3 from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import archiver from 'archiver';
import parquet from '@dsnp/parquetjs';
import duckdb from 'duckdb';
import minimist from 'minimist';
import { Transform } from 'node:stream';
import wkx from 'wkx';
Expand Down Expand Up @@ -318,23 +318,48 @@ function zip_datas(tmp, datas, name) {
}

async function parquet_datas(tmp, datas, name) {
const schema = new parquet.ParquetSchema({
source_name: { type: 'UTF8' },
geometry: { type: 'BYTE_ARRAY', optional: true },
id: { type: 'UTF8', optional: true },
pid: { type: 'UTF8', optional: true },
number: { type: 'UTF8', optional: true },
street: { type: 'UTF8', optional: true },
unit: { type: 'UTF8', optional: true },
city: { type: 'UTF8', optional: true },
postcode: { type: 'UTF8', optional: true },
district: { type: 'UTF8', optional: true },
region: { type: 'UTF8', optional: true },
addrtype: { type: 'UTF8', optional: true },
notes: { type: 'UTF8', optional: true }
});
const writer = await parquet.ParquetWriter.openFile(schema, path.resolve(tmp, `${name}.parquet`));
writer.setRowGroupSize(4096);
const dbFilePath = path.resolve(tmp, `${name}.duckdb`);
const db = new duckdb.Database(dbFilePath);
const connection = db.connect();

const createTableQuery = `
CREATE TABLE data (
source_name VARCHAR,
geometry BLOB,
id VARCHAR,
pid VARCHAR,
number VARCHAR,
street VARCHAR,
unit VARCHAR,
city VARCHAR,
postcode VARCHAR,
district VARCHAR,
region VARCHAR,
addrtype VARCHAR,
notes VARCHAR
);
`;
await connection.run(createTableQuery);

const insertQuery = `
INSERT INTO data (
source_name,
geometry,
id,
pid,
number,
street,
unit,
city,
postcode,
district,
region,
addrtype,
notes
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
`;
const preparedStatement = await connection.prepare(insertQuery);


for (const data of datas) {
const resolved_data_filename = path.resolve(tmp, 'sources', data);
Expand All @@ -351,33 +376,38 @@ async function parquet_datas(tmp, datas, name) {

// GeoParquet expects the geometry as a WKB
let wkbGeometry = null;
// if (record.geometry && record.geometry.type) {
// wkbGeometry = wkx.Geometry.parseGeoJSON(record.geometry).toWkb();
// } else {
// console.error(`not ok - ${resolved_data_filename} line ${line_count} has no geometry: ${line}`);
// continue;
// }

await writer.appendRow({
source_name: data,
geometry: wkbGeometry,
id: properties.id,
pid: properties.pid,
number: properties.number,
street: properties.street,
unit: properties.unit,
city: properties.city,
postcode: properties.postcode,
district: properties.district,
region: properties.region,
addrtype: properties.addrtype,
notes: properties.notes
});
if (record.geometry && record.geometry.type) {
wkbGeometry = wkx.Geometry.parseGeoJSON(record.geometry).toWkb();
} else {
console.error(`not ok - ${resolved_data_filename} line ${line_count} has no geometry: ${line}`);
continue;
}

await preparedStatement.run([
data,
wkbGeometry,
properties.id,
properties.pid,
properties.number,
properties.street,
properties.unit,
properties.city,
properties.postcode,
properties.district,
properties.region,
properties.addrtype,
properties.notes
]);
}

console.error(`ok - ${resolved_data_filename} processed ${line_count} lines and appended to parquet file`);
}

await writer.close();
return path.resolve(tmp, `${name}.parquet`);
const parquetFilePath = path.resolve(tmp, `${name}.parquet`);
const exportQuery = `EXPORT TABLE data TO '${parquetFilePath}' (FORMAT 'parquet')`;
await connection.run(exportQuery);

connection.close();

return parquetFilePath;
}
Loading

0 comments on commit fc6e1ef

Please sign in to comment.