diff --git a/api/.sqlx/query-1d1876dcffdc0d3d1eead78a1514f3cac87830214e21abdeb5be583c1f6804b1.json b/api/.sqlx/query-1d1876dcffdc0d3d1eead78a1514f3cac87830214e21abdeb5be583c1f6804b1.json new file mode 100644 index 000000000..b07f6e607 --- /dev/null +++ b/api/.sqlx/query-1d1876dcffdc0d3d1eead78a1514f3cac87830214e21abdeb5be583c1f6804b1.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM package_versions WHERE scope = $1 AND name = $2 AND version = $3", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "1d1876dcffdc0d3d1eead78a1514f3cac87830214e21abdeb5be583c1f6804b1" +} diff --git a/api/migrations/20250310055436_package_version_drop.sql b/api/migrations/20250310055436_package_version_drop.sql new file mode 100644 index 000000000..602d9091d --- /dev/null +++ b/api/migrations/20250310055436_package_version_drop.sql @@ -0,0 +1,27 @@ +alter table package_version_dependencies + drop constraint package_version_dependencies_package_scope_package_name_fkey; + +alter table package_version_dependencies + add foreign key (package_scope, package_name) references packages ON UPDATE CASCADE ON DELETE CASCADE; + + +alter table package_version_dependencies + drop constraint package_version_dependencies_package_scope_package_name_pa_fkey; + +alter table package_version_dependencies + add constraint package_version_dependencies_package_scope_package_name_pa_fkey + foreign key (package_scope, package_name, package_version) references package_versions ON UPDATE CASCADE ON DELETE CASCADE; + +alter table package_files + drop constraint package_files_scope_name_version_fkey; + +alter table package_files + add foreign key (scope, name, version) references package_versions + ON UPDATE CASCADE ON DELETE CASCADE; + +alter table npm_tarballs + drop constraint npm_tarballs_scope_name_version_fkey; + +alter table npm_tarballs + add foreign key (scope, name, version) references package_versions + on UPDATE CASCADE ON DELETE CASCADE; diff --git a/api/src/api/errors.rs b/api/src/api/errors.rs index 5ba9d127b..5d2cb3da8 100644 --- a/api/src/api/errors.rs +++ b/api/src/api/errors.rs @@ -237,6 +237,10 @@ errors!( status: BAD_REQUEST, "The requested package is archived. Unarchive it to modify settings or publish to it.", }, + DeleteVersionHasDependents { + status: BAD_REQUEST, + "The requested package version has dependents. Only a version without dependents can be deleted.", + }, ); pub fn map_unique_violation(err: sqlx::Error, new_err: ApiError) -> ApiError { diff --git a/api/src/api/package.rs b/api/src/api/package.rs index 2322e9512..1047d5aef 100644 --- a/api/src/api/package.rs +++ b/api/src/api/package.rs @@ -145,6 +145,10 @@ pub fn package_router() -> Router { "/:package/versions/:version", util::auth(version_update_handler), ) + .delete( + "/:package/versions/:version", + util::auth(version_delete_handler), + ) .post( "/:package/versions/:version/provenance", util::auth(version_provenance_statements_handler), @@ -942,8 +946,98 @@ pub async fn version_update_handler( gzip_encoded: false, }, ) - .await - .unwrap(); + .await?; + + let npm_version_manifest_path = + crate::gcs_paths::npm_version_manifest_path(&scope, &package); + let npm_version_manifest = + generate_npm_version_manifest(db, npm_url, &scope, &package).await?; + let content = serde_json::to_vec_pretty(&npm_version_manifest)?; + buckets + .npm_bucket + .upload( + npm_version_manifest_path.into(), + UploadTaskBody::Bytes(content.into()), + GcsUploadOptions { + content_type: Some("application/json".into()), + cache_control: Some(CACHE_CONTROL_DO_NOT_CACHE.into()), + gzip_encoded: false, + }, + ) + .await?; + + Ok( + Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty()) + .unwrap(), + ) +} + +#[instrument( + name = "DELETE /api/scopes/:scope/packages/:package/versions/:version", + skip(req), + err, + fields(scope, package, version) +)] +pub async fn version_delete_handler( + req: Request, +) -> ApiResult> { + let scope = req.param_scope()?; + let package = req.param_package()?; + let version = req.param_version()?; + Span::current().record("scope", field::display(&scope)); + Span::current().record("package", field::display(&package)); + Span::current().record("version", field::display(&version)); + + let db = req.data::().unwrap(); + let buckets = req.data::().unwrap().clone(); + let npm_url = &req.data::().unwrap().0; + + let iam = req.iam(); + iam.check_admin_access()?; + + let count = db + .count_package_dependents( + crate::db::DependencyKind::Jsr, + &format!("@{}/{}", scope, package), + ) + .await?; + + if count > 0 { + return Err(ApiError::DeleteVersionHasDependents); + } + + db.delete_package_version(&scope, &package, &version) + .await?; + + let path = crate::gcs_paths::docs_v1_path(&scope, &package, &version); + buckets.docs_bucket.delete_file(path.into()).await?; + + let path = crate::gcs_paths::version_metadata(&scope, &package, &version); + buckets.modules_bucket.delete_file(path.into()).await?; + + let path = + crate::gcs_paths::file_path_root_directory(&scope, &package, &version); + buckets.modules_bucket.delete_directory(path.into()).await?; + + let package_metadata_path = + crate::gcs_paths::package_metadata(&scope, &package); + let package_metadata = PackageMetadata::create(db, &scope, &package).await?; + + let content = serde_json::to_vec_pretty(&package_metadata)?; + buckets + .modules_bucket + .upload( + package_metadata_path.into(), + UploadTaskBody::Bytes(content.into()), + GcsUploadOptions { + content_type: Some("application/json".into()), + cache_control: Some(CACHE_CONTROL_DO_NOT_CACHE.into()), + gzip_encoded: false, + }, + ) + .await?; let npm_version_manifest_path = crate::gcs_paths::npm_version_manifest_path(&scope, &package); @@ -4093,4 +4187,66 @@ ggHohNAjhbzDaY2iBW/m3NC5dehGUP4T2GBo/cwGhg== assert_eq!(tasks.len(), 1); assert_eq!(tasks[0].id, task2.id); } + + #[tokio::test] + async fn delete_version() { + let mut t = TestSetup::new().await; + let staff_token = t.staff_user.token.clone(); + + // unpublished package + let mut resp = t + .http() + .get("/api/scopes/scope/packages/foo/versions/0.0.1/dependencies/graph") + .call() + .await + .unwrap(); + resp + .expect_err_code(StatusCode::NOT_FOUND, "packageVersionNotFound") + .await; + + let task = process_tarball_setup(&t, create_mock_tarball("ok")).await; + assert_eq!(task.status, PublishingTaskStatus::Success, "{:?}", task); + + // Now publish a package that has a few deps + let package_name = PackageName::try_from("bar").unwrap(); + let version = Version::try_from("1.2.3").unwrap(); + let task = crate::publish::tests::process_tarball_setup2( + &t, + create_mock_tarball("depends_on_ok"), + &package_name, + &version, + false, + ) + .await; + assert_eq!(task.status, PublishingTaskStatus::Success, "{:?}", task); + + let mut resp = t + .http() + .delete("/api/scopes/scope/packages/foo/versions/0.0.1") + .token(Some(&staff_token)) + .call() + .await + .unwrap(); + resp + .expect_err_code(StatusCode::BAD_REQUEST, "deleteVersionHasDependents") + .await; + + let mut resp = t + .http() + .delete("/api/scopes/scope/packages/bar/versions/1.2.3") + .token(Some(&staff_token)) + .call() + .await + .unwrap(); + resp.expect_ok_no_content().await; + + let mut resp = t + .http() + .delete("/api/scopes/scope/packages/foo/versions/0.0.1") + .token(Some(&staff_token)) + .call() + .await + .unwrap(); + resp.expect_ok_no_content().await; + } } diff --git a/api/src/buckets.rs b/api/src/buckets.rs index 5533da005..6f255dd2d 100644 --- a/api/src/buckets.rs +++ b/api/src/buckets.rs @@ -8,6 +8,7 @@ use futures::Future; use futures::FutureExt; use futures::Stream; use futures::StreamExt; +use futures::TryStreamExt; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::instrument; @@ -24,6 +25,8 @@ pub struct BucketWithQueue { pub bucket: gcp::Bucket, upload_queue: DynamicBackgroundTaskQueue, download_queue: DynamicBackgroundTaskQueue, + delete_queue: DynamicBackgroundTaskQueue, + list_queue: DynamicBackgroundTaskQueue, } impl BucketWithQueue { @@ -32,6 +35,8 @@ impl BucketWithQueue { bucket, upload_queue: DynamicBackgroundTaskQueue::default(), download_queue: DynamicBackgroundTaskQueue::default(), + delete_queue: DynamicBackgroundTaskQueue::default(), + list_queue: DynamicBackgroundTaskQueue::default(), } } @@ -72,6 +77,40 @@ impl BucketWithQueue { .await .unwrap() } + + #[instrument(name = "BucketWithQueue::delete_file", skip(self), err)] + pub async fn delete_file(&self, path: Arc) -> Result { + self + .delete_queue + .run(DeleteFileTask { + bucket: self.bucket.clone(), + path, + }) + .await + .unwrap() + } + + #[instrument(name = "BucketWithQueue::delete_directory", skip(self), err)] + pub async fn delete_directory(&self, path: Arc) -> Result<(), GcsError> { + let list = self + .list_queue + .run(ListDirectoryTask { + bucket: self.bucket.clone(), + path, + }) + .await + .unwrap()?; + + if let Some(list) = list { + let stream = futures::stream::iter(list.items) + .map(|item| self.delete_file(item.name.into())) + .buffer_unordered(64); + + let _ = stream.try_collect::>().await?; + } + + Ok(()) + } } #[derive(Clone)] @@ -191,3 +230,61 @@ impl RestartableTask for DownloadTask { .boxed() } } + +struct DeleteFileTask { + bucket: gcp::Bucket, + path: Arc, +} + +impl RestartableTask for DeleteFileTask { + type Ok = bool; + type Err = gcp::GcsError; + type Fut = + Pin> + Send + 'static>>; + + fn run(self) -> Self::Fut { + async move { + let res = self.bucket.delete_file(&self.path).await; + match res { + Ok(data) => RestartableTaskResult::Ok(data), + Err(e) if e.is_retryable() => { + RestartableTaskResult::Backoff(DeleteFileTask { + bucket: self.bucket, + path: self.path, + }) + } + Err(e) => RestartableTaskResult::Error(e), + } + } + .boxed() + } +} + +struct ListDirectoryTask { + bucket: gcp::Bucket, + path: Arc, +} + +impl RestartableTask for ListDirectoryTask { + type Ok = Option; + type Err = gcp::GcsError; + type Fut = + Pin> + Send + 'static>>; + + fn run(self) -> Self::Fut { + async move { + let res = self.bucket.list(&self.path).await; + match res { + Ok(data) => RestartableTaskResult::Ok(data), + Err(e) if e.is_retryable() => { + RestartableTaskResult::Backoff(ListDirectoryTask { + bucket: self.bucket, + path: self.path, + }) + } + Err(e) => RestartableTaskResult::Error(e), + } + } + .boxed() + } +} diff --git a/api/src/db/database.rs b/api/src/db/database.rs index 06af6a26a..fa59fc4c7 100644 --- a/api/src/db/database.rs +++ b/api/src/db/database.rs @@ -1649,6 +1649,26 @@ impl Database { .await } + #[instrument(name = "Database::delete_package_version", skip(self), err)] + pub async fn delete_package_version( + &self, + scope: &ScopeName, + name: &PackageName, + version: &Version, + ) -> Result<()> { + sqlx::query_as!( + PackageVersion, + r#"DELETE FROM package_versions WHERE scope = $1 AND name = $2 AND version = $3"#, + scope as _, + name as _, + version as _ + ) + .execute(&self.pool) + .await?; + + Ok(()) + } + #[instrument(name = "Database::get_package_file", skip(self), err)] pub async fn get_package_file( &self, diff --git a/api/src/gcp.rs b/api/src/gcp.rs index 58b6a864f..667027d47 100644 --- a/api/src/gcp.rs +++ b/api/src/gcp.rs @@ -233,7 +233,8 @@ impl Bucket { Ok(bucket) } - #[instrument(name = "gcp::Bucket::download_resp", skip(self), err, fields(bucket = %self.name))] + #[instrument(name = "gcp::Bucket::download_resp", skip(self), err, fields(bucket = %self.name + ))] pub async fn download_resp(&self, path: &str) -> Result { let path = percent_encoding::utf8_percent_encode(path, NON_ALPHANUMERIC); let url = format!( @@ -255,7 +256,8 @@ impl Bucket { Ok(resp) } - #[instrument(name = "gcp::Bucket::download", skip(self), err, fields(bucket = %self.name))] + #[instrument(name = "gcp::Bucket::download", skip(self), err, fields(bucket = %self.name + ))] pub async fn download(&self, path: &str) -> Result, GcsError> { let resp = self.download_resp(path).await?; if resp.status() == 404 { @@ -266,7 +268,8 @@ impl Bucket { Ok(Some(bytes)) } - #[instrument(name = "gcp::Bucket::download_stream", skip(self), err, fields(bucket = %self.name))] + #[instrument(name = "gcp::Bucket::download_stream", skip(self), err, fields(bucket = %self.name + ))] pub async fn download_stream( &self, path: &str, @@ -279,7 +282,12 @@ impl Bucket { .map(|x| x.map(|x| x.1)) } - #[instrument(name = "gcp::Bucket::download_stream_with_encoding", skip(self), err, fields(bucket = %self.name))] + #[instrument( + name = "gcp::Bucket::download_stream_with_encoding", + skip(self), + err, + fields(bucket = %self.name) + )] pub async fn download_stream_with_encoding( &self, path: &str, @@ -359,7 +367,8 @@ impl Bucket { Ok(()) } - #[instrument(name = "gcp::Bucket::upload", skip(self, data), err, fields(bucket = %self.name, size = %data.len()))] + #[instrument(name = "gcp::Bucket::upload", skip(self, data), err, fields(bucket = %self.name, size = %data.len() + ))] pub async fn upload( &self, path: &str, @@ -371,7 +380,12 @@ impl Bucket { .await } - #[instrument(name = "gcp::Bucket::upload_stream", skip(self, stream), err, fields(bucket = %self.name))] + #[instrument( + name = "gcp::Bucket::upload_stream", + skip(self, stream), + err, + fields(bucket = %self.name) + )] pub async fn upload_stream< S: Stream> + Send + Sync + 'static, >( @@ -384,6 +398,72 @@ impl Bucket { .upload_inner(path, Part::stream(Body::wrap_stream(stream)), options) .await } + + #[instrument(name = "gcp::Bucket::list", skip(self), err, fields(bucket = %self.name + ))] + pub async fn list(&self, path: &str) -> Result, GcsError> { + let path = percent_encoding::utf8_percent_encode(path, NON_ALPHANUMERIC); + let url = format!( + "{}/storage/v1/b/{}/o?prefix={}", + self.endpoint, self.name, path + ); + let token = self + .client + .get_access_token() + .await + .map_err(GcsError::AccessToken)?; + let resp = self + .client + .http() + .get(url) + .bearer_auth(token) + .send() + .await?; + + if resp.status() == 404 { + return Ok(None); + } + let resp = Bucket::error_if_failed(resp)?; + let json = resp.json().await?; + Ok(Some(json)) + } + + #[instrument(name = "gcp::Bucket::delete", skip(self), err, fields(bucket = %self.name + ))] + pub async fn delete_file(&self, path: &str) -> Result { + let path = percent_encoding::utf8_percent_encode(path, NON_ALPHANUMERIC); + let url = + format!("{}/storage/v1/b/{}/o/{}", self.endpoint, self.name, path); + let token = self + .client + .get_access_token() + .await + .map_err(GcsError::AccessToken)?; + let resp = self + .client + .http() + .delete(url) + .bearer_auth(token) + .send() + .await?; + + if resp.status() == 404 { + return Ok(true); + } + let _ = Bucket::error_if_failed(resp)?; + Ok(false) + } +} + +#[derive(serde::Deserialize)] +pub struct List { + #[serde(default)] + pub items: Vec, +} + +#[derive(serde::Deserialize)] +pub struct ListItem { + pub name: String, } #[derive(Clone)] @@ -403,7 +483,8 @@ impl Queue { } } - #[instrument("gcp::Queue::task_buffer", skip(self), err, fields(queue_id = self.id))] + #[instrument("gcp::Queue::task_buffer", skip(self), err, fields(queue_id = self.id + ))] pub async fn task_buffer( &self, id: Option, @@ -475,7 +556,8 @@ impl BigQuery { } } - #[instrument("gcp::BigQuery::query", skip(self), err, fields(project = %self.project))] + #[instrument("gcp::BigQuery::query", skip(self), err, fields(project = %self.project + ))] pub async fn query( &self, query: &str, @@ -505,7 +587,8 @@ impl BigQuery { Ok(json) } - #[instrument("gcp::BigQuery::get_query_results", skip(self), err, fields(project = %self.project))] + #[instrument("gcp::BigQuery::get_query_results", skip(self), err, fields(project = %self.project + ))] pub async fn get_query_results( &self, job_id: &str, diff --git a/api/src/gcs_paths.rs b/api/src/gcs_paths.rs index 2cf95aafc..9a86bff8e 100644 --- a/api/src/gcs_paths.rs +++ b/api/src/gcs_paths.rs @@ -15,6 +15,14 @@ pub fn file_path( format!("@{scope}/{package_name}/{version}{path}") } +pub fn file_path_root_directory( + scope: &ScopeName, + package_name: &PackageName, + version: &Version, +) -> String { + format!("@{scope}/{package_name}/{version}/") +} + pub fn docs_v1_path( scope: &ScopeName, package_name: &PackageName, diff --git a/frontend/routes/package/versions.tsx b/frontend/routes/package/versions.tsx index 666ec8370..014127c09 100644 --- a/frontend/routes/package/versions.tsx +++ b/frontend/routes/package/versions.tsx @@ -164,13 +164,17 @@ function Version({ }) { const isPublished = version !== null; const isFailed = tasks.length > 0 && tasks[0].status === "failure"; + const isSuccess = tasks.length > 0 && + tasks.some((task) => task.status === "success"); return (
{version?.yanked @@ -204,7 +210,7 @@ function Version({ : (isFailed ? : !isPublished - ? "..." + ? isSuccess ? : "..." : releaseTrack)}
@@ -254,6 +260,19 @@ function Version({ )} + {isPublished && iam.isStaff && ( +
+ + +
+ )}
    {tasks.map((task, i) => ( @@ -357,6 +376,17 @@ export const handler = define.handlers({ headers: { Location: `/@${scope}/${packageName}/versions` }, }); } + case "delete": { + const version = String(data.get("version")); + const res = await api.delete( + path`/scopes/${scope}/packages/${packageName}/versions/${version}`, + ); + if (!res.ok) throw res; + return new Response(null, { + status: 303, + headers: { Location: `/@${scope}/${packageName}/versions` }, + }); + } default: { throw new Error("Invalid action " + action); }