From 9ec1dfc44556cbe1c59490481e7dbfbdcd127e42 Mon Sep 17 00:00:00 2001 From: Anjor Kanekar Date: Tue, 11 Feb 2025 09:32:11 +0000 Subject: [PATCH] cid gravity (#229) --- dataprep-tools/filecoin/boost_create_deals.py | 57 ++++++++++--------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/dataprep-tools/filecoin/boost_create_deals.py b/dataprep-tools/filecoin/boost_create_deals.py index 20b586a..4c3c284 100644 --- a/dataprep-tools/filecoin/boost_create_deals.py +++ b/dataprep-tools/filecoin/boost_create_deals.py @@ -252,6 +252,27 @@ def execute_boost_deal(deal_arg, params): return None # Indicate failure to process the output +def get_existing_replications(check_existing_deals, deals_url, deals_file, fields): + replications = {} + if check_existing_deals[0]: + try: + client.download_file(deals_url, deals_file) + with open(deals_file, "r") as csv_file: + reader = csv.DictReader(csv_file, fieldnames=fields) + next(reader, None) # skip header + for row in reader: + if row["commp_piece_cid"] not in replications: + replications[row["commp_piece_cid"]] = [] + replications[row["commp_piece_cid"]].append(row["provider"]) + except Exception as e: + logger.warning(f"Error reading existing deals file {deals_url}: {e}. Proceeding with potentially duplicate deals.") + replications = {} # Reset replications to avoid incorrect skipping + else: + logger.info(f"No existing deals file found at {deals_url}. Creating a new one.") + + return replications + + def create_deals_for_metadata(metadata_obj, epoch_str, deal_type_suffix=""): """ Creates deals based on the provided metadata CSV content. @@ -309,24 +330,8 @@ def create_deals_for_metadata(metadata_obj, epoch_str, deal_type_suffix=""): if not lock_acquired: return 1 # Exit if lock not acquired - replications = {} check_existing_deals = client.check_exists(deals_url) - if check_existing_deals[0]: - try: - client.download_file(deals_url, deals_file) - with open(deals_file, "r") as csv_file: - reader = csv.DictReader(csv_file, fieldnames=fields) - next(reader, None) # skip header - for row in reader: - if row["commp_piece_cid"] not in replications: - replications[row["commp_piece_cid"]] = [] - replications[row["commp_piece_cid"]].append(row["provider"]) - except Exception as e: - logger.warning(f"Error reading existing deals file {deals_url}: {e}. Proceeding with potentially duplicate deals.") - replications = {} # Reset replications to avoid incorrect skipping - else: - logger.info(f"No existing deals file found at {deals_url}. Creating a new one.") - + replications = get_existing_replications(check_existing_deals, deals_file, deals_url, fields) deals_created_count = 0 try: @@ -349,15 +354,15 @@ def create_deals_for_metadata(metadata_obj, epoch_str, deal_type_suffix=""): logger.info(f"Found {len(providers)} providers: {providers}") for provider in providers: - if file_item["commp_piece_cid"] in replications and provider in replications[file_item["commp_piece_cid"]]: - logger.info(f"Skipping deal for {file_item['commp_piece_cid']} with {provider}, already has a deal.") - continue - - if file_item["commp_piece_cid"] not in replications: - replications[file_item["commp_piece_cid"]] = [] - elif len(replications[file_item["commp_piece_cid"]]) >= replication_factor: - logger.info(f"Skipping deal for {file_item['commp_piece_cid']}, already replicated {len(replications[file_item['commp_piece_cid']])} times (replication factor: {replication_factor}).") - continue + if not USE_CID_GRAVITY: + if file_item["commp_piece_cid"] not in replications: + replications[file_item["commp_piece_cid"]] = [] + elif provider in replications[file_item["commp_piece_cid"]]: + logger.info(f"Skipping deal for {file_item['commp_piece_cid']} with {provider}, already has a deal.") + continue + elif len(replications[file_item["commp_piece_cid"]]) >= replication_factor: + logger.info(f"Skipping deal for {file_item['commp_piece_cid']}, already replicated {len(replications[file_item['commp_piece_cid']])} times (replication factor: {replication_factor}).") + continue params = { "provider": provider,