-
Notifications
You must be signed in to change notification settings - Fork 166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(incremental): copy multiple tables in parallel (#1237) #1413
base: main
Are you sure you want to change the base?
feat(incremental): copy multiple tables in parallel (#1237) #1413
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a maintainer but this is a much needed feature, thanks @AxelThevenot 🙌🏻
Large datasets with a lot of partitions currently performs quite poorly.
I looked into doing this directly with one call copy_table
call but it doesn't look like it's possible currently (unless we delete the partitions upfront and use write_append)
# Runs all the copy jobs in parallel | ||
for source_ref in source_ref_array: | ||
|
||
for partition_id in partition_ids or [None]: | ||
source_ref_partition = ( | ||
f"{source_ref}${partition_id}" if partition_id else source_ref | ||
) | ||
destination_ref_partition = ( | ||
f"{destination_ref}${partition_id}" if partition_id else destination_ref | ||
) | ||
copy_job = client.copy_table( | ||
source_ref_partition, | ||
destination_ref_partition, | ||
job_config=CopyJobConfig(write_disposition=write_disposition), | ||
retry=self._retry.create_reopen_with_deadline(conn), | ||
) | ||
copy_jobs.append(copy_job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would being explicit here clarify the logic?
if partition_ids:
copy_jobs = [
client.copy_table(
f"{source_ref}${partition_id}",
f"{destination_ref}${partition_id}",
job_config=CopyJobConfig(write_disposition=write_disposition),
retry=self._retry.create_reopen_with_deadline(conn),
)
for partition_id in partition_ids
for source_ref in source_ref_array
]
else:
copy_jobs = [client.copy_table(
source_ref_array,
destination_ref,
job_config=CopyJobConfig(write_disposition=write_disposition),
retry=self._retry.create_reopen_with_deadline(conn),
)]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got your point, need to change this part :)
copy_job = client.copy_table( | ||
source_ref_partition, | ||
destination_ref_partition, | ||
job_config=CopyJobConfig(write_disposition=write_disposition), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be more than one element in source_ref_partition
?
If we ever be in the scenario where we have source_ref_array
greater than one and write_disposition
set to WRITE_TRUNCATE
, we'll be overwriting the same data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi ! I just took the old function passing a list instead of a simple source ref :)
I don't see any use case neither but I kept this mechanism to avoid any breaking change
@mikealfare @colin-rogers-dbt could we get your thoughts / a review of this one? |
resolves dbt-labs/dbt-adapters#559
N/A dbt-labs/docs.getdbt.com/#
Problem
Copy partitions and tables in parallel instead of sequentially which is slow for large partition management
Solution
Run jobs in parallel and waits for the results.
Checklist