Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dagster pipeline to ingest edx.org course via API #1455

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions src/ol_orchestrate/assets/edxorg_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,155 @@ def edxorg_program_metadata(
data_version=DataVersion(program_course_data_version),
metadata={"object_key": program_course_object_key},
)


@multi_asset(
group_name="edxorg",
outs={
"course_metadata": AssetOut(
description="The metadata for MITx courses extracted from the edxorg's "
"course catalog API",
io_manager_key="s3file_io_manager",
key=AssetKey(("edxorg", "processed_data", "course_metadata")),
),
"course_run_metadata": AssetOut(
description="The metadata for MITx course runs extracted from the edxorg's "
"course catalog API",
io_manager_key="s3file_io_manager",
key=AssetKey(("edxorg", "processed_data", "course_run_metadata")),
),
},
)
def edxorg_mitx_course_metadata(
context: AssetExecutionContext, edxorg_api: OpenEdxApiClientFactory
):
mitx_course_generator = edxorg_api.client.get_edxorg_mitx_courses()

mitx_courses = []
mitx_course_runs = []
data_retrieval_timestamp = datetime.now(tz=UTC).isoformat()
total_extracted_count = 0
for i, data in enumerate(mitx_course_generator):
if i == 0:
count, result_batch = data
context.log.info("Total MITx courses to extract: %d courses", count)
else:
result_batch = data

batch_count = len(result_batch)
total_extracted_count += batch_count
context.log.info(
"Extracted a batch of %d MITx courses. Total so far: %d courses.",
batch_count,
total_extracted_count,
)
for course in result_batch:
course_key = course["key"]
mitx_courses.append(
{
"course_key": course_key,
"title": course["title"],
"owner": (
course["owners"][0].get("key", None)
if course["owners"]
else None
),
"short_description": course["short_description"],
"full_description": course["full_description"],
"level_type": course["level_type"],
"marketing_url": course["marketing_url"],
"image": {
"url": course["image"].get("src"),
"description": course["image"].get("description"),
}
if course["image"] and course["image"].get("src")
else None,
"course_type": course["course_type"],
"subjects": [
{"name": subject.get("name")}
for subject in course.get("subjects", [])
],
"prerequisites": course["prerequisites"],
"prerequisites_raw": course["prerequisites_raw"],
"modified": course["modified"],
"retrieved_at": data_retrieval_timestamp,
}
)
for course_run in course["course_runs"]:
mitx_course_runs.append( # noqa: PERF401
{
"course_key": course_key,
"run_key": course_run["key"],
"title": course_run["title"],
"short_description": course_run["short_description"],
"full_description": course_run["full_description"],
"marketing_url": course_run["marketing_url"],
"level_type": course_run["level_type"],
"languages": course_run["content_language"],
"start_on": course_run["start"],
"end_on": course_run["end"],
"enrollment_start": course_run["enrollment_start"],
"enrollment_end": course_run["enrollment_end"],
"announcement": course_run["announcement"],
"pacing_type": course_run["pacing_type"],
"enrollment_type": course_run["type"],
"availability": course_run["availability"],
"status": course_run["status"],
"image": {
"url": course_run["image"].get("src"),
"description": course_run["image"].get("description"),
}
if course_run["image"] and course_run["image"].get("src")
else None,
"seats": course_run["seats"],
"staff": [
{
"first_name": staff.get("given_name"),
"last_name": staff.get("family_name"),
}
for staff in course_run.get("staff")
],
"weeks_to_complete": course_run["weeks_to_complete"],
"min_effort": course_run["min_effort"],
"max_effort": course_run["max_effort"],
"estimated_hours": course_run["estimated_hours"],
"modified": course_run["modified"],
"retrieved_at": data_retrieval_timestamp,
}
)

context.log.info("Total extracted %d MITx courses....", len(mitx_courses))
context.log.info("Total extracted %d MITx course runs....", len(mitx_course_runs))

course_data_version = hashlib.sha256(
json.dumps(mitx_courses).encode("utf-8")
).hexdigest()
course_run_data_version = hashlib.sha256(
json.dumps(mitx_course_runs).encode("utf-8")
).hexdigest()

course_file = Path(f"course_{course_data_version}.json")
course_run_file = Path(f"course_run_{course_run_data_version}.json")
course_object_key = f"{'/'.join(context.asset_key_for_output('course_metadata').path)}/{course_data_version}.json" # noqa: E501
course_run_object_key = f"{'/'.join(context.asset_key_for_output('course_run_metadata').path)}/{course_run_data_version}.json" # noqa: E501

with (
jsonlines.open(course_file, mode="w") as courses,
jsonlines.open(course_run_file, mode="w") as course_runs,
):
courses.write_all(mitx_courses)
course_runs.write_all(mitx_course_runs)

yield Output(
(course_file, course_object_key),
output_name="course_metadata",
data_version=DataVersion(course_data_version),
metadata={"object_key": course_object_key},
)

yield Output(
(course_run_file, course_run_object_key),
output_name="course_run_metadata",
data_version=DataVersion(course_run_data_version),
metadata={"object_key": course_run_object_key},
)
11 changes: 7 additions & 4 deletions src/ol_orchestrate/definitions/edx/edxorg_api_data_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
)
from dagster_aws.s3 import S3Resource

from ol_orchestrate.assets.edxorg_api import edxorg_program_metadata
from ol_orchestrate.assets.edxorg_api import (
edxorg_mitx_course_metadata,
edxorg_program_metadata,
)
from ol_orchestrate.io_managers.filepath import S3FileObjectIOManager
from ol_orchestrate.lib.constants import DAGSTER_ENV, VAULT_ADDRESS
from ol_orchestrate.lib.dagster_helpers import default_io_manager
Expand Down Expand Up @@ -40,12 +43,12 @@ def s3_uploads_bucket(

edxorg_api_daily_schedule = ScheduleDefinition(
name="edxorg_api_schedule",
target=AssetSelection.assets(edxorg_program_metadata),
target=AssetSelection.assets(edxorg_program_metadata, edxorg_mitx_course_metadata),
cron_schedule="@daily",
execution_timezone="Etc/UTC",
)

edxorg_program_metadata_extract = Definitions(
edxorg_api_data = Definitions(
resources={
"io_manager": default_io_manager(DAGSTER_ENV),
"s3file_io_manager": S3FileObjectIOManager(
Expand All @@ -56,6 +59,6 @@ def s3_uploads_bucket(
"s3": S3Resource(),
"edxorg_api": OpenEdxApiClientFactory(deployment="edxorg", vault=vault),
},
assets=[edxorg_program_metadata],
assets=[edxorg_program_metadata, edxorg_mitx_course_metadata],
schedules=[edxorg_api_daily_schedule],
)
20 changes: 20 additions & 0 deletions src/ol_orchestrate/resources/openedx.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,26 @@ def get_edxorg_programs(self):
next_page = response_data["next"]
yield response_data["results"]

def get_edxorg_mitx_courses(self):
"""
Retrieve a list of all the active courses in MITx catalog by walking through the
paginated results
Yield: A generator for walking the paginated list of courses
"""
course_catalog_url = "https://api.edx.org/catalog/v1/catalogs/10/courses"
response_data = self._fetch_with_auth(course_catalog_url)
results = response_data["results"]
next_page = response_data["next"]
count = response_data["count"]
yield count, results
while next_page:
response_data = self._fetch_with_auth(
course_catalog_url, extra_params=parse_qs(next_page)
)
next_page = response_data["next"]
yield response_data["results"]


class OpenEdxApiClientFactory(ConfigurableResource):
deployment: str = Field(
Expand Down