Skip to content

Commit

Permalink
fix: tus-metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Jan 18, 2024
1 parent 830ea71 commit beb2836
Show file tree
Hide file tree
Showing 11 changed files with 1,327 additions and 1,282 deletions.
2,375 changes: 1,217 additions & 1,158 deletions package-lock.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
"@fastify/swagger": "^8.3.1",
"@fastify/swagger-ui": "^1.7.0",
"@isaacs/ttlcache": "^1.4.1",
"@tus/file-store": "1.1.0",
"@tus/s3-store": "1.2.0",
"@tus/server": "1.2.0",
"@tus/file-store": "1.2.0",
"@tus/s3-store": "1.3.0",
"@tus/server": "1.3.0",
"agentkeepalive": "^4.2.1",
"async-retry": "^1.3.3",
"axios": "^1.6.3",
Expand Down
33 changes: 0 additions & 33 deletions src/http/routes/tus/handlers.ts

This file was deleted.

32 changes: 20 additions & 12 deletions src/http/routes/tus/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import { FastifyBaseLogger, FastifyInstance } from 'fastify'

Check failure on line 1 in src/http/routes/tus/index.ts

View workflow job for this annotation

GitHub Actions / Test / OS ubuntu-20.04 / Node 20

Type '(rawReq: IncomingMessage, metadata: Record<string, string | null>) => string' is not assignable to type '(req: IncomingMessage, metadata?: Record<string, string | null> | undefined) => string'.
import { Server } from '@tus/server'
import * as http from 'http'
import { Server, ServerOptions, DataStore } from '@tus/server'
import { jwt, storage, db, dbSuperUser } from '../../plugins'
import { getConfig } from '../../../config'
import * as http from 'http'
import { getFileSizeLimit } from '../../../storage/limits'
import { Storage } from '../../../storage'
import {
FileStore,
LockNotifier,
PgLocker,
S3Store,
UploadId,
AlsMemoryKV,
} from '../../../storage/tus'
import {
namingFunction,
onCreate,
Expand All @@ -13,15 +22,8 @@ import {
generateUrl,
getFileIdFromRequest,
} from './lifecycle'
import { ServerOptions, DataStore } from '@tus/server'
import { getFileSizeLimit } from '../../../storage/limits'
import { UploadId } from './upload-id'
import { FileStore } from './file-store'
import { TenantConnection } from '../../../database/connection'
import { PgLocker, LockNotifier } from './postgres-locker'
import { PubSub } from '../../../database/pubsub'
import { S3Store } from './s3-store'
import { DeleteHandler } from './handlers'

const {
storageS3Bucket,
Expand Down Expand Up @@ -49,6 +51,7 @@ function createTusStore() {
return new S3Store({
partSize: 6 * 1024 * 1024, // Each uploaded part will have ~6MB,
expirationPeriodInMilliseconds: tusUrlExpiryMs,
cache: new AlsMemoryKV(),
s3ClientConfig: {
bucket: storageS3Bucket,
region: storageS3Region,
Expand All @@ -70,6 +73,7 @@ function createTusServer(lockNotifier: LockNotifier) {
} = {
path: tusPath,
datastore: datastore,
disableTerminationForFinishedUploads: true,
locker: (rawReq: http.IncomingMessage) => {
const req = rawReq as MultiPartRequest
return new PgLocker(req.upload.storage.db, lockNotifier)
Expand Down Expand Up @@ -106,9 +110,7 @@ function createTusServer(lockNotifier: LockNotifier) {
return fileSizeLimit
},
}
const server = new Server(serverOptions)
server.handlers.DELETE = new DeleteHandler(datastore, serverOptions)
return server
return new Server(serverOptions)
}

export default async function routes(fastify: FastifyInstance) {
Expand All @@ -126,6 +128,12 @@ export default async function routes(fastify: FastifyInstance) {
fastify.register(db)
fastify.register(storage)

fastify.addHook('onRequest', (req, res, done) => {
AlsMemoryKV.localStorage.run(new Map(), () => {
done()
})
})

fastify.addHook('preHandler', async (req) => {
;(req.raw as MultiPartRequest).log = req.log
;(req.raw as MultiPartRequest).upload = {
Expand Down
62 changes: 41 additions & 21 deletions src/http/routes/tus/lifecycle.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import http from 'http'
import { isRenderableError, Storage } from '../../../storage'
import { Metadata, Upload } from '@tus/server'
import { getConfig } from '../../../config'
import { BaseLogger } from 'pino'
import { Upload } from '@tus/server'
import { randomUUID } from 'crypto'
import { UploadId } from './upload-id'
import { isRenderableError, Storage, StorageBackendError } from '../../../storage'
import { getConfig } from '../../../config'
import { Uploader } from '../../../storage/uploader'
import { TenantConnection } from '../../../database/connection'
import { BaseLogger } from 'pino'
import { UploadId } from '../../../storage/tus'

const { storageS3Bucket, tusPath } = getConfig()
const reExtractFileID = /([^/]+)\/?$/
Expand All @@ -22,6 +22,9 @@ export type MultiPartRequest = http.IncomingMessage & {
}
}

/**
* Runs on every TUS incoming request
*/
export async function onIncomingRequest(
rawReq: http.IncomingMessage,
res: http.ServerResponse,
Expand Down Expand Up @@ -51,22 +54,24 @@ export async function onIncomingRequest(
})
}

/**
* Generate URL for TUS upload, it encodes the uploadID to base64url
*/
export function generateUrl(
_: http.IncomingMessage,
{
proto,
host,
baseUrl,
path,
id,
}: { proto: string; host: string; baseUrl: string; path: string; id: string }
{ proto, host, path, id }: { proto: string; host: string; path: string; id: string }
) {
proto = process.env.NODE_ENV === 'production' ? 'https' : proto

// remove the tenant-id from the url, since we'll be using the tenant-id from the request
id = id.split('/').slice(1).join('/')
id = Buffer.from(id, 'utf-8').toString('base64url')
return `${proto}://${host}${path}/${id}`
}

/**
* Extract the uploadId from the request and decodes it from base64url
*/
export function getFileIdFromRequest(rawRwq: http.IncomingMessage) {
const req = rawRwq as MultiPartRequest
const match = reExtractFileID.exec(req.url as string)
Expand All @@ -79,35 +84,43 @@ export function getFileIdFromRequest(rawRwq: http.IncomingMessage) {
return req.upload.tenantId + '/' + idMatch
}

export function namingFunction(rawReq: http.IncomingMessage) {
/**
* Generate the uploadId for the TUS upload
* the URL structure is as follows:
*
* /tenant-id/bucket-name/object-name/version
*/
export function namingFunction(
rawReq: http.IncomingMessage,
metadata: Record<string, string | null>
) {
const req = rawReq as MultiPartRequest

if (!req.url) {
throw new Error('no url set')
}

const metadataHeader = req.headers['upload-metadata']

if (typeof metadataHeader !== 'string') {
throw new Error('no metadata')
if (!metadata) {
throw new StorageBackendError('metadata_header_invalid', 400, 'metadata header invalid')
}

try {
const params = Metadata.parse(metadataHeader)

const version = randomUUID()

return new UploadId({
tenant: req.upload.tenantId,
bucket: params.bucketName || '',
objectName: params.objectName || '',
bucket: metadata.bucketName || '',
objectName: metadata.objectName || '',
version,
}).toString()
} catch (e) {
throw e
}
}

/**
* Runs before the upload URL is created
*/
export async function onCreate(
rawReq: http.IncomingMessage,
res: http.ServerResponse,
Expand Down Expand Up @@ -137,6 +150,9 @@ export async function onCreate(
return res
}

/**
* Runs when the upload to the underline store is completed
*/
export async function onUploadFinish(
rawReq: http.IncomingMessage,
res: http.ServerResponse,
Expand Down Expand Up @@ -178,11 +194,15 @@ export async function onUploadFinish(

type TusError = { status_code: number; body: string }

/**
* Runs when there is an error on the TUS upload
*/
export function onResponseError(
req: http.IncomingMessage,
res: http.ServerResponse,
e: TusError | Error
) {
console.log(e)
if (e instanceof Error) {
;(res as any).executionError = e
}
Expand Down
19 changes: 19 additions & 0 deletions src/storage/tus/als-memory-kv.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { AsyncLocalStorage } from 'async_hooks'
import { KvStore } from '@tus/server'
import { MetadataValue } from '@tus/s3-store'

export class AlsMemoryKV implements KvStore<MetadataValue> {
static localStorage = new AsyncLocalStorage<Map<string, MetadataValue>>()

async delete(value: string): Promise<void> {
AlsMemoryKV.localStorage.getStore()?.delete(value)
}

async get(value: string): Promise<MetadataValue | undefined> {
return AlsMemoryKV.localStorage.getStore()?.get(value)
}

async set(key: string, value: MetadataValue): Promise<void> {
AlsMemoryKV.localStorage.getStore()?.set(key, value)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { FileStore as TusFileStore } from '@tus/file-store'
import { Upload } from '@tus/server'
import fsExtra from 'fs-extra'
import path from 'path'
import { FileBackend } from '../../../storage/backend'
import { Configstore } from '@tus/file-store'
import { FileBackend } from '../backend'

type FileStoreOptions = {
directory: string
Expand Down
5 changes: 5 additions & 0 deletions src/storage/tus/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export * from './file-store'
export * from './s3-store'
export * from './postgres-locker'
export * from './upload-id'
export * from './als-memory-kv'
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Lock, Locker, RequestRelease } from '@tus/server'
import { Database, DBError } from '../../../storage/database'
import { PubSubAdapter } from '../../../pubsub'
import { UploadId } from './upload-id'
import { clearTimeout } from 'timers'
import EventEmitter from 'events'
import { Database, DBError } from '../database'
import { PubSubAdapter } from '../../pubsub'
import { UploadId } from './upload-id'

const REQUEST_LOCK_RELEASE_MESSAGE = 'REQUEST_LOCK_RELEASE'

Expand Down
Loading

0 comments on commit beb2836

Please sign in to comment.