Skip to content

Commit 6afb9c3

Browse files
committed
feat: Add --lrs-urls option to transform_tracking_logs command
- Introduced the --lrs-urls flag to specify target Learning Record Stores (LRS) by their route_url. - Updated documentation to reflect the new usage of the --lrs-urls option. - Added tests to ensure correct functionality when using the --lrs-urls flag. close #483
1 parent 54b9067 commit 6afb9c3

File tree

6 files changed

+179
-8
lines changed

6 files changed

+179
-8
lines changed

docs/howto/how_to_bulk_transform.rst

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ Modes Of Operation
3535

3636
The command can work in a few distinct ways.
3737

38-
**File(s) to learning record store (LRS)** - this will use the existing event-routing-backends configuration to route any log replays to **all** configured LRS backends just like the event was being emitted right now. This can be used to backfill old data, capture old events that didn't previously have transforms, or fix up lost data from downtime issues.
38+
**File(s) to learning record store (LRS)** - this will use the existing event-routing-backends configuration to route any log replays to **all** configured LRS backends by default, just like the event was being emitted right now. This can be used to backfill old data, capture old events that didn't previously have transforms, or fix up lost data from downtime issues. To target specific LRSs, you can use the ``--lrs-urls`` option to specify one or more LRS endpoints by their `route_url`. When provided, the command will route the transformed events exclusively to the specified LRSs, rather than all configured ones.
3939

4040
**File(s) to file(s)** - This will perform the same transformations as usual, but instead of routing them to an LRS they can be saved as a file to any libcloud destination. In this mode all events are saved to a single file and no filters are applied.
4141

@@ -65,6 +65,16 @@ Examples
6565
--destination_provider LRS \
6666
--transformer_type xapi
6767

68+
::
69+
70+
# Transform all events in the local file /openedx/data/tracking.log to the specified LRSs
71+
python manage.py lms transform_tracking_logs \
72+
--source_provider LOCAL \
73+
--source_config '{"key": "/openedx/data/", "prefix": "tracking.log", "container": "logs"}' \
74+
--destination_provider LRS \
75+
--transformer_type xapi \
76+
--lrs-urls http://lrs1.example.com http://lrs2.example.com
77+
6878
::
6979

7080
# Transform all events in the local file /openedx/data/tracking.log to all configured LRSs

event_routing_backends/backends/events_router.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,15 @@ def configure_host(self, host, router):
6464

6565
return host
6666

67-
def prepare_to_send(self, events):
67+
def prepare_to_send(self, events, router_urls=None):
6868
"""
6969
Prepare a list of events to be sent and create a processed, filtered batch for each router.
70+
If router_urls are explicitly mentioned, then only use the specified routers
7071
"""
7172
routers = RouterConfiguration.get_enabled_routers(self.backend_name)
73+
if router_urls:
74+
routers = routers.filter(route_url__in=router_urls)
75+
7276
business_critical_events = get_business_critical_events()
7377
route_events = {}
7478

@@ -139,7 +143,7 @@ def get_failed_events(self, batch_size):
139143
return []
140144
return [json.loads(event.decode('utf-8')) for event in failed_events]
141145

142-
def bulk_send(self, events):
146+
def bulk_send(self, events, router_urls=None):
143147
"""
144148
Send the event to configured routers after processing it.
145149
@@ -150,7 +154,7 @@ def bulk_send(self, events):
150154
Arguments:
151155
events (list[dict]): list of original event dictionaries
152156
"""
153-
event_routes = self.prepare_to_send(events)
157+
event_routes = self.prepare_to_send(events, router_urls)
154158

155159
for events_for_route in event_routes.values():
156160
prepared_events = []

event_routing_backends/backends/tests/test_events_router.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -917,6 +917,66 @@ def test_successful_routing_of_bulk_events(
917917
# test mocked oauth client
918918
mocked_oauth_client.assert_not_called()
919919

920+
@patch("event_routing_backends.tasks.dispatch_bulk_events.delay")
921+
@patch("event_routing_backends.utils.http_client.requests.post")
922+
@patch("event_routing_backends.utils.xapi_lrs_client.RemoteLRS")
923+
def test_bulk_send_routes_events_based_on_configured_urls(
924+
self, mocked_lrs, mocked_post, mock_dispatch_event
925+
):
926+
TieredCache.dangerous_clear_all_tiers()
927+
mocked_oauth_client = MagicMock()
928+
mocked_api_key_client = MagicMock()
929+
930+
MOCKED_MAP = {
931+
"AUTH_HEADERS": HttpClient,
932+
"OAUTH2": mocked_oauth_client,
933+
"API_KEY": mocked_api_key_client,
934+
"XAPI_LRS": LrsClient,
935+
}
936+
RouterConfigurationFactory.create(
937+
backend_name=RouterConfiguration.XAPI_BACKEND,
938+
enabled=True,
939+
route_url="http://test1.com",
940+
auth_scheme=RouterConfiguration.AUTH_BASIC,
941+
auth_key=None,
942+
username="abc",
943+
password="xyz",
944+
configurations=ROUTER_CONFIG_FIXTURE[0],
945+
)
946+
RouterConfigurationFactory.create(
947+
backend_name=RouterConfiguration.XAPI_BACKEND,
948+
enabled=True,
949+
route_url="http://test2.com",
950+
auth_scheme=RouterConfiguration.AUTH_BASIC,
951+
auth_key=None,
952+
username="abc1",
953+
password="xyz1",
954+
configurations=ROUTER_CONFIG_FIXTURE[0],
955+
)
956+
957+
router = AsyncEventsRouter(
958+
processors=[], backend_name=RouterConfiguration.XAPI_BACKEND
959+
)
960+
961+
with patch.dict(
962+
"event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING", MOCKED_MAP
963+
):
964+
router.bulk_send(self.bulk_transformed_events)
965+
966+
assert mock_dispatch_event.call_count == 2
967+
968+
# Reset mock before the next call
969+
mock_dispatch_event.reset_mock()
970+
971+
with patch.dict(
972+
"event_routing_backends.tasks.ROUTER_STRATEGY_MAPPING", MOCKED_MAP
973+
):
974+
router.bulk_send(
975+
self.bulk_transformed_events, router_urls=["http://test1.com"]
976+
)
977+
978+
assert mock_dispatch_event.call_count == 1
979+
920980

921981
@ddt.ddt
922982
class TestSyncEventsRouter(TestEventsRouter): # pylint: disable=test-inherits-tests

event_routing_backends/management/commands/helpers/queued_sender.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ def __init__( # pylint: disable=too-many-positional-arguments
2424
transformer_type,
2525
max_queue_size=10000,
2626
sleep_between_batches_secs=1.0,
27-
dry_run=False
27+
dry_run=False,
28+
lrs_urls=None
2829
):
2930
self.destination = destination
3031
self.destination_container = destination_container
@@ -34,6 +35,7 @@ def __init__( # pylint: disable=too-many-positional-arguments
3435
self.max_queue_size = max_queue_size
3536
self.sleep_between_batches = sleep_between_batches_secs
3637
self.dry_run = dry_run
38+
self.lrs_urls = lrs_urls or []
3739

3840
# Bookkeeping
3941
self.queued_lines = 0
@@ -101,7 +103,7 @@ def send(self):
101103
"""
102104
if self.destination == "LRS":
103105
print(f"Sending {len(self.event_queue)} events to LRS...")
104-
self.backend.bulk_send(self.event_queue)
106+
self.backend.bulk_send(self.event_queue, self.lrs_urls)
105107
else:
106108
print("Skipping send, we're storing with libcloud instead of an LRS.")
107109

event_routing_backends/management/commands/tests/test_transform_tracking_logs.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,24 @@ def command_options():
163163
},
164164
"whitelist": ["problem_check"]
165165
},
166+
# Test with LRS URLs
167+
{
168+
"transformer_type": "xapi",
169+
"source_provider": "MINIO",
170+
"source_config": REMOTE_CONFIG,
171+
"lrs_urls": ["http://lrs1.com", "http://lrs2.com"],
172+
"expected_results": {
173+
"expected_batches_sent": 1,
174+
"log_lines": [
175+
"Looking for log files in test_bucket/xapi_statements/*",
176+
"Finalizing 2 events to LRS",
177+
"Sending to LRS!",
178+
"Sending 2 events to LRS...",
179+
"Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines, sent 1 batches.",
180+
],
181+
},
182+
"whitelist": ["problem_check"],
183+
},
166184
]
167185

168186
for option in options:
@@ -189,7 +207,8 @@ def _get_raw_log_stream(_, start_bytes, chunk_size):
189207

190208

191209
@pytest.mark.parametrize("command_opts", command_options())
192-
def test_transform_command(command_opts, mock_common_calls, caplog, capsys):
210+
@patch("event_routing_backends.management.commands.transform_tracking_logs.RouterConfiguration")
211+
def test_transform_command(MockRouterConfiguration, command_opts, mock_common_calls, caplog, capsys):
193212
"""
194213
Test the command and QueuedSender with a variety of options.
195214
"""
@@ -198,6 +217,12 @@ def test_transform_command(command_opts, mock_common_calls, caplog, capsys):
198217
expected_results = command_opts.pop("expected_results")
199218
transform_tracking_logs.CHUNK_SIZE = command_opts.pop("chunk_size", 1024*1024*2)
200219

220+
# Mock RouterConfiguration to return specific URLs
221+
MockRouterConfiguration.objects.filter.return_value.values_list.return_value = [
222+
"http://lrs1.com",
223+
"http://lrs2.com",
224+
]
225+
201226
mm = MagicMock()
202227

203228
mock_log_object = MagicMock()
@@ -244,6 +269,44 @@ def test_transform_command(command_opts, mock_common_calls, caplog, capsys):
244269
assert line in caplog.text or line in captured.out
245270

246271

272+
@patch("event_routing_backends.management.commands.transform_tracking_logs.RouterConfiguration")
273+
def test_invalid_lrs_urls(MockRouterConfiguration, mock_common_calls, caplog):
274+
"""
275+
Test that a ValueError is raised when invalid LRS URLs are provided.
276+
"""
277+
command_opts = {
278+
"transformer_type": "xapi",
279+
"source_provider": "MINIO",
280+
"source_config": REMOTE_CONFIG,
281+
"lrs_urls": ["http://lrs3-invalid.com"],
282+
}
283+
284+
mock_libcloud_provider, mock_libcloud_get_driver = mock_common_calls
285+
286+
MockRouterConfiguration.objects.filter.return_value.values_list.return_value = [
287+
"http://lrs1.com",
288+
"http://lrs2.com",
289+
]
290+
291+
transform_tracking_logs.CHUNK_SIZE = command_opts.pop("chunk_size", 1024 * 1024 * 2)
292+
293+
mm = MagicMock()
294+
295+
mock_log_object = MagicMock()
296+
mock_log_object.__str__.return_value = "tracking.log"
297+
mock_log_object.name = "tracking.log"
298+
mock_log_object.size = _get_raw_log_size()
299+
300+
# Fake finding one log file in each container, it will be loaded and parsed twice
301+
mm.return_value.iterate_container_objects.return_value = [mock_log_object]
302+
mm.return_value.download_object_range_as_stream = _get_raw_log_stream
303+
mock_libcloud_get_driver.return_value = mm
304+
305+
# Run command with invalid route_urls and assert ValueError is raised
306+
with pytest.raises(ValueError):
307+
call_command("transform_tracking_logs", **command_opts)
308+
309+
247310
def test_queued_sender_store_on_lrs(mock_common_calls, capsys):
248311
"""
249312
Test that we don't attempt to store on an LRS backend.

event_routing_backends/management/commands/transform_tracking_logs.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from libcloud.storage.types import Provider
1414

1515
from event_routing_backends.management.commands.helpers.queued_sender import QueuedSender
16+
from event_routing_backends.models import RouterConfiguration
1617

1718
# Number of bytes to download at a time, this is 2 MB
1819
CHUNK_SIZE = 1024 * 1024 * 2
@@ -159,6 +160,25 @@ def validate_destination(driver, container_name, prefix, source_objects):
159160
print(f"Wrote source file list to '{container_name}/{full_path}'")
160161

161162

163+
def validate_lrs_routes(lrs_urls):
164+
"""
165+
Validate that the provided LRS URLs are present and enabled in the RouterConfiguration.
166+
167+
Raises a ValueError if any of the URLs are missing or not enabled.
168+
"""
169+
if lrs_urls:
170+
missing_urls = set(lrs_urls) - set(
171+
RouterConfiguration.objects.filter(
172+
route_url__in=lrs_urls, enabled=True
173+
).values_list("route_url", flat=True)
174+
)
175+
if missing_urls:
176+
raise ValueError(
177+
"The following LRS URLs are not present or not enabled in the ",
178+
f"RouterConfiguration: {', '.join(missing_urls)}"
179+
)
180+
181+
162182
def get_libcloud_drivers(source_provider, source_config, destination_provider, destination_config):
163183
"""
164184
Attempt to configure the libcloud drivers for source and destination.
@@ -256,6 +276,15 @@ def add_arguments(self, parser):
256276
help="Attempt to transform all lines from all files, but do not send to the destination.",
257277
)
258278

279+
parser.add_argument(
280+
'--lrs-urls',
281+
nargs='+',
282+
type=str,
283+
default=None,
284+
help="Specify the LRS route_url(s) to send data to "
285+
"(e.g., --lrs-urls http://lrs1.example.com http://lrs2.example.com).",
286+
)
287+
259288
def handle(self, *args, **options):
260289
"""
261290
Configure the command and start the transform process.
@@ -272,11 +301,13 @@ def handle(self, *args, **options):
272301
options["destination_provider"],
273302
dest_config
274303
)
304+
lrs_urls = options.get('lrs_urls')
275305

276306
source_file_list = validate_source_and_files(source_driver, source_container, source_prefix)
277307
if dest_driver != "LRS":
278308
validate_destination(dest_driver, dest_container, dest_prefix, source_file_list)
279309
else:
310+
validate_lrs_routes(lrs_urls)
280311
print(f"Found {len(source_file_list)} source files: ", *source_file_list, sep="\n")
281312

282313
sender = QueuedSender(
@@ -286,7 +317,8 @@ def handle(self, *args, **options):
286317
options["transformer_type"],
287318
max_queue_size=options["batch_size"],
288319
sleep_between_batches_secs=options["sleep_between_batches_secs"],
289-
dry_run=options["dry_run"]
320+
dry_run=options["dry_run"],
321+
lrs_urls=lrs_urls
290322
)
291323

292324
transform_tracking_logs(

0 commit comments

Comments
 (0)