Skip to content

Commit

Permalink
feat: dagster pipeline to ingest edx.org course via API (#1455)
Browse files Browse the repository at this point in the history
* Ingest MITx courses via API and create MITx courses and runs assets

* add new assets to edxorg api data definition

* update the course catalog URL

* update course catalog URL
  • Loading branch information
rachellougee authored Feb 10, 2025
1 parent bf2e815 commit ec4db21
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 4 deletions.
152 changes: 152 additions & 0 deletions src/ol_orchestrate/assets/edxorg_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,155 @@ def edxorg_program_metadata(
data_version=DataVersion(program_course_data_version),
metadata={"object_key": program_course_object_key},
)


@multi_asset(
group_name="edxorg",
outs={
"course_metadata": AssetOut(
description="The metadata for MITx courses extracted from the edxorg's "
"course catalog API",
io_manager_key="s3file_io_manager",
key=AssetKey(("edxorg", "processed_data", "course_metadata")),
),
"course_run_metadata": AssetOut(
description="The metadata for MITx course runs extracted from the edxorg's "
"course catalog API",
io_manager_key="s3file_io_manager",
key=AssetKey(("edxorg", "processed_data", "course_run_metadata")),
),
},
)
def edxorg_mitx_course_metadata(
context: AssetExecutionContext, edxorg_api: OpenEdxApiClientFactory
):
mitx_course_generator = edxorg_api.client.get_edxorg_mitx_courses()

mitx_courses = []
mitx_course_runs = []
data_retrieval_timestamp = datetime.now(tz=UTC).isoformat()
total_extracted_count = 0
for i, data in enumerate(mitx_course_generator):
if i == 0:
count, result_batch = data
context.log.info("Total MITx courses to extract: %d courses", count)
else:
result_batch = data

batch_count = len(result_batch)
total_extracted_count += batch_count
context.log.info(
"Extracted a batch of %d MITx courses. Total so far: %d courses.",
batch_count,
total_extracted_count,
)
for course in result_batch:
course_key = course["key"]
mitx_courses.append(
{
"course_key": course_key,
"title": course["title"],
"owner": (
course["owners"][0].get("key", None)
if course["owners"]
else None
),
"short_description": course["short_description"],
"full_description": course["full_description"],
"level_type": course["level_type"],
"marketing_url": course["marketing_url"],
"image": {
"url": course["image"].get("src"),
"description": course["image"].get("description"),
}
if course["image"] and course["image"].get("src")
else None,
"course_type": course["course_type"],
"subjects": [
{"name": subject.get("name")}
for subject in course.get("subjects", [])
],
"prerequisites": course["prerequisites"],
"prerequisites_raw": course["prerequisites_raw"],
"modified": course["modified"],
"retrieved_at": data_retrieval_timestamp,
}
)
for course_run in course["course_runs"]:
mitx_course_runs.append( # noqa: PERF401
{
"course_key": course_key,
"run_key": course_run["key"],
"title": course_run["title"],
"short_description": course_run["short_description"],
"full_description": course_run["full_description"],
"marketing_url": course_run["marketing_url"],
"level_type": course_run["level_type"],
"languages": course_run["content_language"],
"start_on": course_run["start"],
"end_on": course_run["end"],
"enrollment_start": course_run["enrollment_start"],
"enrollment_end": course_run["enrollment_end"],
"announcement": course_run["announcement"],
"pacing_type": course_run["pacing_type"],
"enrollment_type": course_run["type"],
"availability": course_run["availability"],
"status": course_run["status"],
"image": {
"url": course_run["image"].get("src"),
"description": course_run["image"].get("description"),
}
if course_run["image"] and course_run["image"].get("src")
else None,
"seats": course_run["seats"],
"staff": [
{
"first_name": staff.get("given_name"),
"last_name": staff.get("family_name"),
}
for staff in course_run.get("staff")
],
"weeks_to_complete": course_run["weeks_to_complete"],
"min_effort": course_run["min_effort"],
"max_effort": course_run["max_effort"],
"estimated_hours": course_run["estimated_hours"],
"modified": course_run["modified"],
"retrieved_at": data_retrieval_timestamp,
}
)

context.log.info("Total extracted %d MITx courses....", len(mitx_courses))
context.log.info("Total extracted %d MITx course runs....", len(mitx_course_runs))

course_data_version = hashlib.sha256(
json.dumps(mitx_courses).encode("utf-8")
).hexdigest()
course_run_data_version = hashlib.sha256(
json.dumps(mitx_course_runs).encode("utf-8")
).hexdigest()

course_file = Path(f"course_{course_data_version}.json")
course_run_file = Path(f"course_run_{course_run_data_version}.json")
course_object_key = f"{'/'.join(context.asset_key_for_output('course_metadata').path)}/{course_data_version}.json" # noqa: E501
course_run_object_key = f"{'/'.join(context.asset_key_for_output('course_run_metadata').path)}/{course_run_data_version}.json" # noqa: E501

with (
jsonlines.open(course_file, mode="w") as courses,
jsonlines.open(course_run_file, mode="w") as course_runs,
):
courses.write_all(mitx_courses)
course_runs.write_all(mitx_course_runs)

yield Output(
(course_file, course_object_key),
output_name="course_metadata",
data_version=DataVersion(course_data_version),
metadata={"object_key": course_object_key},
)

yield Output(
(course_run_file, course_run_object_key),
output_name="course_run_metadata",
data_version=DataVersion(course_run_data_version),
metadata={"object_key": course_run_object_key},
)
11 changes: 7 additions & 4 deletions src/ol_orchestrate/definitions/edx/edxorg_api_data_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
)
from dagster_aws.s3 import S3Resource

from ol_orchestrate.assets.edxorg_api import edxorg_program_metadata
from ol_orchestrate.assets.edxorg_api import (
edxorg_mitx_course_metadata,
edxorg_program_metadata,
)
from ol_orchestrate.io_managers.filepath import S3FileObjectIOManager
from ol_orchestrate.lib.constants import DAGSTER_ENV, VAULT_ADDRESS
from ol_orchestrate.lib.dagster_helpers import default_io_manager
Expand Down Expand Up @@ -40,12 +43,12 @@ def s3_uploads_bucket(

edxorg_api_daily_schedule = ScheduleDefinition(
name="edxorg_api_schedule",
target=AssetSelection.assets(edxorg_program_metadata),
target=AssetSelection.assets(edxorg_program_metadata, edxorg_mitx_course_metadata),
cron_schedule="@daily",
execution_timezone="Etc/UTC",
)

edxorg_program_metadata_extract = Definitions(
edxorg_api_data = Definitions(
resources={
"io_manager": default_io_manager(DAGSTER_ENV),
"s3file_io_manager": S3FileObjectIOManager(
Expand All @@ -56,6 +59,6 @@ def s3_uploads_bucket(
"s3": S3Resource(),
"edxorg_api": OpenEdxApiClientFactory(deployment="edxorg", vault=vault),
},
assets=[edxorg_program_metadata],
assets=[edxorg_program_metadata, edxorg_mitx_course_metadata],
schedules=[edxorg_api_daily_schedule],
)
20 changes: 20 additions & 0 deletions src/ol_orchestrate/resources/openedx.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,26 @@ def get_edxorg_programs(self):
next_page = response_data["next"]
yield response_data["results"]

def get_edxorg_mitx_courses(self):
"""
Retrieve a list of all the active courses in MITx catalog by walking through the
paginated results
Yield: A generator for walking the paginated list of courses
"""
course_catalog_url = "https://discovery.edx.org/api/v1/catalogs/10/courses/"
response_data = self._fetch_with_auth(course_catalog_url)
results = response_data["results"]
next_page = response_data["next"]
count = response_data["count"]
yield count, results
while next_page:
response_data = self._fetch_with_auth(
course_catalog_url, extra_params=parse_qs(next_page)
)
next_page = response_data["next"]
yield response_data["results"]


class OpenEdxApiClientFactory(ConfigurableResource):
deployment: str = Field(
Expand Down

0 comments on commit ec4db21

Please sign in to comment.