Skip to content

Commit

Permalink
Add --tail option to gantry follow (#90)
Browse files Browse the repository at this point in the history
* Add `--tail` option to `gantry follow`

* clean up

* make sure to refresh the job

* fix
  • Loading branch information
epwalsh authored May 31, 2024
1 parent f1e62c2 commit 861baae
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 63 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- Added `--tail` option to `gantry follow` to only tail the logs as opposed to dumping the entire history.

## [v1.2.0](https://github.com/allenai/beaker-gantry/releases/tag/v1.2.0) - 2024-05-30

### Changed
Expand Down
7 changes: 5 additions & 2 deletions gantry/commands/follow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@

@main.command(**CLICK_COMMAND_DEFAULTS)
@click.argument("experiment", nargs=1, required=True, type=str)
def follow(experiment: str):
@click.option(
"-t", "--tail", is_flag=True, help="Only tail the logs as opposed to printing all logs so far."
)
def follow(experiment: str, tail: bool = False):
"""
Follow the logs for a running experiment.
"""
beaker = Beaker.from_env(session=True)
exp = beaker.experiment.get(experiment)
job = util.follow_experiment(beaker, exp)
job = util.follow_experiment(beaker, exp, tail=tail)
util.display_results(beaker, exp, job)
79 changes: 19 additions & 60 deletions gantry/util.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import tempfile
import time
from datetime import datetime
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Union, cast
from typing import Optional, Tuple, cast

import requests
import rich
Expand All @@ -15,7 +15,6 @@
Digest,
Experiment,
Job,
JobTimeoutError,
SecretNotFound,
WorkspaceNotSet,
)
Expand All @@ -27,9 +26,6 @@
from .exceptions import *
from .version import VERSION

if TYPE_CHECKING:
from datetime import timedelta


class StrEnum(str, Enum):
def __str__(self) -> str:
Expand Down Expand Up @@ -74,43 +70,10 @@ def parse_git_remote_url(url: str) -> Tuple[str, str]:
return account, repo


def display_logs(logs: Iterable[bytes], ignore_timestamp: Optional[str] = None) -> Optional[str]:
def follow_experiment(
beaker: Beaker, experiment: Experiment, timeout: int = 0, tail: bool = False
) -> Job:
console = rich.get_console()
latest_timestamp: Optional[str] = None

def print_line(line: str):
if not line:
return
nonlocal latest_timestamp
# Remove timestamp
try:
timestamp, line = line.split("Z ", maxsplit=1)
latest_timestamp = f"{timestamp}Z"
if ignore_timestamp is not None and latest_timestamp == ignore_timestamp:
return
except ValueError:
pass
console.print(line, highlight=False, markup=False)

line_buffer = ""
for bytes_chunk in logs:
chunk = line_buffer + bytes_chunk.decode(errors="ignore")
chunk = chunk.replace("\r", "\n")
lines = chunk.split("\n")
if chunk.endswith("\n"):
line_buffer = ""
else:
# Last line chunk is probably incomplete.
lines, line_buffer = lines[:-1], lines[-1]
for line in lines:
print_line(line)

print_line(line_buffer)
return latest_timestamp


def follow_experiment(beaker: Beaker, experiment: Experiment, timeout: int = 0) -> Job:
start = time.monotonic()

# Wait for job to start...
job: Optional[Job] = beaker.experiment.tasks(experiment.id)[0].latest_job # type: ignore
Expand All @@ -123,30 +86,26 @@ def follow_experiment(beaker: Beaker, experiment: Experiment, timeout: int = 0)

exit_code: Optional[int] = job.status.exit_code

stream_logs = exit_code is None
stream_logs = exit_code is None and not job.is_finalized
if stream_logs:
print()
rich.get_console().rule("Logs")

last_timestamp: Optional[str] = None
since: Optional[Union[str, datetime]] = datetime.utcnow()
while stream_logs and exit_code is None and not job.is_finalized:
job = beaker.experiment.tasks(experiment.id)[0].latest_job # type: ignore
assert job is not None
exit_code = job.status.exit_code
last_timestamp = display_logs(
beaker.job.logs(job, quiet=True, since=since),
ignore_timestamp=last_timestamp,
)
since = last_timestamp or since
time.sleep(2.0)
if timeout > 0 and time.monotonic() - start >= timeout:
raise JobTimeoutError(f"Job did not finish within {timeout} seconds")

if stream_logs:
for line_bytes in beaker.job.follow(
job,
timeout=timeout if timeout > 0 else None,
include_timestamps=False,
since=datetime.utcnow() - timedelta(seconds=5) if tail else None,
):
line = line_bytes.decode(errors="ignore")
if line.endswith("\n"):
line = line[:-1]
console.print(line, highlight=False, markup=False)
rich.get_console().rule("End logs")
print()

# Refresh the job.
job = beaker.job.get(job.id)

return job


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ authors = [
license = {file = "LICENSE"}
requires-python = ">3.7"
dependencies = [
"beaker-py>=1.26.13,<2.0",
"beaker-py>=1.27.0,<2.0",
"GitPython>=3.0,<4.0",
"rich",
"click",
Expand Down

0 comments on commit 861baae

Please sign in to comment.