-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(ingest/s3): enhance readability #12609
base: master
Are you sure you want to change the base?
refactor(ingest/s3): enhance readability #12609
Conversation
- Refactor S3Source().get_folder_info() to enhance readability - Add a test to ensure that get_folder_info() returns the expected result.
Codecov ReportAll modified and coverable lines are covered by tests ✅
... and 52 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
|
@@ -877,51 +876,28 @@ def _is_allowed_path(path_spec_: PathSpec, s3_uri: str) -> bool: | |||
s3_objects = ( | |||
obj | |||
for obj in bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE) | |||
if _is_allowed_path(path_spec, f"s3://{obj.bucket_name}/{obj.key}") | |||
if _is_allowed_path(path_spec, self.create_s3_path(obj.bucket_name, obj.key)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if modification_time is None: | ||
logger.warning( | ||
f"Unable to find any files in the folder {key}. Skipping..." | ||
) | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this because groupby_unsorted
never returns empty group.
Folder( | ||
partition_id=id, | ||
is_partition=bool(id), | ||
creation_time=creation_time if creation_time else None, # type: ignore[arg-type] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need null handling. The type of creation_time
is datetime
.
creation_time
is the value ofitem.last_modified
. (L896)- The type of
item
is ObjectSummary - The type of
ObjectSummary.last_modified
isdatetime
.
@@ -847,7 +847,7 @@ def get_folder_info( | |||
path_spec: PathSpec, | |||
bucket: "Bucket", | |||
prefix: str, | |||
) -> List[Folder]: | |||
) -> Iterable[Folder]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change this for memory efficiency.
max_file = max(group, key=lambda x: x.last_modified) | ||
max_file_s3_path = self.create_s3_path(max_file.bucket_name, max_file.key) | ||
|
||
# If partition_id is None, it means the folder is not a partition | ||
partition_id = path_spec.get_partition_from_path(max_file_s3_path) | ||
|
||
yield Folder( | ||
partition_id=partition_id, | ||
is_partition=bool(partition_id), | ||
creation_time=max_file.last_modified, | ||
modification_time=max_file.last_modified, | ||
sample_file=max_file_s3_path, | ||
size=sum(obj.size for obj in group), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before | After |
---|---|
O(n) | O(2n) |
The performance loss is minimal because the time complexity has only increased from O(n) to O(2n).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why O(2n)? Because max() and sum().
Checklist