Skip to content

resource_tracker

resource_tracker #

Resource Tracker package for monitoring system resources and detecting cloud environments.

Modules:

Name Description
cloud_info

Detect cloud environment (provider, region, instance type) via VM metadata services.

server_info

Detect server hardware (CPU count, memory amount, disk space, GPU count and VRAM amount) via procfs and nvidia-smi.

tiny_data_frame

A very inefficient data-frame implementation for manipulating resource usage data.

tracker

Track resource usage of a process or server.

Classes:

Name Description
TinyDataFrame

A very inefficient data-frame implementation with a few features.

PidTracker

Track resource usage of a process and optionally its children.

SystemTracker

Track system-wide resource usage.

Functions:

Name Description
get_cloud_info

Detect cloud environment and return standardized information on provider, region, and instance type.

get_server_info

Collects important information about the Linux server.

get_cloud_info cached #

get_cloud_info()

Detect cloud environment and return standardized information on provider, region, and instance type.

Returns:

Type Description
dict

A dictionary containing standardized cloud information:

  • vendor: The cloud provider (aws, gcp, azure, hcloud, upcloud), or "unknown"
  • instance_type: The instance type/size/flavor, or "unknown"
  • region: The region/zone where the instance is running, or "unknown"
Source code in resource_tracker/cloud_info.py
@cache
def get_cloud_info() -> dict:
    """
    Detect cloud environment and return standardized information on provider, region, and instance type.

    Returns:
        A dictionary containing standardized cloud information:

            - `vendor`: The cloud provider (aws, gcp, azure, hcloud, upcloud), or "unknown"
            - `instance_type`: The instance type/size/flavor, or "unknown"
            - `region`: The region/zone where the instance is running, or "unknown"
    """
    start_time = time()
    check_functions = [
        _check_aws,
        _check_gcp,
        _check_azure,
        _check_hetzner,
        _check_upcloud,
    ]

    # run checks in parallel, return early if any check succeeds
    with ThreadPoolExecutor(max_workers=len(check_functions)) as executor:
        futures = {executor.submit(check_fn): check_fn for check_fn in check_functions}
        pending = set(futures.keys())
        while pending:
            done, pending = wait(pending, return_when=FIRST_COMPLETED)
            for future in done:
                with suppress(Exception):
                    info = future.result()
                    if info:
                        # stop all remaining checks early
                        for f in pending:
                            f.cancel()
                        return info | {"discovery_time": time() - start_time}

    return {
        "vendor": "unknown",
        "instance_type": "unknown",
        "region": "unknown",
        "discovery_time": time() - start_time,
    }

get_server_info #

get_server_info()

Collects important information about the Linux server.

Returns:

Type Description
dict

A dictionary containing server information:

  • vcpus: Number of virtual CPUs
  • memory_mb: Total memory in MB
  • gpu_count: Number of GPUs (0 if not available)
  • gpu_memory_mb: Total VRAM in MB (0 if not available)
Source code in resource_tracker/server_info.py
def get_server_info() -> dict:
    """
    Collects important information about the Linux server.

    Returns:
        A dictionary containing server information:

            - `vcpus`: Number of virtual CPUs
            - `memory_mb`: Total memory in MB
            - `gpu_count`: Number of GPUs (`0` if not available)
            - `gpu_memory_mb`: Total VRAM in MB (`0` if not available)
    """
    gpu_info = get_gpu_info()
    info = {
        "vcpus": cpu_count(),
        "memory_mb": get_total_memory_mb(),
        "gpu_count": gpu_info["count"],
        "gpu_names": gpu_info["gpu_names"],
        "gpu_memory_mb": gpu_info["memory_mb"],
    }
    return info

TinyDataFrame #

A very inefficient data-frame implementation with a few features.

Supported features:

  • reading CSV files from a remote URL
  • reading CSV files from a local file
  • converting a dictionary of lists/arrays to a data-frame
  • converting a list of dictionaries to a data-frame
  • slicing rows
  • slicing columns
  • slicing rows and columns
  • printing a summary of the data-frame
  • printing the data-frame as a human-readable (grid) table
  • renaming columns
  • writing to a CSV file

Parameters:

Name Type Description Default
data Optional[dict | list]

Dictionary of lists/arrays or list of dictionaries.

None
csv_file_path Optional[str]

Path to a properly quoted CSV file.

None

Example:

>>> df = TinyDataFrame(csv_file_path="https://raw.githubusercontent.com/plotly/datasets/refs/heads/master/mtcars.csv")
>>> df
TinyDataFrame with 32 rows and 12 columns. First row as a dict: {'manufacturer': 'Mazda RX4', 'mpg': 21.0, 'cyl': 6.0, 'disp': 160.0, 'hp': 110.0, 'drat': 3.9, 'wt': 2.62, 'qsec': 16.46, 'vs': 0.0, 'am': 1.0, 'gear': 4.0, 'carb': 4.0}
>>> df[2:5][['manufacturer', 'hp']]
TinyDataFrame with 3 rows and 2 columns. First row as a dict: {'manufacturer': 'Datsun 710', 'hp': 93.0}
>>> print(df[2:5][['manufacturer', 'hp']])  # doctest: +NORMALIZE_WHITESPACE
TinyDataFrame with 3 rows and 2 columns:
manufacturer      | hp
------------------+------
Datsun 710        |  93.0
Hornet 4 Drive    | 110.0
Hornet Sportabout | 175.0
>>> print(df[2:5][['manufacturer', 'hp']].to_csv())  # doctest: +NORMALIZE_WHITESPACE
"manufacturer","hp"
"Datsun 710",93.0
"Hornet 4 Drive",110.0
"Hornet Sportabout",175.0

Methods:

Name Description
__init__

Initialize with either:

__len__

Return the number of rows in the data-frame

__getitem__

Get a single column or multiple columns or a row or a slice of rows. Can be chained.

__setitem__

Set a column with the given key to the provided values.

head

Return first n rows as a new TinyDataFrame.

tail

Return last n rows as a new TinyDataFrame.

__repr__

Return a string representation of the data-frame.

__str__

Print the first 10 rows of the data-frame in a human-readable table.

to_csv

Write the data-frame to a CSV file or return as string if no path is provided.

rename

Rename one or multiple columns.

Source code in resource_tracker/tiny_data_frame.py
class TinyDataFrame:
    """A very inefficient data-frame implementation with a few features.

    Supported features:

    - reading CSV files from a remote URL
    - reading CSV files from a local file
    - converting a dictionary of lists/arrays to a data-frame
    - converting a list of dictionaries to a data-frame
    - slicing rows
    - slicing columns
    - slicing rows and columns
    - printing a summary of the data-frame
    - printing the data-frame as a human-readable (grid) table
    - renaming columns
    - writing to a CSV file

    Args:
        data: Dictionary of lists/arrays or list of dictionaries.
        csv_file_path: Path to a properly quoted CSV file.

    Example:

        >>> df = TinyDataFrame(csv_file_path="https://raw.githubusercontent.com/plotly/datasets/refs/heads/master/mtcars.csv")
        >>> df
        TinyDataFrame with 32 rows and 12 columns. First row as a dict: {'manufacturer': 'Mazda RX4', 'mpg': 21.0, 'cyl': 6.0, 'disp': 160.0, 'hp': 110.0, 'drat': 3.9, 'wt': 2.62, 'qsec': 16.46, 'vs': 0.0, 'am': 1.0, 'gear': 4.0, 'carb': 4.0}
        >>> df[2:5][['manufacturer', 'hp']]
        TinyDataFrame with 3 rows and 2 columns. First row as a dict: {'manufacturer': 'Datsun 710', 'hp': 93.0}
        >>> print(df[2:5][['manufacturer', 'hp']])  # doctest: +NORMALIZE_WHITESPACE
        TinyDataFrame with 3 rows and 2 columns:
        manufacturer      | hp
        ------------------+------
        Datsun 710        |  93.0
        Hornet 4 Drive    | 110.0
        Hornet Sportabout | 175.0
        >>> print(df[2:5][['manufacturer', 'hp']].to_csv())  # doctest: +NORMALIZE_WHITESPACE
        "manufacturer","hp"
        "Datsun 710",93.0
        "Hornet 4 Drive",110.0
        "Hornet Sportabout",175.0
    """

    def __init__(
        self, data: Optional[dict | list] = None, csv_file_path: Optional[str] = None
    ):
        """
        Initialize with either:

        - Dictionary of lists/arrays
        - List of dictionaries
        - CSV file path
        """
        self.columns = []
        self._data = {}

        assert data is not None or csv_file_path is not None, (
            "either data or csv_file_path must be provided"
        )
        assert data is None or csv_file_path is None, (
            "only one of data or csv_file_path must be provided"
        )
        assert data is None or isinstance(data, dict) or isinstance(data, list), (
            "data must be a dictionary or a list"
        )
        assert csv_file_path is None or isinstance(csv_file_path, str), (
            "csv_file_path must be a string"
        )

        if csv_file_path:
            data = self._read_csv(csv_file_path)

        if isinstance(data, dict):
            self._data = {k: list(v) for k, v in data.items()}
            self.columns = list(self._data.keys())
        elif isinstance(data, list) and data and isinstance(data[0], dict):
            # let's preserve column order
            self.columns = []
            seen_columns = set()
            for row in data:
                for col in row.keys():
                    if col not in seen_columns:
                        self.columns.append(col)
                        seen_columns.add(col)
            self._data = {col: [row.get(col) for row in data] for col in self.columns}

    def _read_csv(self, csv_file_path: str) -> list[dict]:
        """Read a CSV file and return a list of dictionaries.

        Args:
            csv_file_path: CSV file path or URL.
        """
        results = []

        parsed = urlparse(csv_file_path)
        if parsed.scheme in ("http", "https"):
            with urlopen(csv_file_path) as response:
                content = response.read().decode("utf-8").splitlines()
                csv_source = content
        else:
            csv_source = open(csv_file_path, "r")

        try:
            reader = DictReader(csv_source, quoting=QUOTE_NONNUMERIC)
            for row in reader:
                results.append(row)
        finally:
            if not isinstance(csv_source, list):
                csv_source.close()

        return results

    def __len__(self):
        """Return the number of rows in the data-frame"""
        return len(next(iter(self._data.values()))) if self.columns else 0

    def __getitem__(
        self, key: Union[str, list[str], int, slice]
    ) -> Union[list, dict, "TinyDataFrame"]:
        """Get a single column or multiple columns or a row or a slice of rows. Can be chained.

        Args:
            key: A single column name, a list of column names, a row index, or a slice of row indexes.

        Returns:
            A single column as a list, a list of columns as a new TinyDataFrame, a row as a dictionary, or a slice of rows as a new TinyDataFrame.
        """
        # a single column
        if isinstance(key, str):
            return self._data[key]
        # multiple columns
        elif isinstance(key, list) and all(isinstance(k, str) for k in key):
            return TinyDataFrame(
                {col: self._data[col] for col in key if col in self._data}
            )
        # row index
        elif isinstance(key, int):
            return {col: self._data[col][key] for col in self.columns}
        # row indexes
        elif isinstance(key, slice):
            return TinyDataFrame({col: self._data[col][key] for col in self.columns})
        else:
            raise TypeError(f"Invalid key type: {type(key)}")

    def __setitem__(self, key: str, value: list) -> None:
        """Set a column with the given key to the provided values.

        Args:
            key: Column name (string)
            value: List of values for the column

        Raises:
            TypeError: If key is not a string
            ValueError: If the length of values doesn't match the dataframe length
        """
        if not isinstance(key, str):
            raise TypeError(f"Column name must be a string, got {type(key)}")

        if len(self) > 0 and len(value) != len(self):
            raise ValueError(
                f"Length of values ({len(value)}) must match dataframe length ({len(self)})"
            )

        if key not in self.columns:
            self.columns.append(key)

        self._data[key] = list(value)

    def head(self, n: int = 5) -> "TinyDataFrame":
        """Return first n rows as a new TinyDataFrame."""
        return self[slice(0, n)]

    def tail(self, n: int = 5) -> "TinyDataFrame":
        """Return last n rows as a new TinyDataFrame."""
        return self[slice(-n, None)]

    def __repr__(self) -> str:
        """Return a string representation of the data-frame."""
        return f"TinyDataFrame with {len(self)} rows and {len(self.columns)} columns. First row as a dict: {self[0]}"

    def __str__(self) -> str:
        """Print the first 10 rows of the data-frame in a human-readable table."""
        header = (
            f"TinyDataFrame with {len(self)} rows and {len(self.columns)} columns:\n"
        )
        if len(self) == 0:
            return header + "Empty dataframe"

        max_rows = min(10, len(self))

        col_widths = {}
        for col in self.columns:
            col_widths[col] = len(str(col))
            for i in range(max_rows):
                col_widths[col] = max(col_widths[col], len(str(self._data[col][i])))

        rows = []
        header_row = " | ".join(str(col).ljust(col_widths[col]) for col in self.columns)
        rows.append(header_row)
        separator = "-+-".join("-" * col_widths[col] for col in self.columns)
        rows.append(separator)

        for i in range(max_rows):
            row_values = []
            for col in self.columns:
                value = str(self._data[col][i])
                # right-align numbers, left-align strings
                try:
                    float(value)  # check if it's a number
                    row_values.append(value.rjust(col_widths[col]))
                except ValueError:
                    row_values.append(value.ljust(col_widths[col]))
            rows.append(" | ".join(row_values))

        # add ellipsis if there are more rows
        if len(self) > max_rows:
            rows.append("..." + " " * (len(rows[0]) - 3))
        return header + "\n".join(rows)

    def to_csv(
        self, csv_file_path: Optional[str] = None, quote_strings: bool = True
    ) -> str:
        """Write the data-frame to a CSV file or return as string if no path is provided.

        Args:
            csv_file_path: Path to write CSV file. If None, returns CSV as string.
            quote_strings: Whether to quote strings.
        """
        if csv_file_path:
            f = open(csv_file_path, "w", newline="")
        else:
            f = StringIO(newline="")

        try:
            writer = csv_writer(
                f, quoting=QUOTE_NONNUMERIC if quote_strings else QUOTE_MINIMAL
            )
            writer.writerow(self.columns)
            for i in range(len(self)):
                writer.writerow([self._data[col][i] for col in self.columns])

            if not csv_file_path:
                return f.getvalue()
        finally:
            f.close()

    def rename(self, columns: dict) -> "TinyDataFrame":
        """Rename one or multiple columns.

        Args:
            columns: Dictionary mapping old column names to new column names.

        Returns:
            Self for method chaining.

        Raises:
            KeyError: If any old column name doesn't exist in the dataframe.
        """
        for old_name in columns.keys():
            if old_name not in self.columns:
                raise KeyError(f"Column '{old_name}' not found in dataframe")

        for i, col in enumerate(self.columns):
            if col in columns:
                self.columns[i] = columns[col]
        # note that order of columns might change, but self.columns matters anyway
        for old_name, new_name in columns.items():
            self._data[new_name] = self._data.pop(old_name)

        return self

__init__ #

__init__(data=None, csv_file_path=None)

Initialize with either:

  • Dictionary of lists/arrays
  • List of dictionaries
  • CSV file path
Source code in resource_tracker/tiny_data_frame.py
def __init__(
    self, data: Optional[dict | list] = None, csv_file_path: Optional[str] = None
):
    """
    Initialize with either:

    - Dictionary of lists/arrays
    - List of dictionaries
    - CSV file path
    """
    self.columns = []
    self._data = {}

    assert data is not None or csv_file_path is not None, (
        "either data or csv_file_path must be provided"
    )
    assert data is None or csv_file_path is None, (
        "only one of data or csv_file_path must be provided"
    )
    assert data is None or isinstance(data, dict) or isinstance(data, list), (
        "data must be a dictionary or a list"
    )
    assert csv_file_path is None or isinstance(csv_file_path, str), (
        "csv_file_path must be a string"
    )

    if csv_file_path:
        data = self._read_csv(csv_file_path)

    if isinstance(data, dict):
        self._data = {k: list(v) for k, v in data.items()}
        self.columns = list(self._data.keys())
    elif isinstance(data, list) and data and isinstance(data[0], dict):
        # let's preserve column order
        self.columns = []
        seen_columns = set()
        for row in data:
            for col in row.keys():
                if col not in seen_columns:
                    self.columns.append(col)
                    seen_columns.add(col)
        self._data = {col: [row.get(col) for row in data] for col in self.columns}

__len__ #

__len__()

Return the number of rows in the data-frame

Source code in resource_tracker/tiny_data_frame.py
def __len__(self):
    """Return the number of rows in the data-frame"""
    return len(next(iter(self._data.values()))) if self.columns else 0

__getitem__ #

__getitem__(key)

Get a single column or multiple columns or a row or a slice of rows. Can be chained.

Parameters:

Name Type Description Default
key Union[str, list[str], int, slice]

A single column name, a list of column names, a row index, or a slice of row indexes.

required

Returns:

Type Description
Union[list, dict, TinyDataFrame]

A single column as a list, a list of columns as a new TinyDataFrame, a row as a dictionary, or a slice of rows as a new TinyDataFrame.

Source code in resource_tracker/tiny_data_frame.py
def __getitem__(
    self, key: Union[str, list[str], int, slice]
) -> Union[list, dict, "TinyDataFrame"]:
    """Get a single column or multiple columns or a row or a slice of rows. Can be chained.

    Args:
        key: A single column name, a list of column names, a row index, or a slice of row indexes.

    Returns:
        A single column as a list, a list of columns as a new TinyDataFrame, a row as a dictionary, or a slice of rows as a new TinyDataFrame.
    """
    # a single column
    if isinstance(key, str):
        return self._data[key]
    # multiple columns
    elif isinstance(key, list) and all(isinstance(k, str) for k in key):
        return TinyDataFrame(
            {col: self._data[col] for col in key if col in self._data}
        )
    # row index
    elif isinstance(key, int):
        return {col: self._data[col][key] for col in self.columns}
    # row indexes
    elif isinstance(key, slice):
        return TinyDataFrame({col: self._data[col][key] for col in self.columns})
    else:
        raise TypeError(f"Invalid key type: {type(key)}")

__setitem__ #

__setitem__(key, value)

Set a column with the given key to the provided values.

Parameters:

Name Type Description Default
key str

Column name (string)

required
value list

List of values for the column

required

Raises:

Type Description
TypeError

If key is not a string

ValueError

If the length of values doesn't match the dataframe length

Source code in resource_tracker/tiny_data_frame.py
def __setitem__(self, key: str, value: list) -> None:
    """Set a column with the given key to the provided values.

    Args:
        key: Column name (string)
        value: List of values for the column

    Raises:
        TypeError: If key is not a string
        ValueError: If the length of values doesn't match the dataframe length
    """
    if not isinstance(key, str):
        raise TypeError(f"Column name must be a string, got {type(key)}")

    if len(self) > 0 and len(value) != len(self):
        raise ValueError(
            f"Length of values ({len(value)}) must match dataframe length ({len(self)})"
        )

    if key not in self.columns:
        self.columns.append(key)

    self._data[key] = list(value)

head #

head(n=5)

Return first n rows as a new TinyDataFrame.

Source code in resource_tracker/tiny_data_frame.py
def head(self, n: int = 5) -> "TinyDataFrame":
    """Return first n rows as a new TinyDataFrame."""
    return self[slice(0, n)]

tail #

tail(n=5)

Return last n rows as a new TinyDataFrame.

Source code in resource_tracker/tiny_data_frame.py
def tail(self, n: int = 5) -> "TinyDataFrame":
    """Return last n rows as a new TinyDataFrame."""
    return self[slice(-n, None)]

__repr__ #

__repr__()

Return a string representation of the data-frame.

Source code in resource_tracker/tiny_data_frame.py
def __repr__(self) -> str:
    """Return a string representation of the data-frame."""
    return f"TinyDataFrame with {len(self)} rows and {len(self.columns)} columns. First row as a dict: {self[0]}"

__str__ #

__str__()

Print the first 10 rows of the data-frame in a human-readable table.

Source code in resource_tracker/tiny_data_frame.py
def __str__(self) -> str:
    """Print the first 10 rows of the data-frame in a human-readable table."""
    header = (
        f"TinyDataFrame with {len(self)} rows and {len(self.columns)} columns:\n"
    )
    if len(self) == 0:
        return header + "Empty dataframe"

    max_rows = min(10, len(self))

    col_widths = {}
    for col in self.columns:
        col_widths[col] = len(str(col))
        for i in range(max_rows):
            col_widths[col] = max(col_widths[col], len(str(self._data[col][i])))

    rows = []
    header_row = " | ".join(str(col).ljust(col_widths[col]) for col in self.columns)
    rows.append(header_row)
    separator = "-+-".join("-" * col_widths[col] for col in self.columns)
    rows.append(separator)

    for i in range(max_rows):
        row_values = []
        for col in self.columns:
            value = str(self._data[col][i])
            # right-align numbers, left-align strings
            try:
                float(value)  # check if it's a number
                row_values.append(value.rjust(col_widths[col]))
            except ValueError:
                row_values.append(value.ljust(col_widths[col]))
        rows.append(" | ".join(row_values))

    # add ellipsis if there are more rows
    if len(self) > max_rows:
        rows.append("..." + " " * (len(rows[0]) - 3))
    return header + "\n".join(rows)

to_csv #

to_csv(csv_file_path=None, quote_strings=True)

Write the data-frame to a CSV file or return as string if no path is provided.

Parameters:

Name Type Description Default
csv_file_path Optional[str]

Path to write CSV file. If None, returns CSV as string.

None
quote_strings bool

Whether to quote strings.

True
Source code in resource_tracker/tiny_data_frame.py
def to_csv(
    self, csv_file_path: Optional[str] = None, quote_strings: bool = True
) -> str:
    """Write the data-frame to a CSV file or return as string if no path is provided.

    Args:
        csv_file_path: Path to write CSV file. If None, returns CSV as string.
        quote_strings: Whether to quote strings.
    """
    if csv_file_path:
        f = open(csv_file_path, "w", newline="")
    else:
        f = StringIO(newline="")

    try:
        writer = csv_writer(
            f, quoting=QUOTE_NONNUMERIC if quote_strings else QUOTE_MINIMAL
        )
        writer.writerow(self.columns)
        for i in range(len(self)):
            writer.writerow([self._data[col][i] for col in self.columns])

        if not csv_file_path:
            return f.getvalue()
    finally:
        f.close()

rename #

rename(columns)

Rename one or multiple columns.

Parameters:

Name Type Description Default
columns dict

Dictionary mapping old column names to new column names.

required

Returns:

Type Description
TinyDataFrame

Self for method chaining.

Raises:

Type Description
KeyError

If any old column name doesn't exist in the dataframe.

Source code in resource_tracker/tiny_data_frame.py
def rename(self, columns: dict) -> "TinyDataFrame":
    """Rename one or multiple columns.

    Args:
        columns: Dictionary mapping old column names to new column names.

    Returns:
        Self for method chaining.

    Raises:
        KeyError: If any old column name doesn't exist in the dataframe.
    """
    for old_name in columns.keys():
        if old_name not in self.columns:
            raise KeyError(f"Column '{old_name}' not found in dataframe")

    for i, col in enumerate(self.columns):
        if col in columns:
            self.columns[i] = columns[col]
    # note that order of columns might change, but self.columns matters anyway
    for old_name, new_name in columns.items():
        self._data[new_name] = self._data.pop(old_name)

    return self

PidTracker #

Track resource usage of a process and optionally its children.

This class monitors system resources like CPU times and usage, memory usage, GPU and VRAM utilization, I/O operations for a given process ID and optionally its child processes.

Data is collected every interval seconds and written to the stdout or output_file (if provided) as CSV. Currently, the following columns are tracked:

  • timestamp (float): The current timestamp.
  • pid (int): The monitored process ID.
  • children (int | None): The current number of child processes.
  • utime (int): The total user+nice mode CPU time in clock ticks.
  • stime (int): The total system mode CPU time in clock ticks.
  • cpu_usage (float): The current CPU usage between 0 and number of CPUs.
  • pss (int): The current PSS (Proportional Set Size) in kB.
  • read_bytes (int): The total number of bytes read from disk.
  • write_bytes (int): The total number of bytes written to disk.
  • gpu_usage (float): The current GPU utilization between 0 and GPU count.
  • gpu_vram (float): The current GPU memory used in MiB.
  • gpu_utilized (int): The number of GPUs with utilization > 0.

Parameters:

Name Type Description Default
pid int

Process ID to track. Defaults to current process ID.

getpid()
interval float

Sampling interval in seconds. Defaults to 1.

1
children bool

Whether to track child processes. Defaults to True.

True
autostart bool

Whether to start tracking immediately. Defaults to True.

True
output_file str

File to write the output to. Defaults to None, print to stdout.

None

Methods:

Name Description
__call__

Dummy method to make this class callable.

diff_stats

Calculate stats since last call.

start_tracking

Start an infinite loop tracking resource usage of the process until it exits.

Source code in resource_tracker/tracker.py
class PidTracker:
    """Track resource usage of a process and optionally its children.

    This class monitors system resources like CPU times and usage, memory usage,
    GPU and VRAM utilization, I/O operations for a given process ID and
    optionally its child processes.

    Data is collected every `interval` seconds and written to the stdout or
    `output_file` (if provided) as CSV. Currently, the following columns are
    tracked:

    - timestamp (float): The current timestamp.
    - pid (int): The monitored process ID.
    - children (int | None): The current number of child processes.
    - utime (int): The total user+nice mode CPU time in clock ticks.
    - stime (int): The total system mode CPU time in clock ticks.
    - cpu_usage (float): The current CPU usage between 0 and number of CPUs.
    - pss (int): The current PSS (Proportional Set Size) in kB.
    - read_bytes (int): The total number of bytes read from disk.
    - write_bytes (int): The total number of bytes written to disk.
    - gpu_usage (float): The current GPU utilization between 0 and GPU count.
    - gpu_vram (float): The current GPU memory used in MiB.
    - gpu_utilized (int): The number of GPUs with utilization > 0.

    Args:
        pid (int, optional): Process ID to track. Defaults to current process ID.
        interval (float, optional): Sampling interval in seconds. Defaults to 1.
        children (bool, optional): Whether to track child processes. Defaults to True.
        autostart (bool, optional): Whether to start tracking immediately. Defaults to True.
        output_file (str, optional): File to write the output to. Defaults to None, print to stdout.
    """

    def __init__(
        self,
        pid: int = getpid(),
        interval: float = 1,
        children: bool = True,
        autostart: bool = True,
        output_file: str = None,
    ):
        self.pid = pid
        self.status = "running"
        self.interval = interval
        self.cycle = 0
        self.children = children
        self.start_time = time()
        self.stats = get_pid_stats(pid, children)
        if autostart:
            self.start_tracking(output_file)

    def __call__(self):
        """Dummy method to make this class callable."""
        pass

    def diff_stats(self):
        """Calculate stats since last call."""
        last_stats = self.stats
        self.stats = get_pid_stats(self.pid, self.children)
        self.cycle += 1

        return {
            "timestamp": self.stats["timestamp"],
            "pid": self.pid,
            "children": self.stats["children"],
            "utime": max(0, self.stats["utime"] - last_stats["utime"]),
            "stime": max(0, self.stats["stime"] - last_stats["stime"]),
            "cpu_usage": round(
                max(
                    0,
                    (
                        (self.stats["utime"] + self.stats["stime"])
                        - (last_stats["utime"] + last_stats["stime"])
                    )
                    / (self.stats["timestamp"] - last_stats["timestamp"])
                    / sysconf("SC_CLK_TCK"),
                ),
                4,
            ),
            "pss": self.stats["pss"],
            "read_bytes": max(0, self.stats["read_bytes"] - last_stats["read_bytes"]),
            "write_bytes": max(
                0, self.stats["write_bytes"] - last_stats["write_bytes"]
            ),
            "gpu_usage": self.stats["gpu_usage"],
            "gpu_vram": self.stats["gpu_vram"],
            "gpu_utilized": self.stats["gpu_utilized"],
        }

    def start_tracking(
        self, output_file: Optional[str] = None, print_header: bool = True
    ):
        """Start an infinite loop tracking resource usage of the process until it exits.

        A CSV line is written every `interval` seconds.

        Args:
            output_file: File to write the output to. Defaults to None, printing to stdout.
            print_header: Whether to print the header of the CSV. Defaults to True.
        """
        file_handle = open(output_file, "w") if output_file else stdout
        file_writer = csv_writer(file_handle, quoting=QUOTE_NONNUMERIC)
        try:
            while True:
                current_time = time()
                current_stats = self.diff_stats()
                if current_stats["pss"] == 0:
                    # the process has exited
                    self.status = "exited"
                    break
                if self.cycle == 1 and print_header:
                    file_writer.writerow(current_stats.keys())
                else:
                    file_writer.writerow(current_stats.values())
                if output_file:
                    file_handle.flush()
                sleep(max(0, self.interval - (time() - current_time)))
        finally:
            if output_file and not file_handle.closed:
                file_handle.close()

__call__ #

__call__()

Dummy method to make this class callable.

Source code in resource_tracker/tracker.py
def __call__(self):
    """Dummy method to make this class callable."""
    pass

diff_stats #

diff_stats()

Calculate stats since last call.

Source code in resource_tracker/tracker.py
def diff_stats(self):
    """Calculate stats since last call."""
    last_stats = self.stats
    self.stats = get_pid_stats(self.pid, self.children)
    self.cycle += 1

    return {
        "timestamp": self.stats["timestamp"],
        "pid": self.pid,
        "children": self.stats["children"],
        "utime": max(0, self.stats["utime"] - last_stats["utime"]),
        "stime": max(0, self.stats["stime"] - last_stats["stime"]),
        "cpu_usage": round(
            max(
                0,
                (
                    (self.stats["utime"] + self.stats["stime"])
                    - (last_stats["utime"] + last_stats["stime"])
                )
                / (self.stats["timestamp"] - last_stats["timestamp"])
                / sysconf("SC_CLK_TCK"),
            ),
            4,
        ),
        "pss": self.stats["pss"],
        "read_bytes": max(0, self.stats["read_bytes"] - last_stats["read_bytes"]),
        "write_bytes": max(
            0, self.stats["write_bytes"] - last_stats["write_bytes"]
        ),
        "gpu_usage": self.stats["gpu_usage"],
        "gpu_vram": self.stats["gpu_vram"],
        "gpu_utilized": self.stats["gpu_utilized"],
    }

start_tracking #

start_tracking(output_file=None, print_header=True)

Start an infinite loop tracking resource usage of the process until it exits.

A CSV line is written every interval seconds.

Parameters:

Name Type Description Default
output_file Optional[str]

File to write the output to. Defaults to None, printing to stdout.

None
print_header bool

Whether to print the header of the CSV. Defaults to True.

True
Source code in resource_tracker/tracker.py
def start_tracking(
    self, output_file: Optional[str] = None, print_header: bool = True
):
    """Start an infinite loop tracking resource usage of the process until it exits.

    A CSV line is written every `interval` seconds.

    Args:
        output_file: File to write the output to. Defaults to None, printing to stdout.
        print_header: Whether to print the header of the CSV. Defaults to True.
    """
    file_handle = open(output_file, "w") if output_file else stdout
    file_writer = csv_writer(file_handle, quoting=QUOTE_NONNUMERIC)
    try:
        while True:
            current_time = time()
            current_stats = self.diff_stats()
            if current_stats["pss"] == 0:
                # the process has exited
                self.status = "exited"
                break
            if self.cycle == 1 and print_header:
                file_writer.writerow(current_stats.keys())
            else:
                file_writer.writerow(current_stats.values())
            if output_file:
                file_handle.flush()
            sleep(max(0, self.interval - (time() - current_time)))
    finally:
        if output_file and not file_handle.closed:
            file_handle.close()

SystemTracker #

Track system-wide resource usage.

This class monitors system resources like CPU times and usage, memory usage, GPU and VRAM utilization, disk I/O, and network traffic for the entire system.

Data is collected every interval seconds and written to the stdout or output_file (if provided) as CSV. Currently, the following columns are tracked:

  • timestamp (float): The current timestamp.
  • processes (int): The number of running processes.
  • utime (int): The total user+nice mode CPU time in clock ticks.
  • stime (int): The total system mode CPU time in clock ticks.
  • cpu_usage (float): The current CPU usage between 0 and number of CPUs.
  • memory_free (int): The amount of free memory in kB.
  • memory_used (int): The amount of used memory in kB.
  • memory_buffers (int): The amount of memory used for buffers in kB.
  • memory_cached (int): The amount of memory used for caching in kB.
  • memory_active_anon (int): The amount of memory used for anonymous pages in kB.
  • memory_inactive_anon (int): The amount of memory used for inactive anonymous pages in kB.
  • disk_read_bytes (int): The total number of bytes read from disk.
  • disk_write_bytes (int): The total number of bytes written to disk.
  • disk_space_total_gb (float): The total disk space in GB.
  • disk_space_used_gb (float): The used disk space in GB.
  • disk_space_free_gb (float): The free disk space in GB.
  • net_recv_bytes (int): The total number of bytes received over network.
  • net_sent_bytes (int): The total number of bytes sent over network.
  • gpu_usage (float): The current GPU utilization between 0 and GPU count.
  • gpu_vram (float): The current GPU memory used in MiB.
  • gpu_utilized (int): The number of GPUs with utilization > 0.

Parameters:

Name Type Description Default
interval float

Sampling interval in seconds. Defaults to 1.

1
autostart bool

Whether to start tracking immediately. Defaults to True.

True
output_file str

File to write the output to. Defaults to None, print to stdout.

None

Methods:

Name Description
__call__

Dummy method to make this class callable.

diff_stats

Calculate stats since last call.

start_tracking

Start an infinite loop tracking system resource usage.

Source code in resource_tracker/tracker.py
class SystemTracker:
    """Track system-wide resource usage.

    This class monitors system resources like CPU times and usage, memory usage,
    GPU and VRAM utilization, disk I/O, and network traffic for the entire system.

    Data is collected every `interval` seconds and written to the stdout or
    `output_file` (if provided) as CSV. Currently, the following columns are
    tracked:

    - timestamp (float): The current timestamp.
    - processes (int): The number of running processes.
    - utime (int): The total user+nice mode CPU time in clock ticks.
    - stime (int): The total system mode CPU time in clock ticks.
    - cpu_usage (float): The current CPU usage between 0 and number of CPUs.
    - memory_free (int): The amount of free memory in kB.
    - memory_used (int): The amount of used memory in kB.
    - memory_buffers (int): The amount of memory used for buffers in kB.
    - memory_cached (int): The amount of memory used for caching in kB.
    - memory_active_anon (int): The amount of memory used for anonymous pages in kB.
    - memory_inactive_anon (int): The amount of memory used for inactive anonymous pages in kB.
    - disk_read_bytes (int): The total number of bytes read from disk.
    - disk_write_bytes (int): The total number of bytes written to disk.
    - disk_space_total_gb (float): The total disk space in GB.
    - disk_space_used_gb (float): The used disk space in GB.
    - disk_space_free_gb (float): The free disk space in GB.
    - net_recv_bytes (int): The total number of bytes received over network.
    - net_sent_bytes (int): The total number of bytes sent over network.
    - gpu_usage (float): The current GPU utilization between 0 and GPU count.
    - gpu_vram (float): The current GPU memory used in MiB.
    - gpu_utilized (int): The number of GPUs with utilization > 0.

    Args:
        interval: Sampling interval in seconds. Defaults to 1.
        autostart: Whether to start tracking immediately. Defaults to True.
        output_file: File to write the output to. Defaults to None, print to stdout.
    """

    def __init__(
        self,
        interval: float = 1,
        autostart: bool = True,
        output_file: str = None,
    ):
        self.status = "running"
        self.interval = interval
        self.cycle = 0
        self.start_time = time()

        # get sector sizes for all disks
        self.sector_sizes = {}
        with suppress(FileNotFoundError):
            for disk_path in glob("/sys/block/*/"):
                disk_name = disk_path.split("/")[-2]
                if is_partition(disk_name):
                    continue
                try:
                    with open(f"{disk_path}queue/hw_sector_size", "r") as f:
                        self.sector_sizes[disk_name] = int(f.read().strip())
                except (FileNotFoundError, ValueError):
                    self.sector_sizes[disk_name] = 512

        self.stats = get_system_stats()
        if autostart:
            self.start_tracking(output_file)

    def __call__(self):
        """Dummy method to make this class callable."""
        pass

    def diff_stats(self):
        """Calculate stats since last call."""
        last_stats = self.stats
        self.stats = get_system_stats()
        self.cycle += 1

        time_diff = self.stats["timestamp"] - last_stats["timestamp"]

        # calculate total disk I/O in bytes using per-disk sector sizes
        total_read_bytes = 0
        total_write_bytes = 0
        for disk_name in set(self.stats["disk_stats"]) & set(last_stats["disk_stats"]):
            sector_size = self.sector_sizes.get(disk_name, 512)
            read_sectors = max(
                0,
                self.stats["disk_stats"][disk_name]["read_sectors"]
                - last_stats["disk_stats"][disk_name]["read_sectors"],
            )
            write_sectors = max(
                0,
                self.stats["disk_stats"][disk_name]["write_sectors"]
                - last_stats["disk_stats"][disk_name]["write_sectors"],
            )
            total_read_bytes += read_sectors * sector_size
            total_write_bytes += write_sectors * sector_size

        # Get disk usage information for reporting
        disk_space_total = 0
        disk_space_used = 0
        disk_space_free = 0
        for disk_space in self.stats["disk_spaces"].values():
            disk_space_total += disk_space["total"]
            disk_space_used += disk_space["used"]
            disk_space_free += disk_space["free"]

        return {
            "timestamp": self.stats["timestamp"],
            "processes": self.stats["processes"],
            "utime": max(0, self.stats["utime"] - last_stats["utime"]),
            "stime": max(0, self.stats["stime"] - last_stats["stime"]),
            "cpu_usage": round(
                max(
                    0,
                    (
                        (self.stats["utime"] + self.stats["stime"])
                        - (last_stats["utime"] + last_stats["stime"])
                    )
                    / time_diff
                    / sysconf("SC_CLK_TCK"),
                ),
                4,
            ),
            "memory_free": self.stats["memory_free"],
            "memory_used": self.stats["memory_used"],
            "memory_buffers": self.stats["memory_buffers"],
            "memory_cached": self.stats["memory_cached"],
            "memory_active_anon": self.stats["memory_active_anon"],
            "memory_inactive_anon": self.stats["memory_inactive_anon"],
            "disk_read_bytes": total_read_bytes,
            "disk_write_bytes": total_write_bytes,
            "disk_space_total_gb": round(disk_space_total / (1024**3), 2),
            "disk_space_used_gb": round(disk_space_used / (1024**3), 2),
            "disk_space_free_gb": round(disk_space_free / (1024**3), 2),
            "net_recv_bytes": max(
                0, self.stats["net_recv_bytes"] - last_stats["net_recv_bytes"]
            ),
            "net_sent_bytes": max(
                0, self.stats["net_sent_bytes"] - last_stats["net_sent_bytes"]
            ),
            "gpu_usage": self.stats["gpu_usage"],
            "gpu_vram": self.stats["gpu_vram"],
            "gpu_utilized": self.stats["gpu_utilized"],
        }

    def start_tracking(
        self, output_file: Optional[str] = None, print_header: bool = True
    ):
        """Start an infinite loop tracking system resource usage.

        A CSV line is written every `interval` seconds.

        Args:
            output_file: File to write the output to. Defaults to None, printing to stdout.
            print_header: Whether to print the header of the CSV. Defaults to True.
        """
        file_handle = open(output_file, "w") if output_file else stdout
        file_writer = csv_writer(file_handle, quoting=QUOTE_NONNUMERIC)
        try:
            while True:
                current_time = time()
                current_stats = self.diff_stats()
                if self.cycle == 1 and print_header:
                    file_writer.writerow(current_stats.keys())
                else:
                    file_writer.writerow(current_stats.values())
                if output_file:
                    file_handle.flush()
                sleep(max(0, self.interval - (time() - current_time)))
        finally:
            if output_file and not file_handle.closed:
                file_handle.close()

__call__ #

__call__()

Dummy method to make this class callable.

Source code in resource_tracker/tracker.py
def __call__(self):
    """Dummy method to make this class callable."""
    pass

diff_stats #

diff_stats()

Calculate stats since last call.

Source code in resource_tracker/tracker.py
def diff_stats(self):
    """Calculate stats since last call."""
    last_stats = self.stats
    self.stats = get_system_stats()
    self.cycle += 1

    time_diff = self.stats["timestamp"] - last_stats["timestamp"]

    # calculate total disk I/O in bytes using per-disk sector sizes
    total_read_bytes = 0
    total_write_bytes = 0
    for disk_name in set(self.stats["disk_stats"]) & set(last_stats["disk_stats"]):
        sector_size = self.sector_sizes.get(disk_name, 512)
        read_sectors = max(
            0,
            self.stats["disk_stats"][disk_name]["read_sectors"]
            - last_stats["disk_stats"][disk_name]["read_sectors"],
        )
        write_sectors = max(
            0,
            self.stats["disk_stats"][disk_name]["write_sectors"]
            - last_stats["disk_stats"][disk_name]["write_sectors"],
        )
        total_read_bytes += read_sectors * sector_size
        total_write_bytes += write_sectors * sector_size

    # Get disk usage information for reporting
    disk_space_total = 0
    disk_space_used = 0
    disk_space_free = 0
    for disk_space in self.stats["disk_spaces"].values():
        disk_space_total += disk_space["total"]
        disk_space_used += disk_space["used"]
        disk_space_free += disk_space["free"]

    return {
        "timestamp": self.stats["timestamp"],
        "processes": self.stats["processes"],
        "utime": max(0, self.stats["utime"] - last_stats["utime"]),
        "stime": max(0, self.stats["stime"] - last_stats["stime"]),
        "cpu_usage": round(
            max(
                0,
                (
                    (self.stats["utime"] + self.stats["stime"])
                    - (last_stats["utime"] + last_stats["stime"])
                )
                / time_diff
                / sysconf("SC_CLK_TCK"),
            ),
            4,
        ),
        "memory_free": self.stats["memory_free"],
        "memory_used": self.stats["memory_used"],
        "memory_buffers": self.stats["memory_buffers"],
        "memory_cached": self.stats["memory_cached"],
        "memory_active_anon": self.stats["memory_active_anon"],
        "memory_inactive_anon": self.stats["memory_inactive_anon"],
        "disk_read_bytes": total_read_bytes,
        "disk_write_bytes": total_write_bytes,
        "disk_space_total_gb": round(disk_space_total / (1024**3), 2),
        "disk_space_used_gb": round(disk_space_used / (1024**3), 2),
        "disk_space_free_gb": round(disk_space_free / (1024**3), 2),
        "net_recv_bytes": max(
            0, self.stats["net_recv_bytes"] - last_stats["net_recv_bytes"]
        ),
        "net_sent_bytes": max(
            0, self.stats["net_sent_bytes"] - last_stats["net_sent_bytes"]
        ),
        "gpu_usage": self.stats["gpu_usage"],
        "gpu_vram": self.stats["gpu_vram"],
        "gpu_utilized": self.stats["gpu_utilized"],
    }

start_tracking #

start_tracking(output_file=None, print_header=True)

Start an infinite loop tracking system resource usage.

A CSV line is written every interval seconds.

Parameters:

Name Type Description Default
output_file Optional[str]

File to write the output to. Defaults to None, printing to stdout.

None
print_header bool

Whether to print the header of the CSV. Defaults to True.

True
Source code in resource_tracker/tracker.py
def start_tracking(
    self, output_file: Optional[str] = None, print_header: bool = True
):
    """Start an infinite loop tracking system resource usage.

    A CSV line is written every `interval` seconds.

    Args:
        output_file: File to write the output to. Defaults to None, printing to stdout.
        print_header: Whether to print the header of the CSV. Defaults to True.
    """
    file_handle = open(output_file, "w") if output_file else stdout
    file_writer = csv_writer(file_handle, quoting=QUOTE_NONNUMERIC)
    try:
        while True:
            current_time = time()
            current_stats = self.diff_stats()
            if self.cycle == 1 and print_header:
                file_writer.writerow(current_stats.keys())
            else:
                file_writer.writerow(current_stats.values())
            if output_file:
                file_handle.flush()
            sleep(max(0, self.interval - (time() - current_time)))
    finally:
        if output_file and not file_handle.closed:
            file_handle.close()