diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index bb062f330..aa95ee3f7 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -209,6 +209,12 @@ def get_table_from_response(cls, resp) -> "agate.Table": column_names = [field.name for field in resp.schema] return agate_helper.table_from_data_flat(resp, column_names) + def get_job_labels(self): + labels = self.get_labels_from_query_comment() + labels["dbt_invocation_id"] = get_invocation_id() + + return labels + def get_labels_from_query_comment(cls): if ( hasattr(cls.profile, "query_comment") @@ -244,9 +250,7 @@ def raw_execute( fire_event(SQLQuery(conn_name=conn.name, sql=sql, node_info=get_node_info())) - labels = self.get_labels_from_query_comment() - - labels["dbt_invocation_id"] = get_invocation_id() + labels = self.get_job_labels() job_params = { "use_legacy_sql": use_legacy_sql, @@ -424,6 +428,7 @@ def copy_bq_table(self, source, destination, write_disposition) -> None: destination_ref = self.table_ref( destination.database, destination.schema, destination.table ) + labels = self.get_job_labels() logger.debug( 'Copying table(s) "{}" to "{}" with disposition: "{}"', @@ -440,7 +445,7 @@ def copy_bq_table(self, source, destination, write_disposition) -> None: copy_job = client.copy_table( source_ref_array, destination_ref, - job_config=CopyJobConfig(write_disposition=write_disposition), + job_config=CopyJobConfig(write_disposition=write_disposition, labels=labels), retry=self._retry.create_reopen_with_deadline(conn), ) copy_job.result(timeout=self._retry.create_job_execution_timeout(fallback=300)) @@ -456,8 +461,10 @@ def write_dataframe_to_table( field_delimiter: str, fallback_timeout: Optional[float] = None, ) -> None: + labels = self.get_job_labels() load_config = LoadJobConfig( skip_leading_rows=1, + labels=labels, schema=table_schema, field_delimiter=field_delimiter, ) @@ -477,6 +484,10 @@ def write_file_to_table( config = kwargs["kwargs"] if "schema" in config: config["schema"] = json.load(config["schema"]) + + if "labels" not in config: + config["labels"] = self.get_job_labels() + load_config = LoadJobConfig(**config) table = self.table_ref(database, schema, identifier) self._write_file_to_table(client, file_path, table, load_config, fallback_timeout)