Skip to content

Commit

Permalink
CM-27209 - Add latest CLI version check (#276)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarshalX authored Jan 28, 2025
1 parent 1c4d549 commit a4c7e86
Show file tree
Hide file tree
Showing 7 changed files with 448 additions and 4 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/tests_full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ jobs:
run: poetry install

- name: Run executable test
if: matrix.python-version != '3.13' # we will migrate pyinstaller to 3.13 later
# we care about the one Python version that will be used to build the executable
# TODO(MarshalX): upgrade to Python 3.13
if: matrix.python-version == '3.12'
run: |
poetry run pyinstaller pyinstaller.spec
./dist/cycode-cli version
Expand Down
32 changes: 31 additions & 1 deletion cycode/cli/commands/main_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@

import click

from cycode import __version__
from cycode.cli.commands.ai_remediation.ai_remediation_command import ai_remediation_command
from cycode.cli.commands.auth.auth_command import auth_command
from cycode.cli.commands.configure.configure_command import configure_command
from cycode.cli.commands.ignore.ignore_command import ignore_command
from cycode.cli.commands.report.report_command import report_command
from cycode.cli.commands.scan.scan_command import scan_command
from cycode.cli.commands.status.status_command import status_command
from cycode.cli.commands.version.version_checker import version_checker
from cycode.cli.commands.version.version_command import version_command
from cycode.cli.consts import (
CLI_CONTEXT_SETTINGS,
Expand Down Expand Up @@ -48,6 +50,12 @@
default=False,
help='Do not show the progress meter.',
)
@click.option(
'--no-update-notifier',
is_flag=True,
default=False,
help='Do not check CLI for updates.',
)
@click.option(
'--output',
'-o',
Expand All @@ -63,7 +71,12 @@
)
@click.pass_context
def main_cli(
context: click.Context, verbose: bool, no_progress_meter: bool, output: str, user_agent: Optional[str]
context: click.Context,
verbose: bool,
no_progress_meter: bool,
no_update_notifier: bool,
output: str,
user_agent: Optional[str],
) -> None:
init_sentry()
add_breadcrumb('cycode')
Expand All @@ -85,3 +98,20 @@ def main_cli(
if user_agent:
user_agent_option = UserAgentOptionScheme().loads(user_agent)
CycodeClientBase.enrich_user_agent(user_agent_option.user_agent_suffix)

if not no_update_notifier:
context.call_on_close(lambda: check_latest_version_on_close())


@click.pass_context
def check_latest_version_on_close(context: click.Context) -> None:
output = context.obj.get('output')
# don't print anything if the output is JSON
if output == 'json':
return

# we always want to check the latest version for "version" and "status" commands
should_use_cache = context.invoked_subcommand not in {'version', 'status'}
version_checker.check_and_notify_update(
current_version=__version__, use_color=context.color, use_cache=should_use_cache
)
209 changes: 209 additions & 0 deletions cycode/cli/commands/version/version_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
import os
import re
import time
from pathlib import Path
from typing import List, Optional, Tuple

import click

from cycode.cli.user_settings.configuration_manager import ConfigurationManager
from cycode.cli.utils.path_utils import get_file_content
from cycode.cyclient.cycode_client_base import CycodeClientBase


def _compare_versions(
current_parts: List[int],
latest_parts: List[int],
current_is_pre: bool,
latest_is_pre: bool,
latest_version: str,
) -> Optional[str]:
"""Compare version numbers and determine if an update is needed.
Implements version comparison logic with special handling for pre-release versions:
- Won't suggest downgrading from stable to pre-release
- Will suggest upgrading from pre-release to stable of the same version
Args:
current_parts: List of numeric version components for the current version
latest_parts: List of numeric version components for the latest version
current_is_pre: Whether the current version is pre-release
latest_is_pre: Whether the latest version is pre-release
latest_version: The full latest version string
Returns:
str | None: The latest version string if an update is recommended,
None if no update is needed
"""
# If current is stable and latest is pre-release, don't suggest update
if not current_is_pre and latest_is_pre:
return None

# Compare version numbers
for current, latest in zip(current_parts, latest_parts):
if latest > current:
return latest_version
if current > latest:
return None

# If all numbers are equal, suggest update if current is pre-release and latest is stable
if current_is_pre and not latest_is_pre:
return latest_version

return None


class VersionChecker(CycodeClientBase):
PYPI_API_URL = 'https://pypi.org/pypi'
PYPI_PACKAGE_NAME = 'cycode'

GIT_CHANGELOG_URL_PREFIX = 'https://github.com/cycodehq/cycode-cli/releases/tag/v'

DAILY = 24 * 60 * 60 # 24 hours in seconds
WEEKLY = DAILY * 7

def __init__(self) -> None:
"""Initialize the VersionChecker.
Sets up the version checker with PyPI API URL and configure the cache file location
using the global configuration directory.
"""
super().__init__(self.PYPI_API_URL)

configuration_manager = ConfigurationManager()
config_dir = configuration_manager.global_config_file_manager.get_config_directory_path()
self.cache_file = Path(config_dir) / '.version_check'

def get_latest_version(self) -> Optional[str]:
"""Fetch the latest version of the package from PyPI.
Makes an HTTP request to PyPI's JSON API to get the latest version information.
Returns:
str | None: The latest version string if successful, None if the request fails
or the version information is not available.
"""
try:
response = self.get(f'{self.PYPI_PACKAGE_NAME}/json')
data = response.json()
return data.get('info', {}).get('version')
except Exception:
return None

@staticmethod
def _parse_version(version: str) -> Tuple[List[int], bool]:
"""Parse version string into components and identify if it's a pre-release.
Extracts numeric version components and determines if the version is a pre-release
by checking for 'dev' in the version string.
Args:
version: The version string to parse (e.g., '1.2.3' or '1.2.3dev4')
Returns:
tuple: A tuple containing:
- List[int]: List of numeric version components
- bool: True if this is a pre-release version, False otherwise
"""
version_parts = [int(x) for x in re.findall(r'\d+', version)]
is_prerelease = 'dev' in version

return version_parts, is_prerelease

def _should_check_update(self, is_prerelease: bool) -> bool:
"""Determine if an update check should be performed based on the last check time.
Implements a time-based caching mechanism where update checks are performed:
- Daily for pre-release versions
- Weekly for stable versions
Args:
is_prerelease: Whether the current version is a pre-release
Returns:
bool: True if an update check should be performed, False otherwise
"""
if not os.path.exists(self.cache_file):
return True

file_content = get_file_content(self.cache_file)
if file_content is None:
return True

try:
last_check = float(file_content.strip())
except ValueError:
return True

duration = self.DAILY if is_prerelease else self.WEEKLY
return time.time() - last_check >= duration

def _update_last_check(self) -> None:
"""Update the timestamp of the last update check.
Creates the cache directory if it doesn't exist and write the current timestamp
to the cache file. Silently handle any IO errors that might occur during the process.
"""
try:
os.makedirs(os.path.dirname(self.cache_file), exist_ok=True)
with open(self.cache_file, 'w', encoding='UTF-8') as f:
f.write(str(time.time()))
except IOError:
pass

def check_for_update(self, current_version: str, use_cache: bool = True) -> Optional[str]:
"""Check if an update is available for the current version.
Respects the update check frequency (daily/weekly) based on the version type
Args:
current_version: The current version string of the CLI
use_cache: If True, use the cached timestamp to determine if an update check is needed
Returns:
str | None: The latest version string if an update is recommended,
None if no update is needed or if check should be skipped
"""
current_parts, current_is_pre = self._parse_version(current_version)

# Check if we should perform the update check based on frequency
if use_cache and not self._should_check_update(current_is_pre):
return None

latest_version = self.get_latest_version()
if not latest_version:
return None

# Update the last check timestamp
use_cache and self._update_last_check()

latest_parts, latest_is_pre = self._parse_version(latest_version)
return _compare_versions(current_parts, latest_parts, current_is_pre, latest_is_pre, latest_version)

def check_and_notify_update(self, current_version: str, use_color: bool = True, use_cache: bool = True) -> None:
"""Check for updates and display a notification if a new version is available.
Performs the version check and displays a formatted message with update instructions
if a newer version is available. The message includes:
- Current and new version numbers
- Link to the changelog
- Command to perform the update
Args:
current_version: Current version of the CLI
use_color: If True, use colored output in the terminal
use_cache: If True, use the cached timestamp to determine if an update check is needed
"""
latest_version = self.check_for_update(current_version, use_cache)
should_update = bool(latest_version)
if should_update:
update_message = (
'\nNew version of cycode available! '
f"{click.style(current_version, fg='yellow')}{click.style(latest_version, fg='bright_blue')}\n"
f"Changelog: {click.style(f'{self.GIT_CHANGELOG_URL_PREFIX}{latest_version}', fg='bright_blue')}\n"
f"Run {click.style('pip install --upgrade cycode', fg='green')} to update\n"
)
click.echo(update_message, color=use_color)


version_checker = VersionChecker()
7 changes: 5 additions & 2 deletions cycode/cli/utils/path_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import json
import os
from functools import lru_cache
from typing import AnyStr, List, Optional
from typing import TYPE_CHECKING, AnyStr, List, Optional, Union

import click
from binaryornot.helpers import is_binary_string

from cycode.cyclient import logger

if TYPE_CHECKING:
from os import PathLike


@lru_cache(maxsize=None)
def is_sub_path(path: str, sub_path: str) -> bool:
Expand Down Expand Up @@ -73,7 +76,7 @@ def join_paths(path: str, filename: str) -> str:
return os.path.join(path, filename)


def get_file_content(file_path: str) -> Optional[AnyStr]:
def get_file_content(file_path: Union[str, 'PathLike']) -> Optional[AnyStr]:
try:
with open(file_path, 'r', encoding='UTF-8') as f:
return f.read()
Expand Down
71 changes: 71 additions & 0 deletions tests/cli/commands/test_check_latest_version_on_close.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from unittest.mock import patch

import pytest
from click.testing import CliRunner

from cycode import __version__
from cycode.cli.commands.main_cli import main_cli
from cycode.cli.commands.version.version_checker import VersionChecker
from tests.conftest import CLI_ENV_VARS

_NEW_LATEST_VERSION = '999.0.0' # Simulate a newer version available
_UPDATE_MESSAGE_PART = 'new version of cycode available'


@patch.object(VersionChecker, 'check_for_update')
def test_version_check_with_json_output(mock_check_update: patch) -> None:
# When output is JSON, version check should be skipped
mock_check_update.return_value = _NEW_LATEST_VERSION

args = ['--output', 'json', 'version']
result = CliRunner().invoke(main_cli, args, env=CLI_ENV_VARS)

# Version check message should not be present in JSON output
assert _UPDATE_MESSAGE_PART not in result.output.lower()
mock_check_update.assert_not_called()


@pytest.fixture
def mock_auth_info() -> 'patch':
# Mock the authorization info to avoid API calls
with patch('cycode.cli.commands.status.status_command.get_authorization_info', return_value=None) as mock:
yield mock


@pytest.mark.parametrize('command', ['version', 'status'])
@patch.object(VersionChecker, 'check_for_update')
def test_version_check_for_special_commands(mock_check_update: patch, mock_auth_info: patch, command: str) -> None:
# Version and status commands should always check the version without cache
mock_check_update.return_value = _NEW_LATEST_VERSION

result = CliRunner().invoke(main_cli, [command], env=CLI_ENV_VARS)

# Version information should be present in output
assert _UPDATE_MESSAGE_PART in result.output.lower()
# Version check must be called without a cache
mock_check_update.assert_called_once_with(__version__, False)


@patch.object(VersionChecker, 'check_for_update')
def test_version_check_with_text_output(mock_check_update: patch) -> None:
# Regular commands with text output should check the version using cache
mock_check_update.return_value = _NEW_LATEST_VERSION

args = ['version']
result = CliRunner().invoke(main_cli, args, env=CLI_ENV_VARS)

# Version check message should be present in JSON output
assert _UPDATE_MESSAGE_PART in result.output.lower()


@patch.object(VersionChecker, 'check_for_update')
def test_version_check_disabled(mock_check_update: patch) -> None:
# When --no-update-notifier is used, version check should be skipped
mock_check_update.return_value = _NEW_LATEST_VERSION

args = ['--no-update-notifier', 'version']
result = CliRunner().invoke(main_cli, args, env=CLI_ENV_VARS)

# Version check message should not be present
assert _UPDATE_MESSAGE_PART not in result.output.lower()
mock_check_update.assert_not_called()
Empty file.
Loading

0 comments on commit a4c7e86

Please sign in to comment.