diff --git a/docs/howto/how_to_bulk_transform.rst b/docs/howto/how_to_bulk_transform.rst index 806de623..1164054d 100644 --- a/docs/howto/how_to_bulk_transform.rst +++ b/docs/howto/how_to_bulk_transform.rst @@ -35,7 +35,7 @@ Modes Of Operation The command can work in a few distinct ways. -**File(s) to learning record store (LRS)** - this will use the existing event-routing-backends configuration to route any log replays to **all** configured LRS backends just like the event was being emitted right now. This can be used to backfill old data, capture old events that didn't previously have transforms, or fix up lost data from downtime issues. +**File(s) to learning record store (LRS)** - this will use the existing event-routing-backends configuration to route any log replays to **all** configured LRS backends by default, just like the event was being emitted right now. This can be used to backfill old data, capture old events that didn't previously have transforms, or fix up lost data from downtime issues. To target specific LRSs, you can use the ``--lrs-urls`` option to specify one or more LRS endpoints by their `route_url`. When provided, the command will route the transformed events exclusively to the specified LRSs, rather than all configured ones. **File(s) to file(s)** - This will perform the same transformations as usual, but instead of routing them to an LRS they can be saved as a file to any libcloud destination. In this mode all events are saved to a single file and no filters are applied. @@ -65,6 +65,16 @@ Examples --destination_provider LRS \ --transformer_type xapi +:: + + # Transform all events in the local file /openedx/data/tracking.log to the specified LRSs + python manage.py lms transform_tracking_logs \ + --source_provider LOCAL \ + --source_config '{"key": "/openedx/data/", "prefix": "tracking.log", "container": "logs"}' \ + --destination_provider LRS \ + --transformer_type xapi \ + --lrs-urls http://lrs1.example.com http://lrs2.example.com + :: # Transform all events in the local file /openedx/data/tracking.log to all configured LRSs diff --git a/event_routing_backends/backends/events_router.py b/event_routing_backends/backends/events_router.py index 6d6965b9..cb903d21 100644 --- a/event_routing_backends/backends/events_router.py +++ b/event_routing_backends/backends/events_router.py @@ -64,11 +64,15 @@ def configure_host(self, host, router): return host - def prepare_to_send(self, events): + def prepare_to_send(self, events, router_urls=None): """ Prepare a list of events to be sent and create a processed, filtered batch for each router. + If router_urls are explicitly mentioned, then only use the specified routers """ routers = RouterConfiguration.get_enabled_routers(self.backend_name) + if router_urls: + routers = routers.filter(route_url__in=router_urls) + business_critical_events = get_business_critical_events() route_events = {} @@ -139,7 +143,7 @@ def get_failed_events(self, batch_size): return [] return [json.loads(event.decode('utf-8')) for event in failed_events] - def bulk_send(self, events): + def bulk_send(self, events, router_urls=None): """ Send the event to configured routers after processing it. @@ -150,7 +154,7 @@ def bulk_send(self, events): Arguments: events (list[dict]): list of original event dictionaries """ - event_routes = self.prepare_to_send(events) + event_routes = self.prepare_to_send(events, router_urls) for events_for_route in event_routes.values(): prepared_events = [] diff --git a/event_routing_backends/backends/tests/test_events_router.py b/event_routing_backends/backends/tests/test_events_router.py index 511400ff..f140dc27 100644 --- a/event_routing_backends/backends/tests/test_events_router.py +++ b/event_routing_backends/backends/tests/test_events_router.py @@ -917,6 +917,66 @@ def test_successful_routing_of_bulk_events( # test mocked oauth client mocked_oauth_client.assert_not_called() + @patch("event_routing_backends.tasks.dispatch_bulk_events.delay") + @patch("event_routing_backends.utils.http_client.requests.post") + @patch("event_routing_backends.utils.xapi_lrs_client.RemoteLRS") + def test_bulk_send_routes_events_based_on_configured_urls( + self, mocked_lrs, mocked_post, mock_dispatch_event + ): + TieredCache.dangerous_clear_all_tiers() + mocked_oauth_client = MagicMock() + mocked_api_key_client = MagicMock() + + MOCKED_MAP = { + "AUTH_HEADERS": HttpClient, + "OAUTH2": mocked_oauth_client, + "API_KEY": mocked_api_key_client, + "XAPI_LRS": LrsClient, + } + RouterConfigurationFactory.create( + backend_name=RouterConfiguration.XAPI_BACKEND, + enabled=True, + route_url="http://test1.com", + auth_scheme=RouterConfiguration.AUTH_BASIC, + auth_key=None, + username="abc", + password="xyz", + configurations=ROUTER_CONFIG_FIXTURE[0], + ) + RouterConfigurationFactory.create( + backend_name=RouterConfiguration.XAPI_BACKEND, + enabled=True, + route_url="http://test2.com", + auth_scheme=RouterConfiguration.AUTH_BASIC, + auth_key=None, + username="abc1", + password="xyz1", + configurations=ROUTER_CONFIG_FIXTURE[0], + ) + + router = AsyncEventsRouter( + processors=[], backend_name=RouterConfiguration.XAPI_BACKEND + ) + + with patch.dict( + "event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING", MOCKED_MAP + ): + router.bulk_send(self.bulk_transformed_events) + + assert mock_dispatch_event.call_count == 2 + + # Reset mock before the next call + mock_dispatch_event.reset_mock() + + with patch.dict( + "event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING", MOCKED_MAP + ): + router.bulk_send( + self.bulk_transformed_events, router_urls=["http://test1.com"] + ) + + assert mock_dispatch_event.call_count == 1 + @ddt.ddt class TestSyncEventsRouter(TestEventsRouter): # pylint: disable=test-inherits-tests diff --git a/event_routing_backends/management/commands/helpers/queued_sender.py b/event_routing_backends/management/commands/helpers/queued_sender.py index 8a7ecf5e..b2b5a045 100644 --- a/event_routing_backends/management/commands/helpers/queued_sender.py +++ b/event_routing_backends/management/commands/helpers/queued_sender.py @@ -24,7 +24,8 @@ def __init__( # pylint: disable=too-many-positional-arguments transformer_type, max_queue_size=10000, sleep_between_batches_secs=1.0, - dry_run=False + dry_run=False, + lrs_urls=None ): self.destination = destination self.destination_container = destination_container @@ -34,6 +35,7 @@ def __init__( # pylint: disable=too-many-positional-arguments self.max_queue_size = max_queue_size self.sleep_between_batches = sleep_between_batches_secs self.dry_run = dry_run + self.lrs_urls = lrs_urls or [] # Bookkeeping self.queued_lines = 0 @@ -101,7 +103,7 @@ def send(self): """ if self.destination == "LRS": print(f"Sending {len(self.event_queue)} events to LRS...") - self.backend.bulk_send(self.event_queue) + self.backend.bulk_send(self.event_queue, self.lrs_urls) else: print("Skipping send, we're storing with libcloud instead of an LRS.") diff --git a/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py index 60cc412d..fb5c6d59 100644 --- a/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py +++ b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py @@ -163,6 +163,24 @@ def command_options(): }, "whitelist": ["problem_check"] }, + # Test with LRS URLs + { + "transformer_type": "xapi", + "source_provider": "MINIO", + "source_config": REMOTE_CONFIG, + "lrs_urls": ["http://lrs1.com", "http://lrs2.com"], + "expected_results": { + "expected_batches_sent": 1, + "log_lines": [ + "Looking for log files in test_bucket/xapi_statements/*", + "Finalizing 2 events to LRS", + "Sending to LRS!", + "Sending 2 events to LRS...", + "Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines, sent 1 batches.", + ], + }, + "whitelist": ["problem_check"], + }, ] for option in options: @@ -189,7 +207,8 @@ def _get_raw_log_stream(_, start_bytes, chunk_size): @pytest.mark.parametrize("command_opts", command_options()) -def test_transform_command(command_opts, mock_common_calls, caplog, capsys): +@patch("event_routing_backends.management.commands.transform_tracking_logs.RouterConfiguration") +def test_transform_command(MockRouterConfiguration, command_opts, mock_common_calls, caplog, capsys): """ Test the command and QueuedSender with a variety of options. """ @@ -198,6 +217,12 @@ def test_transform_command(command_opts, mock_common_calls, caplog, capsys): expected_results = command_opts.pop("expected_results") transform_tracking_logs.CHUNK_SIZE = command_opts.pop("chunk_size", 1024*1024*2) + # Mock RouterConfiguration to return specific URLs + MockRouterConfiguration.objects.filter.return_value.values_list.return_value = [ + "http://lrs1.com", + "http://lrs2.com", + ] + mm = MagicMock() mock_log_object = MagicMock() @@ -244,6 +269,44 @@ def test_transform_command(command_opts, mock_common_calls, caplog, capsys): assert line in caplog.text or line in captured.out +@patch("event_routing_backends.management.commands.transform_tracking_logs.RouterConfiguration") +def test_invalid_lrs_urls(MockRouterConfiguration, mock_common_calls, caplog): + """ + Test that a ValueError is raised when invalid LRS URLs are provided. + """ + command_opts = { + "transformer_type": "xapi", + "source_provider": "MINIO", + "source_config": REMOTE_CONFIG, + "lrs_urls": ["http://lrs3-invalid.com"], + } + + mock_libcloud_provider, mock_libcloud_get_driver = mock_common_calls + + MockRouterConfiguration.objects.filter.return_value.values_list.return_value = [ + "http://lrs1.com", + "http://lrs2.com", + ] + + transform_tracking_logs.CHUNK_SIZE = command_opts.pop("chunk_size", 1024 * 1024 * 2) + + mm = MagicMock() + + mock_log_object = MagicMock() + mock_log_object.__str__.return_value = "tracking.log" + mock_log_object.name = "tracking.log" + mock_log_object.size = _get_raw_log_size() + + # Fake finding one log file in each container, it will be loaded and parsed twice + mm.return_value.iterate_container_objects.return_value = [mock_log_object] + mm.return_value.download_object_range_as_stream = _get_raw_log_stream + mock_libcloud_get_driver.return_value = mm + + # Run command with invalid route_urls and assert ValueError is raised + with pytest.raises(ValueError): + call_command("transform_tracking_logs", **command_opts) + + def test_queued_sender_store_on_lrs(mock_common_calls, capsys): """ Test that we don't attempt to store on an LRS backend. diff --git a/event_routing_backends/management/commands/transform_tracking_logs.py b/event_routing_backends/management/commands/transform_tracking_logs.py index 4b8b7750..74c35dce 100644 --- a/event_routing_backends/management/commands/transform_tracking_logs.py +++ b/event_routing_backends/management/commands/transform_tracking_logs.py @@ -13,6 +13,7 @@ from libcloud.storage.types import Provider from event_routing_backends.management.commands.helpers.queued_sender import QueuedSender +from event_routing_backends.models import RouterConfiguration # Number of bytes to download at a time, this is 2 MB CHUNK_SIZE = 1024 * 1024 * 2 @@ -159,6 +160,25 @@ def validate_destination(driver, container_name, prefix, source_objects): print(f"Wrote source file list to '{container_name}/{full_path}'") +def validate_lrs_routes(lrs_urls): + """ + Validate that the provided LRS URLs are present and enabled in the RouterConfiguration. + + Raises a ValueError if any of the URLs are missing or not enabled. + """ + if lrs_urls: + missing_urls = set(lrs_urls) - set( + RouterConfiguration.objects.filter( + route_url__in=lrs_urls, enabled=True + ).values_list("route_url", flat=True) + ) + if missing_urls: + raise ValueError( + "The following LRS URLs are not present or not enabled in the ", + f"RouterConfiguration: {', '.join(missing_urls)}" + ) + + def get_libcloud_drivers(source_provider, source_config, destination_provider, destination_config): """ Attempt to configure the libcloud drivers for source and destination. @@ -256,6 +276,15 @@ def add_arguments(self, parser): help="Attempt to transform all lines from all files, but do not send to the destination.", ) + parser.add_argument( + '--lrs-urls', + nargs='+', + type=str, + default=None, + help="Specify the LRS route_url(s) to send data to " + "(e.g., --lrs-urls http://lrs1.example.com http://lrs2.example.com).", + ) + def handle(self, *args, **options): """ Configure the command and start the transform process. @@ -272,11 +301,13 @@ def handle(self, *args, **options): options["destination_provider"], dest_config ) + lrs_urls = options.get('lrs_urls') source_file_list = validate_source_and_files(source_driver, source_container, source_prefix) if dest_driver != "LRS": validate_destination(dest_driver, dest_container, dest_prefix, source_file_list) else: + validate_lrs_routes(lrs_urls) print(f"Found {len(source_file_list)} source files: ", *source_file_list, sep="\n") sender = QueuedSender( @@ -286,7 +317,8 @@ def handle(self, *args, **options): options["transformer_type"], max_queue_size=options["batch_size"], sleep_between_batches_secs=options["sleep_between_batches_secs"], - dry_run=options["dry_run"] + dry_run=options["dry_run"], + lrs_urls=lrs_urls ) transform_tracking_logs(