From a4c7e860a7060c94e6da3817f29e9cdcaa91867d Mon Sep 17 00:00:00 2001 From: Ilya Siamionau Date: Tue, 28 Jan 2025 16:29:46 +0100 Subject: [PATCH] CM-27209 - Add latest CLI version check (#276) --- .github/workflows/tests_full.yml | 4 +- cycode/cli/commands/main_cli.py | 32 ++- .../cli/commands/version/version_checker.py | 209 ++++++++++++++++++ cycode/cli/utils/path_utils.py | 7 +- .../test_check_latest_version_on_close.py | 71 ++++++ tests/cli/commands/version/__init__.py | 0 .../commands/version/test_version_checker.py | 129 +++++++++++ 7 files changed, 448 insertions(+), 4 deletions(-) create mode 100644 cycode/cli/commands/version/version_checker.py create mode 100644 tests/cli/commands/test_check_latest_version_on_close.py create mode 100644 tests/cli/commands/version/__init__.py create mode 100644 tests/cli/commands/version/test_version_checker.py diff --git a/.github/workflows/tests_full.yml b/.github/workflows/tests_full.yml index a760d617..985a3d36 100644 --- a/.github/workflows/tests_full.yml +++ b/.github/workflows/tests_full.yml @@ -65,7 +65,9 @@ jobs: run: poetry install - name: Run executable test - if: matrix.python-version != '3.13' # we will migrate pyinstaller to 3.13 later + # we care about the one Python version that will be used to build the executable + # TODO(MarshalX): upgrade to Python 3.13 + if: matrix.python-version == '3.12' run: | poetry run pyinstaller pyinstaller.spec ./dist/cycode-cli version diff --git a/cycode/cli/commands/main_cli.py b/cycode/cli/commands/main_cli.py index f97e0749..59b8625f 100644 --- a/cycode/cli/commands/main_cli.py +++ b/cycode/cli/commands/main_cli.py @@ -3,6 +3,7 @@ import click +from cycode import __version__ from cycode.cli.commands.ai_remediation.ai_remediation_command import ai_remediation_command from cycode.cli.commands.auth.auth_command import auth_command from cycode.cli.commands.configure.configure_command import configure_command @@ -10,6 +11,7 @@ from cycode.cli.commands.report.report_command import report_command from cycode.cli.commands.scan.scan_command import scan_command from cycode.cli.commands.status.status_command import status_command +from cycode.cli.commands.version.version_checker import version_checker from cycode.cli.commands.version.version_command import version_command from cycode.cli.consts import ( CLI_CONTEXT_SETTINGS, @@ -48,6 +50,12 @@ default=False, help='Do not show the progress meter.', ) +@click.option( + '--no-update-notifier', + is_flag=True, + default=False, + help='Do not check CLI for updates.', +) @click.option( '--output', '-o', @@ -63,7 +71,12 @@ ) @click.pass_context def main_cli( - context: click.Context, verbose: bool, no_progress_meter: bool, output: str, user_agent: Optional[str] + context: click.Context, + verbose: bool, + no_progress_meter: bool, + no_update_notifier: bool, + output: str, + user_agent: Optional[str], ) -> None: init_sentry() add_breadcrumb('cycode') @@ -85,3 +98,20 @@ def main_cli( if user_agent: user_agent_option = UserAgentOptionScheme().loads(user_agent) CycodeClientBase.enrich_user_agent(user_agent_option.user_agent_suffix) + + if not no_update_notifier: + context.call_on_close(lambda: check_latest_version_on_close()) + + +@click.pass_context +def check_latest_version_on_close(context: click.Context) -> None: + output = context.obj.get('output') + # don't print anything if the output is JSON + if output == 'json': + return + + # we always want to check the latest version for "version" and "status" commands + should_use_cache = context.invoked_subcommand not in {'version', 'status'} + version_checker.check_and_notify_update( + current_version=__version__, use_color=context.color, use_cache=should_use_cache + ) diff --git a/cycode/cli/commands/version/version_checker.py b/cycode/cli/commands/version/version_checker.py new file mode 100644 index 00000000..c5ec9d4f --- /dev/null +++ b/cycode/cli/commands/version/version_checker.py @@ -0,0 +1,209 @@ +import os +import re +import time +from pathlib import Path +from typing import List, Optional, Tuple + +import click + +from cycode.cli.user_settings.configuration_manager import ConfigurationManager +from cycode.cli.utils.path_utils import get_file_content +from cycode.cyclient.cycode_client_base import CycodeClientBase + + +def _compare_versions( + current_parts: List[int], + latest_parts: List[int], + current_is_pre: bool, + latest_is_pre: bool, + latest_version: str, +) -> Optional[str]: + """Compare version numbers and determine if an update is needed. + + Implements version comparison logic with special handling for pre-release versions: + - Won't suggest downgrading from stable to pre-release + - Will suggest upgrading from pre-release to stable of the same version + + Args: + current_parts: List of numeric version components for the current version + latest_parts: List of numeric version components for the latest version + current_is_pre: Whether the current version is pre-release + latest_is_pre: Whether the latest version is pre-release + latest_version: The full latest version string + + Returns: + str | None: The latest version string if an update is recommended, + None if no update is needed + """ + # If current is stable and latest is pre-release, don't suggest update + if not current_is_pre and latest_is_pre: + return None + + # Compare version numbers + for current, latest in zip(current_parts, latest_parts): + if latest > current: + return latest_version + if current > latest: + return None + + # If all numbers are equal, suggest update if current is pre-release and latest is stable + if current_is_pre and not latest_is_pre: + return latest_version + + return None + + +class VersionChecker(CycodeClientBase): + PYPI_API_URL = 'https://pypi.org/pypi' + PYPI_PACKAGE_NAME = 'cycode' + + GIT_CHANGELOG_URL_PREFIX = 'https://github.com/cycodehq/cycode-cli/releases/tag/v' + + DAILY = 24 * 60 * 60 # 24 hours in seconds + WEEKLY = DAILY * 7 + + def __init__(self) -> None: + """Initialize the VersionChecker. + + Sets up the version checker with PyPI API URL and configure the cache file location + using the global configuration directory. + """ + super().__init__(self.PYPI_API_URL) + + configuration_manager = ConfigurationManager() + config_dir = configuration_manager.global_config_file_manager.get_config_directory_path() + self.cache_file = Path(config_dir) / '.version_check' + + def get_latest_version(self) -> Optional[str]: + """Fetch the latest version of the package from PyPI. + + Makes an HTTP request to PyPI's JSON API to get the latest version information. + + Returns: + str | None: The latest version string if successful, None if the request fails + or the version information is not available. + """ + try: + response = self.get(f'{self.PYPI_PACKAGE_NAME}/json') + data = response.json() + return data.get('info', {}).get('version') + except Exception: + return None + + @staticmethod + def _parse_version(version: str) -> Tuple[List[int], bool]: + """Parse version string into components and identify if it's a pre-release. + + Extracts numeric version components and determines if the version is a pre-release + by checking for 'dev' in the version string. + + Args: + version: The version string to parse (e.g., '1.2.3' or '1.2.3dev4') + + Returns: + tuple: A tuple containing: + - List[int]: List of numeric version components + - bool: True if this is a pre-release version, False otherwise + """ + version_parts = [int(x) for x in re.findall(r'\d+', version)] + is_prerelease = 'dev' in version + + return version_parts, is_prerelease + + def _should_check_update(self, is_prerelease: bool) -> bool: + """Determine if an update check should be performed based on the last check time. + + Implements a time-based caching mechanism where update checks are performed: + - Daily for pre-release versions + - Weekly for stable versions + + Args: + is_prerelease: Whether the current version is a pre-release + + Returns: + bool: True if an update check should be performed, False otherwise + """ + if not os.path.exists(self.cache_file): + return True + + file_content = get_file_content(self.cache_file) + if file_content is None: + return True + + try: + last_check = float(file_content.strip()) + except ValueError: + return True + + duration = self.DAILY if is_prerelease else self.WEEKLY + return time.time() - last_check >= duration + + def _update_last_check(self) -> None: + """Update the timestamp of the last update check. + + Creates the cache directory if it doesn't exist and write the current timestamp + to the cache file. Silently handle any IO errors that might occur during the process. + """ + try: + os.makedirs(os.path.dirname(self.cache_file), exist_ok=True) + with open(self.cache_file, 'w', encoding='UTF-8') as f: + f.write(str(time.time())) + except IOError: + pass + + def check_for_update(self, current_version: str, use_cache: bool = True) -> Optional[str]: + """Check if an update is available for the current version. + + Respects the update check frequency (daily/weekly) based on the version type + + Args: + current_version: The current version string of the CLI + use_cache: If True, use the cached timestamp to determine if an update check is needed + + Returns: + str | None: The latest version string if an update is recommended, + None if no update is needed or if check should be skipped + """ + current_parts, current_is_pre = self._parse_version(current_version) + + # Check if we should perform the update check based on frequency + if use_cache and not self._should_check_update(current_is_pre): + return None + + latest_version = self.get_latest_version() + if not latest_version: + return None + + # Update the last check timestamp + use_cache and self._update_last_check() + + latest_parts, latest_is_pre = self._parse_version(latest_version) + return _compare_versions(current_parts, latest_parts, current_is_pre, latest_is_pre, latest_version) + + def check_and_notify_update(self, current_version: str, use_color: bool = True, use_cache: bool = True) -> None: + """Check for updates and display a notification if a new version is available. + + Performs the version check and displays a formatted message with update instructions + if a newer version is available. The message includes: + - Current and new version numbers + - Link to the changelog + - Command to perform the update + + Args: + current_version: Current version of the CLI + use_color: If True, use colored output in the terminal + use_cache: If True, use the cached timestamp to determine if an update check is needed + """ + latest_version = self.check_for_update(current_version, use_cache) + should_update = bool(latest_version) + if should_update: + update_message = ( + '\nNew version of cycode available! ' + f"{click.style(current_version, fg='yellow')} → {click.style(latest_version, fg='bright_blue')}\n" + f"Changelog: {click.style(f'{self.GIT_CHANGELOG_URL_PREFIX}{latest_version}', fg='bright_blue')}\n" + f"Run {click.style('pip install --upgrade cycode', fg='green')} to update\n" + ) + click.echo(update_message, color=use_color) + + +version_checker = VersionChecker() diff --git a/cycode/cli/utils/path_utils.py b/cycode/cli/utils/path_utils.py index 4f8be3f1..a2d8816b 100644 --- a/cycode/cli/utils/path_utils.py +++ b/cycode/cli/utils/path_utils.py @@ -1,13 +1,16 @@ import json import os from functools import lru_cache -from typing import AnyStr, List, Optional +from typing import TYPE_CHECKING, AnyStr, List, Optional, Union import click from binaryornot.helpers import is_binary_string from cycode.cyclient import logger +if TYPE_CHECKING: + from os import PathLike + @lru_cache(maxsize=None) def is_sub_path(path: str, sub_path: str) -> bool: @@ -73,7 +76,7 @@ def join_paths(path: str, filename: str) -> str: return os.path.join(path, filename) -def get_file_content(file_path: str) -> Optional[AnyStr]: +def get_file_content(file_path: Union[str, 'PathLike']) -> Optional[AnyStr]: try: with open(file_path, 'r', encoding='UTF-8') as f: return f.read() diff --git a/tests/cli/commands/test_check_latest_version_on_close.py b/tests/cli/commands/test_check_latest_version_on_close.py new file mode 100644 index 00000000..189973b4 --- /dev/null +++ b/tests/cli/commands/test_check_latest_version_on_close.py @@ -0,0 +1,71 @@ +from unittest.mock import patch + +import pytest +from click.testing import CliRunner + +from cycode import __version__ +from cycode.cli.commands.main_cli import main_cli +from cycode.cli.commands.version.version_checker import VersionChecker +from tests.conftest import CLI_ENV_VARS + +_NEW_LATEST_VERSION = '999.0.0' # Simulate a newer version available +_UPDATE_MESSAGE_PART = 'new version of cycode available' + + +@patch.object(VersionChecker, 'check_for_update') +def test_version_check_with_json_output(mock_check_update: patch) -> None: + # When output is JSON, version check should be skipped + mock_check_update.return_value = _NEW_LATEST_VERSION + + args = ['--output', 'json', 'version'] + result = CliRunner().invoke(main_cli, args, env=CLI_ENV_VARS) + + # Version check message should not be present in JSON output + assert _UPDATE_MESSAGE_PART not in result.output.lower() + mock_check_update.assert_not_called() + + +@pytest.fixture +def mock_auth_info() -> 'patch': + # Mock the authorization info to avoid API calls + with patch('cycode.cli.commands.status.status_command.get_authorization_info', return_value=None) as mock: + yield mock + + +@pytest.mark.parametrize('command', ['version', 'status']) +@patch.object(VersionChecker, 'check_for_update') +def test_version_check_for_special_commands(mock_check_update: patch, mock_auth_info: patch, command: str) -> None: + # Version and status commands should always check the version without cache + mock_check_update.return_value = _NEW_LATEST_VERSION + + result = CliRunner().invoke(main_cli, [command], env=CLI_ENV_VARS) + + # Version information should be present in output + assert _UPDATE_MESSAGE_PART in result.output.lower() + # Version check must be called without a cache + mock_check_update.assert_called_once_with(__version__, False) + + +@patch.object(VersionChecker, 'check_for_update') +def test_version_check_with_text_output(mock_check_update: patch) -> None: + # Regular commands with text output should check the version using cache + mock_check_update.return_value = _NEW_LATEST_VERSION + + args = ['version'] + result = CliRunner().invoke(main_cli, args, env=CLI_ENV_VARS) + + # Version check message should be present in JSON output + assert _UPDATE_MESSAGE_PART in result.output.lower() + + +@patch.object(VersionChecker, 'check_for_update') +def test_version_check_disabled(mock_check_update: patch) -> None: + # When --no-update-notifier is used, version check should be skipped + mock_check_update.return_value = _NEW_LATEST_VERSION + + args = ['--no-update-notifier', 'version'] + result = CliRunner().invoke(main_cli, args, env=CLI_ENV_VARS) + + # Version check message should not be present + assert _UPDATE_MESSAGE_PART not in result.output.lower() + mock_check_update.assert_not_called() diff --git a/tests/cli/commands/version/__init__.py b/tests/cli/commands/version/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/cli/commands/version/test_version_checker.py b/tests/cli/commands/version/test_version_checker.py new file mode 100644 index 00000000..eb0b9bd9 --- /dev/null +++ b/tests/cli/commands/version/test_version_checker.py @@ -0,0 +1,129 @@ +import time +from typing import TYPE_CHECKING, Optional +from unittest.mock import MagicMock, patch + +import pytest + +if TYPE_CHECKING: + from pathlib import Path + +from cycode.cli.commands.version.version_checker import VersionChecker + + +@pytest.fixture +def version_checker() -> 'VersionChecker': + return VersionChecker() + + +@pytest.fixture +def version_checker_cached(tmp_path: 'Path', version_checker: 'VersionChecker') -> 'VersionChecker': + version_checker.cache_file = tmp_path / '.version_check' + return version_checker + + +class TestVersionChecker: + def test_parse_version_stable(self, version_checker: 'VersionChecker') -> None: + version = '1.2.3' + parts, is_pre = version_checker._parse_version(version) + + assert parts == [1, 2, 3] + assert not is_pre + + def test_parse_version_prerelease(self, version_checker: 'VersionChecker') -> None: + version = '1.2.3dev4' + parts, is_pre = version_checker._parse_version(version) + + assert parts == [1, 2, 3, 4] + assert is_pre + + def test_parse_version_complex(self, version_checker: 'VersionChecker') -> None: + version = '1.2.3.dev4.post5' + parts, is_pre = version_checker._parse_version(version) + + assert parts == [1, 2, 3, 4, 5] + assert is_pre + + def test_should_check_update_no_cache(self, version_checker_cached: 'VersionChecker') -> None: + assert version_checker_cached._should_check_update(is_prerelease=False) is True + + def test_should_check_update_invalid_cache(self, version_checker_cached: 'VersionChecker') -> None: + version_checker_cached.cache_file.write_text('invalid') + assert version_checker_cached._should_check_update(is_prerelease=False) is True + + def test_should_check_update_expired(self, version_checker_cached: 'VersionChecker') -> None: + # Write a timestamp from 8 days ago + old_time = time.time() - (8 * 24 * 60 * 60) + version_checker_cached.cache_file.write_text(str(old_time)) + + assert version_checker_cached._should_check_update(is_prerelease=False) is True + + def test_should_check_update_not_expired(self, version_checker_cached: 'VersionChecker') -> None: + # Write a recent timestamp + version_checker_cached.cache_file.write_text(str(time.time())) + + assert version_checker_cached._should_check_update(is_prerelease=False) is False + + def test_should_check_update_prerelease_daily(self, version_checker_cached: 'VersionChecker') -> None: + # Write a timestamp from 25 hours ago + old_time = time.time() - (25 * 60 * 60) + version_checker_cached.cache_file.write_text(str(old_time)) + + assert version_checker_cached._should_check_update(is_prerelease=True) is True + + @pytest.mark.parametrize( + 'current_version, latest_version, expected_result', + [ + # Stable version comparisons + ('1.2.3', '1.2.4', '1.2.4'), # Higher patch version + ('1.2.3', '1.3.0', '1.3.0'), # Higher minor version + ('1.2.3', '2.0.0', '2.0.0'), # Higher major version + ('1.2.3', '1.2.3', None), # Same version + ('1.2.4', '1.2.3', None), # Current higher than latest + # Pre-release version comparisons + ('1.2.3dev1', '1.2.3', '1.2.3'), # Pre-release to stable + ('1.2.3', '1.2.4dev1', None), # Stable to pre-release + ('1.2.3dev1', '1.2.3dev2', '1.2.3dev2'), # Pre-release to higher pre-release + ('1.2.3dev2', '1.2.3dev1', None), # Pre-release to lower pre-release + # Edge cases + ('1.0.0dev1', '1.0.0', '1.0.0'), # Pre-release to same version stable + ('2.0.0', '2.0.0dev1', None), # Stable to same version pre-release + ('2.2.1.dev4', '2.2.0', None), # Pre-release to lower stable + ], + ) + def test_check_for_update_scenarios( + self, + version_checker_cached: 'VersionChecker', + current_version: str, + latest_version: str, + expected_result: Optional[str], + ) -> None: + with patch.multiple( + version_checker_cached, + _should_check_update=MagicMock(return_value=True), + get_latest_version=MagicMock(return_value=latest_version), + _update_last_check=MagicMock(), + ): + result = version_checker_cached.check_for_update(current_version) + assert result == expected_result + + def test_get_latest_version_success(self, version_checker: 'VersionChecker') -> None: + mock_response = MagicMock() + mock_response.json.return_value = {'info': {'version': '1.2.3'}} + with patch.object(version_checker, 'get', return_value=mock_response): + assert version_checker.get_latest_version() == '1.2.3' + + def test_get_latest_version_failure(self, version_checker: 'VersionChecker') -> None: + with patch.object(version_checker, 'get', side_effect=Exception): + assert version_checker.get_latest_version() is None + + def test_update_last_check(self, version_checker_cached: 'VersionChecker') -> None: + version_checker_cached._update_last_check() + assert version_checker_cached.cache_file.exists() + + timestamp = float(version_checker_cached.cache_file.read_text().strip()) + assert abs(timestamp - time.time()) < 1 # Should be within 1 second + + def test_update_last_check_permission_error(self, version_checker_cached: 'VersionChecker') -> None: + with patch('builtins.open', side_effect=IOError): + version_checker_cached._update_last_check() + # Should not raise an exception