@@ -245,15 +245,8 @@ Result<std::set<std::string>> OrphanFilesCleanerImpl::GetUsedFiles() const {
245245 used_files.insert (SnapshotManager::LATEST);
246246 Duration duration;
247247 PAIMON_ASSIGN_OR_RAISE (std::vector<Snapshot> snapshots, snapshot_manager_->GetAllSnapshots ());
248+ std::vector<std::future<Result<std::set<std::string>>>> used_files_futures;
248249 for (const auto & snapshot : snapshots) {
249- used_files.insert (SnapshotManager::SNAPSHOT_PREFIX + std::to_string (snapshot.Id ()));
250- used_files.insert (snapshot.BaseManifestList ());
251- used_files.insert (snapshot.DeltaManifestList ());
252- std::vector<ManifestFileMeta> manifests;
253- PAIMON_RETURN_NOT_OK (manifest_list_->ReadIfFileExist (snapshot.BaseManifestList (),
254- /* filter=*/ nullptr , &manifests));
255- PAIMON_RETURN_NOT_OK (manifest_list_->ReadIfFileExist (snapshot.DeltaManifestList (),
256- /* filter=*/ nullptr , &manifests));
257250 const std::optional<std::string>& changelog_manifest_list =
258251 snapshot.ChangelogManifestList ();
259252 if (changelog_manifest_list) {
@@ -266,18 +259,46 @@ Result<std::set<std::string>> OrphanFilesCleanerImpl::GetUsedFiles() const {
266259 // TODO(jinli.zjw): support IndexManifestEntry and add tests
267260 // used_files.insert(index_manifest_name.value());
268261 }
269- for (const auto & manifest : manifests) {
270- used_files.insert (manifest.FileName ());
271- std::vector<ManifestEntry> manifest_entries;
272- PAIMON_RETURN_NOT_OK (manifest_file_->ReadIfFileExist (
273- manifest.FileName (), /* filter=*/ nullptr , &manifest_entries));
274- for (const auto & manifest_entry : manifest_entries) {
275- used_files.insert (manifest_entry.FileName ());
276- }
277- }
262+
263+ used_files_futures.emplace_back (
264+ Via (executor_.get (), [this , snapshot] { return GetUsedFilesBySnapshot (snapshot); }));
265+ }
266+
267+ for (const auto & used_files_result : CollectAll (used_files_futures)) {
268+ PAIMON_RETURN_NOT_OK (used_files_result);
269+ used_files.insert (used_files_result.value ().begin (), used_files_result.value ().end ());
278270 }
271+
279272 metrics_->SetCounter (CleanMetrics::CLEAN_LIST_USED_FILES_DURATION, duration.Get ());
280273 metrics_->SetCounter (CleanMetrics::CLEAN_USED_FILES, static_cast <uint64_t >(used_files.size ()));
274+ metrics_->SetCounter (CleanMetrics::CLEAN_SNAPSHOT_FILES,
275+ static_cast <uint64_t >(snapshots.size ()));
276+ return used_files;
277+ }
278+
279+ Result<std::set<std::string>> OrphanFilesCleanerImpl::GetUsedFilesBySnapshot (
280+ const Snapshot& snapshot) const {
281+ std::set<std::string> used_files;
282+
283+ used_files.insert (SnapshotManager::SNAPSHOT_PREFIX + std::to_string (snapshot.Id ()));
284+ used_files.insert (snapshot.BaseManifestList ());
285+ used_files.insert (snapshot.DeltaManifestList ());
286+ std::vector<ManifestFileMeta> manifests;
287+ PAIMON_RETURN_NOT_OK (manifest_list_->ReadIfFileExist (snapshot.BaseManifestList (),
288+ /* filter=*/ nullptr , &manifests));
289+ PAIMON_RETURN_NOT_OK (manifest_list_->ReadIfFileExist (snapshot.DeltaManifestList (),
290+ /* filter=*/ nullptr , &manifests));
291+
292+ for (const auto & manifest : manifests) {
293+ used_files.insert (manifest.FileName ());
294+ std::vector<ManifestEntry> manifest_entries;
295+ PAIMON_RETURN_NOT_OK (manifest_file_->ReadIfFileExist (
296+ manifest.FileName (), /* filter=*/ nullptr , &manifest_entries));
297+ for (const auto & manifest_entry : manifest_entries) {
298+ used_files.insert (manifest_entry.FileName ());
299+ }
300+ }
301+
281302 return used_files;
282303}
283304
0 commit comments