Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dry-run option #102

Merged
merged 4 commits into from
Feb 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 87 additions & 40 deletions NGPIris/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,14 @@ def cli(context : Context, credentials : str):
@click.argument("bucket")
@click.argument("source")
@click.argument("destination")
@click.option(
"-dr",
"--dry_run",
help = "Simulate the command execution without making actual changes. Useful for testing and verification",
is_flag = True
)
@click.pass_context
def upload(context : Context, bucket : str, source : str, destination : str):
def upload(context : Context, bucket : str, source : str, destination : str, dry_run : bool):
"""
Upload files to an HCP bucket/namespace.

Expand All @@ -97,11 +103,17 @@ def upload(context : Context, bucket : str, source : str, destination : str):
destination = add_trailing_slash(destination)
if Path(source).is_dir():
source = add_trailing_slash(source)
hcph.upload_folder(source, destination)
if dry_run:
click.echo("This command would have uploaded the folder \"" + source + "\" to \"" + destination + "\"")
else:
hcph.upload_folder(source, destination)
else:
file_name = Path(source).name
destination += file_name
hcph.upload_file(source, destination)
if dry_run:
click.echo("This command would have uploaded the file \"" + source + "\" to \"" + destination + "\"")
else:
hcph.upload_file(source, destination)

@cli.command()
@click.argument("bucket")
Expand All @@ -119,8 +131,14 @@ def upload(context : Context, bucket : str, source : str, destination : str):
help = "Ignore the download limit",
is_flag = True
)
@click.option(
"-dr",
"--dry_run",
help = "Simulate the command execution without making actual changes. Useful for testing and verification",
is_flag = True
)
@click.pass_context
def download(context : Context, bucket : str, source : str, destination : str, force : bool, ignore_warning : bool):
def download(context : Context, bucket : str, source : str, destination : str, force : bool, ignore_warning : bool, dry_run : bool):
"""
Download a file or folder from an HCP bucket/namespace.

Expand All @@ -134,45 +152,58 @@ def download(context : Context, bucket : str, source : str, destination : str, f
hcph.mount_bucket(bucket)
if not Path(destination).exists():
Path(destination).mkdir()

if object_is_folder(source, hcph):
if source == "/":
source = ""

cumulative_download_size = Byte(0)
if not ignore_warning:
click.echo("Computing download size...")
for object in hcph.list_objects(source):
object : dict
cumulative_download_size += Byte(object["Size"])
if cumulative_download_size >= TiB(1):
click.echo("WARNING: You are about to download more than 1 TB of data. Is this your intention? [y/N]: ", nl = False)
inp = click.getchar(True)
if inp == "y" or inp == "Y":
break
else: # inp == "n" or inp == "N" or something else
exit("\nAborting download")

hcph.download_folder(source, Path(destination).as_posix())

if not dry_run:
if object_is_folder(source, hcph):
if source == "/":
source = ""

cumulative_download_size = Byte(0)
if not ignore_warning:
click.echo("Computing download size...")
for object in hcph.list_objects(source):
object : dict
cumulative_download_size += Byte(object["Size"])
if cumulative_download_size >= TiB(1):
click.echo("WARNING: You are about to download more than 1 TB of data. Is this your intention? [y/N]: ", nl = False)
inp = click.getchar(True)
if inp == "y" or inp == "Y":
break
else: # inp == "n" or inp == "N" or something else
exit("\nAborting download")

hcph.download_folder(source, Path(destination).as_posix())
else:
if Byte(hcph.get_object(source)["ContentLength"]) >= TiB(1):
click.echo("WARNING: You are about to download more than 1 TB of data. Is this your intention? [y/N]: ", nl = False)
inp = click.getchar(True)
if inp == "y" or inp == "Y":
pass
else: # inp == "n" or inp == "N" or something else
exit("\nAborting download")

downloaded_source = Path(destination) / Path(source).name
if downloaded_source.exists() and not force:
exit("Object already exists. If you wish to overwrite the existing file, use the -f, --force option")
hcph.download_file(source, downloaded_source.as_posix())
else:
if Byte(hcph.get_object(source)["ContentLength"]) >= TiB(1):
click.echo("WARNING: You are about to download more than 1 TB of data. Is this your intention? [y/N]: ", nl = False)
inp = click.getchar(True)
if inp == "y" or inp == "Y":
pass
else: # inp == "n" or inp == "N" or something else
exit("\nAborting download")

downloaded_source = Path(destination) / Path(source).name
if downloaded_source.exists() and not force:
exit("Object already exists. If you wish to overwrite the existing file, use the -f, --force option")
hcph.download_file(source, downloaded_source.as_posix())
if object_is_folder(source, hcph):
click.echo("This command would have downloaded the folder \"" + source + "\". If you wish to know the contents of this folder, use the 'list-objects' command")
else:
click.echo("This command would have downloaded the object \"" + source + "\":")
click.echo(list(hcph.list_objects(source))[0])

@cli.command()
@click.argument("bucket")
@click.argument("object")
@click.option(
"-dr",
"--dry_run",
help = "Simulate the command execution without making actual changes. Useful for testing and verification",
is_flag = True
)
@click.pass_context
def delete_object(context : Context, bucket : str, object : str):
def delete_object(context : Context, bucket : str, object : str, dry_run : bool):
"""
Delete an object from an HCP bucket/namespace.

Expand All @@ -182,13 +213,24 @@ def delete_object(context : Context, bucket : str, object : str):
"""
hcph : HCPHandler = get_HCPHandler(context)
hcph.mount_bucket(bucket)
hcph.delete_object(object)
if not dry_run:
hcph.delete_object(object)
else:
click.echo("This command would delete:")
click.echo(list(hcph.list_objects(object))[0])


@cli.command()
@click.argument("bucket")
@click.argument("folder")
@click.option(
"-dr",
"--dry_run",
help = "Simulate the command execution without making actual changes. Useful for testing and verification",
is_flag = True
)
@click.pass_context
def delete_folder(context : Context, bucket : str, folder : str):
def delete_folder(context : Context, bucket : str, folder : str, dry_run : bool):
"""
Delete a folder from an HCP bucket/namespace.

Expand All @@ -198,7 +240,12 @@ def delete_folder(context : Context, bucket : str, folder : str):
"""
hcph : HCPHandler = get_HCPHandler(context)
hcph.mount_bucket(bucket)
hcph.delete_folder(folder)
if not dry_run:
hcph.delete_folder(folder)
else:
click.echo("By deleting \"" + folder + "\", the following objects would have been deleted (not including objects in sub-folders):")
for obj in hcph.list_objects(folder):
click.echo(obj)

@cli.command()
@click.pass_context
Expand Down