diff --git a/poetry.lock b/poetry.lock index 5de7067..0da87ee 100644 --- a/poetry.lock +++ b/poetry.lock @@ -188,24 +188,6 @@ files = [ [package.extras] test = ["pytest (>=6)"] -[[package]] -name = "factory-boy" -version = "3.3.1" -description = "A versatile test fixtures replacement based on thoughtbot's factory_bot for Ruby." -optional = false -python-versions = ">=3.8" -files = [ - {file = "factory_boy-3.3.1-py2.py3-none-any.whl", hash = "sha256:7b1113c49736e1e9995bc2a18f4dbf2c52cf0f841103517010b1d825712ce3ca"}, - {file = "factory_boy-3.3.1.tar.gz", hash = "sha256:8317aa5289cdfc45f9cae570feb07a6177316c82e34d14df3c2e1f22f26abef0"}, -] - -[package.dependencies] -Faker = ">=0.7.0" - -[package.extras] -dev = ["Django", "Pillow", "SQLAlchemy", "coverage", "flake8", "isort", "mongoengine", "mongomock", "mypy", "tox", "wheel (>=0.32.0)", "zest.releaser[recommended]"] -doc = ["Sphinx", "sphinx-rtd-theme", "sphinxcontrib-spelling"] - [[package]] name = "faker" version = "28.1.0" @@ -787,4 +769,4 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess [metadata] lock-version = "2.0" python-versions = "^3.8.18" -content-hash = "dce910997fc0b722653f85593398486275d2635a45bd110e9754b5fc4a688d95" +content-hash = "0c64047990b087fcb24a87f53b1d7659eb73839b1439e5bb6a3534cf21bdc637" diff --git a/pyproject.toml b/pyproject.toml index 66fbfcf..793d44a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,7 +45,6 @@ ruff = "^0.5.4" pre-commit = "3.5.0" faker = "^28.1.0" -factory-boy = "^3.3.1" [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" diff --git a/tests/factories/__init__.py b/tests/factories/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/factories/time_entry.py b/tests/factories/time_entry.py new file mode 100644 index 0000000..21913e9 --- /dev/null +++ b/tests/factories/time_entry.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from datetime import datetime, timedelta +from typing import TYPE_CHECKING, Dict, List, Optional, Union + +from tests.conftest import fake + + +if TYPE_CHECKING: + from backports.zoneinfo import ZoneInfo + from pydantic_core import TzInfo + + +try: + import zoneinfo +except ImportError: + from backports import zoneinfo + + +def _datetime_repr_factory(timezone: Union[ZoneInfo, TzInfo, None] = None) -> str: + if not timezone: + timezone_name = fake.timezone() + timezone = zoneinfo.ZoneInfo(timezone_name) + + return fake.date_time_this_decade(tzinfo=timezone).isoformat(timespec="seconds") + + +def _stop_datetime_repr_factory( + duration: int, start_repr: str, timezone: Union[ZoneInfo, TzInfo] +) -> str: + if duration: + start_datetime = datetime.fromisoformat(start_repr) + stop_datetime = start_datetime + timedelta(seconds=duration) + else: + stop_datetime = fake.date_time_this_decade(tzinfo=timezone) + + return stop_datetime.isoformat(timespec="seconds") + + +def time_entry_request_factory(workspace_id: Optional[int] = None) -> Dict[str, Union[str, int]]: + return { + "created_with": fake.color_name(), + "start": _datetime_repr_factory(), + "workspace_id": workspace_id or fake.random_int(), + } + + +def time_entry_extended_request_factory( + workspace_id: Optional[int] = None, +) -> Dict[str, Union[str, bool, int, None, List[Union[str, int]]]]: + timezone_name = fake.timezone() + timezone = zoneinfo.ZoneInfo(timezone_name) + duration = fake.random_int(min=-1) + start = _datetime_repr_factory(timezone) + + return { + "created_with": fake.color_name(), + "billable": fake.boolean(), + "description": fake.text(max_nb_chars=100), + "duration": duration, + "project_id": fake.random_int(), + "start": start, + "stop": _stop_datetime_repr_factory(duration, start, timezone), + "tag_ids": [fake.random_int() for _ in range(fake.random_int(min=0, max=20))], + "tags": [fake.word() for _ in range(fake.random_int(min=0, max=20))], + "task_id": fake.random_int(), + "user_id": fake.random_int(), + "workspace_id": workspace_id or fake.random_int(), + } + + +def time_entry_response_factory( + workspace_id: int, + start: Optional[str] = None, + billable: Optional[bool] = None, + description: Optional[str] = None, + duration: Optional[int] = None, + stop: Optional[str] = None, + project_id: Optional[int] = None, + tag_ids: Optional[List[int]] = None, + tags: Optional[List[str]] = None, + task_id: Optional[int] = None, + user_id: Optional[int] = None, +) -> Dict[str, Union[str, bool, int, None, List[Union[str, int]]]]: + if start: + tz = datetime.strptime(start, "%Y-%m-%dT%H:%M:%S%z").tzinfo + else: + timezone_name = fake.timezone() + tz = zoneinfo.ZoneInfo(timezone_name) + + return { + "at": _datetime_repr_factory(tz), + "billable": billable or fake.boolean(), + "description": description or fake.text(max_nb_chars=100), + "duration": duration or fake.random_int(), + "duronly": fake.boolean(), + "id": fake.random_number(digits=11, fix_len=True), + "permissions": None, + "project_id": project_id or fake.random_int() if fake.boolean() else None, + "server_deleted_at": ( + fake.date_time_this_month(tzinfo=tz).isoformat(timespec="seconds") + if fake.boolean() + else None + ), + "start": start or _datetime_repr_factory(tz), + "stop": stop or _datetime_repr_factory(tz), + "tag_ids": tag_ids or [], + "tags": tags or [], + "task_id": task_id or fake.random_int() if fake.boolean() else None, + "user_id": user_id or fake.random_int(), + "workspace_id": workspace_id, + } diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 4bf6e60..022cae7 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -7,6 +7,7 @@ import pytest from toggl_python.auth import TokenAuth from toggl_python.entities.user import CurrentUser +from toggl_python.entities.workspace import Workspace if TYPE_CHECKING: @@ -21,6 +22,14 @@ def i_authed_user() -> CurrentUser: return CurrentUser(auth=auth) +@pytest.fixture(scope="session") +def i_authed_workspace() -> Workspace: + token = os.environ["TOGGL_TOKEN"] + auth = TokenAuth(token=token) + + return Workspace(auth=auth) + + @pytest.fixture() def me_response(i_authed_user: CurrentUser) -> MeResponse: return i_authed_user.me() diff --git a/tests/integration/test_time_entry.py b/tests/integration/test_time_entry.py new file mode 100644 index 0000000..7ba7e22 --- /dev/null +++ b/tests/integration/test_time_entry.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +import os +from typing import TYPE_CHECKING + +from toggl_python.schemas.time_entry import MeTimeEntryResponse + +from tests.factories.time_entry import ( + time_entry_extended_request_factory, + time_entry_request_factory, +) + +# Necessary to mark all tests in module as integration +from tests.integration import pytestmark # noqa: F401 - imported but unused + + +if TYPE_CHECKING: + from toggl_python.entities.workspace import Workspace + + +def test_create_time_entry__only_necessary_fields(i_authed_workspace: Workspace) -> None: + workspace_id = int(os.environ["WORKSPACE_ID"]) + request_body = time_entry_request_factory(workspace_id) + expected_result = set(MeTimeEntryResponse.model_fields.keys()) + + result = i_authed_workspace.create_time_entry( + workspace_id, + start_datetime=request_body["start"], + created_with=request_body["created_with"], + ) + + assert result.model_fields_set == expected_result + + _ = i_authed_workspace.delete_time_entry(workspace_id=workspace_id, time_entry_id=result.id) + + +def test_create_time_entry__all_fields(i_authed_workspace: Workspace) -> None: + """Create TimeEntry without fields `tag_ids` and `task_id`. + + `tag_ids` requires existing Tags and it is complicated to test + and `task_id` is available on paid plan. + """ + workspace_id = int(os.environ["WORKSPACE_ID"]) + request_body = time_entry_extended_request_factory(workspace_id) + expected_result = set(MeTimeEntryResponse.model_fields.keys()) + + result = i_authed_workspace.create_time_entry( + workspace_id, + start_datetime=request_body["start"], + created_with=request_body["created_with"], + billable=request_body["billable"], + description=request_body["description"], + duration=request_body["duration"], + project_id=os.environ["PROJECT_ID"], + stop=request_body["stop"], + tags=request_body["tags"], + user_id=os.environ["USER_ID"], + ) + + assert result.model_fields_set == expected_result + + _ = i_authed_workspace.delete_time_entry(workspace_id=workspace_id, time_entry_id=result.id) diff --git a/tests/test_time_entry.py b/tests/test_time_entry.py index 9551cab..4506223 100644 --- a/tests/test_time_entry.py +++ b/tests/test_time_entry.py @@ -19,6 +19,12 @@ MeWebTimerResponse, ) +from tests.conftest import fake +from tests.factories.time_entry import ( + time_entry_extended_request_factory, + time_entry_request_factory, + time_entry_response_factory, +) from tests.responses.me_get import ME_WEB_TIMER_RESPONSE from tests.responses.time_entry_get import ME_TIME_ENTRY_RESPONSE, ME_TIME_ENTRY_WITH_META_RESPONSE from tests.responses.time_entry_put_and_patch import BULK_EDIT_TIME_ENTRIES_RESPONSE @@ -30,6 +36,89 @@ from toggl_python.entities.workspace import Workspace +def test_create_time_entry__only_required_fields( + response_mock: MockRouter, authed_workspace: Workspace +) -> None: + workspace_id = fake.random_int() + request_body = time_entry_request_factory(workspace_id) + fake_response = time_entry_response_factory(workspace_id, request_body["start"]) + mocked_route = response_mock.post( + f"/workspaces/{workspace_id}/time_entries", json=request_body + ).mock( + return_value=Response(status_code=200, json=fake_response), + ) + expected_result = MeTimeEntryResponse.model_validate(fake_response) + + result = authed_workspace.create_time_entry( + workspace_id, + start_datetime=request_body["start"], + created_with=request_body["created_with"], + ) + + assert mocked_route.called is True + assert result == expected_result + + +def test_create_time_entry__all_fields( + response_mock: MockRouter, authed_workspace: Workspace +) -> None: + workspace_id = fake.random_int() + request_body = time_entry_extended_request_factory(workspace_id) + fake_response = time_entry_response_factory( + workspace_id, + start=request_body["start"], + billable=request_body["billable"], + description=request_body["description"], + duration=request_body["duration"], + project_id=request_body["project_id"], + stop=request_body["stop"], + tag_ids=request_body["tag_ids"], + tags=request_body["tags"], + task_id=request_body["task_id"], + user_id=request_body["user_id"], + ) + mocked_route = response_mock.post( + f"/workspaces/{workspace_id}/time_entries", json=request_body + ).mock( + return_value=Response(status_code=200, json=fake_response), + ) + expected_result = MeTimeEntryResponse.model_validate(fake_response) + + result = authed_workspace.create_time_entry( + workspace_id, + start_datetime=request_body["start"], + created_with=request_body["created_with"], + billable=request_body["billable"], + description=request_body["description"], + duration=request_body["duration"], + project_id=request_body["project_id"], + stop=request_body["stop"], + tag_ids=request_body["tag_ids"], + tags=request_body["tags"], + task_id=request_body["task_id"], + user_id=request_body["user_id"], + ) + + assert mocked_route.called is True + assert result == expected_result + + +def test_create_time_entry__invalid_start_stop_and_duration(authed_workspace: Workspace) -> None: + workspace_id = fake.random_int() + request_body = time_entry_extended_request_factory(workspace_id) + error_message = ( + r"`start`, `stop` and `duration` must be consistent - `start` \+ `duration` == `stop`" + ) + + with pytest.raises(ValidationError, match=error_message): + _ = authed_workspace.create_time_entry( + workspace_id, + start_datetime=request_body["start"], + created_with=request_body["created_with"], + duration=request_body["duration"] + fake.random_int(), + stop=request_body["stop"], + ) + def test_get_time_entry__without_query_params( response_mock: MockRouter, authed_current_user: CurrentUser ) -> None: diff --git a/toggl_python/entities/workspace.py b/toggl_python/entities/workspace.py index 7965a9a..252ae79 100644 --- a/toggl_python/entities/workspace.py +++ b/toggl_python/entities/workspace.py @@ -8,6 +8,7 @@ BulkEditTimeEntriesOperation, BulkEditTimeEntriesResponse, MeTimeEntryResponse, + TimeEntryCreateRequest, TimeEntryRequest, ) from toggl_python.schemas.workspace import GetWorkspacesQueryParams, WorkspaceResponse @@ -92,6 +93,48 @@ def get_projects( # noqa: PLR0913 - Too many arguments in function definition ( return [ProjectResponse.model_validate(project_data) for project_data in response_body] + def create_time_entry( + self, + workspace_id: int, + start_datetime: Union[datetime, str], + created_with: str, + billable: Optional[bool] = None, + description: Optional[str] = None, + duration: Optional[int] = None, + stop: Optional[str] = None, + project_id: Optional[int] = None, + tag_ids: Optional[List[int]] = None, + tags: Optional[List[str]] = None, + task_id: Optional[int] = None, + user_id: Optional[int] = None, + ) -> MeTimeEntryResponse: + request_body_schema = TimeEntryCreateRequest( + created_with=created_with, + start=start_datetime, + workspace_id=workspace_id, + billable=billable, + description=description, + duration=duration, + stop=stop, + project_id=project_id, + tag_ids=tag_ids, + tags=tags, + task_id=task_id, + user_id=user_id, + ) + request_body = request_body_schema.model_dump( + mode="json", exclude_none=True, exclude_unset=True + ) + + response = self.client.post( + url=f"{self.prefix}/{workspace_id}/time_entries", json=request_body + ) + self.raise_for_status(response) + + response_body = response.json() + + return MeTimeEntryResponse.model_validate(response_body) + def update_time_entry( # noqa: PLR0913 - Too many arguments in function definition (13 > 12) self, workspace_id: int, diff --git a/toggl_python/schemas/time_entry.py b/toggl_python/schemas/time_entry.py index 17b4dd1..81c910c 100644 --- a/toggl_python/schemas/time_entry.py +++ b/toggl_python/schemas/time_entry.py @@ -1,10 +1,16 @@ from __future__ import annotations -from datetime import datetime +from datetime import datetime, timedelta from enum import Enum from typing import Dict, List, Optional, Union -from pydantic import AwareDatetime, field_serializer, model_serializer +from pydantic import ( + AwareDatetime, + field_serializer, + model_serializer, + model_validator, +) +from typing_extensions import Self from toggl_python.schemas.base import BaseSchema, SinceParamSchemaMixin @@ -34,7 +40,7 @@ class MeTimeEntryResponseBase(BaseSchema): billable: bool description: Optional[str] project_id: Optional[int] - tag_ids: List[int] + tag_ids: Optional[List[int]] task_id: Optional[int] user_id: int workspace_id: int @@ -49,7 +55,7 @@ class MeTimeEntryResponse(MeTimeEntryResponseBase): server_deleted_at: Optional[datetime] start: datetime stop: Optional[datetime] - tags: List[str] + tags: Optional[List[str]] class MeTimeEntryWithMetaResponse(MeTimeEntryResponse): @@ -109,13 +115,52 @@ def serialize_datetimes(self, value: Optional[datetime]) -> Optional[str]: return value.isoformat() +class TimeEntryCreateRequest(BaseSchema): + created_with: str + start: AwareDatetime + workspace_id: int + billable: Optional[bool] = None + description: Optional[str] = None + duration: Optional[int] = None + stop: Optional[AwareDatetime] = None + project_id: Optional[int] = None + tag_ids: Optional[List[int]] = None + tags: Optional[List[str]] = None + task_id: Optional[int] = None + user_id: Optional[int] = None + + @field_serializer("start", when_used="json") + def serialize_datetimes(self, value: Optional[datetime]) -> Optional[str]: + if not value: + return value + + return value.isoformat() + + @model_validator(mode="after") + def validate_stop_and_duration(self) -> Self: + if ( + self.duration + and self.stop + and (self.start + timedelta(seconds=self.duration) != self.stop) + ): + error_message = ( + "`start`, `stop` and `duration` must be consistent - " + "`start` + `duration` == `stop`" + ) + raise ValueError(error_message) + + return self + + class BulkEditTimeEntriesOperation(BaseSchema): operation: BulkEditTimeEntriesOperations field_name: BulkEditTimeEntriesFieldNames field_value: Union[bool, str, int, AwareDatetime, List[int], List[str]] @model_serializer(when_used="json") - def serialize_schema(self) -> Dict: + def serialize_schema( + self, + ) -> Dict[str, Union[bool, str, int, AwareDatetime, List[int], List[str]]]: return { "op": self.operation, "path": f"/{self.field_name}",