Skip to content

Commit

Permalink
cid gravity (#229)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjor authored Feb 11, 2025
1 parent 1767cc9 commit 9ec1dfc
Showing 1 changed file with 31 additions and 26 deletions.
57 changes: 31 additions & 26 deletions dataprep-tools/filecoin/boost_create_deals.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,27 @@ def execute_boost_deal(deal_arg, params):
return None # Indicate failure to process the output


def get_existing_replications(check_existing_deals, deals_url, deals_file, fields):
replications = {}
if check_existing_deals[0]:
try:
client.download_file(deals_url, deals_file)
with open(deals_file, "r") as csv_file:
reader = csv.DictReader(csv_file, fieldnames=fields)
next(reader, None) # skip header
for row in reader:
if row["commp_piece_cid"] not in replications:
replications[row["commp_piece_cid"]] = []
replications[row["commp_piece_cid"]].append(row["provider"])
except Exception as e:
logger.warning(f"Error reading existing deals file {deals_url}: {e}. Proceeding with potentially duplicate deals.")
replications = {} # Reset replications to avoid incorrect skipping
else:
logger.info(f"No existing deals file found at {deals_url}. Creating a new one.")

return replications


def create_deals_for_metadata(metadata_obj, epoch_str, deal_type_suffix=""):
"""
Creates deals based on the provided metadata CSV content.
Expand Down Expand Up @@ -309,24 +330,8 @@ def create_deals_for_metadata(metadata_obj, epoch_str, deal_type_suffix=""):
if not lock_acquired:
return 1 # Exit if lock not acquired

replications = {}
check_existing_deals = client.check_exists(deals_url)
if check_existing_deals[0]:
try:
client.download_file(deals_url, deals_file)
with open(deals_file, "r") as csv_file:
reader = csv.DictReader(csv_file, fieldnames=fields)
next(reader, None) # skip header
for row in reader:
if row["commp_piece_cid"] not in replications:
replications[row["commp_piece_cid"]] = []
replications[row["commp_piece_cid"]].append(row["provider"])
except Exception as e:
logger.warning(f"Error reading existing deals file {deals_url}: {e}. Proceeding with potentially duplicate deals.")
replications = {} # Reset replications to avoid incorrect skipping
else:
logger.info(f"No existing deals file found at {deals_url}. Creating a new one.")

replications = get_existing_replications(check_existing_deals, deals_file, deals_url, fields)

deals_created_count = 0
try:
Expand All @@ -349,15 +354,15 @@ def create_deals_for_metadata(metadata_obj, epoch_str, deal_type_suffix=""):
logger.info(f"Found {len(providers)} providers: {providers}")

for provider in providers:
if file_item["commp_piece_cid"] in replications and provider in replications[file_item["commp_piece_cid"]]:
logger.info(f"Skipping deal for {file_item['commp_piece_cid']} with {provider}, already has a deal.")
continue

if file_item["commp_piece_cid"] not in replications:
replications[file_item["commp_piece_cid"]] = []
elif len(replications[file_item["commp_piece_cid"]]) >= replication_factor:
logger.info(f"Skipping deal for {file_item['commp_piece_cid']}, already replicated {len(replications[file_item['commp_piece_cid']])} times (replication factor: {replication_factor}).")
continue
if not USE_CID_GRAVITY:
if file_item["commp_piece_cid"] not in replications:
replications[file_item["commp_piece_cid"]] = []
elif provider in replications[file_item["commp_piece_cid"]]:
logger.info(f"Skipping deal for {file_item['commp_piece_cid']} with {provider}, already has a deal.")
continue
elif len(replications[file_item["commp_piece_cid"]]) >= replication_factor:
logger.info(f"Skipping deal for {file_item['commp_piece_cid']}, already replicated {len(replications[file_item['commp_piece_cid']])} times (replication factor: {replication_factor}).")
continue

params = {
"provider": provider,
Expand Down

0 comments on commit 9ec1dfc

Please sign in to comment.