From a35bff34819024c0ff078e48b32bcf2db5e7cda3 Mon Sep 17 00:00:00 2001 From: Niranjan Kumar Date: Tue, 7 Jan 2025 17:19:28 +0530 Subject: [PATCH 1/3] Added Background Processing Using Job Queue --- g2p_social_registry_importer/__manifest__.py | 2 + .../data/ir_config_params.xml | 7 + .../data/ir_cron_data.xml | 13 + .../models/__init__.py | 1 + .../fetch_social_registry_beneficiary.py | 485 ++++++++++++++---- .../models/res_config.py | 9 + ...etch_social_registry_beneficiary_views.xml | 94 +++- .../views/res_config_view.xml | 27 + 8 files changed, 537 insertions(+), 101 deletions(-) create mode 100644 g2p_social_registry_importer/data/ir_cron_data.xml create mode 100644 g2p_social_registry_importer/models/res_config.py create mode 100644 g2p_social_registry_importer/views/res_config_view.xml diff --git a/g2p_social_registry_importer/__manifest__.py b/g2p_social_registry_importer/__manifest__.py index 52e1170d..f10ba993 100644 --- a/g2p_social_registry_importer/__manifest__.py +++ b/g2p_social_registry_importer/__manifest__.py @@ -15,10 +15,12 @@ ], "data": [ "data/ir_config_params.xml", + "data/ir_cron_data.xml", "data/social_registry_data_source.xml", "data/search_criteria.xml", "security/ir.model.access.csv", "views/fetch_social_registry_beneficiary_views.xml", + "views/res_config_view.xml", ], "external_dependencies": {"python": ["camel_converter"]}, "application": True, diff --git a/g2p_social_registry_importer/data/ir_config_params.xml b/g2p_social_registry_importer/data/ir_config_params.xml index 30ef1531..43bd9e66 100644 --- a/g2p_social_registry_importer/data/ir_config_params.xml +++ b/g2p_social_registry_importer/data/ir_config_params.xml @@ -20,4 +20,11 @@ name="set_param" eval="('g2p_import_social_registry.max_registrants_count_job_queue', 100)" /> + + + diff --git a/g2p_social_registry_importer/data/ir_cron_data.xml b/g2p_social_registry_importer/data/ir_cron_data.xml new file mode 100644 index 00000000..3b89f5cb --- /dev/null +++ b/g2p_social_registry_importer/data/ir_cron_data.xml @@ -0,0 +1,13 @@ + + + + Process Social Registry Imports + + code + model.fetch_social_registry_beneficiary() + 10 + minutes + -1 + + + diff --git a/g2p_social_registry_importer/models/__init__.py b/g2p_social_registry_importer/models/__init__.py index 1f279e70..04b57bcc 100644 --- a/g2p_social_registry_importer/models/__init__.py +++ b/g2p_social_registry_importer/models/__init__.py @@ -2,3 +2,4 @@ from . import fetch_social_registry_beneficiary from . import imported_registrants from . import res_partner +from . import res_config diff --git a/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py b/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py index 1f928124..b5d55278 100644 --- a/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py +++ b/g2p_social_registry_importer/models/fetch_social_registry_beneficiary.py @@ -49,6 +49,127 @@ class G2PFetchSocialRegistryBeneficiary(models.Model): readonly=True, ) + # Job status tracking fields + job_status = fields.Selection( + [ + ("draft", "Draft"), + ("started", "Started"), + ("running", "Running"), + ("completed", "Completed"), + ("failed", "Failed"), + ], + default="draft", + ) + + # Timestamps for tracking job execution + start_datetime = fields.Datetime("Start Time") + end_datetime = fields.Datetime("End Time") + cron_id = fields.Many2one("ir.cron", string="Cron Job") # Reference to scheduled job + + interval_number = fields.Integer(default=1, help="Repeat every x.") + interval_type = fields.Selection( + [ + ("minutes", "Minutes"), + ("hours", "Hours"), + ("days", "Days"), + ("weeks", "Weeks"), + ("months", "Months"), + ], + string="Interval Unit", + default="hours", + ) + cron_running = fields.Boolean(compute="_compute_cron_running") + + def test_connection(self): + """Test the connection to the Social Registry API by validating: + 1. Data source configuration + 2. Authentication endpoints + 3. Authentication credentials + 4. Successful token retrieval + """ + self.ensure_one() + try: + # Step 1: Validate data source configuration + if not self.data_source_id: + raise ValidationError(_("Data source is not configured")) + + if not self.data_source_id.url: + raise ValidationError(_("Data source URL is not configured")) + + # Step 2: Get and validate paths + try: + paths = self.get_data_source_paths() + auth_url = self.get_social_registry_auth_url(paths) + except ValidationError as e: + raise ValidationError(_("Data source path configuration error: %s") % str(e)) from e + + # Step 3: Validate required credentials + client_id = ( + self.env["ir.config_parameter"].sudo().get_param("g2p_import_social_registry.client_id") + ) + client_secret = ( + self.env["ir.config_parameter"].sudo().get_param("g2p_import_social_registry.client_password") + ) + grant_type = ( + self.env["ir.config_parameter"].sudo().get_param("g2p_import_social_registry.grant_type") + ) + + if not all([client_id, client_secret, grant_type]): + raise ValidationError( + _("Missing credentials: Please configure client ID, secret and grant type") + ) + + # Step 4: Test authentication + try: + auth_token = self.get_auth_token(auth_url) + if not auth_token: + raise ValidationError(_("Authentication failed: No token received")) + except requests.exceptions.ConnectionError as e: + raise ValidationError( + _("Connection failed: Unable to reach authentication server at %s") % auth_url + ) from e + except requests.exceptions.Timeout as e: + raise ValidationError(_("Connection timed out while contacting authentication server")) from e + except requests.exceptions.RequestException as e: + raise ValidationError(_("Connection error: %s") % str(e)) from e + + # If we get here, all checks passed + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("Connection Test"), + "message": _("Successfully connected to Social Registry API and authenticated"), + "type": "success", + "sticky": False, + }, + } + + except ValidationError as ve: + _logger.warning("Connection test failed: %s", str(ve)) + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("Connection Test Failed"), + "message": str(ve), + "type": "danger", + "sticky": True, + }, + } + except Exception as e: + _logger.error("Unexpected error during connection test: %s", str(e), exc_info=True) + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("Connection Test Failed"), + "message": _("Unexpected error occurred. Please check the logs."), + "type": "danger", + "sticky": True, + }, + } + @api.onchange("registry") def onchange_target_registry(self): for rec in self: @@ -430,131 +551,297 @@ def process_registrants(self, registrants): self.process_record(record) def process_registrants_async(self, registrants, count): - max_registrant = int( - self.env["ir.config_parameter"] - .sudo() - .get_param("g2p_import_social_registry.max_registrants_count_job_queue") - ) - _logger.warning("Fetching Registrant Asynchronously!") - jobs = [] - for i in range(0, count, max_registrant): - jobs.append(self.delayable().process_registrants(registrants[i : i + max_registrant])) - main_job = group(*jobs) - main_job.delay() + """Queue registrants for asynchronous processing in batches.""" + try: + self.write({"job_status": "running"}) + + max_registrant = int( + self.env["ir.config_parameter"] + .sudo() + .get_param("g2p_import_social_registry.max_registrants_count_job_queue") + ) + + actual_count = min(len(registrants), count) + total_batches = -(-(actual_count) // max_registrant) + + jobs = [] + for i in range(0, actual_count, max_registrant): + batch = registrants[i : i + max_registrant] + batch_num = i // max_registrant + 1 + + description = f"{self.name} - Batch {batch_num}/{total_batches} ({len(batch)} registrants)" + jobs.append(self.with_delay(description=description).process_registrants(batch)) + + # Queue all batch jobs and add a final job to update status when complete + main_job = group(*jobs) + # Add callback to mark completion and update timestamps + main_job.on_done( + self.with_delay(description="Update import status").write( + { + "job_status": "completed", + "end_datetime": fields.Datetime.now(), + "last_sync_date": fields.Datetime.now(), + } + ) + ) + main_job.delay() + + return True + + except Exception as e: + self.write({"job_status": "failed", "end_datetime": fields.Datetime.now()}) + _logger.error(f"Failed to queue batch jobs: {str(e)}") + raise def fetch_social_registry_beneficiary(self): - config_parameters = self.env["ir.config_parameter"].sudo() - today_isoformat = datetime.now(timezone.utc).isoformat() - social_registry_version = config_parameters.get_param("social_registry_version") - max_registrant = int( - config_parameters.get_param("g2p_import_social_registry.max_registrants_count_job_queue") + """Main entry point for fetching beneficiaries. + Handles both sync and async processing based on configuration""" + enable_async = ( + self.env["ir.config_parameter"].sudo().get_param("g2p_import_social_registry.enable_async") ) - message_id = str(uuid.uuid4()) - transaction_id = str(uuid.uuid4()) - reference_id = str(uuid.uuid4()) + if enable_async: + return self._schedule_async_import() + else: + return self._fetch_Beneficiary() + + def _fetch_Beneficiary(self): + """Synchronous import process""" + try: + # Update status to running when sync process starts + self.write({"job_status": "running", "start_datetime": fields.Datetime.now()}) + + config_parameters = self.env["ir.config_parameter"].sudo() + today_isoformat = datetime.now(timezone.utc).isoformat() + social_registry_version = config_parameters.get_param("social_registry_version") + max_registrant = int( + config_parameters.get_param("g2p_import_social_registry.max_registrants_count_job_queue") + ) - # Define Data Source - paths = self.get_data_source_paths() + message_id = str(uuid.uuid4()) + transaction_id = str(uuid.uuid4()) + reference_id = str(uuid.uuid4()) - # Define Social Registry auth url + # Define Data Source + paths = self.get_data_source_paths() - full_social_registry_auth_url = self.get_social_registry_auth_url(paths) + # Define Social Registry auth url - # Retrieve auth token + full_social_registry_auth_url = self.get_social_registry_auth_url(paths) - auth_token = self.get_auth_token(full_social_registry_auth_url) + # Retrieve auth token - # Define Social Registry search url - full_social_registry_search_url = self.get_social_registry_search_url(paths) + auth_token = self.get_auth_token(full_social_registry_auth_url) - # Define header - header = self.get_header_for_body( - social_registry_version, - today_isoformat, - message_id, - ) + # Define Social Registry search url + full_social_registry_search_url = self.get_social_registry_search_url(paths) - # Define message - message = self.get_message( - today_isoformat, - transaction_id=transaction_id, - reference_id=reference_id, - ) + # Define header + header = self.get_header_for_body( + social_registry_version, + today_isoformat, + message_id, + ) - signature = "" + # Define message + message = self.get_message( + today_isoformat, + transaction_id=transaction_id, + reference_id=reference_id, + ) - # Define data - data = self.get_data( - signature, - header, - message, - ) + signature = "" - data = json.dumps(data) + # Define data + data = self.get_data( + signature, + header, + message, + ) - # POST Request - response = requests.post( - full_social_registry_search_url, - data=data, - headers={"Authorization": auth_token}, - timeout=constants.REQUEST_TIMEOUT, - ) + data = json.dumps(data) - if not response.ok: - _logger.error("Social Registry Search API response: %s", response.text) - response.raise_for_status() + # POST Request + response = requests.post( + full_social_registry_search_url, + data=data, + headers={"Authorization": auth_token}, + timeout=constants.REQUEST_TIMEOUT, + ) - sticky = False + if not response.ok: + _logger.error("Social Registry Search API response: %s", response.text) + response.raise_for_status() - # Process response - if response.ok: - kind = "success" - message = _("Successfully Imported Social Registry Beneficiaries") + sticky = False - search_responses = response.json().get("message", {}).get("search_response", []) - if not search_responses: - kind = "warning" - message = _("No imported beneficiary") + # Process response + if response.ok: + kind = "success" + message = _("Successfully Imported Social Registry Beneficiaries") - for search_response in search_responses: - reg_record = search_response.get("data", {}).get("reg_records", []) - registrants = reg_record.get("getRegistrants", []) - total_partners_count = reg_record.get("totalRegistrantCount", "") + search_responses = response.json().get("message", {}).get("search_response", []) + if not search_responses: + kind = "warning" + message = _("No imported beneficiary") - if total_partners_count: - if total_partners_count < max_registrant: - self.process_registrants(registrants) + for search_response in search_responses: + reg_record = search_response.get("data", {}).get("reg_records", []) + registrants = reg_record.get("getRegistrants", []) + total_partners_count = reg_record.get("totalRegistrantCount", "") + + if total_partners_count: + if total_partners_count < max_registrant: + self.process_registrants(registrants) + + else: + self.process_registrants_async(registrants, total_partners_count) + kind = "success" + message = _("Fetching from Social Registry Started Asynchronously.") + sticky = True else: - self.process_registrants_async(registrants, total_partners_count) kind = "success" - message = _("Fetching from Social Registry Started Asynchronously.") - sticky = True + message = _("No matching records found.") - else: - kind = "success" - message = _("No matching records found.") + # Update completion status and force refresh + self.write( + { + "job_status": "completed", + "end_datetime": fields.Datetime.now(), + "last_sync_date": fields.Datetime.now(), + } + ) + + else: + self.write({"job_status": "failed", "end_datetime": fields.Datetime.now()}) + kind = "danger" + message = response.json().get("error", {}).get("message", "") + if not message: + message = _("{reason}: Unable to connect to API.").format(reason=response.reason) + + action = { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("Social Registry"), + "message": message, + "sticky": sticky, + "type": kind, + "next": { + "type": "ir.actions.act_window_close", + }, + }, + } + return action + + except Exception as e: + self.write({"job_status": "failed", "end_datetime": fields.Datetime.now()}) + _logger.error("Social Registry import failed: %s", str(e)) + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("Social Registry"), + "message": _("Import failed: %s") % str(e), + "type": "danger", + "sticky": True, + }, + } + + def social_registry_import_action_trigger(self): + """ + Trigger the social registry import action - starts/stops the automated import process + """ + # Check if asynchronous import is enabled + enable_async = ( + self.env["ir.config_parameter"].sudo().get_param("g2p_import_social_registry.enable_async") + ) - self.last_sync_date = fields.Datetime.now() + if not enable_async: + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("Asynchronous Import Disabled"), + "message": _("Asynchronous import is disabled in system configuration."), + "type": "warning", + "sticky": True, + }, + } - else: - kind = "danger" - message = response.json().get("error", {}).get("message", "") - if not message: - message = _("{reason}: Unable to connect to API.").format(reason=response.reason) - - action = { - "type": "ir.actions.client", - "tag": "display_notification", - "params": { - "title": _("Social Registry"), - "message": message, - "sticky": sticky, - "type": kind, - "next": { - "type": "ir.actions.act_window_close", + for rec in self: + if rec.job_status == "draft" or rec.job_status == "completed": + # Start the import job + _logger.info("Job Started") + rec.job_status = "started" + + # Create scheduled job (cron) + ir_cron = self.env["ir.cron"].sudo() + rec.cron_id = ir_cron.create( + { + "name": f"Social Registry Import Cron {rec.name} #{rec.id}", + "active": True, + "interval_number": self.interval_number, + "interval_type": self.interval_type, + "model_id": self.env["ir.model"] + .search([("model", "=", "g2p.fetch.social.registry.beneficiary")]) + .id, + "state": "code", + "code": f"model.browse({rec.id}).fetch_social_registry_beneficiary()", + "doall": False, + "numbercall": -1, + } + ) + rec.job_status = "running" + + elif rec.job_status == "started" or rec.job_status == "running": + # Stop the import job + _logger.info("Job Stopped") + rec.job_status = "completed" + rec.sudo().cron_id.unlink() + rec.cron_id = None + + def _schedule_async_import(self): + """Schedule asynchronous import process""" + try: + _logger.info(f"Starting async import for {self.name}") + + self.write({"job_status": "started", "start_datetime": fields.Datetime.now()}) + + description = f"Scheduled Social Registry Import - {self.name}" + job = self.with_delay(description=description)._fetch_Beneficiary() + + _logger.info(f"Successfully queued async import job {job.uuid}") + + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("Social Registry"), + "message": _("Async import scheduled successfully"), + "type": "success", + "sticky": False, }, - }, - } - return action + } + + except Exception as e: + _logger.error(f"Failed to schedule async import: {str(e)}", exc_info=True) + self.write({"job_status": "failed", "end_datetime": fields.Datetime.now()}) + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("Social Registry"), + "message": _("Failed to schedule async import: %s") % str(e), + "type": "danger", + "sticky": True, + }, + } + + def _compute_cron_running(self): + """Compute whether the cron job is currently running for this import""" + for rec in self: + rec.cron_running = bool( + rec.cron_id and rec.cron_id.active and rec.job_status in ["started", "running"] + ) diff --git a/g2p_social_registry_importer/models/res_config.py b/g2p_social_registry_importer/models/res_config.py new file mode 100644 index 00000000..8d1c705d --- /dev/null +++ b/g2p_social_registry_importer/models/res_config.py @@ -0,0 +1,9 @@ +from odoo import fields, models + +class RegistryConfig(models.TransientModel): + _inherit = "res.config.settings" + + enable_social_registry_async = fields.Boolean( + config_parameter="g2p_import_social_registry.enable_async", + string="Enable Background Processing" + ) diff --git a/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml b/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml index faa9a4fb..2a69ae42 100644 --- a/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml +++ b/g2p_social_registry_importer/views/fetch_social_registry_beneficiary_views.xml @@ -1,6 +1,17 @@ + + + Import Social Registry beneficiary + + + code + + action = records.social_registry_import_action_trigger() + + + fetch_social_registry_beneficiary_tree g2p.fetch.social.registry.beneficiary @@ -8,6 +19,31 @@ + +