Skip to content

helpers

resource_tracker.helpers #

General helpers.

Functions:

Name Description
is_partition

Determine if a disk name represents a partition rather than a whole disk.

is_psutil_available

Check if psutil is installed and available for import.

is_procfs_available

Check if procfs is available on the system.

get_tracker_implementation

Determine which tracker implementation to use based on available system resources.

get_zfs_pools_space

Get the space of ZFS pools.

cleanup_files

Cleanup files.

cleanup_processes

Gracefully, then if needed forcefully terminate and close processes.

aggregate_stats

Aggregate statistics from multiple sources.

render_csv_row

Format a single CSV row as a string in memory.

is_partition cached #

is_partition(disk_name)

Determine if a disk name represents a partition rather than a whole disk.

Parameters:

Name Type Description Default
disk_name str

Name of the disk device (e.g., 'sda1', 'nvme0n1p1')

required

Returns:s True if the device is likely a partition, False otherwise

Source code in resource_tracker/helpers.py
@cache
def is_partition(disk_name: str) -> bool:
    """
    Determine if a disk name represents a partition rather than a whole disk.

    Args:
        disk_name: Name of the disk device (e.g., 'sda1', 'nvme0n1p1')

    Returns:s
        True if the device is likely a partition, False otherwise
    """
    # common partition name patterns: sdXN, nvmeXnYpZ, mmcblkXpY
    if search(r"(sd[a-z]+|nvme\d+n\d+|mmcblk\d+)p?\d+$", disk_name):
        # check if there's a parent device in /sys/block/
        parent_devices = [d.split("/")[-2] for d in glob("/sys/block/*/")]
        if any(
            disk_name.startswith(parent) and disk_name != parent
            for parent in parent_devices
        ):
            return True
    return False

is_psutil_available cached #

is_psutil_available()

Check if psutil is installed and available for import.

Returns:

Name Type Description
bool bool

True if psutil is available, False otherwise

Source code in resource_tracker/helpers.py
@cache
def is_psutil_available() -> bool:
    """
    Check if psutil is installed and available for import.

    Returns:
        bool: True if psutil is available, False otherwise
    """
    try:
        return find_spec("psutil") is not None
    except ImportError:
        return False

is_procfs_available cached #

is_procfs_available()

Check if procfs is available on the system.

Returns:

Name Type Description
bool bool

True if procfs is available, False otherwise

Source code in resource_tracker/helpers.py
@cache
def is_procfs_available() -> bool:
    """
    Check if procfs is available on the system.

    Returns:
        bool: True if procfs is available, False otherwise
    """
    return os.path.isdir("/proc") and os.access("/proc", os.R_OK)

get_tracker_implementation cached #

get_tracker_implementation()

Determine which tracker implementation to use based on available system resources.

Returns:

Name Type Description
tuple tuple[Callable, Callable]

A tuple containing (get_process_stats, get_system_stats) functions from the appropriate implementation module.

Raises:

Type Description
ImportError

If no suitable implementation is available.

Source code in resource_tracker/helpers.py
@cache
def get_tracker_implementation() -> tuple[Callable, Callable]:
    """
    Determine which tracker implementation to use based on available system resources.

    Returns:
        tuple: A tuple containing (get_process_stats, get_system_stats) functions from the appropriate implementation module.

    Raises:
        ImportError: If no suitable implementation is available.
    """
    if is_psutil_available():
        from .tracker_psutil import get_process_stats, get_system_stats
    elif is_procfs_available():
        from .tracker_procfs import get_process_stats, get_system_stats
    else:
        raise ImportError(
            "No tracker implementation available - install psutil or use a Linux system with procfs."
        )
    return get_process_stats, get_system_stats

get_zfs_pools_space #

get_zfs_pools_space()

Get the space of ZFS pools.

Source code in resource_tracker/helpers.py
def get_zfs_pools_space() -> Dict[str, Dict[str, int]]:
    """
    Get the space of ZFS pools.
    """
    disks = {}
    with suppress(FileNotFoundError, OSError):
        zpool_process = Popen(
            ["zpool", "list", "-Hp", "-o", "name,size,allocated,free"],
            stdout=PIPE,
        )
        try:
            stdout, _ = zpool_process.communicate(timeout=0.25)
            if zpool_process.returncode == 0:
                for line in stdout.splitlines():
                    parts = line.decode().split("\t")
                    if len(parts) >= 4:
                        disks[f"zfs:{parts[0]}"] = {
                            "total": int(parts[1]),
                            "used": int(parts[2]),
                            "free": int(parts[3]),
                        }
        except TimeoutExpired:
            zpool_process.kill()
        except Exception:
            pass
    return disks

cleanup_files #

cleanup_files(files)

Cleanup files.

Parameters:

Name Type Description Default
files List[str]

List of file paths to cleanup.

required
Source code in resource_tracker/helpers.py
def cleanup_files(files: List[str]):
    """Cleanup files.

    Args:
        files: List of file paths to cleanup.
    """
    for f in files:
        with suppress(Exception):
            unlink(f)

cleanup_processes #

cleanup_processes(processes)

Gracefully, then if needed forcefully terminate and close processes.

Parameters:

Name Type Description Default
processes List[Process]

List of multiprocessing.Process objects to cleanup.

required
Source code in resource_tracker/helpers.py
def cleanup_processes(processes: List[Process]):
    """Gracefully, then if needed forcefully terminate and close processes.

    Args:
        processes: List of `multiprocessing.Process` objects to cleanup.
    """
    for process in processes:
        with suppress(Exception):
            if process.is_alive():
                process.terminate()
                process.join(timeout=1.0)
        with suppress(Exception):
            if process.is_alive():
                process.kill()
                process.join(timeout=1.0)
        with suppress(Exception):
            process.close()

aggregate_stats #

aggregate_stats(stats)

Aggregate statistics from multiple sources.

This function combines statistics from multiple runs or sources generated by resource_tracker.ResourceTracker.stats, handling different aggregation types appropriately:

  • For mean and duration values: computes the average value across all sources.
  • For max and sum values: takes the maximum value across all sources.

Parameters:

Name Type Description Default
stats List[Dict[str, Dict[str, Any]]]

A list of dictionaries containing statistics. Each dictionary should have column names as keys and dictionaries of aggregation types as values.

required

Returns:

Type Description
Dict[str, Dict[str, Any]]

A dictionary with the same structure as the input dictionaries, but with

Dict[str, Dict[str, Any]]

aggregated values.

Example:

>>> stats1 = {'cpu_usage': {'mean': 1.5, 'max': 2.0}, 'memory': {'mean': 100, 'max': 150}}
>>> stats2 = {'cpu_usage': {'mean': 2.5, 'max': 3.0}, 'memory': {'mean': 200, 'max': 250}}
>>> aggregate_stats([stats1, stats2])
{'cpu_usage': {'max': 3.0, 'mean': 2.0}, 'memory': {'max': 250, 'mean': 150.0}}
Source code in resource_tracker/helpers.py
def aggregate_stats(
    stats: List[Dict[str, Dict[str, Any]]],
) -> Dict[str, Dict[str, Any]]:
    """Aggregate statistics from multiple sources.

    This function combines statistics from multiple runs or sources generated by
    [resource_tracker.ResourceTracker.stats][], handling different aggregation
    types appropriately:

    - For `mean` and `duration` values: computes the average value across all sources.
    - For `max` and `sum` values: takes the maximum value across all sources.

    Args:
        stats: A list of dictionaries containing statistics. Each dictionary should
               have column names as keys and dictionaries of aggregation types as values.

    Returns:
        A dictionary with the same structure as the input dictionaries, but with
        aggregated values.

    Example:

        >>> stats1 = {'cpu_usage': {'mean': 1.5, 'max': 2.0}, 'memory': {'mean': 100, 'max': 150}}
        >>> stats2 = {'cpu_usage': {'mean': 2.5, 'max': 3.0}, 'memory': {'mean': 200, 'max': 250}}
        >>> aggregate_stats([stats1, stats2])
        {'cpu_usage': {'max': 3.0, 'mean': 2.0}, 'memory': {'max': 250, 'mean': 150.0}}
    """
    if not stats:
        return {}

    result = {}
    for stats_dict in stats:
        for col, values in stats_dict.items():
            if col not in result:
                result[col] = {}
            for agg_type, value in values.items():
                # collect values in a temp list to aggregate later
                if agg_type in ["mean", "duration"]:
                    if f"{agg_type}_values" not in result[col]:
                        result[col][f"{agg_type}_values"] = []
                    result[col][f"{agg_type}_values"].append(value)
                # keep the largest value
                elif agg_type in ["max", "sum"]:
                    if agg_type not in result[col] or value > result[col][agg_type]:
                        result[col][agg_type] = value
                else:
                    raise ValueError(f"Unsupported aggregation type: {agg_type}")

    # compute averages for mean and duration values
    for col, values in result.items():
        for agg_type in ["mean", "duration"]:
            temp_key = f"{agg_type}_values"
            if temp_key in values:
                values[agg_type] = sum(values[temp_key]) / len(values[temp_key])
                del values[temp_key]

    return result

render_csv_row #

render_csv_row(row, quoting=QUOTE_NONNUMERIC)

Format a single CSV row as a string in memory.

Parameters:

Name Type Description Default
row Iterable[Union[str, float, int, None]]

A list or iterable of values to write (strings, numbers, or None) with csv.writer.

required
quoting int

Quoting strategy for the CSV writer. Defaults to QUOTE_NONNUMERIC.

QUOTE_NONNUMERIC

Returns:

Type Description
bytes

A bytes object representing the full CSV-formatted row (including newline).

Source code in resource_tracker/helpers.py
def render_csv_row(
    row: Iterable[Union[str, float, int, None]], quoting: int = QUOTE_NONNUMERIC
) -> bytes:
    """
    Format a single CSV row as a string in memory.

    Args:
        row: A list or iterable of values to write (strings, numbers, or None) with [csv.writer][].
        quoting: Quoting strategy for the CSV writer. Defaults to QUOTE_NONNUMERIC.

    Returns:
        A bytes object representing the full CSV-formatted row (including newline).
    """
    buffer = StringIO(newline="")
    writer = csv_writer(buffer, quoting=quoting, lineterminator="\n")
    writer.writerow(row)
    return buffer.getvalue().encode("utf-8")