Skip to content

Commit

Permalink
feat(analytics): Add command to migrate analytics data to pg (#3981)
Browse files Browse the repository at this point in the history
  • Loading branch information
gagantrivedi authored May 30, 2024
1 parent 515b34c commit 848db5a
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 0 deletions.
19 changes: 19 additions & 0 deletions api/app_analytics/management/commands/migrate_analytics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import argparse
from typing import Any

from app_analytics.migrate_to_pg import migrate_feature_evaluations
from django.core.management import BaseCommand


class Command(BaseCommand):
def add_arguments(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--migrate-till",
type=int,
dest="migrate_till",
help="Migrate data till n days ago",
default=30,
)

def handle(self, *args: Any, migrate_till: int, **options: Any) -> None:
migrate_feature_evaluations(migrate_till)
28 changes: 28 additions & 0 deletions api/app_analytics/migrate_to_pg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from app_analytics.constants import ANALYTICS_READ_BUCKET_SIZE
from app_analytics.influxdb_wrapper import influxdb_client, read_bucket
from app_analytics.models import FeatureEvaluationBucket


def migrate_feature_evaluations(migrate_till: int = 30) -> None:
query_api = influxdb_client.query_api()

for i in range(migrate_till):
range_start = f"-{i+1}d"
range_stop = f"-{i}d"
query = f"from (bucket: {read_bucket}) |> range(start: {range_start}, stop: {range_stop})"

result = query_api.query(query)

feature_evaluations = []
for table in result:
for record in table.records:
feature_evaluations.append(
FeatureEvaluationBucket(
feature_name=record.values["feature_id"],
bucket_size=ANALYTICS_READ_BUCKET_SIZE,
created_at=record.get_time(),
total_count=record.get_value(),
environment_id=record.values["environment_id"],
)
)
FeatureEvaluationBucket.objects.bulk_create(feature_evaluations)
67 changes: 67 additions & 0 deletions api/tests/unit/app_analytics/test_migrate_to_pg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import pytest
from app_analytics.migrate_to_pg import migrate_feature_evaluations
from app_analytics.models import FeatureEvaluationBucket
from django.conf import settings
from django.utils import timezone
from pytest_mock import MockerFixture


@pytest.mark.skipif(
"analytics" not in settings.DATABASES,
reason="Skip test if analytics database is not configured",
)
@pytest.mark.django_db(databases=["analytics", "default"])
def test_migrate_feature_evaluations(mocker: MockerFixture) -> None:
# Given
feature_name = "test_feature_one"
environment_id = "1"

# mock the read bucket name
read_bucket = "test_bucket"
mocker.patch("app_analytics.migrate_to_pg.read_bucket", read_bucket)

# Next, mock the influx client and create some records
mock_influxdb_client = mocker.patch("app_analytics.migrate_to_pg.influxdb_client")
mock_query_api = mock_influxdb_client.query_api.return_value
mock_tables = []
for i in range(3):
mock_record = mocker.MagicMock(
values={"feature_id": feature_name, "environment_id": environment_id},
spec_set=["values", "get_time", "get_value"],
)
mock_record.get_time.return_value = timezone.now() - timezone.timedelta(days=i)
mock_record.get_value.return_value = 100

mock_table = mocker.MagicMock(records=[mock_record], spec_set=["records"])
mock_tables.append(mock_table)

mock_query_api.query.side_effect = [[table] for table in mock_tables]

# When
migrate_feature_evaluations(migrate_till=3)

# Then - only 3 records should be created
assert FeatureEvaluationBucket.objects.count() == 3
assert (
FeatureEvaluationBucket.objects.filter(
feature_name=feature_name,
environment_id=environment_id,
bucket_size=15,
total_count=100,
).count()
== 3
)
# And, the query should have been called 3 times
mock_query_api.assert_has_calls(
[
mocker.call.query(
f"from (bucket: {read_bucket}) |> range(start: -1d, stop: -0d)"
),
mocker.call.query(
f"from (bucket: {read_bucket}) |> range(start: -2d, stop: -1d)"
),
mocker.call.query(
f"from (bucket: {read_bucket}) |> range(start: -3d, stop: -2d)"
),
]
)

0 comments on commit 848db5a

Please sign in to comment.