Skip to content

tracker

resource_tracker.tracker #

Track resource usage of a process and/or the system.

To start tracker(s) in the background as spawned or forked process(es), use the resource_tracker.ResourceTracker class. Starting this will not block the main process and will allow you to access the collected data via the pid_tracker and system_tracker properties of the instance in real-time, or after stopping the resource tracker(s).

For more custom use cases, you can also use the resource_tracker.PidTracker and resource_tracker.SystemTracker classes directly logging either to the standard output or a file, and handle putting those into a background thread/process yourself.

Classes:

Name Description
PidTracker

Track resource usage of a process and optionally its children.

SystemTracker

Track system-wide resource usage.

ResourceTracker

Track resource usage of processes and the system in a non-blocking way.

PidTracker #

Track resource usage of a process and optionally its children.

This class monitors system resources like CPU times and usage, memory usage, GPU and VRAM utilization, I/O operations for a given process ID and optionally its child processes.

Data is collected every interval seconds and written to the stdout or output_file (if provided) as CSV. Currently, the following columns are tracked:

  • timestamp (float): The current timestamp.
  • pid (int): The monitored process ID.
  • children (int | None): The current number of child processes.
  • utime (int): The total user+nice mode CPU time in seconds.
  • stime (int): The total system mode CPU time in seconds.
  • cpu_usage (float): The current CPU usage between 0 and number of CPUs.
  • memory (int): The current memory usage in kB. Implementation depends on the operating system, and it is preferably PSS (Proportional Set Size) on Linux, USS (Unique Set Size) on macOS and Windows, and RSS (Resident Set Size) on Windows.
  • read_bytes (int): The total number of bytes read from disk.
  • write_bytes (int): The total number of bytes written to disk.
  • gpu_usage (float): The current GPU utilization between 0 and GPU count.
  • gpu_vram (float): The current GPU memory used in MiB.
  • gpu_utilized (int): The number of GPUs with utilization > 0.

Parameters:

Name Type Description Default
pid int

Process ID to track. Defaults to current process ID.

getpid()
interval float

Sampling interval in seconds. Defaults to 1.

1
children bool

Whether to track child processes. Defaults to True.

True
autostart bool

Whether to start tracking immediately. Defaults to True.

True
output_file str

File to write the output to. Defaults to None, print to stdout.

None

Methods:

Name Description
__call__

Dummy method to make this class callable.

diff_stats

Calculate stats since last call.

start_tracking

Start an infinite loop tracking resource usage of the process until it exits.

Source code in resource_tracker/tracker.py
class PidTracker:
    """Track resource usage of a process and optionally its children.

    This class monitors system resources like CPU times and usage, memory usage,
    GPU and VRAM utilization, I/O operations for a given process ID and
    optionally its child processes.

    Data is collected every `interval` seconds and written to the stdout or
    `output_file` (if provided) as CSV. Currently, the following columns are
    tracked:

    - timestamp (float): The current timestamp.
    - pid (int): The monitored process ID.
    - children (int | None): The current number of child processes.
    - utime (int): The total user+nice mode CPU time in seconds.
    - stime (int): The total system mode CPU time in seconds.
    - cpu_usage (float): The current CPU usage between 0 and number of CPUs.
    - memory (int): The current memory usage in kB. Implementation depends on the
      operating system, and it is preferably PSS (Proportional Set Size) on Linux,
      USS (Unique Set Size) on macOS and Windows, and RSS (Resident Set Size) on
      Windows.
    - read_bytes (int): The total number of bytes read from disk.
    - write_bytes (int): The total number of bytes written to disk.
    - gpu_usage (float): The current GPU utilization between 0 and GPU count.
    - gpu_vram (float): The current GPU memory used in MiB.
    - gpu_utilized (int): The number of GPUs with utilization > 0.

    Args:
        pid (int, optional): Process ID to track. Defaults to current process ID.
        interval (float, optional): Sampling interval in seconds. Defaults to 1.
        children (bool, optional): Whether to track child processes. Defaults to True.
        autostart (bool, optional): Whether to start tracking immediately. Defaults to True.
        output_file (str, optional): File to write the output to. Defaults to None, print to stdout.
    """

    def __init__(
        self,
        pid: int = getpid(),
        interval: float = 1,
        children: bool = True,
        autostart: bool = True,
        output_file: str = None,
    ):
        self.get_pid_stats, _ = get_tracker_implementation()

        self.pid = pid
        self.status = "running"
        self.interval = interval
        self.cycle = 0
        self.children = children
        self.start_time = time()
        self.stats = self.get_pid_stats(pid, children)
        if autostart:
            self.start_tracking(output_file)

    def __call__(self):
        """Dummy method to make this class callable."""
        pass

    def diff_stats(self):
        """Calculate stats since last call."""
        last_stats = self.stats
        self.stats = self.get_pid_stats(self.pid, self.children)
        self.cycle += 1

        return {
            "timestamp": self.stats["timestamp"],
            "pid": self.pid,
            "children": self.stats["children"],
            "utime": max(0, self.stats["utime"] - last_stats["utime"]),
            "stime": max(0, self.stats["stime"] - last_stats["stime"]),
            "cpu_usage": round(
                max(
                    0,
                    (
                        (self.stats["utime"] + self.stats["stime"])
                        - (last_stats["utime"] + last_stats["stime"])
                    )
                    / (self.stats["timestamp"] - last_stats["timestamp"]),
                ),
                4,
            ),
            "memory": self.stats["memory"],
            "read_bytes": max(0, self.stats["read_bytes"] - last_stats["read_bytes"]),
            "write_bytes": max(
                0, self.stats["write_bytes"] - last_stats["write_bytes"]
            ),
            "gpu_usage": self.stats["gpu_usage"],
            "gpu_vram": self.stats["gpu_vram"],
            "gpu_utilized": self.stats["gpu_utilized"],
        }

    def start_tracking(
        self, output_file: Optional[str] = None, print_header: bool = True
    ):
        """Start an infinite loop tracking resource usage of the process until it exits.

        A CSV line is written every `interval` seconds.

        Args:
            output_file: File to write the output to. Defaults to None, printing to stdout.
            print_header: Whether to print the header of the CSV. Defaults to True.
        """
        file_handle = open(output_file, "w") if output_file else stdout
        file_writer = csv_writer(file_handle, quoting=QUOTE_NONNUMERIC)
        try:
            while True:
                current_time = time()
                current_stats = self.diff_stats()
                if current_stats["memory"] == 0:
                    # the process has exited
                    self.status = "exited"
                    break
                if self.cycle == 1 and print_header:
                    file_writer.writerow(current_stats.keys())
                else:
                    file_writer.writerow(current_stats.values())
                if output_file:
                    file_handle.flush()
                sleep(max(0, self.interval - (time() - current_time)))
        finally:
            if output_file and not file_handle.closed:
                file_handle.close()

__call__ #

__call__()

Dummy method to make this class callable.

Source code in resource_tracker/tracker.py
def __call__(self):
    """Dummy method to make this class callable."""
    pass

diff_stats #

diff_stats()

Calculate stats since last call.

Source code in resource_tracker/tracker.py
def diff_stats(self):
    """Calculate stats since last call."""
    last_stats = self.stats
    self.stats = self.get_pid_stats(self.pid, self.children)
    self.cycle += 1

    return {
        "timestamp": self.stats["timestamp"],
        "pid": self.pid,
        "children": self.stats["children"],
        "utime": max(0, self.stats["utime"] - last_stats["utime"]),
        "stime": max(0, self.stats["stime"] - last_stats["stime"]),
        "cpu_usage": round(
            max(
                0,
                (
                    (self.stats["utime"] + self.stats["stime"])
                    - (last_stats["utime"] + last_stats["stime"])
                )
                / (self.stats["timestamp"] - last_stats["timestamp"]),
            ),
            4,
        ),
        "memory": self.stats["memory"],
        "read_bytes": max(0, self.stats["read_bytes"] - last_stats["read_bytes"]),
        "write_bytes": max(
            0, self.stats["write_bytes"] - last_stats["write_bytes"]
        ),
        "gpu_usage": self.stats["gpu_usage"],
        "gpu_vram": self.stats["gpu_vram"],
        "gpu_utilized": self.stats["gpu_utilized"],
    }

start_tracking #

start_tracking(output_file=None, print_header=True)

Start an infinite loop tracking resource usage of the process until it exits.

A CSV line is written every interval seconds.

Parameters:

Name Type Description Default
output_file Optional[str]

File to write the output to. Defaults to None, printing to stdout.

None
print_header bool

Whether to print the header of the CSV. Defaults to True.

True
Source code in resource_tracker/tracker.py
def start_tracking(
    self, output_file: Optional[str] = None, print_header: bool = True
):
    """Start an infinite loop tracking resource usage of the process until it exits.

    A CSV line is written every `interval` seconds.

    Args:
        output_file: File to write the output to. Defaults to None, printing to stdout.
        print_header: Whether to print the header of the CSV. Defaults to True.
    """
    file_handle = open(output_file, "w") if output_file else stdout
    file_writer = csv_writer(file_handle, quoting=QUOTE_NONNUMERIC)
    try:
        while True:
            current_time = time()
            current_stats = self.diff_stats()
            if current_stats["memory"] == 0:
                # the process has exited
                self.status = "exited"
                break
            if self.cycle == 1 and print_header:
                file_writer.writerow(current_stats.keys())
            else:
                file_writer.writerow(current_stats.values())
            if output_file:
                file_handle.flush()
            sleep(max(0, self.interval - (time() - current_time)))
    finally:
        if output_file and not file_handle.closed:
            file_handle.close()

SystemTracker #

Track system-wide resource usage.

This class monitors system resources like CPU times and usage, memory usage, GPU and VRAM utilization, disk I/O, and network traffic for the entire system.

Data is collected every interval seconds and written to the stdout or output_file (if provided) as CSV. Currently, the following columns are tracked:

  • timestamp (float): The current timestamp.
  • processes (int): The number of running processes.
  • utime (int): The total user+nice mode CPU time in seconds.
  • stime (int): The total system mode CPU time in seconds.
  • cpu_usage (float): The current CPU usage between 0 and number of CPUs.
  • memory_free (int): The amount of free memory in kB.
  • memory_used (int): The amount of used memory in kB.
  • memory_buffers (int): The amount of memory used for buffers in kB.
  • memory_cached (int): The amount of memory used for caching in kB.
  • memory_active (int): The amount of memory used for active pages in kB.
  • memory_inactive (int): The amount of memory used for inactive pages in kB.
  • disk_read_bytes (int): The total number of bytes read from disk.
  • disk_write_bytes (int): The total number of bytes written to disk.
  • disk_space_total_gb (float): The total disk space in GB.
  • disk_space_used_gb (float): The used disk space in GB.
  • disk_space_free_gb (float): The free disk space in GB.
  • net_recv_bytes (int): The total number of bytes received over network.
  • net_sent_bytes (int): The total number of bytes sent over network.
  • gpu_usage (float): The current GPU utilization between 0 and GPU count.
  • gpu_vram (float): The current GPU memory used in MiB.
  • gpu_utilized (int): The number of GPUs with utilization > 0.

Parameters:

Name Type Description Default
interval float

Sampling interval in seconds. Defaults to 1.

1
autostart bool

Whether to start tracking immediately. Defaults to True.

True
output_file str

File to write the output to. Defaults to None, print to stdout.

None

Methods:

Name Description
__call__

Dummy method to make this class callable.

diff_stats

Calculate stats since last call.

start_tracking

Start an infinite loop tracking system resource usage.

Source code in resource_tracker/tracker.py
class SystemTracker:
    """Track system-wide resource usage.

    This class monitors system resources like CPU times and usage, memory usage,
    GPU and VRAM utilization, disk I/O, and network traffic for the entire system.

    Data is collected every `interval` seconds and written to the stdout or
    `output_file` (if provided) as CSV. Currently, the following columns are
    tracked:

    - timestamp (float): The current timestamp.
    - processes (int): The number of running processes.
    - utime (int): The total user+nice mode CPU time in seconds.
    - stime (int): The total system mode CPU time in seconds.
    - cpu_usage (float): The current CPU usage between 0 and number of CPUs.
    - memory_free (int): The amount of free memory in kB.
    - memory_used (int): The amount of used memory in kB.
    - memory_buffers (int): The amount of memory used for buffers in kB.
    - memory_cached (int): The amount of memory used for caching in kB.
    - memory_active (int): The amount of memory used for active pages in kB.
    - memory_inactive (int): The amount of memory used for inactive pages in kB.
    - disk_read_bytes (int): The total number of bytes read from disk.
    - disk_write_bytes (int): The total number of bytes written to disk.
    - disk_space_total_gb (float): The total disk space in GB.
    - disk_space_used_gb (float): The used disk space in GB.
    - disk_space_free_gb (float): The free disk space in GB.
    - net_recv_bytes (int): The total number of bytes received over network.
    - net_sent_bytes (int): The total number of bytes sent over network.
    - gpu_usage (float): The current GPU utilization between 0 and GPU count.
    - gpu_vram (float): The current GPU memory used in MiB.
    - gpu_utilized (int): The number of GPUs with utilization > 0.

    Args:
        interval: Sampling interval in seconds. Defaults to 1.
        autostart: Whether to start tracking immediately. Defaults to True.
        output_file: File to write the output to. Defaults to None, print to stdout.
    """

    def __init__(
        self,
        interval: float = 1,
        autostart: bool = True,
        output_file: str = None,
    ):
        _, self.get_system_stats = get_tracker_implementation()

        self.status = "running"
        self.interval = interval
        self.cycle = 0
        self.start_time = time()

        self.stats = self.get_system_stats()
        if autostart:
            self.start_tracking(output_file)

    def __call__(self):
        """Dummy method to make this class callable."""
        pass

    def diff_stats(self):
        """Calculate stats since last call."""
        last_stats = self.stats
        self.stats = self.get_system_stats()
        self.cycle += 1

        time_diff = self.stats["timestamp"] - last_stats["timestamp"]

        total_read_bytes = 0
        total_write_bytes = 0
        for disk_name in set(self.stats["disk_stats"]) & set(last_stats["disk_stats"]):
            read_bytes = max(
                0,
                self.stats["disk_stats"][disk_name]["read_bytes"]
                - last_stats["disk_stats"][disk_name]["read_bytes"],
            )
            write_bytes = max(
                0,
                self.stats["disk_stats"][disk_name]["write_bytes"]
                - last_stats["disk_stats"][disk_name]["write_bytes"],
            )
            total_read_bytes += read_bytes
            total_write_bytes += write_bytes

        disk_space_total = 0
        disk_space_used = 0
        disk_space_free = 0
        for disk_space in self.stats["disk_spaces"].values():
            disk_space_total += disk_space["total"]
            disk_space_used += disk_space["used"]
            disk_space_free += disk_space["free"]

        return {
            "timestamp": self.stats["timestamp"],
            "processes": self.stats["processes"],
            "utime": max(0, self.stats["utime"] - last_stats["utime"]),
            "stime": max(0, self.stats["stime"] - last_stats["stime"]),
            "cpu_usage": round(
                max(
                    0,
                    (
                        (self.stats["utime"] + self.stats["stime"])
                        - (last_stats["utime"] + last_stats["stime"])
                    )
                    / time_diff,
                ),
                4,
            ),
            "memory_free": self.stats["memory_free"],
            "memory_used": self.stats["memory_used"],
            "memory_buffers": self.stats["memory_buffers"],
            "memory_cached": self.stats["memory_cached"],
            "memory_active": self.stats["memory_active"],
            "memory_inactive": self.stats["memory_inactive"],
            "disk_read_bytes": total_read_bytes,
            "disk_write_bytes": total_write_bytes,
            "disk_space_total_gb": round(disk_space_total / (1024**3), 2),
            "disk_space_used_gb": round(disk_space_used / (1024**3), 2),
            "disk_space_free_gb": round(disk_space_free / (1024**3), 2),
            "net_recv_bytes": max(
                0, self.stats["net_recv_bytes"] - last_stats["net_recv_bytes"]
            ),
            "net_sent_bytes": max(
                0, self.stats["net_sent_bytes"] - last_stats["net_sent_bytes"]
            ),
            "gpu_usage": self.stats["gpu_usage"],
            "gpu_vram": self.stats["gpu_vram"],
            "gpu_utilized": self.stats["gpu_utilized"],
        }

    def start_tracking(
        self, output_file: Optional[str] = None, print_header: bool = True
    ):
        """Start an infinite loop tracking system resource usage.

        A CSV line is written every `interval` seconds.

        Args:
            output_file: File to write the output to. Defaults to None, printing to stdout.
            print_header: Whether to print the header of the CSV. Defaults to True.
        """
        file_handle = open(output_file, "w") if output_file else stdout
        file_writer = csv_writer(file_handle, quoting=QUOTE_NONNUMERIC)
        try:
            while True:
                current_time = time()
                current_stats = self.diff_stats()
                if self.cycle == 1 and print_header:
                    file_writer.writerow(current_stats.keys())
                else:
                    file_writer.writerow(current_stats.values())
                if output_file:
                    file_handle.flush()
                sleep(max(0, self.interval - (time() - current_time)))
        finally:
            if output_file and not file_handle.closed:
                file_handle.close()

__call__ #

__call__()

Dummy method to make this class callable.

Source code in resource_tracker/tracker.py
def __call__(self):
    """Dummy method to make this class callable."""
    pass

diff_stats #

diff_stats()

Calculate stats since last call.

Source code in resource_tracker/tracker.py
def diff_stats(self):
    """Calculate stats since last call."""
    last_stats = self.stats
    self.stats = self.get_system_stats()
    self.cycle += 1

    time_diff = self.stats["timestamp"] - last_stats["timestamp"]

    total_read_bytes = 0
    total_write_bytes = 0
    for disk_name in set(self.stats["disk_stats"]) & set(last_stats["disk_stats"]):
        read_bytes = max(
            0,
            self.stats["disk_stats"][disk_name]["read_bytes"]
            - last_stats["disk_stats"][disk_name]["read_bytes"],
        )
        write_bytes = max(
            0,
            self.stats["disk_stats"][disk_name]["write_bytes"]
            - last_stats["disk_stats"][disk_name]["write_bytes"],
        )
        total_read_bytes += read_bytes
        total_write_bytes += write_bytes

    disk_space_total = 0
    disk_space_used = 0
    disk_space_free = 0
    for disk_space in self.stats["disk_spaces"].values():
        disk_space_total += disk_space["total"]
        disk_space_used += disk_space["used"]
        disk_space_free += disk_space["free"]

    return {
        "timestamp": self.stats["timestamp"],
        "processes": self.stats["processes"],
        "utime": max(0, self.stats["utime"] - last_stats["utime"]),
        "stime": max(0, self.stats["stime"] - last_stats["stime"]),
        "cpu_usage": round(
            max(
                0,
                (
                    (self.stats["utime"] + self.stats["stime"])
                    - (last_stats["utime"] + last_stats["stime"])
                )
                / time_diff,
            ),
            4,
        ),
        "memory_free": self.stats["memory_free"],
        "memory_used": self.stats["memory_used"],
        "memory_buffers": self.stats["memory_buffers"],
        "memory_cached": self.stats["memory_cached"],
        "memory_active": self.stats["memory_active"],
        "memory_inactive": self.stats["memory_inactive"],
        "disk_read_bytes": total_read_bytes,
        "disk_write_bytes": total_write_bytes,
        "disk_space_total_gb": round(disk_space_total / (1024**3), 2),
        "disk_space_used_gb": round(disk_space_used / (1024**3), 2),
        "disk_space_free_gb": round(disk_space_free / (1024**3), 2),
        "net_recv_bytes": max(
            0, self.stats["net_recv_bytes"] - last_stats["net_recv_bytes"]
        ),
        "net_sent_bytes": max(
            0, self.stats["net_sent_bytes"] - last_stats["net_sent_bytes"]
        ),
        "gpu_usage": self.stats["gpu_usage"],
        "gpu_vram": self.stats["gpu_vram"],
        "gpu_utilized": self.stats["gpu_utilized"],
    }

start_tracking #

start_tracking(output_file=None, print_header=True)

Start an infinite loop tracking system resource usage.

A CSV line is written every interval seconds.

Parameters:

Name Type Description Default
output_file Optional[str]

File to write the output to. Defaults to None, printing to stdout.

None
print_header bool

Whether to print the header of the CSV. Defaults to True.

True
Source code in resource_tracker/tracker.py
def start_tracking(
    self, output_file: Optional[str] = None, print_header: bool = True
):
    """Start an infinite loop tracking system resource usage.

    A CSV line is written every `interval` seconds.

    Args:
        output_file: File to write the output to. Defaults to None, printing to stdout.
        print_header: Whether to print the header of the CSV. Defaults to True.
    """
    file_handle = open(output_file, "w") if output_file else stdout
    file_writer = csv_writer(file_handle, quoting=QUOTE_NONNUMERIC)
    try:
        while True:
            current_time = time()
            current_stats = self.diff_stats()
            if self.cycle == 1 and print_header:
                file_writer.writerow(current_stats.keys())
            else:
                file_writer.writerow(current_stats.values())
            if output_file:
                file_handle.flush()
            sleep(max(0, self.interval - (time() - current_time)))
    finally:
        if output_file and not file_handle.closed:
            file_handle.close()

ResourceTracker #

Track resource usage of processes and the system in a non-blocking way.

Start a resource_tracker.PidTracker and/or a resource_tracker.SystemTracker in the background as spawned or forked process(es), and make the collected data available easily in the main process via the pid_tracker and system_tracker properties.

Parameters:

Name Type Description Default
pid int

Process ID to track. Defaults to current process ID.

getpid()
children bool

Whether to track child processes. Defaults to True.

True
interval float

Sampling interval in seconds. Defaults to 1.

1
method Optional[str]

Multiprocessing method. Defaults to None, which tries to fork on Linux and macOS, and spawn on Windows.

None
autostart bool

Whether to start tracking immediately. Defaults to True.

True
track_processes bool

Whether to track resource usage at the process level. Defaults to True.

True
track_system bool

Whether to track system-wide resource usage. Defaults to True.

True

Methods:

Name Description
start

Start the selected resource trackers in the background as subprocess(es).

stop

Stop the previously started resource trackers' background processes.

Attributes:

Name Type Description
pid_tracker Union[TinyDataFrame, List]

Collected data from the resource_tracker.PidTracker.

system_tracker Union[TinyDataFrame, List]

Collected data from the resource_tracker.SystemTracker.

Source code in resource_tracker/tracker.py
class ResourceTracker:
    """Track resource usage of processes and the system in a non-blocking way.

    Start a [resource_tracker.PidTracker][] and/or a [resource_tracker.SystemTracker][] in the background as spawned
    or forked process(es), and make the collected data available easily in the
    main process via the `pid_tracker` and `system_tracker` properties.

    Args:
        pid: Process ID to track. Defaults to current process ID.
        children: Whether to track child processes. Defaults to True.
        interval: Sampling interval in seconds. Defaults to 1.
        method: Multiprocessing method. Defaults to None, which tries to fork on
            Linux and macOS, and spawn on Windows.
        autostart: Whether to start tracking immediately. Defaults to True.
        track_processes: Whether to track resource usage at the process level.
            Defaults to True.
        track_system: Whether to track system-wide resource usage. Defaults to True.
    """

    def __init__(
        self,
        pid: int = getpid(),
        children: bool = True,
        interval: float = 1,
        method: Optional[str] = None,
        autostart: bool = True,
        track_processes: bool = True,
        track_system: bool = True,
    ):
        self.pid = pid
        self.children = children
        self.interval = interval
        self.method = method
        self.autostart = autostart
        self.trackers = []
        if track_processes:
            self.trackers.append("pid_tracker")
        if track_system:
            self.trackers.append("system_tracker")

        if method is None:
            # try to fork when possible due to leaked semaphores on older Python versions
            # see e.g. https://github.com/python/cpython/issues/90549
            if platform in ["linux", "darwin"]:
                self.mpc = get_context("fork")
            else:
                self.mpc = get_context("spawn")
        else:
            self.mpc = get_context(method)

        # error details from subprocesses
        self.error_queue = SimpleQueue()

        # create temporary CSV file(s) for the tracker(s), and record only the file path(s)
        # to be passed later to subprocess(es) avoiding pickling the file object(s)
        for tracker_name in self.trackers:
            temp_file = NamedTemporaryFile(delete=False)
            setattr(self, f"{tracker_name}_filepath", temp_file.name)
            temp_file.close()
        # make sure to cleanup the temp file(s)
        finalize(
            self,
            cleanup_files,
            [
                getattr(self, f"{tracker_name}_filepath")
                for tracker_name in self.trackers
            ],
        )

        if autostart:
            self.start()

    def start(self):
        """Start the selected resource trackers in the background as subprocess(es)."""
        self.start_time = time()

        if "pid_tracker" in self.trackers:
            self.pid_tracker_process = self.mpc.Process(
                target=_run_tracker,
                args=("pid", self.error_queue),
                kwargs={
                    "pid": self.pid,
                    "interval": self.interval,
                    "children": self.children,
                    "output_file": self.pid_tracker_filepath,
                },
                daemon=True,
            )
            self.pid_tracker_process.start()

        if "system_tracker" in self.trackers:
            self.system_tracker_process = self.mpc.Process(
                target=_run_tracker,
                args=("system", self.error_queue),
                kwargs={
                    "interval": self.interval,
                    "output_file": self.system_tracker_filepath,
                },
                daemon=True,
            )
            self.system_tracker_process.start()

        # make sure to cleanup the started subprocess(es)
        finalize(
            self,
            cleanup_processes,
            [
                getattr(self, f"{tracker_name}_process")
                for tracker_name in self.trackers
            ],
        )

    def stop(self):
        """Stop the previously started resource trackers' background processes."""
        self.stop_time = time()
        # check for errors in the subprocesses
        if not self.error_queue.empty():
            error_data = self.error_queue.get()
            logger.warning(
                "Resource tracker subprocess failed!\n"
                f"Error type: {error_data['name']} (from module {error_data['module']})\n"
                f"Error message: {error_data['message']}\n"
                f"Original traceback:\n{error_data['traceback']}"
            )
        # terminate tracker processes
        for tracker_name in self.trackers:
            process_attr = f"{tracker_name}_process"
            if hasattr(self, process_attr):
                cleanup_processes([getattr(self, process_attr)])
        self.error_queue.close()
        logger.debug(
            "Resource tracker stopped after %s seconds, logging %d process-level and %d system-wide records",
            self.stop_time - self.start_time,
            len(self.pid_tracker),
            len(self.system_tracker),
        )

    @property
    def pid_tracker(self) -> Union[TinyDataFrame, List]:
        """Collected data from the [resource_tracker.PidTracker][].

        Returns:
            A [resource_tracker.TinyDataFrame][] object containing the collected data or an empty list if the [resource_tracker.PidTracker][] is not running.
        """
        try:
            return TinyDataFrame(
                csv_file_path=self.pid_tracker_filepath,
            )
        except Exception:
            return []

    @property
    def system_tracker(self) -> Union[TinyDataFrame, List]:
        """Collected data from the [resource_tracker.SystemTracker][].

        Returns:
            A [resource_tracker.TinyDataFrame][] object containing the collected data or an empty list if the [resource_tracker.SystemTracker][] is not running.
        """
        try:
            return TinyDataFrame(
                csv_file_path=self.system_tracker_filepath,
            )
        except Exception:
            return []

start #

start()

Start the selected resource trackers in the background as subprocess(es).

Source code in resource_tracker/tracker.py
def start(self):
    """Start the selected resource trackers in the background as subprocess(es)."""
    self.start_time = time()

    if "pid_tracker" in self.trackers:
        self.pid_tracker_process = self.mpc.Process(
            target=_run_tracker,
            args=("pid", self.error_queue),
            kwargs={
                "pid": self.pid,
                "interval": self.interval,
                "children": self.children,
                "output_file": self.pid_tracker_filepath,
            },
            daemon=True,
        )
        self.pid_tracker_process.start()

    if "system_tracker" in self.trackers:
        self.system_tracker_process = self.mpc.Process(
            target=_run_tracker,
            args=("system", self.error_queue),
            kwargs={
                "interval": self.interval,
                "output_file": self.system_tracker_filepath,
            },
            daemon=True,
        )
        self.system_tracker_process.start()

    # make sure to cleanup the started subprocess(es)
    finalize(
        self,
        cleanup_processes,
        [
            getattr(self, f"{tracker_name}_process")
            for tracker_name in self.trackers
        ],
    )

stop #

stop()

Stop the previously started resource trackers' background processes.

Source code in resource_tracker/tracker.py
def stop(self):
    """Stop the previously started resource trackers' background processes."""
    self.stop_time = time()
    # check for errors in the subprocesses
    if not self.error_queue.empty():
        error_data = self.error_queue.get()
        logger.warning(
            "Resource tracker subprocess failed!\n"
            f"Error type: {error_data['name']} (from module {error_data['module']})\n"
            f"Error message: {error_data['message']}\n"
            f"Original traceback:\n{error_data['traceback']}"
        )
    # terminate tracker processes
    for tracker_name in self.trackers:
        process_attr = f"{tracker_name}_process"
        if hasattr(self, process_attr):
            cleanup_processes([getattr(self, process_attr)])
    self.error_queue.close()
    logger.debug(
        "Resource tracker stopped after %s seconds, logging %d process-level and %d system-wide records",
        self.stop_time - self.start_time,
        len(self.pid_tracker),
        len(self.system_tracker),
    )

pid_tracker property #

pid_tracker

Collected data from the resource_tracker.PidTracker.

Returns:

Type Description
Union[TinyDataFrame, List]

A resource_tracker.TinyDataFrame object containing the collected data or an empty list if the resource_tracker.PidTracker is not running.

system_tracker property #

system_tracker

Collected data from the resource_tracker.SystemTracker.

Returns:

Type Description
Union[TinyDataFrame, List]

A resource_tracker.TinyDataFrame object containing the collected data or an empty list if the resource_tracker.SystemTracker is not running.