diff --git a/src/ol_orchestrate/assets/edxorg_api.py b/src/ol_orchestrate/assets/edxorg_api.py index 1e184bc11..68bef5384 100644 --- a/src/ol_orchestrate/assets/edxorg_api.py +++ b/src/ol_orchestrate/assets/edxorg_api.py @@ -124,3 +124,155 @@ def edxorg_program_metadata( data_version=DataVersion(program_course_data_version), metadata={"object_key": program_course_object_key}, ) + + +@multi_asset( + group_name="edxorg", + outs={ + "course_metadata": AssetOut( + description="The metadata for MITx courses extracted from the edxorg's " + "course catalog API", + io_manager_key="s3file_io_manager", + key=AssetKey(("edxorg", "processed_data", "course_metadata")), + ), + "course_run_metadata": AssetOut( + description="The metadata for MITx course runs extracted from the edxorg's " + "course catalog API", + io_manager_key="s3file_io_manager", + key=AssetKey(("edxorg", "processed_data", "course_run_metadata")), + ), + }, +) +def edxorg_mitx_course_metadata( + context: AssetExecutionContext, edxorg_api: OpenEdxApiClientFactory +): + mitx_course_generator = edxorg_api.client.get_edxorg_mitx_courses() + + mitx_courses = [] + mitx_course_runs = [] + data_retrieval_timestamp = datetime.now(tz=UTC).isoformat() + total_extracted_count = 0 + for i, data in enumerate(mitx_course_generator): + if i == 0: + count, result_batch = data + context.log.info("Total MITx courses to extract: %d courses", count) + else: + result_batch = data + + batch_count = len(result_batch) + total_extracted_count += batch_count + context.log.info( + "Extracted a batch of %d MITx courses. Total so far: %d courses.", + batch_count, + total_extracted_count, + ) + for course in result_batch: + course_key = course["key"] + mitx_courses.append( + { + "course_key": course_key, + "title": course["title"], + "owner": ( + course["owners"][0].get("key", None) + if course["owners"] + else None + ), + "short_description": course["short_description"], + "full_description": course["full_description"], + "level_type": course["level_type"], + "marketing_url": course["marketing_url"], + "image": { + "url": course["image"].get("src"), + "description": course["image"].get("description"), + } + if course["image"] and course["image"].get("src") + else None, + "course_type": course["course_type"], + "subjects": [ + {"name": subject.get("name")} + for subject in course.get("subjects", []) + ], + "prerequisites": course["prerequisites"], + "prerequisites_raw": course["prerequisites_raw"], + "modified": course["modified"], + "retrieved_at": data_retrieval_timestamp, + } + ) + for course_run in course["course_runs"]: + mitx_course_runs.append( # noqa: PERF401 + { + "course_key": course_key, + "run_key": course_run["key"], + "title": course_run["title"], + "short_description": course_run["short_description"], + "full_description": course_run["full_description"], + "marketing_url": course_run["marketing_url"], + "level_type": course_run["level_type"], + "languages": course_run["content_language"], + "start_on": course_run["start"], + "end_on": course_run["end"], + "enrollment_start": course_run["enrollment_start"], + "enrollment_end": course_run["enrollment_end"], + "announcement": course_run["announcement"], + "pacing_type": course_run["pacing_type"], + "enrollment_type": course_run["type"], + "availability": course_run["availability"], + "status": course_run["status"], + "image": { + "url": course_run["image"].get("src"), + "description": course_run["image"].get("description"), + } + if course_run["image"] and course_run["image"].get("src") + else None, + "seats": course_run["seats"], + "staff": [ + { + "first_name": staff.get("given_name"), + "last_name": staff.get("family_name"), + } + for staff in course_run.get("staff") + ], + "weeks_to_complete": course_run["weeks_to_complete"], + "min_effort": course_run["min_effort"], + "max_effort": course_run["max_effort"], + "estimated_hours": course_run["estimated_hours"], + "modified": course_run["modified"], + "retrieved_at": data_retrieval_timestamp, + } + ) + + context.log.info("Total extracted %d MITx courses....", len(mitx_courses)) + context.log.info("Total extracted %d MITx course runs....", len(mitx_course_runs)) + + course_data_version = hashlib.sha256( + json.dumps(mitx_courses).encode("utf-8") + ).hexdigest() + course_run_data_version = hashlib.sha256( + json.dumps(mitx_course_runs).encode("utf-8") + ).hexdigest() + + course_file = Path(f"course_{course_data_version}.json") + course_run_file = Path(f"course_run_{course_run_data_version}.json") + course_object_key = f"{'/'.join(context.asset_key_for_output('course_metadata').path)}/{course_data_version}.json" # noqa: E501 + course_run_object_key = f"{'/'.join(context.asset_key_for_output('course_run_metadata').path)}/{course_run_data_version}.json" # noqa: E501 + + with ( + jsonlines.open(course_file, mode="w") as courses, + jsonlines.open(course_run_file, mode="w") as course_runs, + ): + courses.write_all(mitx_courses) + course_runs.write_all(mitx_course_runs) + + yield Output( + (course_file, course_object_key), + output_name="course_metadata", + data_version=DataVersion(course_data_version), + metadata={"object_key": course_object_key}, + ) + + yield Output( + (course_run_file, course_run_object_key), + output_name="course_run_metadata", + data_version=DataVersion(course_run_data_version), + metadata={"object_key": course_run_object_key}, + ) diff --git a/src/ol_orchestrate/definitions/edx/edxorg_api_data_extract.py b/src/ol_orchestrate/definitions/edx/edxorg_api_data_extract.py index cdc3895f6..bb9ee9e43 100644 --- a/src/ol_orchestrate/definitions/edx/edxorg_api_data_extract.py +++ b/src/ol_orchestrate/definitions/edx/edxorg_api_data_extract.py @@ -7,7 +7,10 @@ ) from dagster_aws.s3 import S3Resource -from ol_orchestrate.assets.edxorg_api import edxorg_program_metadata +from ol_orchestrate.assets.edxorg_api import ( + edxorg_mitx_course_metadata, + edxorg_program_metadata, +) from ol_orchestrate.io_managers.filepath import S3FileObjectIOManager from ol_orchestrate.lib.constants import DAGSTER_ENV, VAULT_ADDRESS from ol_orchestrate.lib.dagster_helpers import default_io_manager @@ -40,12 +43,12 @@ def s3_uploads_bucket( edxorg_api_daily_schedule = ScheduleDefinition( name="edxorg_api_schedule", - target=AssetSelection.assets(edxorg_program_metadata), + target=AssetSelection.assets(edxorg_program_metadata, edxorg_mitx_course_metadata), cron_schedule="@daily", execution_timezone="Etc/UTC", ) -edxorg_program_metadata_extract = Definitions( +edxorg_api_data = Definitions( resources={ "io_manager": default_io_manager(DAGSTER_ENV), "s3file_io_manager": S3FileObjectIOManager( @@ -56,6 +59,6 @@ def s3_uploads_bucket( "s3": S3Resource(), "edxorg_api": OpenEdxApiClientFactory(deployment="edxorg", vault=vault), }, - assets=[edxorg_program_metadata], + assets=[edxorg_program_metadata, edxorg_mitx_course_metadata], schedules=[edxorg_api_daily_schedule], ) diff --git a/src/ol_orchestrate/resources/openedx.py b/src/ol_orchestrate/resources/openedx.py index ae12a5a2e..c506df356 100644 --- a/src/ol_orchestrate/resources/openedx.py +++ b/src/ol_orchestrate/resources/openedx.py @@ -206,6 +206,26 @@ def get_edxorg_programs(self): next_page = response_data["next"] yield response_data["results"] + def get_edxorg_mitx_courses(self): + """ + Retrieve a list of all the active courses in MITx catalog by walking through the + paginated results + + Yield: A generator for walking the paginated list of courses + """ + course_catalog_url = "https://discovery.edx.org/api/v1/catalogs/10/courses/" + response_data = self._fetch_with_auth(course_catalog_url) + results = response_data["results"] + next_page = response_data["next"] + count = response_data["count"] + yield count, results + while next_page: + response_data = self._fetch_with_auth( + course_catalog_url, extra_params=parse_qs(next_page) + ) + next_page = response_data["next"] + yield response_data["results"] + class OpenEdxApiClientFactory(ConfigurableResource): deployment: str = Field(