Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: use per request cache on tus metadata #422

Merged
merged 1 commit into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,375 changes: 1,217 additions & 1,158 deletions package-lock.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
"@fastify/swagger": "^8.3.1",
"@fastify/swagger-ui": "^1.7.0",
"@isaacs/ttlcache": "^1.4.1",
"@tus/file-store": "1.1.0",
"@tus/s3-store": "1.2.0",
"@tus/server": "1.2.0",
"@tus/file-store": "1.2.0",
"@tus/s3-store": "1.3.0",
"@tus/server": "1.3.0",
"agentkeepalive": "^4.2.1",
"async-retry": "^1.3.3",
"axios": "^1.6.3",
Expand Down
33 changes: 0 additions & 33 deletions src/http/routes/tus/handlers.ts

This file was deleted.

32 changes: 20 additions & 12 deletions src/http/routes/tus/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import { FastifyBaseLogger, FastifyInstance } from 'fastify'
import { Server } from '@tus/server'
import * as http from 'http'
import { Server, ServerOptions, DataStore } from '@tus/server'
import { jwt, storage, db, dbSuperUser } from '../../plugins'
import { getConfig } from '../../../config'
import * as http from 'http'
import { getFileSizeLimit } from '../../../storage/limits'
import { Storage } from '../../../storage'
import {
FileStore,
LockNotifier,
PgLocker,
S3Store,
UploadId,
AlsMemoryKV,
} from '../../../storage/tus'
import {
namingFunction,
onCreate,
Expand All @@ -13,15 +22,8 @@ import {
generateUrl,
getFileIdFromRequest,
} from './lifecycle'
import { ServerOptions, DataStore } from '@tus/server'
import { getFileSizeLimit } from '../../../storage/limits'
import { UploadId } from './upload-id'
import { FileStore } from './file-store'
import { TenantConnection } from '../../../database/connection'
import { PgLocker, LockNotifier } from './postgres-locker'
import { PubSub } from '../../../database/pubsub'
import { S3Store } from './s3-store'
import { DeleteHandler } from './handlers'

const {
storageS3Bucket,
Expand Down Expand Up @@ -49,6 +51,7 @@ function createTusStore() {
return new S3Store({
partSize: 6 * 1024 * 1024, // Each uploaded part will have ~6MB,
expirationPeriodInMilliseconds: tusUrlExpiryMs,
cache: new AlsMemoryKV(),
s3ClientConfig: {
bucket: storageS3Bucket,
region: storageS3Region,
Expand All @@ -70,6 +73,7 @@ function createTusServer(lockNotifier: LockNotifier) {
} = {
path: tusPath,
datastore: datastore,
disableTerminationForFinishedUploads: true,
locker: (rawReq: http.IncomingMessage) => {
const req = rawReq as MultiPartRequest
return new PgLocker(req.upload.storage.db, lockNotifier)
Expand Down Expand Up @@ -106,9 +110,7 @@ function createTusServer(lockNotifier: LockNotifier) {
return fileSizeLimit
},
}
const server = new Server(serverOptions)
server.handlers.DELETE = new DeleteHandler(datastore, serverOptions)
return server
return new Server(serverOptions)
}

export default async function routes(fastify: FastifyInstance) {
Expand All @@ -126,6 +128,12 @@ export default async function routes(fastify: FastifyInstance) {
fastify.register(db)
fastify.register(storage)

fastify.addHook('onRequest', (req, res, done) => {
AlsMemoryKV.localStorage.run(new Map(), () => {
done()
})
})

fastify.addHook('preHandler', async (req) => {
;(req.raw as MultiPartRequest).log = req.log
;(req.raw as MultiPartRequest).upload = {
Expand Down
61 changes: 40 additions & 21 deletions src/http/routes/tus/lifecycle.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import http from 'http'
import { isRenderableError, Storage } from '../../../storage'
import { Metadata, Upload } from '@tus/server'
import { getConfig } from '../../../config'
import { BaseLogger } from 'pino'
import { Upload } from '@tus/server'
import { randomUUID } from 'crypto'
import { UploadId } from './upload-id'
import { isRenderableError, Storage, StorageBackendError } from '../../../storage'
import { getConfig } from '../../../config'
import { Uploader } from '../../../storage/uploader'
import { TenantConnection } from '../../../database/connection'
import { BaseLogger } from 'pino'
import { UploadId } from '../../../storage/tus'

const { storageS3Bucket, tusPath } = getConfig()
const reExtractFileID = /([^/]+)\/?$/
Expand All @@ -22,6 +22,9 @@ export type MultiPartRequest = http.IncomingMessage & {
}
}

/**
* Runs on every TUS incoming request
*/
export async function onIncomingRequest(
rawReq: http.IncomingMessage,
res: http.ServerResponse,
Expand Down Expand Up @@ -51,22 +54,24 @@ export async function onIncomingRequest(
})
}

/**
* Generate URL for TUS upload, it encodes the uploadID to base64url
*/
export function generateUrl(
_: http.IncomingMessage,
{
proto,
host,
baseUrl,
path,
id,
}: { proto: string; host: string; baseUrl: string; path: string; id: string }
{ proto, host, path, id }: { proto: string; host: string; path: string; id: string }
) {
proto = process.env.NODE_ENV === 'production' ? 'https' : proto

// remove the tenant-id from the url, since we'll be using the tenant-id from the request
id = id.split('/').slice(1).join('/')
id = Buffer.from(id, 'utf-8').toString('base64url')
return `${proto}://${host}${path}/${id}`
}

/**
* Extract the uploadId from the request and decodes it from base64url
*/
export function getFileIdFromRequest(rawRwq: http.IncomingMessage) {
const req = rawRwq as MultiPartRequest
const match = reExtractFileID.exec(req.url as string)
Expand All @@ -79,35 +84,43 @@ export function getFileIdFromRequest(rawRwq: http.IncomingMessage) {
return req.upload.tenantId + '/' + idMatch
}

export function namingFunction(rawReq: http.IncomingMessage) {
/**
* Generate the uploadId for the TUS upload
* the URL structure is as follows:
*
* /tenant-id/bucket-name/object-name/version
*/
export function namingFunction(
rawReq: http.IncomingMessage,
metadata?: Record<string, string | null>
) {
const req = rawReq as MultiPartRequest

if (!req.url) {
throw new Error('no url set')
}

const metadataHeader = req.headers['upload-metadata']

if (typeof metadataHeader !== 'string') {
throw new Error('no metadata')
if (!metadata) {
throw new StorageBackendError('metadata_header_invalid', 400, 'metadata header invalid')
}

try {
const params = Metadata.parse(metadataHeader)

const version = randomUUID()

return new UploadId({
tenant: req.upload.tenantId,
bucket: params.bucketName || '',
objectName: params.objectName || '',
bucket: metadata.bucketName || '',
objectName: metadata.objectName || '',
version,
}).toString()
} catch (e) {
throw e
}
}

/**
* Runs before the upload URL is created
*/
export async function onCreate(
rawReq: http.IncomingMessage,
res: http.ServerResponse,
Expand Down Expand Up @@ -137,6 +150,9 @@ export async function onCreate(
return res
}

/**
* Runs when the upload to the underline store is completed
*/
export async function onUploadFinish(
rawReq: http.IncomingMessage,
res: http.ServerResponse,
Expand Down Expand Up @@ -178,6 +194,9 @@ export async function onUploadFinish(

type TusError = { status_code: number; body: string }

/**
* Runs when there is an error on the TUS upload
*/
export function onResponseError(
req: http.IncomingMessage,
res: http.ServerResponse,
Expand Down
19 changes: 19 additions & 0 deletions src/storage/tus/als-memory-kv.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { AsyncLocalStorage } from 'async_hooks'
import { KvStore } from '@tus/server'
import { MetadataValue } from '@tus/s3-store'

export class AlsMemoryKV implements KvStore<MetadataValue> {
static localStorage = new AsyncLocalStorage<Map<string, MetadataValue>>()

async delete(value: string): Promise<void> {
AlsMemoryKV.localStorage.getStore()?.delete(value)
}

async get(value: string): Promise<MetadataValue | undefined> {
return AlsMemoryKV.localStorage.getStore()?.get(value)
}

async set(key: string, value: MetadataValue): Promise<void> {
AlsMemoryKV.localStorage.getStore()?.set(key, value)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { FileStore as TusFileStore } from '@tus/file-store'
import { Upload } from '@tus/server'
import fsExtra from 'fs-extra'
import path from 'path'
import { FileBackend } from '../../../storage/backend'
import { Configstore } from '@tus/file-store'
import { FileBackend } from '../backend'

type FileStoreOptions = {
directory: string
Expand Down
5 changes: 5 additions & 0 deletions src/storage/tus/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export * from './file-store'
export * from './s3-store'
export * from './postgres-locker'
export * from './upload-id'
export * from './als-memory-kv'
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Lock, Locker, RequestRelease } from '@tus/server'
import { Database, DBError } from '../../../storage/database'
import { PubSubAdapter } from '../../../pubsub'
import { UploadId } from './upload-id'
import { clearTimeout } from 'timers'
import EventEmitter from 'events'
import { Database, DBError } from '../database'
import { PubSubAdapter } from '../../pubsub'
import { UploadId } from './upload-id'

const REQUEST_LOCK_RELEASE_MESSAGE = 'REQUEST_LOCK_RELEASE'

Expand Down
59 changes: 13 additions & 46 deletions src/http/routes/tus/s3-store.ts → src/storage/tus/s3-store.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,10 @@
import { S3Store as BaseS3Store } from '@tus/s3-store'
import { TUS_RESUMABLE, Upload } from '@tus/server'
import AWS, { S3 } from '@aws-sdk/client-s3'

type MetadataValue = {
file: Upload
'upload-id': string
'tus-version': string
}
import { MetadataValue, S3Store as BaseS3Store } from '@tus/s3-store'
import { Upload } from '@tus/server'
import { S3 } from '@aws-sdk/client-s3'

// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore - overwriting private getMetadata function for backwards compatibility
export class S3Store extends BaseS3Store {
public async create(upload: Upload) {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
const bucket = this.bucket as string
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
const client = this.client as S3

const request: AWS.CreateMultipartUploadCommandInput = {
Bucket: bucket,
Key: upload.id,
Metadata: { 'tus-version': TUS_RESUMABLE },
}

if (upload.metadata?.contentType) {
request.ContentType = upload.metadata.contentType
}

if (upload.metadata?.cacheControl) {
request.CacheControl = upload.metadata.cacheControl
}

upload.creation_date = new Date().toISOString()

const res = await client.createMultipartUpload(request)
await (this as any).saveMetadata(upload, res.UploadId as string)

return upload
}

/**
* Get the metadata for a file.
* It keeps backwards compatibility from version 0.9 to 1.0.0
Expand Down Expand Up @@ -69,8 +33,9 @@ export class S3Store extends BaseS3Store {

if (Metadata?.file) {
// OLD Implementation
// TODO: remove this after all tenants are migrated to the new tus server version
const file = JSON.parse(Metadata.file as string)
cache.set(id, {
const metadata: MetadataValue = {
'tus-version': Metadata?.['tus_version'] as string,
'upload-id': Metadata?.['upload_id'] as string,
file: new Upload({
Expand All @@ -80,14 +45,14 @@ export class S3Store extends BaseS3Store {
metadata: file.metadata,
creation_date: Metadata?.['creation_date'] || undefined,
}),
})
}
cache.set(id, metadata)

return cache.get(id) as MetadataValue
return metadata
}

const file = JSON.parse((await Body?.transformToString()) as string)

cache.set(id, {
const metadata: MetadataValue = {
'tus-version': Metadata?.['tus-version'] as string,
'upload-id': Metadata?.['upload-id'] as string,
file: new Upload({
Expand All @@ -97,7 +62,9 @@ export class S3Store extends BaseS3Store {
metadata: file.metadata,
creation_date: file.creation_date,
}),
})
return cache.get(id) as MetadataValue
}

cache.set(id, metadata)
return metadata
}
}
Loading
Loading