Skip to content

Commit 0de75f7

Browse files
deryrahmanarinda-arif
authored andcommitted
fix: dry-run on loader (#30)
* fix: dry-run on loader * fix: test * fix: test on dry run * fix: remove dummy for dry-run * refactor: use type for bq service * fix: logging result only on non dry-run
1 parent 4ee8e1e commit 0de75f7

File tree

5 files changed

+57
-39
lines changed

5 files changed

+57
-39
lines changed

task/bq2bq/executor/bumblebee/bigquery_service.py

+20-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def transform_load(self,
3535
destination_table=None,
3636
write_disposition=None,
3737
create_disposition=CreateDisposition.CREATE_NEVER,
38+
dry_run=False,
3839
allow_field_addition=False):
3940
pass
4041

@@ -77,8 +78,9 @@ def __init__(self, client, labels, writer, retry_timeout = None, on_job_finish =
7778
self.on_job_finish = on_job_finish
7879
self.on_job_register = on_job_register
7980

80-
def execute_query(self, query):
81+
def execute_query(self, query, dry_run=False):
8182
query_job_config = QueryJobConfig()
83+
query_job_config.dry_run = dry_run
8284
query_job_config.use_legacy_sql = False
8385
query_job_config.labels = self.labels
8486

@@ -89,6 +91,10 @@ def execute_query(self, query):
8991
query_job = self.client.query(query=query,
9092
job_config=query_job_config,
9193
retry=self.retry)
94+
if dry_run:
95+
logger.info("dry-run finished")
96+
return None
97+
9298
logger.info("Job {} is initially in state {} of {} project".format(query_job.job_id, query_job.state,
9399
query_job.project))
94100

@@ -111,6 +117,9 @@ def execute_query(self, query):
111117

112118
if self.on_job_finish is not None:
113119
self.on_job_finish(query_job)
120+
121+
logger.info(result)
122+
logger.info("finished")
114123
return result
115124

116125
def transform_load(self,
@@ -119,11 +128,13 @@ def transform_load(self,
119128
destination_table=None,
120129
write_disposition=None,
121130
create_disposition=CreateDisposition.CREATE_NEVER,
131+
dry_run=False,
122132
allow_field_addition=False):
123133
if query is None or len(query) == 0:
124134
raise ValueError("query must not be Empty")
125135

126136
query_job_config = QueryJobConfig()
137+
query_job_config.dry_run = dry_run
127138
query_job_config.create_disposition = create_disposition
128139
query_job_config.write_disposition = write_disposition
129140
query_job_config.use_legacy_sql = False
@@ -142,6 +153,10 @@ def transform_load(self,
142153
query_job = self.client.query(query=query,
143154
job_config=query_job_config,
144155
retry=self.retry)
156+
if dry_run:
157+
logger.info("dry-run finished")
158+
return None
159+
145160
logger.info("Job {} is initially in state {} of {} project".format(query_job.job_id, query_job.state,
146161
query_job.project))
147162

@@ -164,6 +179,9 @@ def transform_load(self,
164179

165180
if self.on_job_finish is not None:
166181
self.on_job_finish(query_job)
182+
183+
logger.info(result)
184+
logger.info("finished")
167185
return result
168186

169187
def create_table(self, full_table_name, schema_file,
@@ -251,7 +269,7 @@ def execute_query(self, query):
251269
return []
252270

253271
def transform_load(self, query, source_project_id=None, destination_table=None, write_disposition=None,
254-
create_disposition=CreateDisposition.CREATE_NEVER, allow_field_addition=False):
272+
create_disposition=CreateDisposition.CREATE_NEVER, dry_run=False, allow_field_addition=False):
255273
log = """ transform and load with config :
256274
{}
257275
{}

task/bq2bq/executor/bumblebee/bq2bq.py

+1-4
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,7 @@ def bq2bq(properties_file: str,
3737
else:
3838
task_config = TaskConfigFromEnv()
3939

40-
bigquery_service = DummyService()
41-
if not dry_run:
42-
bigquery_service = create_bigquery_service(task_config, job_labels, writer, on_job_finish=on_job_finish, on_job_register=on_job_register)
43-
40+
bigquery_service = create_bigquery_service(task_config, job_labels, writer, on_job_finish=on_job_finish, on_job_register=on_job_register)
4441
transformation = Transformation(bigquery_service,
4542
task_config,
4643
task_files.query,

task/bq2bq/executor/bumblebee/loader.py

+9-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from abc import ABC
55
from abc import abstractmethod
66
from bumblebee.config import LoadMethod
7+
from bumblebee.bigquery_service import BigqueryService
78

89
class BaseLoader(ABC):
910

@@ -14,36 +15,38 @@ def load(self, query):
1415

1516
class PartitionLoader(BaseLoader):
1617

17-
def __init__(self, bigquery_service, destination: str, load_method: LoadMethod, partition: datetime, allow_field_addition=False):
18+
def __init__(self, bigquery_service: BigqueryService, destination: str, load_method: LoadMethod, partition: datetime, allow_field_addition=False):
1819
self.bigquery_service = bigquery_service
1920
self.destination_name = destination
2021
self.load_method = load_method
2122
self.partition_date = partition
2223
self.allow_field_addition = allow_field_addition
2324

24-
def load(self, query):
25+
def load(self, query, dry_run=False):
2526
partition_date_str = self.partition_date.strftime("%Y%m%d")
2627
load_destination = "{}${}".format(self.destination_name, partition_date_str)
2728
write_disposition = self.load_method.write_disposition
2829
allow_field_addition = self.allow_field_addition
2930
return self.bigquery_service.transform_load(query=query,
3031
write_disposition=write_disposition,
3132
destination_table=load_destination,
33+
dry_run = dry_run,
3234
allow_field_addition=allow_field_addition)
3335

3436

3537
class TableLoader(BaseLoader):
3638

37-
def __init__(self, bigquery_service, destination: str, load_method: LoadMethod, allow_field_addition=False):
39+
def __init__(self, bigquery_service: BigqueryService, destination: str, load_method: LoadMethod, allow_field_addition=False):
3840
self.bigquery_service = bigquery_service
3941
self.full_table_name = destination
4042
self.load_method = load_method
4143
self.allow_field_addition = allow_field_addition
4244

43-
def load(self, query):
45+
def load(self, query, dry_run=False):
4446
return self.bigquery_service.transform_load(query=query,
4547
write_disposition=self.load_method.write_disposition,
4648
destination_table=self.full_table_name,
49+
dry_run = dry_run,
4750
allow_field_addition=self.allow_field_addition)
4851

4952

@@ -52,5 +55,5 @@ def __init__(self,bigquery_service: BigqueryService, destination: str):
5255
self.bigquery_service = bigquery_service
5356
self.full_table_name = destination
5457

55-
def load(self,query):
56-
return self.bigquery_service.execute_query(query)
58+
def load(self,query, dry_run=False):
59+
return self.bigquery_service.execute_query(query, dry_run=dry_run)

task/bq2bq/executor/bumblebee/transformation.py

+7-23
Original file line numberDiff line numberDiff line change
@@ -190,14 +190,7 @@ def execute(self):
190190
query = query.apply_parameter(parameter)
191191
query.print_with_logger(logger)
192192

193-
result = None
194-
195-
if not self.dry_run:
196-
result = self.loader.load(query)
197-
logger.info(result)
198-
logger.info("finished")
199-
else:
200-
logger.info("dry-run finished")
193+
self.loader.load(query, dry_run=self.dry_run)
201194

202195
class TableTransformation:
203196
"""
@@ -292,13 +285,7 @@ def execute(self):
292285
logger.info("start transformation job")
293286
self.query.print_with_logger(logger)
294287

295-
result = None
296-
if not self.dry_run:
297-
result = self.loader.load(self.query)
298-
logger.info(result)
299-
logger.info("finished")
300-
else:
301-
logger.info("dry-run finished")
288+
self.loader.load(self.query, dry_run=self.dry_run)
302289

303290
async def async_execute(self):
304291
self.execute()
@@ -353,11 +340,10 @@ def transform(self):
353340
self.partition_column_type)
354341
query.print_with_logger(logger)
355342

356-
if not self.dry_run:
357-
result = self.loader.load(query)
358-
logger.info("finished {}".format(result.total_rows))
359-
else:
360-
logger.info("dry-run finished")
343+
344+
result = self.loader.load(query, dry_run=self.dry_run)
345+
if not self.dry_run and result:
346+
logger.info("total rows: {}".format(result.total_rows))
361347

362348

363349
class MultiPartitionTransformation:
@@ -513,9 +499,7 @@ def collect_datetimes(self):
513499
destination_parameter)
514500
query.print()
515501

516-
results = None
517-
if not self.dry_run:
518-
results = self.bigquery_service.execute_query(query)
502+
results = self.bigquery_service.execute_query(query)
519503

520504
dates = [row[0] for row in results]
521505
datetimes = [datetime.combine(d, datetime.min.time()) for d in dates]

task/bq2bq/executor/tests/test_transformation.py

+20-4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def test_partition_transform_execute(self, BigqueryServiceMock):
5151
bigquery_service.transform_load.assert_called_with(query=final_query,
5252
write_disposition=WriteDisposition.WRITE_TRUNCATE,
5353
destination_table="bq_project.playground_dev.abcd$20190101",
54+
dry_run=False,
5455
allow_field_addition=False)
5556

5657
@mock.patch("bumblebee.bigquery_service.BigqueryService")
@@ -90,6 +91,7 @@ def test_table_transform(self, BigqueryServiceMock):
9091
bigquery_service.transform_load.assert_called_with(query=final_query,
9192
write_disposition=WriteDisposition.WRITE_TRUNCATE,
9293
destination_table="bq_project.playground_dev.abcd",
94+
dry_run=False,
9395
allow_field_addition=False)
9496

9597
@mock.patch("bumblebee.bigquery_service.BigqueryService")
@@ -113,6 +115,7 @@ def test_single_partition_transform_1d_window_0_offset_without_spillover(self, B
113115
bigquery_service.transform_load.assert_called_with(query=final_query,
114116
write_disposition=WriteDisposition.WRITE_TRUNCATE,
115117
destination_table="bq_project.playground_dev.abcd$20190101",
118+
dry_run=False,
116119
allow_field_addition=False)
117120

118121
@mock.patch("bumblebee.bigquery_service.BigqueryService")
@@ -135,6 +138,7 @@ def test_single_partition_transform_2d_window_24h_offset_without_spillover(self,
135138
bigquery_service.transform_load.assert_called_with(query=final_query,
136139
write_disposition=WriteDisposition.WRITE_TRUNCATE,
137140
destination_table="bq_project.playground_dev.abcd$20190104",
141+
dry_run=False,
138142
allow_field_addition=False)
139143

140144
@mock.patch("bumblebee.bigquery_service.BigqueryService")
@@ -174,6 +178,7 @@ def test_single_partition_transform_7d_window_without_spillover(self, BigquerySe
174178
bigquery_service.transform_load.assert_called_with(query=final_query,
175179
write_disposition=WriteDisposition.WRITE_TRUNCATE,
176180
destination_table="bq_project.playground_dev.abcd$20190103",
181+
dry_run=False,
177182
allow_field_addition=False)
178183

179184
@mock.patch("bumblebee.bigquery_service.BigqueryService")
@@ -197,9 +202,11 @@ def test_single_partition_transform_2d_with_spillover(self, BigqueryServiceMock)
197202

198203
calls = [call(query=final_query_1, write_disposition=WriteDisposition.WRITE_TRUNCATE,
199204
destination_table="bq_project.playground_dev.abcd$20190103",
205+
dry_run=False,
200206
allow_field_addition=False),
201207
call(query=final_query_2, write_disposition=WriteDisposition.WRITE_TRUNCATE,
202208
destination_table="bq_project.playground_dev.abcd$20190104",
209+
dry_run=False,
203210
allow_field_addition=False)]
204211
bigquery_service.transform_load.assert_has_calls(calls, any_order=True)
205212
self.assertEqual(len(bigquery_service.transform_load.call_args_list), len(calls))
@@ -220,7 +227,7 @@ def test_dml_transform(self, BigqueryServiceMock):
220227
task.execute()
221228

222229
final_query = """select count(1) from table where date >= '2019-01-02' and date < '2019-01-03'"""
223-
bigquery_service.execute_query.assert_called_with(final_query)
230+
bigquery_service.execute_query.assert_called_with(final_query,dry_run=False)
224231

225232
@mock.patch("bumblebee.bigquery_service.BigqueryService")
226233
def test_execute_dry_run(self, BigqueryServiceMock):
@@ -237,7 +244,13 @@ def test_execute_dry_run(self, BigqueryServiceMock):
237244
task = TableTransformation(bigquery_service, task_config, query, localized_start_time,
238245
localized_end_time, dry_run, localized_execution_time)
239246
task.transform()
240-
bigquery_service.transform_load.assert_not_called()
247+
248+
final_query = "select count(1) from table where date >= '2019-01-01 00:00:00' and date < '2019-01-01 00:00:00'"
249+
bigquery_service.transform_load.assert_called_with(query=final_query,
250+
write_disposition=WriteDisposition.WRITE_TRUNCATE,
251+
destination_table="bq_project.playground_dev.abcd",
252+
dry_run=True,
253+
allow_field_addition=False)
241254

242255

243256
@mock.patch("bumblebee.bigquery_service.BigqueryService")
@@ -262,6 +275,7 @@ def test_allow_field_addition(self, BigqueryServiceMock):
262275
bigquery_service.transform_load.assert_called_with(query=final_query,
263276
write_disposition=WriteDisposition.WRITE_TRUNCATE,
264277
destination_table="bq_project.playground_dev.abcd",
278+
dry_run=False,
265279
allow_field_addition=True)
266280

267281

@@ -289,7 +303,7 @@ def test_should_run_dml_merge_statements(self, BigqueryServiceMock):
289303
transformation.transform()
290304

291305
final_query = """select count(1) from table where date >= '2019-02-01' and date < '2019-02-02'"""
292-
bigquery_service.execute_query.assert_called_with(final_query)
306+
bigquery_service.execute_query.assert_called_with(final_query,dry_run=False)
293307

294308
@mock.patch("bumblebee.bigquery_service.BigqueryService")
295309
def test_should_run_table_task(self, BigqueryServiceMock):
@@ -325,6 +339,7 @@ def get_table_mock(table_name):
325339
bigquery_service.transform_load.assert_called_with(query=final_query,
326340
write_disposition=WriteDisposition.WRITE_APPEND,
327341
destination_table="bq_project.playground_dev.abcd",
342+
dry_run=False,
328343
allow_field_addition=False)
329344

330345
@mock.patch("bumblebee.bigquery_service.BigqueryService")
@@ -362,6 +377,7 @@ def get_table_mock(table_name):
362377
bigquery_service.transform_load.assert_called_with(query=final_query,
363378
write_disposition=WriteDisposition.WRITE_TRUNCATE,
364379
destination_table="bq_project.playground_dev.abcd",
380+
dry_run=False,
365381
allow_field_addition=False)
366382

367383
@mock.patch("bumblebee.bigquery_service.BigqueryService")
@@ -401,7 +417,7 @@ def get_table_mock(table_name):
401417
transformation.transform()
402418

403419
final_query = """-- Optimus generated\nDECLARE partitions ARRAY<DATE>;\n\n\n\nCREATE TEMP TABLE `opt__partitions` AS (\n select count(1) from table where date >= '__dstart__' and date < '__dend__'\n);\n\nSET (partitions) = (\n SELECT AS STRUCT\n array_agg(DISTINCT DATE(`event_timestamp`))\n FROM opt__partitions\n);\n\nMERGE INTO\n `bq_project.playground_dev.abcd` AS target\nUSING\n (\n Select * from `opt__partitions`\n ) AS source\nON FALSE\nWHEN NOT MATCHED BY SOURCE AND DATE(`event_timestamp`) IN UNNEST(partitions)\nTHEN DELETE\nWHEN NOT MATCHED THEN INSERT\n (\n \n )\nVALUES\n (\n \n );\n"""
404-
bigquery_service.execute_query.assert_called_with(final_query)
420+
bigquery_service.execute_query.assert_called_with(final_query,dry_run=False)
405421

406422
@mock.patch("bumblebee.bigquery_service.BigqueryService")
407423
def test_should_fail_if_partition_task_for_ingestion_time_without_filter_in_REPLACE_MERGE(self, BigqueryServiceMock):

0 commit comments

Comments
 (0)