Skip to content

Commit 93f5f3e

Browse files
committed
Improve trigger deletion process
1 parent 5e9d306 commit 93f5f3e

2 files changed

Lines changed: 47 additions & 21 deletions

File tree

tools/python/liveanalytics_influxdb3_migration_plugin/liveanalytics_migration_client/liveanalytics_influxdb3_migration_client.py

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ def bulk_invoke_http_trigger(self, metadata):
417417
None
418418
"""
419419
try:
420-
resources_deleted: bool = False
420+
metadata_table_deleted: bool = False
421421
url = f"{self.influx_host}/api/v3/engine/{TRIGGER_NAME}"
422422
headers = {"Authorization": f"Bearer {self.influx_token}"}
423423
for s3_key in metadata:
@@ -438,10 +438,9 @@ def bulk_invoke_http_trigger(self, metadata):
438438
raise RuntimeError(
439439
f"Migrating {s3_key} failed: {response_body['message']}"
440440
)
441-
if not resources_deleted:
442-
self.delete_trigger()
441+
if not metadata_table_deleted:
443442
self.delete_metadata_table()
444-
resources_deleted = True
443+
metadata_table_deleted = True
445444

446445
# Final verification invocation.
447446
verification_params = {"verify": True, "delete_cache": True}
@@ -457,8 +456,12 @@ def bulk_invoke_http_trigger(self, metadata):
457456
f"Final verification failed: {final_invocation_response.json()['message']}"
458457
)
459458
except Exception as e:
460-
self.error(f"HTTP invocation failed: {e}. View processing engine logs for more information")
461-
sys.exit(1)
459+
self.error(
460+
f"HTTP invocation failed: {e}. View processing engine logs for more information"
461+
)
462+
raise
463+
finally:
464+
self.delete_trigger()
462465

463466
def get_num_completed_and_total_parquet_files(self):
464467
"""
@@ -488,28 +491,47 @@ def delete_trigger(self):
488491
"Content-Type": "application/json",
489492
}
490493

491-
trigger_url = (
492-
f"{self.influx_host}/api/v3/configure/processing_engine_trigger"
494+
disable_trigger_url = (
495+
f"{self.influx_host}/api/v3/configure/processing_engine_trigger/disable"
493496
)
497+
494498
trigger_payload = {
495-
"db": self.influx_database,
499+
"db": self.db_name,
500+
"trigger_name": TRIGGER_NAME,
501+
"plugin_filename": "liveanalytics_migration_plugin/liveanalytics_migration_plugin.py",
502+
"trigger_specification": f"request:{TRIGGER_NAME}", # Creates /api/v3/engine/<TRIGGER_NAME> endpoint.
503+
"trigger_settings": {"run_async": False, "error_behavior": "log"},
504+
"disabled": "true",
505+
"trigger_arguments": {
506+
"db_name": self.db_name,
507+
"s3_bucket": self.s3_bucket_name,
508+
"migration_id": self.migration_id,
509+
},
510+
}
511+
512+
disable_response: requests.Response = requests.post(
513+
disable_trigger_url, params=trigger_payload, headers=headers
514+
)
515+
disable_response.raise_for_status()
516+
517+
delete_trigger_url = (
518+
f"{self.influx_host}/api/v3/configure/processing_engine_trigger"
519+
)
520+
521+
delete_body = {
522+
"db": self.db_name,
496523
"trigger_name": TRIGGER_NAME,
497524
"force": True,
498525
}
499526

500-
response: requests.Response = requests.delete(
501-
trigger_url, json=trigger_payload, headers=headers
527+
delete_response = requests.delete(
528+
delete_trigger_url, json=delete_body, headers=headers
502529
)
503-
if response.status_code == 200 or response.status_code == 201:
504-
self.info(
505-
f"Successfully deleted processing engine trigger: {TRIGGER_NAME}"
506-
)
507-
response.raise_for_status()
530+
delete_response.raise_for_status()
508531
except requests.exceptions.RequestException as e:
509-
self.error("Error deleting processing engine trigger: ", str(e))
510-
532+
self.error("Error deleting processing engine trigger:", str(e))
511533
except Exception as e:
512-
self.error("Failed to delete processing engine trigger: ", str(e))
534+
self.error("Failed to delete processing engine trigger:", str(e))
513535
return
514536

515537
def delete_metadata_table(self):

tools/python/liveanalytics_influxdb3_migration_plugin/liveanalytics_migration_plugin/liveanalytics_migration_plugin.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,9 @@ def ingest_parquet_file_in_chunks(
479479

480480
# Process each record in the chunk.
481481
for row in df_chunk.itertuples(index=False, name=None):
482-
line_protocol = transform_row_to_lp(influxdb3_local, row, table_name, column_types)
482+
line_protocol = transform_row_to_lp(
483+
influxdb3_local, row, table_name, column_types
484+
)
483485
if len(line_protocol.fields.items()) == 0:
484486
influxdb3_local.info(
485487
f"Line protocol was ignored as no fields were set: {line_protocol}"
@@ -562,7 +564,9 @@ def transform_row_to_lp(influxdb3_local, row, table_name, column_types):
562564
elif isinstance(val, (bool, numpy.bool_, pandas.BooleanDtype().type)):
563565
builder.bool_field(column_name, bool(val))
564566
else:
565-
influxdb3_local.error(f"Failed to parse column name: {column_name} value: {val} type: {column_type}")
567+
influxdb3_local.error(
568+
f"Failed to parse column name: {column_name} value: {val} type: {column_type}"
569+
)
566570

567571
return builder
568572

0 commit comments

Comments
 (0)