Skip to content

Commit

Permalink
Merge pull request #437 from openaddresses/give-up-on-parquet
Browse files Browse the repository at this point in the history
Remove DuckDB and attempts to write out Parquet
  • Loading branch information
iandees authored Feb 25, 2025
2 parents 1d2ccda + 1098236 commit c035c39
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 260 deletions.
2 changes: 1 addition & 1 deletion task/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM node:22 AS node
FROM node:22-alpine3.21 AS node
FROM 847904970422.dkr.ecr.us-east-1.amazonaws.com/batch-machine:9.9.0

COPY --from=node /usr/lib /usr/lib
Expand Down
142 changes: 0 additions & 142 deletions task/collect.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ import { mkdirp } from 'mkdirp';
import S3 from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import archiver from 'archiver';
import { DuckDBInstance } from '@duckdb/node-api';
import minimist from 'minimist';
import { Transform } from 'node:stream';
import wkx from 'wkx';

const s3 = new S3.S3Client({
region: process.env.AWS_DEFAULT_REGION
Expand Down Expand Up @@ -111,12 +109,6 @@ async function collect(tmp, collection, oa) {
await upload_zip_collection(zip, collection.name);
console.error('ok - archive uploaded');

const pq = await parquet_datas(tmp, collection_data, collection.name);

console.error(`ok - parquet created: ${pq}`);
await upload_parquet_collection(pq, collection.name);
console.error('ok - parquet uploaded');

await oa.cmd('collection', 'update', {
':collection': collection.id,
size: fs.statSync(zip).size
Expand Down Expand Up @@ -238,45 +230,6 @@ async function upload_zip_collection(file, name) {
console.error(`ok - uploaded: r2://${process.env.R2Bucket}/v2.openaddresses.io/${process.env.StackName}/collection-${name}.zip`);
}

async function upload_parquet_collection(file, name) {
const s3uploader = new Upload({
client: s3,
params: {
ContentType: 'application/vnd.apache.parquet',
Body: fs.createReadStream(file),
Bucket: process.env.Bucket,
Key: `${process.env.StackName}/collection-${name}.parquet`
}
});

await s3uploader.done();

console.error(`ok - s3://${process.env.Bucket}/${process.env.StackName}/collection-${name}.parquet`);

const r2 = new S3.S3Client({
region: 'auto',
credentials: {
accessKeyId: process.env.R2_ACCESS_KEY_ID,
secretAccessKey: process.env.R2_SECRET_ACCESS_KEY
},
endpoint: `https://${process.env.CLOUDFLARE_ACCOUNT_ID}.r2.cloudflarestorage.com`
});

const r2uploader = new Upload({
client: r2,
params: {
ContentType: 'application/vnd.apache.parquet',
Body: fs.createReadStream(file),
Bucket: process.env.R2Bucket,
Key: `v2.openaddresses.io/${process.env.StackName}/collection-${name}.parquet`
}
});

await r2uploader.done();

console.error(`ok - uploaded: r2://${process.env.R2Bucket}/v2.openaddresses.io/${process.env.StackName}/collection-${name}.parquet`);
}

function zip_datas(tmp, datas, name) {
return new Promise((resolve, reject) => {
const output = fs.createWriteStream(path.resolve(tmp, `${name}.zip`))
Expand Down Expand Up @@ -316,98 +269,3 @@ function zip_datas(tmp, datas, name) {
archive.finalize();
});
}

async function parquet_datas(tmp, datas, name) {
const dbFilePath = path.resolve(tmp, `${name}.duckdb`);
const db = await DuckDBInstance.create(dbFilePath);
const connection = db.connect();

const createTableQuery = `
CREATE TABLE data (
source_name VARCHAR,
geometry BLOB,
id VARCHAR,
pid VARCHAR,
number VARCHAR,
street VARCHAR,
unit VARCHAR,
city VARCHAR,
postcode VARCHAR,
district VARCHAR,
region VARCHAR,
addrtype VARCHAR,
notes VARCHAR
);
`;
await connection.run(createTableQuery);

const insertQuery = `
INSERT INTO data (
source_name,
geometry,
id,
pid,
number,
street,
unit,
city,
postcode,
district,
region,
addrtype,
notes
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
`;
const preparedStatement = await connection.prepare(insertQuery);


for (const data of datas) {
const resolved_data_filename = path.resolve(tmp, 'sources', data);

// Read the file and parse it as linefeed-delimited JSON
const data_stream = fs.createReadStream(resolved_data_filename);
const data_lines = data_stream.pipe(split());
let line_count = 0;

for await (const line of data_lines) {
line_count++;
const record = JSON.parse(line);
const properties = record.properties;

// GeoParquet expects the geometry as a WKB
let wkbGeometry = null;
if (record.geometry && record.geometry.type) {
wkbGeometry = wkx.Geometry.parseGeoJSON(record.geometry).toWkb();
} else {
console.error(`not ok - ${resolved_data_filename} line ${line_count} has no geometry: ${line}`);
continue;
}

await preparedStatement.run([
data,
wkbGeometry,
properties.id,
properties.pid,
properties.number,
properties.street,
properties.unit,
properties.city,
properties.postcode,
properties.district,
properties.region,
properties.addrtype,
properties.notes
]);
}

console.error(`ok - ${resolved_data_filename} processed ${line_count} lines and appended to parquet file`);
}

const parquetFilePath = path.resolve(tmp, `${name}.parquet`);
const exportQuery = `EXPORT TABLE data TO '${parquetFilePath}' (FORMAT 'parquet')`;
await connection.run(exportQuery);

connection.close();

return parquetFilePath;
}
115 changes: 1 addition & 114 deletions task/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions task/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
"@aws-sdk/client-s3": "^3.405.0",
"@aws-sdk/client-secrets-manager": "^3.363.0",
"@aws-sdk/lib-storage": "^3.405.0",
"@duckdb/node-api": "^1.2.0-alpha.15",
"@openaddresses/lib": "^4.4.0",
"@supercharge/promise-pool": "^3.0.0",
"@turf/turf": "^6.3.0",
Expand All @@ -41,8 +40,7 @@
"request": "^2.88.2",
"split2": "^4.1.0",
"tilebase": "^4.0.0",
"wellknown": "^0.5.0",
"wkx": "^0.5.0"
"wellknown": "^0.5.0"
},
"devDependencies": {
"eslint": "^9.0.0",
Expand Down

0 comments on commit c035c39

Please sign in to comment.