Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add media download service (part 1) #104

Merged
merged 14 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23,625 changes: 7,631 additions & 15,994 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
"output-handler::start": "AWS_REGION=eu-west-1 STAGE=DEV npm run start --workspace output-handler",
"worker-capacity-manager::build": "npm run build --workspace worker-capacity-manager",
"worker-capacity-manager::start": "AWS_REGION=eu-west-1 STAGE=DEV npm run start --workspace worker-capacity-manager",
"media-download::build": "npm run build --workspace media-download",
"media-download::start": "AWS_REGION=eu-west-1 STAGE=DEV npm run start --workspace media-download",
"worker::build": "npm run build --workspace worker; npm run build --workspace worker",
"worker::package": "npm run package --workspace worker",
"worker::start": "AWS_REGION=eu-west-1 STAGE=DEV npm run start --workspace worker",
Expand Down
65 changes: 41 additions & 24 deletions packages/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ import {
logger,
getObjectText,
getS3Client,
sendMessage,
} from '@guardian/transcription-service-backend-common';
import {
ClientConfig,
TranscriptExportRequest,
inputBucketObjectMetadata,
transcribeFileRequestBody,
transcribeUrlRequestBody,
MediaDownloadJob,
} from '@guardian/transcription-service-common';
import type { SignedUrlResponseBody } from '@guardian/transcription-service-common';
import {
Expand Down Expand Up @@ -71,6 +74,43 @@ const getApp = async () => {
}),
]);

apiRouter.post('/transcribe-url', [
checkAuth,
asyncHandler(async (req, res) => {
const userEmail = req.user?.email;
const body = transcribeUrlRequestBody.safeParse(req.body);
const id = uuid4();
if (!body.success || !userEmail) {
res.status(422).send('missing request params');
return;
}
const downloadJob: MediaDownloadJob = {
id,
url: body.data.url,
languageCode: body.data.languageCode,
translationRequested: body.data.translationRequested,
userEmail,
};

const sendResult = await sendMessage(
sqsClient,
config.app.mediaDownloadQueueUrl,
JSON.stringify(downloadJob),
id,
);
if (isSqsFailure(sendResult)) {
res.status(500).send(sendResult.errorMsg);
return;
}
logger.info('API successfully sent the message to SQS', {
id,
url: body.data.url,
userEmail,
});
res.send('Message sent');
}),
]);

apiRouter.post('/transcribe-file', [
checkAuth,
asyncHandler(async (req, res) => {
Expand Down Expand Up @@ -125,35 +165,12 @@ const getApp = async () => {
body.data.fileName,
signedUrl,
body.data.languageCode,
false,
body.data.translationRequested,
);
if (isSqsFailure(sendResult)) {
res.status(500).send(sendResult.errorMsg);
return;
}
if (body.data.translationRequested) {
const translationSendResult =
await generateOutputSignedUrlAndSendMessage(
s3Key,
sqsClient,
config.app.taskQueueUrl,
config.app.transcriptionOutputBucket,
config.aws.region,
userEmail,
body.data.fileName,
signedUrl,
body.data.languageCode,
true,
);
if (isSqsFailure(translationSendResult)) {
res
.status(500)
.send(
`Translation request failed: ${translationSendResult.errorMsg}`,
);
return;
}
}
logger.info('API successfully sent the message to SQS', {
id: s3Key,
filename: body.data.fileName,
Expand Down
2 changes: 1 addition & 1 deletion packages/backend-common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"@aws-sdk/client-s3": "^3.624.0",
"@aws-sdk/client-sqs": "^3.624.0",
"@aws-sdk/client-ssm": "^3.624.0",
"@aws-sdk/lib-dynamodb": "^3.624.0",
"@aws-sdk/lib-dynamodb": "3.624.0",
"@aws-sdk/client-cloudwatch": "^3.624.0",
"@aws-sdk/client-auto-scaling": "^3.624.0",
"axios": "^1.7.4",
Expand Down
7 changes: 7 additions & 0 deletions packages/backend-common/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export interface TranscriptionConfig {
rootUrl: string;
taskQueueUrl: string;
deadLetterQueueUrl?: string;
mediaDownloadQueueUrl: string;
stage: string;
emailNotificationFromAddress: string;
sourceMediaBucket: string;
Expand Down Expand Up @@ -76,6 +77,11 @@ export const getConfig = async (): Promise<TranscriptionConfig> => {

logger.info(`Parameters fetched: ${parameterNames.join(', ')}`);
const taskQueueUrl = findParameter(parameters, paramPath, 'taskQueueUrl');
const mediaDownloadQueueUrl = findParameter(
parameters,
paramPath,
'mediaDownloadQueueUrl',
);
const deadLetterQueueUrl =
stage === 'DEV'
? undefined
Expand Down Expand Up @@ -139,6 +145,7 @@ export const getConfig = async (): Promise<TranscriptionConfig> => {
secret: appSecret,
taskQueueUrl,
deadLetterQueueUrl,
mediaDownloadQueueUrl,
stage,
sourceMediaBucket,
emailNotificationFromAddress,
Expand Down
48 changes: 48 additions & 0 deletions packages/backend-common/src/process.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { spawn } from 'child_process';
import { logger } from './logging';
export interface ProcessResult {
code?: number;
stdout: string;
stderr: string;
}

export const runSpawnCommand = (
processName: string,
cmd: string,
args: ReadonlyArray<string>,
): Promise<ProcessResult> => {
return new Promise((resolve, reject) => {
const cp = spawn(cmd, args);
const stdout: string[] = [];
const stderr: string[] = [];
cp.stdout.on('data', (data) => {
stdout.push(data.toString());
});

cp.stderr.on('data', (data) => {
stderr.push(data.toString());
});

cp.on('error', (e) => {
stderr.push(e.toString());
});

cp.on('close', (code) => {
const result = {
stdout: stdout.join(''),
stderr: stderr.join(''),
code: code || undefined,
};
logger.info('Ignoring stdout to avoid logging sensitive data');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it important not to log stdout when running yt-dlp?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I've updated this ecff3ce

logger.info(`process ${processName} stderr: ${result.stderr}`);
if (code === 0) {
resolve(result);
} else {
logger.error(
`process ${processName} failed with code ${result.code} due to: ${result.stderr}`,
);
reject(result);
}
});
});
};
32 changes: 26 additions & 6 deletions packages/backend-common/src/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,18 @@ export const generateOutputSignedUrlAndSendMessage = async (
originalFilename: string,
inputSignedUrl: string,
languageCode: LanguageCode,
translate: boolean,
translationRequested: boolean,
): Promise<SendResult> => {
const signedUrls = await generateOutputSignedUrls(
s3Key,
region,
outputBucket,
userEmail,
7,
translate,
translationRequested,
);

const jobId = translate ? `${s3Key}-translation` : s3Key;
const jobId = translationRequested ? `${s3Key}-translation` : s3Key;
const job: TranscriptionJob = {
id: jobId, // id of the source file
inputSignedUrl,
Expand All @@ -87,12 +87,32 @@ export const generateOutputSignedUrlAndSendMessage = async (
originalFilename,
outputBucketUrls: signedUrls,
languageCode,
translate,
translate: false,
};
return await sendMessage(client, queueUrl, JSON.stringify(job), s3Key);
const messageResult = await sendMessage(
client,
queueUrl,
JSON.stringify(job),
s3Key,
);
if (isSqsFailure(messageResult) && translationRequested) {
logger.info(
`Failed to send message, error message: ${messageResult.errorMsg}`,
);
return messageResult;
}
if (!isSqsFailure(messageResult) && translationRequested) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might want to log an error if messageResult is an sqs failure

return await sendMessage(
client,
queueUrl,
JSON.stringify({ ...job, translate: true }),
s3Key,
);
Comment on lines +105 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this bit - we put two messages in the queue if the user has requested a translation? Doesn't the worker create the translation as its transcribing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way translation works at the moment is that we send two messages and the translation runs on one worker, the normal transcription on the other. My idea there was to speed up the transcription/translation - if we do it all on the same worker then it could take twice as long

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok thanks, I hadn't realized this.

}
return messageResult;
};

const sendMessage = async (
export const sendMessage = async (
client: SQSClient,
queueUrl: string,
messageBody: string,
Expand Down
Loading
Loading