Skip to content

Commit

Permalink
initial implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
justincdavis committed Oct 7, 2024
1 parent c7b38fd commit afa0e65
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 63 deletions.
13 changes: 1 addition & 12 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ authors = [
maintainers = [
{name="Justin Davis", email="[email protected]"},
]
description = "Extended functionality on top of OpenCV"
description = "Tools for working with NVIDIA Jetson devices."
readme = "README.md"
classifiers = [
"Development Status :: 4 - Beta",
Expand All @@ -24,7 +24,6 @@ classifiers = [
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: Implementation :: CPython",
"License :: OSI Approved :: MIT License",
"Operating System :: Microsoft :: Windows",
"Operating System :: POSIX :: Linux",
"Natural Language :: English",
"Intended Audience :: Developers",
Expand All @@ -44,9 +43,6 @@ dependencies = [
"Bug Tracker" = "https://github.com/justincdavis/jetsontools/issues"

[project.optional-dependencies]
jit = [
"numba>=0.55.0",
]
ci = [
"pyupgrade>=3.10",
"ruff>=0.4.5",
Expand Down Expand Up @@ -158,13 +154,6 @@ disallow_incomplete_defs = true
disallow_untyped_defs = true
no_implicit_reexport = true
warn_return_any = true
plugins = "numpy.typing.mypy_plugin"

[[tool.mypy.overrides]]
module = [
'numba',
]
ignore_errors = true

[tool.pyright]
include = ["src"]
Expand Down
67 changes: 16 additions & 51 deletions src/jetsontools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,21 @@
"""
Package for bounding boxes and their operations.
Submodules
----------
info
Tools for getting information about the Jetson device.
Classes
-------
Tegrastats
Runs tegrastats in a separate process and stores output in a file.
Functions
---------
set_log_level
Set the log level for the jetsontools package.
enable_jit
Enable just-in-time compilation using Numba for some functions.
"""

from __future__ import annotations
Expand Down Expand Up @@ -71,7 +80,7 @@ def set_log_level(level: str) -> None:
_setup_logger(level)


level = os.getenv("jetsontools_LOG_LEVEL")
level = os.getenv("JETSONTOOLS_LOG_LEVEL")
_setup_logger(level)
_log = logging.getLogger(__name__)
if level is not None and level.upper() not in [
Expand All @@ -83,56 +92,12 @@ def set_log_level(level: str) -> None:
]:
_log.warning(f"Invalid log level: {level}. Using default log level: WARNING")


from dataclasses import dataclass


@dataclass
class _FLAGS:
"""
Class for storing flags for jetsontools.
Attributes
----------
USEJIT : bool
Whether or not to use jit.
PARALLEL : bool
Whether or not to use parallel compilation in the jit.
"""

USEJIT: bool = False
PARALLEL: bool = False


_FLAGSOBJ = _FLAGS()


def enable_jit(*, on: bool | None = None, parallel: bool | None = None) -> None:
"""
Enable just-in-time compilation using Numba for some functions.
Parameters
----------
on : bool | None
If True, enable jit. If False, disable jit. If None, enable jit.
parallel : bool | None
If True, enable parallel jit. If False, disable parallel jit. If None, disable parallel jit.
"""
if on is None:
on = True
if parallel is None:
parallel = False
_FLAGSOBJ.USEJIT = on
_FLAGSOBJ.PARALLEL = parallel
_log.info(f"JIT is {'enabled' if on else 'disabled'}; parallel: {parallel}.")

from . import info
from ._tegrastats import Tegrastats

__all__ = [
"_FLAGSOBJ",
"enable_jit",
"Tegrastats",
"info",
"set_log_level",
]
__version__ = "0.0.1"
Expand Down
145 changes: 145 additions & 0 deletions src/jetsontools/_tegrastats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# Copyright (c) 2024 Justin Davis ([email protected])
#
# MIT License
# ruff: noqa: S404, S603
from __future__ import annotations

import logging
import multiprocessing as mp
import subprocess
import time
from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from types import TracebackType

from typing_extensions import Self

_log = logging.getLogger(__name__)


class Tegrastats:
"""Runs tegrastats in a seperate process and stores output in a file."""

def __init__(
self: Self,
output: Path | str,
interval: int = 1000,
*,
readall: bool | None = None,
) -> None:
"""
Create an instance of tegrastats with outputs to a file.
Parameters
----------
output : Path | str
The path to the output file
interval : int, optional
The interval to run tegrastats in milliseconds, by default 1000
readall : bool, optional
Optionally, read all possible information through tegrastats.
Additional information varies by board.
Can consume additional CPU resources.
By default, will NOT readall
"""
self._output = Path(output)
self._interval = interval
self._readall = readall

self._process = mp.Process(
target=self._run,
args=(self._output, self._interval),
daemon=True,
)

def __enter__(self: Self) -> Self:
self._process.start()
return self

def __exit__(
self: Self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
_log.debug("Stopping tegrastats")
self._process.terminate()
command = ["tegrastats", "--stop"]
subprocess.run(
command,
check=True,
)

def _run(
self: Self,
output: Path,
interval: int,
) -> None:
"""
Target function for process running tegrastats.
Parameters
----------
output : Path
The path to the output file.
interval : int
The interval to update tegrastats info (ms).
Raises
------
RuntimeError
If the process created by Popen does not have stdout/stderr
CalledProcessError
If the process has any stderr output
"""
# maintain the file in open state
with output.open("w+") as f:
_log.debug(f"Open file {output} for writing")

# create the command and run the Popen call
command = ["tegrastats", "--interval", str(interval)]
if self._readall:
command.append("--readall")
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)

_log.debug(f"Ran tegrastats with command: {command}")

# ensure stdout/stderr streams exist
if process.stdout is None or process.stderr is None:
err_msg = "Cannot access stdout or stderr streams in Tegrastat process."
raise RuntimeError(err_msg)

_log.debug("No errors from process found")

# read output while it exists
# this will be stopped by the __exit__ call
# which will call tegrastats --stop
# resulting in the lines to cease
while True:
line = process.stdout.readline()
if not line:
break
timestamp = time.time()
f.write(f"{timestamp:.6f}:: {line}")
f.flush()

_log.debug("Stopped reading from tegrastats process")

# check for any errors
stderr_output = process.stderr.read()
if stderr_output:
raise subprocess.CalledProcessError(
process.returncode,
process.args,
stderr=stderr_output,
)
23 changes: 23 additions & 0 deletions src/jetsontools/info/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright (c) 2024 Justin Davis ([email protected])
#
# MIT License
"""
Tools for getting information about the Jetson device.
Classes
-------
JetsonInfo
Class to store information about the Jetson device.
Functions
---------
get_info
Get information about the Jetson device.
"""

from __future__ import annotations

from ._info import JetsonInfo, get_info

__all__ = ["JetsonInfo", "get_info"]
11 changes: 11 additions & 0 deletions src/jetsontools/info/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright (c) 2024 Justin Davis ([email protected])
#
# MIT License
"""Print information about the Jetson device."""

from __future__ import annotations

from ._info import get_info

if __name__ == "__main__":
_ = get_info(verbose=True)
Loading

0 comments on commit afa0e65

Please sign in to comment.