diff --git a/src/otaclient/app/ota_client.py b/src/otaclient/app/ota_client.py index bf6b5fdfd..8c3ef74ca 100644 --- a/src/otaclient/app/ota_client.py +++ b/src/otaclient/app/ota_client.py @@ -37,6 +37,7 @@ from ota_metadata.legacy import parser as ota_metadata_parser from ota_metadata.legacy import types as ota_metadata_types from otaclient import __version__ +from otaclient.app.create_standby.common import DeltaBundle from otaclient_api.v2 import types as api_types from otaclient_common.common import ensure_otaproxy_start from otaclient_common.downloader import ( @@ -58,6 +59,8 @@ logger = logging.getLogger(__name__) +DEFAULT_STATUS_QUERY_INTERVAL = 1 + class LiveOTAStatus: def __init__(self, ota_status: api_types.StatusOta) -> None: @@ -154,7 +157,7 @@ def _download_exception_handler(_fut: Future[Any]) -> bool: # handled exceptions, let the upper caller do the retry return False finally: - exc = None # drop ref to exc instance + del exc, _fut # drop ref to exc instance class _OTAUpdater: @@ -170,7 +173,7 @@ def __init__( boot_controller: BootControllerProtocol, create_standby_cls: Type[StandbySlotCreatorProtocol], control_flags: OTAClientControlFlags, - status_query_interval: int = 1, + status_query_interval: int = DEFAULT_STATUS_QUERY_INTERVAL, ) -> None: self._shutdown = False self._update_status = api_types.UpdateStatus() @@ -249,32 +252,23 @@ def __init__( self._update_stats_collector = OTAUpdateStatsCollector() self._update_stats_collector.start_collector() - def _download_file( - self, entry: ota_metadata_types.RegularInf - ) -> tuple[int, int, int]: - """Download a single OTA image file. - - This is the single task being executed in the downloader pool. - - Returns: - Retry counts, downloaded files size and traffic on wire. - """ - _fhash_str = entry.get_hash() - # special treatment to empty file - if _fhash_str == EMPTY_FILE_SHA256: - return 0, 0, 0 - - entry_url, compression_alg = self._otameta.get_download_url(entry) - downloader = self._downloader_mapper[threading.get_native_id()] - return downloader.download( - entry_url, - self._ota_tmp_on_standby / _fhash_str, - digest=_fhash_str, - size=entry.size, - compression_alg=compression_alg, - ) - - def _download_files(self, download_list: Iterator[ota_metadata_types.RegularInf]): + def _calculate_delta( + self, + standby_slot_creator: StandbySlotCreatorProtocol, + ) -> DeltaBundle: + logger.info("start to calculate and prepare delta...") + delta_bundle = standby_slot_creator.calculate_and_prepare_delta() + # update dynamic information + self.total_download_files_num = len(delta_bundle.download_list) + self.total_download_fiies_size = delta_bundle.total_download_files_size + self.total_remove_files_num = len(delta_bundle.rm_delta) + return delta_bundle + + def _download_files( + self, + ota_metadata: ota_metadata_parser.OTAMetadata, + download_list: Iterator[ota_metadata_types.RegularInf], + ): """Download all needed OTA image files indicated by calculated bundle.""" logger.debug("download neede OTA image files...") @@ -288,6 +282,31 @@ def _thread_initializer(): self._downloader_pool.get_instance() ) + def _download_file( + entry: ota_metadata_types.RegularInf, + ) -> tuple[int, int, int]: + """Download a single OTA image file. + + This is the single task being executed in the downloader pool. + + Returns: + Retry counts, downloaded files size and traffic on wire. + """ + _fhash_str = entry.get_hash() + # special treatment to empty file + if _fhash_str == EMPTY_FILE_SHA256: + return 0, 0, 0 + + entry_url, compression_alg = ota_metadata.get_download_url(entry) + downloader = self._downloader_mapper[threading.get_native_id()] + return downloader.download( + entry_url, + self._ota_tmp_on_standby / _fhash_str, + digest=_fhash_str, + size=entry.size, + compression_alg=compression_alg, + ) + with ThreadPoolExecutorWithRetry( max_concurrent=cfg.MAX_CONCURRENT_DOWNLOAD_TASKS, max_workers=cfg.MAX_DOWNLOAD_THREAD, @@ -302,7 +321,7 @@ def _thread_initializer(): max_idle_timeout=cfg.DOWNLOAD_GROUP_INACTIVE_TIMEOUT, ), ) as _mapper: - for _fut in _mapper.ensure_tasks(self._download_file, download_list): + for _fut in _mapper.ensure_tasks(_download_file, download_list): if _download_exception_handler(_fut): # donwload succeeded err_count, file_size, _ = _fut.result() self._update_stats_collector.report_stat( @@ -323,86 +342,9 @@ def _thread_initializer(): # release the downloader instances self._downloader_pool.release_all_instances() - - def _update_standby_slot(self): - """Apply OTA update to standby slot.""" - # ------ pre_update ------ # - # --- prepare standby slot --- # - # NOTE: erase standby slot or not based on the used StandbySlotCreator - logger.debug("boot controller prepares standby slot...") - self._boot_controller.pre_update( - self.updating_version, - standby_as_ref=False, # NOTE: this option is deprecated and not used by bootcontroller - erase_standby=self._create_standby_cls.should_erase_standby_slot(), - ) - # prepare the tmp storage on standby slot after boot_controller.pre_update finished - self._ota_tmp_on_standby.mkdir(exist_ok=True) - self._ota_tmp_image_meta_dir_on_standby.mkdir(exist_ok=True) - - # --- init standby_slot creator, calculate delta --- # - logger.info("start to calculate and prepare delta...") - self._update_stats_collector.delta_calculation_started() - - self.update_phase = api_types.UpdatePhase.CALCULATING_DELTA - self._standby_slot_creator = self._create_standby_cls( - ota_metadata=self._otameta, - boot_dir=str(self._boot_controller.get_standby_boot_dir()), - standby_slot_mount_point=cfg.MOUNT_POINT, - active_slot_mount_point=cfg.ACTIVE_ROOT_MOUNT_POINT, - stats_collector=self._update_stats_collector, - ) - try: - _delta_bundle = self._standby_slot_creator.calculate_and_prepare_delta() - # update dynamic information - self.total_download_files_num = len(_delta_bundle.download_list) - self.total_download_fiies_size = _delta_bundle.total_download_files_size - self.total_remove_files_num = len(_delta_bundle.rm_delta) - except Exception as e: - _err_msg = f"failed to generate delta: {e!r}" - logger.error(_err_msg) - raise ota_errors.UpdateDeltaGenerationFailed( - _err_msg, module=__name__ - ) from e - - self._update_stats_collector.delta_calculation_finished() - - # --- download needed files --- # - logger.info( - "start to download needed files..." - f"total_download_files_size={_delta_bundle.total_download_files_size:,}bytes" - ) - self.update_phase = api_types.UpdatePhase.DOWNLOADING_OTA_FILES - self._update_stats_collector.download_started() - - try: - self._download_files(_delta_bundle.get_download_list()) - except ota_errors.OTAError: - raise # no need to wrap OTA Error again - except Exception as e: - _err_msg = f"failed to finish downloading files: {e!r}" - logger.error(_err_msg) - raise ota_errors.NetworkError(_err_msg, module=__name__) from e - - # shutdown downloader on download finished - self._update_stats_collector.download_finished() self._downloader_pool.shutdown() - # ------ in_update ------ # - logger.info("start to apply changes to standby slot...") - self.update_phase = api_types.UpdatePhase.APPLYING_UPDATE - self._update_stats_collector.apply_update_started() - - try: - self._standby_slot_creator.create_standby_slot() - except Exception as e: - _err_msg = f"failed to apply update to standby slot: {e!r}" - logger.error(_err_msg) - raise ota_errors.ApplyOTAUpdateFailed(_err_msg, module=__name__) from e - - logger.info("finished updating standby slot") - self._update_stats_collector.apply_update_finished() - - def _process_persistents(self): + def _process_persistents(self, ota_metadata: ota_metadata_parser.OTAMetadata): logger.info("start persist files handling...") standby_slot_mp = Path(cfg.MOUNT_POINT) @@ -415,7 +357,7 @@ def _process_persistents(self): dst_root=cfg.MOUNT_POINT, ) - for _perinf in self._otameta.iter_metafile( + for _perinf in ota_metadata.iter_metafile( ota_metadata_parser.MetafilesV1.PERSISTENT_FNAME ): _per_fpath = Path(_perinf.path) @@ -441,35 +383,22 @@ def _process_persistents(self): _handler.preserve_persist_entry(_per_fpath) def _execute_update(self): - """OTA update workflow implementation. - - e.g. - cookies = { - "CloudFront-Policy": "eyJTdGF0ZW1lbnQ...", - "CloudFront-Signature": "o4ojzMrJwtSIg~izsy...", - "CloudFront-Key-Pair-Id": "K2...", - } - """ + """Implementation of OTA updating.""" logger.info(f"execute local update: {self.updating_version=},{self.url_base=}") # ------ init, processing metadata ------ # - self.update_phase = api_types.UpdatePhase.PROCESSING_METADATA - - # process metadata.jwt and ota metafiles logger.debug("process metadata.jwt...") - downloader = self._downloader_pool.get_instance() + self.update_phase = api_types.UpdatePhase.PROCESSING_METADATA try: # TODO(20240619): ota_metadata should not be responsible for downloading anything - self._otameta = ota_metadata_parser.OTAMetadata( + otameta = ota_metadata_parser.OTAMetadata( url_base=self.url_base, - downloader=downloader, + downloader=self._downloader_pool.get_instance(), run_dir=Path(cfg.RUN_DIR), certs_dir=Path(cfg.CERTS_DIR), ) - self.total_files_num = self._otameta.total_files_num - self.total_files_size_uncompressed = ( - self._otameta.total_files_size_uncompressed - ) + self.total_files_num = otameta.total_files_num + self.total_files_size_uncompressed = otameta.total_files_size_uncompressed except ota_metadata_parser.MetadataJWTVerificationFailed as e: _err_msg = f"failed to verify metadata.jwt: {e!r}" logger.error(_err_msg) @@ -487,22 +416,58 @@ def _execute_update(self): finally: self._downloader_pool.release_instance() - # ------ execute local update ------ # + # ------ pre-update ------ # logger.info("enter local OTA update...") + self._boot_controller.pre_update( + self.updating_version, + standby_as_ref=False, # NOTE: this option is deprecated and not used by bootcontroller + erase_standby=self._create_standby_cls.should_erase_standby_slot(), + ) + # prepare the tmp storage on standby slot after boot_controller.pre_update finished + self._ota_tmp_on_standby.mkdir(exist_ok=True) + self._ota_tmp_image_meta_dir_on_standby.mkdir(exist_ok=True) + + # ------ in-update ------ # + standby_slot_creator = self._create_standby_cls( + ota_metadata=otameta, + boot_dir=str(self._boot_controller.get_standby_boot_dir()), + standby_slot_mount_point=cfg.MOUNT_POINT, + active_slot_mount_point=cfg.ACTIVE_ROOT_MOUNT_POINT, + stats_collector=self._update_stats_collector, + ) + + self.update_phase = api_types.UpdatePhase.CALCULATING_DELTA + self._update_stats_collector.delta_calculation_started() try: - self._update_standby_slot() - except ota_errors.OTAError: - raise # no need to wrap an OTAError again + delta_bundle = self._calculate_delta(standby_slot_creator) except Exception as e: - raise ota_errors.ApplyOTAUpdateFailed( - f"unspecific applying OTA update failure: {e!r}", module=__name__ - ) + _err_msg = f"failed to generate delta: {e!r}" + logger.error(_err_msg) + raise ota_errors.UpdateDeltaGenerationFailed( + _err_msg, module=__name__ + ) from e + self._update_stats_collector.delta_calculation_finished() + + # NOTE(20240705): download_files raises OTA Error directly, no need to capture exc here + self.update_phase = api_types.UpdatePhase.DOWNLOADING_OTA_FILES + self._update_stats_collector.download_started() + try: + self._download_files(otameta, delta_bundle.get_download_list()) + finally: + del delta_bundle + self._update_stats_collector.download_finished() + + self.update_phase = api_types.UpdatePhase.APPLYING_UPDATE + self._update_stats_collector.apply_update_started() + logger.info("start to apply changes to standby slot...") + standby_slot_creator.create_standby_slot() + self._update_stats_collector.apply_update_finished() - # ------ post update ------ # + # ------ post-update ------ # logger.info("enter post update phase...") self.update_phase = api_types.UpdatePhase.PROCESSING_POSTUPDATE # NOTE(20240219): move persist file handling here - self._process_persistents() + self._process_persistents(otameta) # boot controller postupdate next(_postupdate_gen := self._boot_controller.post_update()) @@ -621,8 +586,6 @@ def __init__( ): try: self.my_ecu_id = my_ecu_id - # ensure only one update/rollback session is running - self._lock = threading.Lock() self.boot_controller = boot_controller self.create_standby_cls = create_standby_cls @@ -664,63 +627,51 @@ def _on_failure(self, exc: ota_errors.OTAError, ota_status: api_types.StatusOta) # API def update(self, version: str, url_base: str, cookies_json: str) -> None: - if self._lock.acquire(blocking=False): - try: - logger.info("[update] entering local update...") - self._update_executor = _OTAUpdater( - version=version, - raw_url_base=url_base, - cookies_json=cookies_json, - boot_controller=self.boot_controller, - create_standby_cls=self.create_standby_cls, - control_flags=self.control_flags, - upper_otaproxy=self.proxy, - ) - - self.last_failure_type = api_types.FailureType.NO_FAILURE - self.last_failure_reason = "" - self.last_failure_traceback = "" - - self.live_ota_status.set_ota_status(api_types.StatusOta.UPDATING) - self._update_executor.execute() - except ota_errors.OTAError as e: - self._on_failure(e, api_types.StatusOta.FAILURE) - finally: - self._update_executor = None - gc.collect() # trigger a forced gc - self._lock.release() - else: - logger.warning( - "ignore incoming rollback request as local update/rollback is ongoing" + try: + logger.info("[update] entering local update...") + self._update_executor = _OTAUpdater( + version=version, + raw_url_base=url_base, + cookies_json=cookies_json, + boot_controller=self.boot_controller, + create_standby_cls=self.create_standby_cls, + control_flags=self.control_flags, + upper_otaproxy=self.proxy, ) - def rollback(self): - if self._lock.acquire(blocking=False): - try: - logger.info("[rollback] entering...") - self._rollback_executor = _OTARollbacker( - boot_controller=self.boot_controller - ) + self.last_failure_type = api_types.FailureType.NO_FAILURE + self.last_failure_reason = "" + self.last_failure_traceback = "" - # clear failure information on handling new rollback request - self.last_failure_type = api_types.FailureType.NO_FAILURE - self.last_failure_reason = "" - self.last_failure_traceback = "" - - # entering rollback - self.live_ota_status.set_ota_status(api_types.StatusOta.ROLLBACKING) - self._rollback_executor.execute() - # silently ignore overlapping request - except ota_errors.OTAError as e: - self._on_failure(e, api_types.StatusOta.ROLLBACK_FAILURE) - finally: - self._rollback_executor = None # type: ignore - self._lock.release() - else: - logger.warning( - "ignore incoming rollback request as local update/rollback is ongoing" + self.live_ota_status.set_ota_status(api_types.StatusOta.UPDATING) + self._update_executor.execute() + except ota_errors.OTAError as e: + self._on_failure(e, api_types.StatusOta.FAILURE) + finally: + self._update_executor = None + gc.collect() # trigger a forced gc + + def rollback(self): + try: + logger.info("[rollback] entering...") + self._rollback_executor = _OTARollbacker( + boot_controller=self.boot_controller ) + # clear failure information on handling new rollback request + self.last_failure_type = api_types.FailureType.NO_FAILURE + self.last_failure_reason = "" + self.last_failure_traceback = "" + + # entering rollback + self.live_ota_status.set_ota_status(api_types.StatusOta.ROLLBACKING) + self._rollback_executor.execute() + # silently ignore overlapping request + except ota_errors.OTAError as e: + self._on_failure(e, api_types.StatusOta.ROLLBACK_FAILURE) + finally: + self._rollback_executor = None # type: ignore + def status(self) -> api_types.StatusResponseEcuV2: live_ota_status = self.live_ota_status.get_ota_status() status_report = api_types.StatusResponseEcuV2( diff --git a/src/otaclient/app/update_stats.py b/src/otaclient/app/update_stats.py index f0530922a..433b7578d 100644 --- a/src/otaclient/app/update_stats.py +++ b/src/otaclient/app/update_stats.py @@ -44,6 +44,20 @@ class OperationRecord(BaseModel): errors: int = 0 +def _calculate_elapsed_time( + _started_timestamp: int, /, _current_time: int | None = None +) -> int: + """Calculate the elapsed time from <_started_timestamp> to current. + + The precision of elapsed time is in second. + NOTE(20240709): If the calculation result is zero due to interval less than + 1s, we round up the result to 1s. + """ + if _current_time is None: + _current_time = int(time.time()) + return max(1, _current_time - _started_timestamp) + + class OTAUpdateStatsCollector: def __init__(self, *, collect_interval: int = 1) -> None: @@ -86,12 +100,12 @@ def total_elapsed_time(self) -> int: @property def delta_calculation_elapsed_time(self) -> int: if self._delta_calculation_started_timestamp == 0: - return 0 + return 0 # not yet started if self._delta_calculation_finished_timestamp == 0: - return int(time.time()) - self._delta_calculation_started_timestamp - return ( - self._delta_calculation_finished_timestamp - - self._delta_calculation_started_timestamp + return _calculate_elapsed_time(self._delta_calculation_started_timestamp) + return _calculate_elapsed_time( + self._delta_calculation_started_timestamp, + self._delta_calculation_finished_timestamp, ) @property @@ -99,17 +113,21 @@ def download_elapsed_time(self) -> int: if self._download_started_timestamp == 0: return 0 if self._download_finished_timestamp == 0: - return int(time.time()) - self._download_started_timestamp - return self._download_finished_timestamp - self._download_started_timestamp + return _calculate_elapsed_time(self._download_started_timestamp) + return _calculate_elapsed_time( + self._download_started_timestamp, + self._download_finished_timestamp, + ) @property def apply_update_elapsed_time(self) -> int: if self._apply_update_started_timestamp == 0: return 0 if self._apply_update_finished_timestamp == 0: - return int(time.time()) - self._apply_update_started_timestamp - return ( - self._apply_update_finished_timestamp - self._apply_update_started_timestamp + return _calculate_elapsed_time(self._apply_update_started_timestamp) + return _calculate_elapsed_time( + self._apply_update_started_timestamp, + self._apply_update_finished_timestamp, ) def _stats_collector(self): diff --git a/tests/test_otaclient/test_create_standby.py b/tests/test_otaclient/test_create_standby.py index 32f38ec96..c73ff5af4 100644 --- a/tests/test_otaclient/test_create_standby.py +++ b/tests/test_otaclient/test_create_standby.py @@ -23,7 +23,10 @@ from pytest_mock import MockerFixture from otaclient.app.boot_control import BootControllerProtocol +from otaclient.app.configs import BaseConfig from otaclient.app.configs import config as otaclient_cfg +from otaclient.app.create_standby.rebuild_mode import RebuildMode +from otaclient.app.ota_client import OTAClientControlFlags, _OTAUpdater from tests.conftest import TestConfiguration as cfg from tests.utils import SlotMeta, compare_dir @@ -63,7 +66,6 @@ def prepare_ab_slots(self, tmp_path: Path, ab_slots: SlotMeta): @pytest.fixture(autouse=True) def mock_setup(self, mocker: MockerFixture, prepare_ab_slots): - from otaclient.app.configs import BaseConfig # ------ mock boot_controller ------ # self._boot_control = typing.cast( @@ -80,9 +82,6 @@ def mock_setup(self, mocker: MockerFixture, prepare_ab_slots): mocker.patch(f"{cfg.CREATE_STANDBY_MODULE_PATH}.rebuild_mode.cfg", _cfg) def test_update_with_create_standby_RebuildMode(self, mocker: MockerFixture): - from otaclient.app.create_standby.rebuild_mode import RebuildMode - from otaclient.app.ota_client import OTAClientControlFlags, _OTAUpdater - # ------ execution ------ # otaclient_control_flags = typing.cast( OTAClientControlFlags, mocker.MagicMock(spec=OTAClientControlFlags) @@ -116,6 +115,9 @@ def test_update_with_create_standby_RebuildMode(self, mocker: MockerFixture): assert collector.processed_files_size assert collector.downloaded_files_num assert collector.downloaded_files_size + assert collector.download_elapsed_time + assert collector.delta_calculation_elapsed_time + assert collector.total_elapsed_time assert collector.apply_update_elapsed_time # --- check slot creating result, ensure slot_a and slot_b is the same --- # diff --git a/tests/test_otaclient/test_ota_client.py b/tests/test_otaclient/test_ota_client.py index 960268955..9aff426a3 100644 --- a/tests/test_otaclient/test_ota_client.py +++ b/tests/test_otaclient/test_ota_client.py @@ -221,10 +221,6 @@ def mock_setup(self, mocker: pytest_mock.MockerFixture): self.control_flags = typing.cast( OTAClientControlFlags, mocker.MagicMock(spec=OTAClientControlFlags) ) - # NOTE: threading.Lock is an alias, so we specs it with its instance - self.ota_lock = typing.cast( - threading.Lock, mocker.MagicMock(spec=threading.Lock()) - ) self.ota_updater = typing.cast(_OTAUpdater, mocker.MagicMock(spec=_OTAUpdater)) self.boot_controller = typing.cast( @@ -247,8 +243,6 @@ def mock_setup(self, mocker: pytest_mock.MockerFixture): mocker.patch( f"{cfg.OTACLIENT_MODULE_PATH}._OTAUpdater", return_value=self.ota_updater ) - # inject lock into otaclient - self.ota_client._lock = self.ota_lock # inject otaclient version self.ota_client.OTACLIENT_VERSION = self.OTACLIENT_VERSION @@ -261,9 +255,7 @@ def test_update_normal_finished(self): ) # --- assert on update finished(before reboot) --- # - self.ota_lock.acquire.assert_called_once_with(blocking=False) self.ota_updater.execute.assert_called_once() - self.ota_lock.release.assert_called_once() assert ( self.ota_client.live_ota_status.get_ota_status() == api_types.StatusOta.UPDATING @@ -282,9 +274,7 @@ def test_update_interrupted(self): ) # --- assertion on interrupted OTA update --- # - self.ota_lock.acquire.assert_called_once_with(blocking=False) self.ota_updater.execute.assert_called_once() - self.ota_lock.release.assert_called_once() assert ( self.ota_client.live_ota_status.get_ota_status() @@ -292,10 +282,6 @@ def test_update_interrupted(self): ) assert self.ota_client.last_failure_type == api_types.FailureType.RECOVERABLE - def test_rollback(self): - # TODO - pass - def test_status_not_in_update(self): # --- query status --- # _status = self.ota_client.status()