Skip to content

resource_tracker

resource_tracker #

Resource Tracker package for monitoring resource usage, detecting cloud environments, and more.

Modules:

Name Description
cloud_info

Detect cloud environment (provider, region, instance type) via VM metadata services.

helpers

General helpers.

nvidia

Helpers to monitor NVIDIA GPUs.

server_info

Detect server hardware (CPU count, memory amount, disk space, GPU count and VRAM amount) via procfs or psutil, and nvidia-smi.

tiny_data_frame

A very inefficient data-frame implementation for manipulating resource usage data.

tracker

Track resource usage of a process and/or the system.

tracker_procfs

Helpers to track resource usage via procfs.

tracker_psutil

Helpers to track resource usage via psutil.

Classes:

Name Description
TinyDataFrame

A very inefficient data-frame implementation with a few features.

PidTracker

Track resource usage of a process and optionally its children.

ResourceTracker

Track resource usage of processes and the system in a non-blocking way.

SystemTracker

Track system-wide resource usage.

Functions:

Name Description
get_cloud_info

Detect cloud environment and return standardized information on provider, region, and instance type.

get_server_info

Collects important information about the Linux server.

get_cloud_info cached #

get_cloud_info()

Detect cloud environment and return standardized information on provider, region, and instance type.

Returns:

Type Description
dict

A dictionary containing standardized cloud information:

  • vendor: The cloud provider (aws, gcp, azure, hcloud, upcloud), or "unknown"
  • instance_type: The instance type/size/flavor, or "unknown"
  • region: The region/zone where the instance is running, or "unknown"
Source code in resource_tracker/cloud_info.py
@cache
def get_cloud_info() -> dict:
    """
    Detect cloud environment and return standardized information on provider, region, and instance type.

    Returns:
        A dictionary containing standardized cloud information:

            - `vendor`: The cloud provider (aws, gcp, azure, hcloud, upcloud), or "unknown"
            - `instance_type`: The instance type/size/flavor, or "unknown"
            - `region`: The region/zone where the instance is running, or "unknown"
    """
    start_time = time()
    check_functions = [
        _check_aws,
        _check_gcp,
        _check_azure,
        _check_hetzner,
        _check_upcloud,
    ]

    # run checks in parallel, return early if any check succeeds
    with ThreadPoolExecutor(max_workers=len(check_functions)) as executor:
        futures = {executor.submit(check_fn): check_fn for check_fn in check_functions}
        pending = set(futures.keys())
        while pending:
            done, pending = wait(pending, return_when=FIRST_COMPLETED)
            for future in done:
                with suppress(Exception):
                    info = future.result()
                    if info:
                        # stop all remaining checks early
                        for f in pending:
                            f.cancel()
                        return info | {"discovery_time": time() - start_time}

    return {
        "vendor": "unknown",
        "instance_type": "unknown",
        "region": "unknown",
        "discovery_time": time() - start_time,
    }

get_server_info #

get_server_info()

Collects important information about the Linux server.

Returns:

Type Description
dict

A dictionary containing server information:

  • os: Operating system
  • vcpus: Number of virtual CPUs
  • memory_mb: Total memory in MB
  • gpu_count: Number of GPUs (0 if not available)
  • gpu_memory_mb: Total VRAM in MB (0 if not available)
Source code in resource_tracker/server_info.py
def get_server_info() -> dict:
    """
    Collects important information about the Linux server.

    Returns:
        A dictionary containing server information:

            - `os`: Operating system
            - `vcpus`: Number of virtual CPUs
            - `memory_mb`: Total memory in MB
            - `gpu_count`: Number of GPUs (`0` if not available)
            - `gpu_memory_mb`: Total VRAM in MB (`0` if not available)
    """
    gpu_info = get_gpu_info()
    info = {
        "os": system(),
        "vcpus": cpu_count(),
        "memory_mb": get_total_memory_mb(),
        "gpu_count": gpu_info["count"],
        "gpu_names": gpu_info["gpu_names"],
        "gpu_memory_mb": gpu_info["memory_mb"],
    }
    return info

TinyDataFrame #

A very inefficient data-frame implementation with a few features.

Supported features:

  • reading CSV files from a remote URL
  • reading CSV files from a local file
  • converting a dictionary of lists/arrays to a data-frame
  • converting a list of dictionaries to a data-frame
  • slicing rows
  • slicing columns
  • slicing rows and columns
  • printing a summary of the data-frame
  • printing the data-frame as a human-readable (grid) table
  • renaming columns
  • writing to a CSV file

Parameters:

Name Type Description Default
data Optional[Union[Dict[str, List[float]], List[Dict[str, float]]]]

Dictionary of lists/arrays or list of dictionaries.

None
csv_file_path Optional[str]

Path to a properly quoted CSV file.

None

Example:

>>> df = TinyDataFrame(csv_file_path="https://raw.githubusercontent.com/plotly/datasets/refs/heads/master/mtcars.csv")
>>> df
TinyDataFrame with 32 rows and 12 columns. First row as a dict: {'manufacturer': 'Mazda RX4', 'mpg': 21.0, 'cyl': 6.0, 'disp': 160.0, 'hp': 110.0, 'drat': 3.9, 'wt': 2.62, 'qsec': 16.46, 'vs': 0.0, 'am': 1.0, 'gear': 4.0, 'carb': 4.0}
>>> df[2:5][['manufacturer', 'hp']]
TinyDataFrame with 3 rows and 2 columns. First row as a dict: {'manufacturer': 'Datsun 710', 'hp': 93.0}
>>> print(df[2:5][['manufacturer', 'hp']])  # doctest: +NORMALIZE_WHITESPACE
TinyDataFrame with 3 rows and 2 columns:
manufacturer      | hp
------------------+------
Datsun 710        |  93.0
Hornet 4 Drive    | 110.0
Hornet Sportabout | 175.0
>>> print(df[2:5][['manufacturer', 'hp']].to_csv())  # doctest: +NORMALIZE_WHITESPACE
"manufacturer","hp"
"Datsun 710",93.0
"Hornet 4 Drive",110.0
"Hornet Sportabout",175.0

Methods:

Name Description
__init__

Initialize with either:

__len__

Return the number of rows in the data-frame

__getitem__

Get a single column or multiple columns or a row or a slice of rows. Can be chained.

__setitem__

Set a column with the given key to the provided values.

head

Return first n rows as a new TinyDataFrame.

tail

Return last n rows as a new TinyDataFrame.

__repr__

Return a string representation of the data-frame.

__str__

Print the first 10 rows of the data-frame in a human-readable table.

to_csv

Write the data-frame to a CSV file or return as string if no path is provided.

rename

Rename one or multiple columns.

Source code in resource_tracker/tiny_data_frame.py
class TinyDataFrame:
    """A very inefficient data-frame implementation with a few features.

    Supported features:

    - reading CSV files from a remote URL
    - reading CSV files from a local file
    - converting a dictionary of lists/arrays to a data-frame
    - converting a list of dictionaries to a data-frame
    - slicing rows
    - slicing columns
    - slicing rows and columns
    - printing a summary of the data-frame
    - printing the data-frame as a human-readable (grid) table
    - renaming columns
    - writing to a CSV file

    Args:
        data: Dictionary of lists/arrays or list of dictionaries.
        csv_file_path: Path to a properly quoted CSV file.

    Example:

        >>> df = TinyDataFrame(csv_file_path="https://raw.githubusercontent.com/plotly/datasets/refs/heads/master/mtcars.csv")
        >>> df
        TinyDataFrame with 32 rows and 12 columns. First row as a dict: {'manufacturer': 'Mazda RX4', 'mpg': 21.0, 'cyl': 6.0, 'disp': 160.0, 'hp': 110.0, 'drat': 3.9, 'wt': 2.62, 'qsec': 16.46, 'vs': 0.0, 'am': 1.0, 'gear': 4.0, 'carb': 4.0}
        >>> df[2:5][['manufacturer', 'hp']]
        TinyDataFrame with 3 rows and 2 columns. First row as a dict: {'manufacturer': 'Datsun 710', 'hp': 93.0}
        >>> print(df[2:5][['manufacturer', 'hp']])  # doctest: +NORMALIZE_WHITESPACE
        TinyDataFrame with 3 rows and 2 columns:
        manufacturer      | hp
        ------------------+------
        Datsun 710        |  93.0
        Hornet 4 Drive    | 110.0
        Hornet Sportabout | 175.0
        >>> print(df[2:5][['manufacturer', 'hp']].to_csv())  # doctest: +NORMALIZE_WHITESPACE
        "manufacturer","hp"
        "Datsun 710",93.0
        "Hornet 4 Drive",110.0
        "Hornet Sportabout",175.0
    """

    def __init__(
        self,
        data: Optional[Union[Dict[str, List[float]], List[Dict[str, float]]]] = None,
        csv_file_path: Optional[str] = None,
    ):
        """
        Initialize with either:

        - Dictionary of lists/arrays
        - List of dictionaries
        - CSV file path
        """
        self.columns = []
        self._data = {}

        assert data is not None or csv_file_path is not None, (
            "either data or csv_file_path must be provided"
        )
        assert data is None or csv_file_path is None, (
            "only one of data or csv_file_path must be provided"
        )
        assert data is None or isinstance(data, dict) or isinstance(data, list), (
            "data must be a dictionary or a list"
        )
        assert csv_file_path is None or isinstance(csv_file_path, str), (
            "csv_file_path must be a string"
        )

        if csv_file_path:
            data = self._read_csv(csv_file_path)

        if isinstance(data, dict):
            self._data = {k: list(v) for k, v in data.items()}
            self.columns = list(self._data.keys())
        elif isinstance(data, list) and data and isinstance(data[0], dict):
            # let's preserve column order
            self.columns = []
            seen_columns = set()
            for row in data:
                for col in row.keys():
                    if col not in seen_columns:
                        self.columns.append(col)
                        seen_columns.add(col)
            self._data = {col: [row.get(col) for row in data] for col in self.columns}

    def _read_csv(self, csv_file_path: str) -> list[dict]:
        """Read a CSV file and return a list of dictionaries.

        Args:
            csv_file_path: CSV file path or URL.
        """
        results = []

        parsed = urlparse(csv_file_path)
        if parsed.scheme in ("http", "https"):
            with urlopen(csv_file_path) as response:
                content = response.read().decode("utf-8").splitlines()
                csv_source = content
        else:
            csv_source = open(csv_file_path, "r")

        try:
            reader = DictReader(csv_source, quoting=QUOTE_NONNUMERIC)
            results = list(reader)
        finally:
            if hasattr(csv_source, "close"):
                csv_source.close()

        return results

    def __len__(self):
        """Return the number of rows in the data-frame"""
        return len(next(iter(self._data.values()))) if self.columns else 0

    def __getitem__(
        self, key: Union[str, List[str], int, slice]
    ) -> Union[List[float], Dict[str, float], "TinyDataFrame"]:
        """Get a single column or multiple columns or a row or a slice of rows. Can be chained.

        Args:
            key: A single column name, a list of column names, a row index, or a slice of row indexes.

        Returns:
            A single column as a list, a list of columns as a new TinyDataFrame, a row as a dictionary, or a slice of rows as a new TinyDataFrame.
        """
        # a single column
        if isinstance(key, str):
            return self._data[key]
        # multiple columns
        elif isinstance(key, List) and all(isinstance(k, str) for k in key):
            return TinyDataFrame(
                {col: self._data[col] for col in key if col in self._data}
            )
        # row index
        elif isinstance(key, int):
            return {col: self._data[col][key] for col in self.columns}
        # row indexes
        elif isinstance(key, slice):
            return TinyDataFrame({col: self._data[col][key] for col in self.columns})
        else:
            raise TypeError(f"Invalid key type: {type(key)}")

    def __setitem__(self, key: str, value: List[float]) -> None:
        """Set a column with the given key to the provided values.

        Args:
            key: Column name (string)
            value: List of values for the column

        Raises:
            TypeError: If key is not a string
            ValueError: If the length of values doesn't match the dataframe length
        """
        if not isinstance(key, str):
            raise TypeError(f"Column name must be a string, got {type(key)}")

        if len(self) > 0 and len(value) != len(self):
            raise ValueError(
                f"Length of values ({len(value)}) must match dataframe length ({len(self)})"
            )

        if key not in self.columns:
            self.columns.append(key)

        self._data[key] = list(value)

    def head(self, n: int = 5) -> "TinyDataFrame":
        """Return first n rows as a new TinyDataFrame."""
        return self[slice(0, n)]

    def tail(self, n: int = 5) -> "TinyDataFrame":
        """Return last n rows as a new TinyDataFrame."""
        return self[slice(-n, None)]

    def __repr__(self) -> str:
        """Return a string representation of the data-frame."""
        return f"TinyDataFrame with {len(self)} rows and {len(self.columns)} columns. First row as a dict: {self[0]}"

    def __str__(self) -> str:
        """Print the first 10 rows of the data-frame in a human-readable table."""
        header = (
            f"TinyDataFrame with {len(self)} rows and {len(self.columns)} columns:\n"
        )
        if len(self) == 0:
            return header + "Empty dataframe"

        max_rows = min(10, len(self))

        col_widths = {}
        for col in self.columns:
            col_widths[col] = len(str(col))
            for i in range(max_rows):
                col_widths[col] = max(col_widths[col], len(str(self._data[col][i])))

        rows = []
        header_row = " | ".join(str(col).ljust(col_widths[col]) for col in self.columns)
        rows.append(header_row)
        separator = "-+-".join("-" * col_widths[col] for col in self.columns)
        rows.append(separator)

        for i in range(max_rows):
            row_values = []
            for col in self.columns:
                value = str(self._data[col][i])
                # right-align numbers, left-align strings
                try:
                    float(value)  # check if it's a number
                    row_values.append(value.rjust(col_widths[col]))
                except ValueError:
                    row_values.append(value.ljust(col_widths[col]))
            rows.append(" | ".join(row_values))

        # add ellipsis if there are more rows
        if len(self) > max_rows:
            rows.append("..." + " " * (len(rows[0]) - 3))
        return header + "\n".join(rows)

    def to_csv(
        self, csv_file_path: Optional[str] = None, quote_strings: bool = True
    ) -> str:
        """Write the data-frame to a CSV file or return as string if no path is provided.

        Args:
            csv_file_path: Path to write CSV file. If None, returns CSV as string.
            quote_strings: Whether to quote strings.
        """
        if csv_file_path:
            f = open(csv_file_path, "w", newline="")
        else:
            f = StringIO(newline="")

        try:
            writer = csv_writer(
                f, quoting=QUOTE_NONNUMERIC if quote_strings else QUOTE_MINIMAL
            )
            writer.writerow(self.columns)
            for i in range(len(self)):
                writer.writerow([self._data[col][i] for col in self.columns])

            if not csv_file_path:
                return f.getvalue()
        finally:
            f.close()

    def rename(self, columns: dict) -> "TinyDataFrame":
        """Rename one or multiple columns.

        Args:
            columns: Dictionary mapping old column names to new column names.

        Returns:
            Self for method chaining.

        Raises:
            KeyError: If any old column name doesn't exist in the dataframe.
        """
        for old_name in columns.keys():
            if old_name not in self.columns:
                raise KeyError(f"Column '{old_name}' not found in dataframe")

        for i, col in enumerate(self.columns):
            if col in columns:
                self.columns[i] = columns[col]
        # note that order of columns might change, but self.columns matters anyway
        for old_name, new_name in columns.items():
            self._data[new_name] = self._data.pop(old_name)

        return self

__init__ #

__init__(data=None, csv_file_path=None)

Initialize with either:

  • Dictionary of lists/arrays
  • List of dictionaries
  • CSV file path
Source code in resource_tracker/tiny_data_frame.py
def __init__(
    self,
    data: Optional[Union[Dict[str, List[float]], List[Dict[str, float]]]] = None,
    csv_file_path: Optional[str] = None,
):
    """
    Initialize with either:

    - Dictionary of lists/arrays
    - List of dictionaries
    - CSV file path
    """
    self.columns = []
    self._data = {}

    assert data is not None or csv_file_path is not None, (
        "either data or csv_file_path must be provided"
    )
    assert data is None or csv_file_path is None, (
        "only one of data or csv_file_path must be provided"
    )
    assert data is None or isinstance(data, dict) or isinstance(data, list), (
        "data must be a dictionary or a list"
    )
    assert csv_file_path is None or isinstance(csv_file_path, str), (
        "csv_file_path must be a string"
    )

    if csv_file_path:
        data = self._read_csv(csv_file_path)

    if isinstance(data, dict):
        self._data = {k: list(v) for k, v in data.items()}
        self.columns = list(self._data.keys())
    elif isinstance(data, list) and data and isinstance(data[0], dict):
        # let's preserve column order
        self.columns = []
        seen_columns = set()
        for row in data:
            for col in row.keys():
                if col not in seen_columns:
                    self.columns.append(col)
                    seen_columns.add(col)
        self._data = {col: [row.get(col) for row in data] for col in self.columns}

__len__ #

__len__()

Return the number of rows in the data-frame

Source code in resource_tracker/tiny_data_frame.py
def __len__(self):
    """Return the number of rows in the data-frame"""
    return len(next(iter(self._data.values()))) if self.columns else 0

__getitem__ #

__getitem__(key)

Get a single column or multiple columns or a row or a slice of rows. Can be chained.

Parameters:

Name Type Description Default
key Union[str, List[str], int, slice]

A single column name, a list of column names, a row index, or a slice of row indexes.

required

Returns:

Type Description
Union[List[float], Dict[str, float], TinyDataFrame]

A single column as a list, a list of columns as a new TinyDataFrame, a row as a dictionary, or a slice of rows as a new TinyDataFrame.

Source code in resource_tracker/tiny_data_frame.py
def __getitem__(
    self, key: Union[str, List[str], int, slice]
) -> Union[List[float], Dict[str, float], "TinyDataFrame"]:
    """Get a single column or multiple columns or a row or a slice of rows. Can be chained.

    Args:
        key: A single column name, a list of column names, a row index, or a slice of row indexes.

    Returns:
        A single column as a list, a list of columns as a new TinyDataFrame, a row as a dictionary, or a slice of rows as a new TinyDataFrame.
    """
    # a single column
    if isinstance(key, str):
        return self._data[key]
    # multiple columns
    elif isinstance(key, List) and all(isinstance(k, str) for k in key):
        return TinyDataFrame(
            {col: self._data[col] for col in key if col in self._data}
        )
    # row index
    elif isinstance(key, int):
        return {col: self._data[col][key] for col in self.columns}
    # row indexes
    elif isinstance(key, slice):
        return TinyDataFrame({col: self._data[col][key] for col in self.columns})
    else:
        raise TypeError(f"Invalid key type: {type(key)}")

__setitem__ #

__setitem__(key, value)

Set a column with the given key to the provided values.

Parameters:

Name Type Description Default
key str

Column name (string)

required
value List[float]

List of values for the column

required

Raises:

Type Description
TypeError

If key is not a string

ValueError

If the length of values doesn't match the dataframe length

Source code in resource_tracker/tiny_data_frame.py
def __setitem__(self, key: str, value: List[float]) -> None:
    """Set a column with the given key to the provided values.

    Args:
        key: Column name (string)
        value: List of values for the column

    Raises:
        TypeError: If key is not a string
        ValueError: If the length of values doesn't match the dataframe length
    """
    if not isinstance(key, str):
        raise TypeError(f"Column name must be a string, got {type(key)}")

    if len(self) > 0 and len(value) != len(self):
        raise ValueError(
            f"Length of values ({len(value)}) must match dataframe length ({len(self)})"
        )

    if key not in self.columns:
        self.columns.append(key)

    self._data[key] = list(value)

head #

head(n=5)

Return first n rows as a new TinyDataFrame.

Source code in resource_tracker/tiny_data_frame.py
def head(self, n: int = 5) -> "TinyDataFrame":
    """Return first n rows as a new TinyDataFrame."""
    return self[slice(0, n)]

tail #

tail(n=5)

Return last n rows as a new TinyDataFrame.

Source code in resource_tracker/tiny_data_frame.py
def tail(self, n: int = 5) -> "TinyDataFrame":
    """Return last n rows as a new TinyDataFrame."""
    return self[slice(-n, None)]

__repr__ #

__repr__()

Return a string representation of the data-frame.

Source code in resource_tracker/tiny_data_frame.py
def __repr__(self) -> str:
    """Return a string representation of the data-frame."""
    return f"TinyDataFrame with {len(self)} rows and {len(self.columns)} columns. First row as a dict: {self[0]}"

__str__ #

__str__()

Print the first 10 rows of the data-frame in a human-readable table.

Source code in resource_tracker/tiny_data_frame.py
def __str__(self) -> str:
    """Print the first 10 rows of the data-frame in a human-readable table."""
    header = (
        f"TinyDataFrame with {len(self)} rows and {len(self.columns)} columns:\n"
    )
    if len(self) == 0:
        return header + "Empty dataframe"

    max_rows = min(10, len(self))

    col_widths = {}
    for col in self.columns:
        col_widths[col] = len(str(col))
        for i in range(max_rows):
            col_widths[col] = max(col_widths[col], len(str(self._data[col][i])))

    rows = []
    header_row = " | ".join(str(col).ljust(col_widths[col]) for col in self.columns)
    rows.append(header_row)
    separator = "-+-".join("-" * col_widths[col] for col in self.columns)
    rows.append(separator)

    for i in range(max_rows):
        row_values = []
        for col in self.columns:
            value = str(self._data[col][i])
            # right-align numbers, left-align strings
            try:
                float(value)  # check if it's a number
                row_values.append(value.rjust(col_widths[col]))
            except ValueError:
                row_values.append(value.ljust(col_widths[col]))
        rows.append(" | ".join(row_values))

    # add ellipsis if there are more rows
    if len(self) > max_rows:
        rows.append("..." + " " * (len(rows[0]) - 3))
    return header + "\n".join(rows)

to_csv #

to_csv(csv_file_path=None, quote_strings=True)

Write the data-frame to a CSV file or return as string if no path is provided.

Parameters:

Name Type Description Default
csv_file_path Optional[str]

Path to write CSV file. If None, returns CSV as string.

None
quote_strings bool

Whether to quote strings.

True
Source code in resource_tracker/tiny_data_frame.py
def to_csv(
    self, csv_file_path: Optional[str] = None, quote_strings: bool = True
) -> str:
    """Write the data-frame to a CSV file or return as string if no path is provided.

    Args:
        csv_file_path: Path to write CSV file. If None, returns CSV as string.
        quote_strings: Whether to quote strings.
    """
    if csv_file_path:
        f = open(csv_file_path, "w", newline="")
    else:
        f = StringIO(newline="")

    try:
        writer = csv_writer(
            f, quoting=QUOTE_NONNUMERIC if quote_strings else QUOTE_MINIMAL
        )
        writer.writerow(self.columns)
        for i in range(len(self)):
            writer.writerow([self._data[col][i] for col in self.columns])

        if not csv_file_path:
            return f.getvalue()
    finally:
        f.close()

rename #

rename(columns)

Rename one or multiple columns.

Parameters:

Name Type Description Default
columns dict

Dictionary mapping old column names to new column names.

required

Returns:

Type Description
TinyDataFrame

Self for method chaining.

Raises:

Type Description
KeyError

If any old column name doesn't exist in the dataframe.

Source code in resource_tracker/tiny_data_frame.py
def rename(self, columns: dict) -> "TinyDataFrame":
    """Rename one or multiple columns.

    Args:
        columns: Dictionary mapping old column names to new column names.

    Returns:
        Self for method chaining.

    Raises:
        KeyError: If any old column name doesn't exist in the dataframe.
    """
    for old_name in columns.keys():
        if old_name not in self.columns:
            raise KeyError(f"Column '{old_name}' not found in dataframe")

    for i, col in enumerate(self.columns):
        if col in columns:
            self.columns[i] = columns[col]
    # note that order of columns might change, but self.columns matters anyway
    for old_name, new_name in columns.items():
        self._data[new_name] = self._data.pop(old_name)

    return self

PidTracker #

Track resource usage of a process and optionally its children.

This class monitors system resources like CPU times and usage, memory usage, GPU and VRAM utilization, I/O operations for a given process ID and optionally its child processes.

Data is collected every interval seconds and written to the stdout or output_file (if provided) as CSV. Currently, the following columns are tracked:

  • timestamp (float): The current timestamp.
  • pid (int): The monitored process ID.
  • children (int | None): The current number of child processes.
  • utime (int): The total user+nice mode CPU time in seconds.
  • stime (int): The total system mode CPU time in seconds.
  • cpu_usage (float): The current CPU usage between 0 and number of CPUs.
  • memory (int): The current memory usage in kB. Implementation depends on the operating system, and it is preferably PSS (Proportional Set Size) on Linux, USS (Unique Set Size) on macOS and Windows, and RSS (Resident Set Size) on Windows.
  • read_bytes (int): The total number of bytes read from disk.
  • write_bytes (int): The total number of bytes written to disk.
  • gpu_usage (float): The current GPU utilization between 0 and GPU count.
  • gpu_vram (float): The current GPU memory used in MiB.
  • gpu_utilized (int): The number of GPUs with utilization > 0.

Parameters:

Name Type Description Default
pid int

Process ID to track. Defaults to current process ID.

getpid()
interval float

Sampling interval in seconds. Defaults to 1.

1
children bool

Whether to track child processes. Defaults to True.

True
autostart bool

Whether to start tracking immediately. Defaults to True.

True
output_file str

File to write the output to. Defaults to None, print to stdout.

None

Methods:

Name Description
__call__

Dummy method to make this class callable.

diff_stats

Calculate stats since last call.

start_tracking

Start an infinite loop tracking resource usage of the process until it exits.

Source code in resource_tracker/tracker.py
class PidTracker:
    """Track resource usage of a process and optionally its children.

    This class monitors system resources like CPU times and usage, memory usage,
    GPU and VRAM utilization, I/O operations for a given process ID and
    optionally its child processes.

    Data is collected every `interval` seconds and written to the stdout or
    `output_file` (if provided) as CSV. Currently, the following columns are
    tracked:

    - timestamp (float): The current timestamp.
    - pid (int): The monitored process ID.
    - children (int | None): The current number of child processes.
    - utime (int): The total user+nice mode CPU time in seconds.
    - stime (int): The total system mode CPU time in seconds.
    - cpu_usage (float): The current CPU usage between 0 and number of CPUs.
    - memory (int): The current memory usage in kB. Implementation depends on the
      operating system, and it is preferably PSS (Proportional Set Size) on Linux,
      USS (Unique Set Size) on macOS and Windows, and RSS (Resident Set Size) on
      Windows.
    - read_bytes (int): The total number of bytes read from disk.
    - write_bytes (int): The total number of bytes written to disk.
    - gpu_usage (float): The current GPU utilization between 0 and GPU count.
    - gpu_vram (float): The current GPU memory used in MiB.
    - gpu_utilized (int): The number of GPUs with utilization > 0.

    Args:
        pid (int, optional): Process ID to track. Defaults to current process ID.
        interval (float, optional): Sampling interval in seconds. Defaults to 1.
        children (bool, optional): Whether to track child processes. Defaults to True.
        autostart (bool, optional): Whether to start tracking immediately. Defaults to True.
        output_file (str, optional): File to write the output to. Defaults to None, print to stdout.
    """

    def __init__(
        self,
        pid: int = getpid(),
        interval: float = 1,
        children: bool = True,
        autostart: bool = True,
        output_file: str = None,
    ):
        self.get_pid_stats, _ = get_tracker_implementation()

        self.pid = pid
        self.status = "running"
        self.interval = interval
        self.cycle = 0
        self.children = children
        self.start_time = time()
        self.stats = self.get_pid_stats(pid, children)
        if autostart:
            self.start_tracking(output_file)

    def __call__(self):
        """Dummy method to make this class callable."""
        pass

    def diff_stats(self):
        """Calculate stats since last call."""
        last_stats = self.stats
        self.stats = self.get_pid_stats(self.pid, self.children)
        self.cycle += 1

        return {
            "timestamp": self.stats["timestamp"],
            "pid": self.pid,
            "children": self.stats["children"],
            "utime": max(0, self.stats["utime"] - last_stats["utime"]),
            "stime": max(0, self.stats["stime"] - last_stats["stime"]),
            "cpu_usage": round(
                max(
                    0,
                    (
                        (self.stats["utime"] + self.stats["stime"])
                        - (last_stats["utime"] + last_stats["stime"])
                    )
                    / (self.stats["timestamp"] - last_stats["timestamp"]),
                ),
                4,
            ),
            "memory": self.stats["memory"],
            "read_bytes": max(0, self.stats["read_bytes"] - last_stats["read_bytes"]),
            "write_bytes": max(
                0, self.stats["write_bytes"] - last_stats["write_bytes"]
            ),
            "gpu_usage": self.stats["gpu_usage"],
            "gpu_vram": self.stats["gpu_vram"],
            "gpu_utilized": self.stats["gpu_utilized"],
        }

    def start_tracking(
        self, output_file: Optional[str] = None, print_header: bool = True
    ):
        """Start an infinite loop tracking resource usage of the process until it exits.

        A CSV line is written every `interval` seconds.

        Args:
            output_file: File to write the output to. Defaults to None, printing to stdout.
            print_header: Whether to print the header of the CSV. Defaults to True.
        """
        file_handle = open(output_file, "w") if output_file else stdout
        file_writer = csv_writer(file_handle, quoting=QUOTE_NONNUMERIC)
        try:
            while True:
                current_time = time()
                current_stats = self.diff_stats()
                if current_stats["memory"] == 0:
                    # the process has exited
                    self.status = "exited"
                    break
                if self.cycle == 1 and print_header:
                    file_writer.writerow(current_stats.keys())
                else:
                    file_writer.writerow(current_stats.values())
                if output_file:
                    file_handle.flush()
                sleep(max(0, self.interval - (time() - current_time)))
        finally:
            if output_file and not file_handle.closed:
                file_handle.close()

__call__ #

__call__()

Dummy method to make this class callable.

Source code in resource_tracker/tracker.py
def __call__(self):
    """Dummy method to make this class callable."""
    pass

diff_stats #

diff_stats()

Calculate stats since last call.

Source code in resource_tracker/tracker.py
def diff_stats(self):
    """Calculate stats since last call."""
    last_stats = self.stats
    self.stats = self.get_pid_stats(self.pid, self.children)
    self.cycle += 1

    return {
        "timestamp": self.stats["timestamp"],
        "pid": self.pid,
        "children": self.stats["children"],
        "utime": max(0, self.stats["utime"] - last_stats["utime"]),
        "stime": max(0, self.stats["stime"] - last_stats["stime"]),
        "cpu_usage": round(
            max(
                0,
                (
                    (self.stats["utime"] + self.stats["stime"])
                    - (last_stats["utime"] + last_stats["stime"])
                )
                / (self.stats["timestamp"] - last_stats["timestamp"]),
            ),
            4,
        ),
        "memory": self.stats["memory"],
        "read_bytes": max(0, self.stats["read_bytes"] - last_stats["read_bytes"]),
        "write_bytes": max(
            0, self.stats["write_bytes"] - last_stats["write_bytes"]
        ),
        "gpu_usage": self.stats["gpu_usage"],
        "gpu_vram": self.stats["gpu_vram"],
        "gpu_utilized": self.stats["gpu_utilized"],
    }

start_tracking #

start_tracking(output_file=None, print_header=True)

Start an infinite loop tracking resource usage of the process until it exits.

A CSV line is written every interval seconds.

Parameters:

Name Type Description Default
output_file Optional[str]

File to write the output to. Defaults to None, printing to stdout.

None
print_header bool

Whether to print the header of the CSV. Defaults to True.

True
Source code in resource_tracker/tracker.py
def start_tracking(
    self, output_file: Optional[str] = None, print_header: bool = True
):
    """Start an infinite loop tracking resource usage of the process until it exits.

    A CSV line is written every `interval` seconds.

    Args:
        output_file: File to write the output to. Defaults to None, printing to stdout.
        print_header: Whether to print the header of the CSV. Defaults to True.
    """
    file_handle = open(output_file, "w") if output_file else stdout
    file_writer = csv_writer(file_handle, quoting=QUOTE_NONNUMERIC)
    try:
        while True:
            current_time = time()
            current_stats = self.diff_stats()
            if current_stats["memory"] == 0:
                # the process has exited
                self.status = "exited"
                break
            if self.cycle == 1 and print_header:
                file_writer.writerow(current_stats.keys())
            else:
                file_writer.writerow(current_stats.values())
            if output_file:
                file_handle.flush()
            sleep(max(0, self.interval - (time() - current_time)))
    finally:
        if output_file and not file_handle.closed:
            file_handle.close()

ResourceTracker #

Track resource usage of processes and the system in a non-blocking way.

Start a resource_tracker.PidTracker and/or a resource_tracker.SystemTracker in the background as spawned or forked process(es), and make the collected data available easily in the main process via the pid_tracker and system_tracker properties.

Parameters:

Name Type Description Default
pid int

Process ID to track. Defaults to current process ID.

getpid()
children bool

Whether to track child processes. Defaults to True.

True
interval float

Sampling interval in seconds. Defaults to 1.

1
method Optional[str]

Multiprocessing method. Defaults to None, which tries to fork on Linux and macOS, and spawn on Windows.

None
autostart bool

Whether to start tracking immediately. Defaults to True.

True
track_processes bool

Whether to track resource usage at the process level. Defaults to True.

True
track_system bool

Whether to track system-wide resource usage. Defaults to True.

True

Methods:

Name Description
start

Start the selected resource trackers in the background as subprocess(es).

stop

Stop the previously started resource trackers' background processes.

Attributes:

Name Type Description
pid_tracker Union[TinyDataFrame, List]

Collected data from the resource_tracker.PidTracker.

system_tracker Union[TinyDataFrame, List]

Collected data from the resource_tracker.SystemTracker.

Source code in resource_tracker/tracker.py
class ResourceTracker:
    """Track resource usage of processes and the system in a non-blocking way.

    Start a [resource_tracker.PidTracker][] and/or a [resource_tracker.SystemTracker][] in the background as spawned
    or forked process(es), and make the collected data available easily in the
    main process via the `pid_tracker` and `system_tracker` properties.

    Args:
        pid: Process ID to track. Defaults to current process ID.
        children: Whether to track child processes. Defaults to True.
        interval: Sampling interval in seconds. Defaults to 1.
        method: Multiprocessing method. Defaults to None, which tries to fork on
            Linux and macOS, and spawn on Windows.
        autostart: Whether to start tracking immediately. Defaults to True.
        track_processes: Whether to track resource usage at the process level.
            Defaults to True.
        track_system: Whether to track system-wide resource usage. Defaults to True.
    """

    def __init__(
        self,
        pid: int = getpid(),
        children: bool = True,
        interval: float = 1,
        method: Optional[str] = None,
        autostart: bool = True,
        track_processes: bool = True,
        track_system: bool = True,
    ):
        self.pid = pid
        self.children = children
        self.interval = interval
        self.method = method
        self.autostart = autostart
        self.trackers = []
        if track_processes:
            self.trackers.append("pid_tracker")
        if track_system:
            self.trackers.append("system_tracker")

        if method is None:
            # try to fork when possible due to leaked semaphores on older Python versions
            # see e.g. https://github.com/python/cpython/issues/90549
            if platform in ["linux", "darwin"]:
                self.mpc = get_context("fork")
            else:
                self.mpc = get_context("spawn")
        else:
            self.mpc = get_context(method)

        # error details from subprocesses
        self.error_queue = SimpleQueue()

        # create temporary CSV file(s) for the tracker(s), and record only the file path(s)
        # to be passed later to subprocess(es) avoiding pickling the file object(s)
        for tracker_name in self.trackers:
            temp_file = NamedTemporaryFile(delete=False)
            setattr(self, f"{tracker_name}_filepath", temp_file.name)
            temp_file.close()
        # make sure to cleanup the temp file(s)
        finalize(
            self,
            cleanup_files,
            [
                getattr(self, f"{tracker_name}_filepath")
                for tracker_name in self.trackers
            ],
        )

        if autostart:
            self.start()

    def start(self):
        """Start the selected resource trackers in the background as subprocess(es)."""
        self.start_time = time()

        if "pid_tracker" in self.trackers:
            self.pid_tracker_process = self.mpc.Process(
                target=_run_tracker,
                args=("pid", self.error_queue),
                kwargs={
                    "pid": self.pid,
                    "interval": self.interval,
                    "children": self.children,
                    "output_file": self.pid_tracker_filepath,
                },
                daemon=True,
            )
            self.pid_tracker_process.start()

        if "system_tracker" in self.trackers:
            self.system_tracker_process = self.mpc.Process(
                target=_run_tracker,
                args=("system", self.error_queue),
                kwargs={
                    "interval": self.interval,
                    "output_file": self.system_tracker_filepath,
                },
                daemon=True,
            )
            self.system_tracker_process.start()

        # make sure to cleanup the started subprocess(es)
        finalize(
            self,
            cleanup_processes,
            [
                getattr(self, f"{tracker_name}_process")
                for tracker_name in self.trackers
            ],
        )

    def stop(self):
        """Stop the previously started resource trackers' background processes."""
        self.stop_time = time()
        # check for errors in the subprocesses
        if not self.error_queue.empty():
            error_data = self.error_queue.get()
            logger.warning(
                "Resource tracker subprocess failed!\n"
                f"Error type: {error_data['name']} (from module {error_data['module']})\n"
                f"Error message: {error_data['message']}\n"
                f"Original traceback:\n{error_data['traceback']}"
            )
        # terminate tracker processes
        for tracker_name in self.trackers:
            process_attr = f"{tracker_name}_process"
            if hasattr(self, process_attr):
                cleanup_processes([getattr(self, process_attr)])
        self.error_queue.close()
        logger.debug(
            "Resource tracker stopped after %s seconds, logging %d process-level and %d system-wide records",
            self.stop_time - self.start_time,
            len(self.pid_tracker),
            len(self.system_tracker),
        )

    @property
    def pid_tracker(self) -> Union[TinyDataFrame, List]:
        """Collected data from the [resource_tracker.PidTracker][].

        Returns:
            A [resource_tracker.TinyDataFrame][] object containing the collected data or an empty list if the [resource_tracker.PidTracker][] is not running.
        """
        try:
            return TinyDataFrame(
                csv_file_path=self.pid_tracker_filepath,
            )
        except Exception:
            return []

    @property
    def system_tracker(self) -> Union[TinyDataFrame, List]:
        """Collected data from the [resource_tracker.SystemTracker][].

        Returns:
            A [resource_tracker.TinyDataFrame][] object containing the collected data or an empty list if the [resource_tracker.SystemTracker][] is not running.
        """
        try:
            return TinyDataFrame(
                csv_file_path=self.system_tracker_filepath,
            )
        except Exception:
            return []

start #

start()

Start the selected resource trackers in the background as subprocess(es).

Source code in resource_tracker/tracker.py
def start(self):
    """Start the selected resource trackers in the background as subprocess(es)."""
    self.start_time = time()

    if "pid_tracker" in self.trackers:
        self.pid_tracker_process = self.mpc.Process(
            target=_run_tracker,
            args=("pid", self.error_queue),
            kwargs={
                "pid": self.pid,
                "interval": self.interval,
                "children": self.children,
                "output_file": self.pid_tracker_filepath,
            },
            daemon=True,
        )
        self.pid_tracker_process.start()

    if "system_tracker" in self.trackers:
        self.system_tracker_process = self.mpc.Process(
            target=_run_tracker,
            args=("system", self.error_queue),
            kwargs={
                "interval": self.interval,
                "output_file": self.system_tracker_filepath,
            },
            daemon=True,
        )
        self.system_tracker_process.start()

    # make sure to cleanup the started subprocess(es)
    finalize(
        self,
        cleanup_processes,
        [
            getattr(self, f"{tracker_name}_process")
            for tracker_name in self.trackers
        ],
    )

stop #

stop()

Stop the previously started resource trackers' background processes.

Source code in resource_tracker/tracker.py
def stop(self):
    """Stop the previously started resource trackers' background processes."""
    self.stop_time = time()
    # check for errors in the subprocesses
    if not self.error_queue.empty():
        error_data = self.error_queue.get()
        logger.warning(
            "Resource tracker subprocess failed!\n"
            f"Error type: {error_data['name']} (from module {error_data['module']})\n"
            f"Error message: {error_data['message']}\n"
            f"Original traceback:\n{error_data['traceback']}"
        )
    # terminate tracker processes
    for tracker_name in self.trackers:
        process_attr = f"{tracker_name}_process"
        if hasattr(self, process_attr):
            cleanup_processes([getattr(self, process_attr)])
    self.error_queue.close()
    logger.debug(
        "Resource tracker stopped after %s seconds, logging %d process-level and %d system-wide records",
        self.stop_time - self.start_time,
        len(self.pid_tracker),
        len(self.system_tracker),
    )

pid_tracker property #

pid_tracker

Collected data from the resource_tracker.PidTracker.

Returns:

Type Description
Union[TinyDataFrame, List]

A resource_tracker.TinyDataFrame object containing the collected data or an empty list if the resource_tracker.PidTracker is not running.

system_tracker property #

system_tracker

Collected data from the resource_tracker.SystemTracker.

Returns:

Type Description
Union[TinyDataFrame, List]

A resource_tracker.TinyDataFrame object containing the collected data or an empty list if the resource_tracker.SystemTracker is not running.

SystemTracker #

Track system-wide resource usage.

This class monitors system resources like CPU times and usage, memory usage, GPU and VRAM utilization, disk I/O, and network traffic for the entire system.

Data is collected every interval seconds and written to the stdout or output_file (if provided) as CSV. Currently, the following columns are tracked:

  • timestamp (float): The current timestamp.
  • processes (int): The number of running processes.
  • utime (int): The total user+nice mode CPU time in seconds.
  • stime (int): The total system mode CPU time in seconds.
  • cpu_usage (float): The current CPU usage between 0 and number of CPUs.
  • memory_free (int): The amount of free memory in kB.
  • memory_used (int): The amount of used memory in kB.
  • memory_buffers (int): The amount of memory used for buffers in kB.
  • memory_cached (int): The amount of memory used for caching in kB.
  • memory_active (int): The amount of memory used for active pages in kB.
  • memory_inactive (int): The amount of memory used for inactive pages in kB.
  • disk_read_bytes (int): The total number of bytes read from disk.
  • disk_write_bytes (int): The total number of bytes written to disk.
  • disk_space_total_gb (float): The total disk space in GB.
  • disk_space_used_gb (float): The used disk space in GB.
  • disk_space_free_gb (float): The free disk space in GB.
  • net_recv_bytes (int): The total number of bytes received over network.
  • net_sent_bytes (int): The total number of bytes sent over network.
  • gpu_usage (float): The current GPU utilization between 0 and GPU count.
  • gpu_vram (float): The current GPU memory used in MiB.
  • gpu_utilized (int): The number of GPUs with utilization > 0.

Parameters:

Name Type Description Default
interval float

Sampling interval in seconds. Defaults to 1.

1
autostart bool

Whether to start tracking immediately. Defaults to True.

True
output_file str

File to write the output to. Defaults to None, print to stdout.

None

Methods:

Name Description
__call__

Dummy method to make this class callable.

diff_stats

Calculate stats since last call.

start_tracking

Start an infinite loop tracking system resource usage.

Source code in resource_tracker/tracker.py
class SystemTracker:
    """Track system-wide resource usage.

    This class monitors system resources like CPU times and usage, memory usage,
    GPU and VRAM utilization, disk I/O, and network traffic for the entire system.

    Data is collected every `interval` seconds and written to the stdout or
    `output_file` (if provided) as CSV. Currently, the following columns are
    tracked:

    - timestamp (float): The current timestamp.
    - processes (int): The number of running processes.
    - utime (int): The total user+nice mode CPU time in seconds.
    - stime (int): The total system mode CPU time in seconds.
    - cpu_usage (float): The current CPU usage between 0 and number of CPUs.
    - memory_free (int): The amount of free memory in kB.
    - memory_used (int): The amount of used memory in kB.
    - memory_buffers (int): The amount of memory used for buffers in kB.
    - memory_cached (int): The amount of memory used for caching in kB.
    - memory_active (int): The amount of memory used for active pages in kB.
    - memory_inactive (int): The amount of memory used for inactive pages in kB.
    - disk_read_bytes (int): The total number of bytes read from disk.
    - disk_write_bytes (int): The total number of bytes written to disk.
    - disk_space_total_gb (float): The total disk space in GB.
    - disk_space_used_gb (float): The used disk space in GB.
    - disk_space_free_gb (float): The free disk space in GB.
    - net_recv_bytes (int): The total number of bytes received over network.
    - net_sent_bytes (int): The total number of bytes sent over network.
    - gpu_usage (float): The current GPU utilization between 0 and GPU count.
    - gpu_vram (float): The current GPU memory used in MiB.
    - gpu_utilized (int): The number of GPUs with utilization > 0.

    Args:
        interval: Sampling interval in seconds. Defaults to 1.
        autostart: Whether to start tracking immediately. Defaults to True.
        output_file: File to write the output to. Defaults to None, print to stdout.
    """

    def __init__(
        self,
        interval: float = 1,
        autostart: bool = True,
        output_file: str = None,
    ):
        _, self.get_system_stats = get_tracker_implementation()

        self.status = "running"
        self.interval = interval
        self.cycle = 0
        self.start_time = time()

        self.stats = self.get_system_stats()
        if autostart:
            self.start_tracking(output_file)

    def __call__(self):
        """Dummy method to make this class callable."""
        pass

    def diff_stats(self):
        """Calculate stats since last call."""
        last_stats = self.stats
        self.stats = self.get_system_stats()
        self.cycle += 1

        time_diff = self.stats["timestamp"] - last_stats["timestamp"]

        total_read_bytes = 0
        total_write_bytes = 0
        for disk_name in set(self.stats["disk_stats"]) & set(last_stats["disk_stats"]):
            read_bytes = max(
                0,
                self.stats["disk_stats"][disk_name]["read_bytes"]
                - last_stats["disk_stats"][disk_name]["read_bytes"],
            )
            write_bytes = max(
                0,
                self.stats["disk_stats"][disk_name]["write_bytes"]
                - last_stats["disk_stats"][disk_name]["write_bytes"],
            )
            total_read_bytes += read_bytes
            total_write_bytes += write_bytes

        disk_space_total = 0
        disk_space_used = 0
        disk_space_free = 0
        for disk_space in self.stats["disk_spaces"].values():
            disk_space_total += disk_space["total"]
            disk_space_used += disk_space["used"]
            disk_space_free += disk_space["free"]

        return {
            "timestamp": self.stats["timestamp"],
            "processes": self.stats["processes"],
            "utime": max(0, self.stats["utime"] - last_stats["utime"]),
            "stime": max(0, self.stats["stime"] - last_stats["stime"]),
            "cpu_usage": round(
                max(
                    0,
                    (
                        (self.stats["utime"] + self.stats["stime"])
                        - (last_stats["utime"] + last_stats["stime"])
                    )
                    / time_diff,
                ),
                4,
            ),
            "memory_free": self.stats["memory_free"],
            "memory_used": self.stats["memory_used"],
            "memory_buffers": self.stats["memory_buffers"],
            "memory_cached": self.stats["memory_cached"],
            "memory_active": self.stats["memory_active"],
            "memory_inactive": self.stats["memory_inactive"],
            "disk_read_bytes": total_read_bytes,
            "disk_write_bytes": total_write_bytes,
            "disk_space_total_gb": round(disk_space_total / (1024**3), 2),
            "disk_space_used_gb": round(disk_space_used / (1024**3), 2),
            "disk_space_free_gb": round(disk_space_free / (1024**3), 2),
            "net_recv_bytes": max(
                0, self.stats["net_recv_bytes"] - last_stats["net_recv_bytes"]
            ),
            "net_sent_bytes": max(
                0, self.stats["net_sent_bytes"] - last_stats["net_sent_bytes"]
            ),
            "gpu_usage": self.stats["gpu_usage"],
            "gpu_vram": self.stats["gpu_vram"],
            "gpu_utilized": self.stats["gpu_utilized"],
        }

    def start_tracking(
        self, output_file: Optional[str] = None, print_header: bool = True
    ):
        """Start an infinite loop tracking system resource usage.

        A CSV line is written every `interval` seconds.

        Args:
            output_file: File to write the output to. Defaults to None, printing to stdout.
            print_header: Whether to print the header of the CSV. Defaults to True.
        """
        file_handle = open(output_file, "w") if output_file else stdout
        file_writer = csv_writer(file_handle, quoting=QUOTE_NONNUMERIC)
        try:
            while True:
                current_time = time()
                current_stats = self.diff_stats()
                if self.cycle == 1 and print_header:
                    file_writer.writerow(current_stats.keys())
                else:
                    file_writer.writerow(current_stats.values())
                if output_file:
                    file_handle.flush()
                sleep(max(0, self.interval - (time() - current_time)))
        finally:
            if output_file and not file_handle.closed:
                file_handle.close()

__call__ #

__call__()

Dummy method to make this class callable.

Source code in resource_tracker/tracker.py
def __call__(self):
    """Dummy method to make this class callable."""
    pass

diff_stats #

diff_stats()

Calculate stats since last call.

Source code in resource_tracker/tracker.py
def diff_stats(self):
    """Calculate stats since last call."""
    last_stats = self.stats
    self.stats = self.get_system_stats()
    self.cycle += 1

    time_diff = self.stats["timestamp"] - last_stats["timestamp"]

    total_read_bytes = 0
    total_write_bytes = 0
    for disk_name in set(self.stats["disk_stats"]) & set(last_stats["disk_stats"]):
        read_bytes = max(
            0,
            self.stats["disk_stats"][disk_name]["read_bytes"]
            - last_stats["disk_stats"][disk_name]["read_bytes"],
        )
        write_bytes = max(
            0,
            self.stats["disk_stats"][disk_name]["write_bytes"]
            - last_stats["disk_stats"][disk_name]["write_bytes"],
        )
        total_read_bytes += read_bytes
        total_write_bytes += write_bytes

    disk_space_total = 0
    disk_space_used = 0
    disk_space_free = 0
    for disk_space in self.stats["disk_spaces"].values():
        disk_space_total += disk_space["total"]
        disk_space_used += disk_space["used"]
        disk_space_free += disk_space["free"]

    return {
        "timestamp": self.stats["timestamp"],
        "processes": self.stats["processes"],
        "utime": max(0, self.stats["utime"] - last_stats["utime"]),
        "stime": max(0, self.stats["stime"] - last_stats["stime"]),
        "cpu_usage": round(
            max(
                0,
                (
                    (self.stats["utime"] + self.stats["stime"])
                    - (last_stats["utime"] + last_stats["stime"])
                )
                / time_diff,
            ),
            4,
        ),
        "memory_free": self.stats["memory_free"],
        "memory_used": self.stats["memory_used"],
        "memory_buffers": self.stats["memory_buffers"],
        "memory_cached": self.stats["memory_cached"],
        "memory_active": self.stats["memory_active"],
        "memory_inactive": self.stats["memory_inactive"],
        "disk_read_bytes": total_read_bytes,
        "disk_write_bytes": total_write_bytes,
        "disk_space_total_gb": round(disk_space_total / (1024**3), 2),
        "disk_space_used_gb": round(disk_space_used / (1024**3), 2),
        "disk_space_free_gb": round(disk_space_free / (1024**3), 2),
        "net_recv_bytes": max(
            0, self.stats["net_recv_bytes"] - last_stats["net_recv_bytes"]
        ),
        "net_sent_bytes": max(
            0, self.stats["net_sent_bytes"] - last_stats["net_sent_bytes"]
        ),
        "gpu_usage": self.stats["gpu_usage"],
        "gpu_vram": self.stats["gpu_vram"],
        "gpu_utilized": self.stats["gpu_utilized"],
    }

start_tracking #

start_tracking(output_file=None, print_header=True)

Start an infinite loop tracking system resource usage.

A CSV line is written every interval seconds.

Parameters:

Name Type Description Default
output_file Optional[str]

File to write the output to. Defaults to None, printing to stdout.

None
print_header bool

Whether to print the header of the CSV. Defaults to True.

True
Source code in resource_tracker/tracker.py
def start_tracking(
    self, output_file: Optional[str] = None, print_header: bool = True
):
    """Start an infinite loop tracking system resource usage.

    A CSV line is written every `interval` seconds.

    Args:
        output_file: File to write the output to. Defaults to None, printing to stdout.
        print_header: Whether to print the header of the CSV. Defaults to True.
    """
    file_handle = open(output_file, "w") if output_file else stdout
    file_writer = csv_writer(file_handle, quoting=QUOTE_NONNUMERIC)
    try:
        while True:
            current_time = time()
            current_stats = self.diff_stats()
            if self.cycle == 1 and print_header:
                file_writer.writerow(current_stats.keys())
            else:
                file_writer.writerow(current_stats.values())
            if output_file:
                file_handle.flush()
            sleep(max(0, self.interval - (time() - current_time)))
    finally:
        if output_file and not file_handle.closed:
            file_handle.close()