Skip to content

resource_tracker.sentinel_api #

HTTP client for the Spare Cores Sentinel metrics ingestion API.

Classes:

Name Description
SentinelAPIError

Raised when the Sentinel API returns a non-2xx response.

Functions:

Name Description
register_run

Register the start of a new Run with the Sentinel API.

refresh_credentials

Refresh the temporary upload credentials for an existing run.

finish_run

Signal that a run has finished and submit final data.

SentinelAPIError #

Bases: Exception

Raised when the Sentinel API returns a non-2xx response.

Attributes:

Name Type Description
status_code

The HTTP status code.

body

The decoded response body (best-effort).

Source code in resource_tracker/sentinel_api.py
class SentinelAPIError(Exception):
    """Raised when the Sentinel API returns a non-2xx response.

    Attributes:
        status_code: The HTTP status code.
        body: The decoded response body (best-effort).
    """

    def __init__(self, status_code: int, body: str):
        self.status_code = status_code
        self.body = body
        super().__init__(f"Sentinel API error {status_code}: {body}")

register_run #

register_run(token, metadata=None, *, host_info=None, cloud_info=None)

Register the start of a new Run with the Sentinel API.

Parameters:

Name Type Description Default
token str

Bearer token for authentication.

required
metadata Optional[Dict[str, Any]]

Optional run metadata. Recognised keys include project_name, job_name, stage_name, task_name, external_run_id, pid, container_image, command (a List[str] or a plain shell str; both are serialized to a JSON-array string on the wire, e.g. '["python", "train.py", "--epochs", "10"]'), env, language, orchestrator, executor, team, and tags (an arbitrary key-value dict).

None
host_info Optional[Dict[str, Any]]

Optional dict of host_* fields (e.g. host_vcpus, host_memory_mib, host_gpu_model, etc.).

None
cloud_info Optional[Dict[str, Any]]

Optional dict of cloud_* fields (e.g. cloud_vendor_id, cloud_region_id, cloud_instance_type, etc.).

None

Returns:

Type Description
dict

A dict containing at least:

dict
  • run_id (str): Unique identifier for this run.
dict
  • upload_uri_prefix (str): S3 URI prefix for uploading metric files.
dict
  • upload_credentials (dict): Temporary AWS STS credentials with keys access_key, secret_key, session_token, expiration, and region.

Raises:

Type Description
SentinelAPIError

On non-2xx responses.

Source code in resource_tracker/sentinel_api.py
def register_run(
    token: str,
    metadata: Optional[Dict[str, Any]] = None,
    *,
    host_info: Optional[Dict[str, Any]] = None,
    cloud_info: Optional[Dict[str, Any]] = None,
) -> dict:
    """Register the start of a new Run with the Sentinel API.

    Args:
        token: Bearer token for authentication.
        metadata: Optional run metadata. Recognised keys include
            ``project_name``, ``job_name``, ``stage_name``, ``task_name``,
            ``external_run_id``, ``pid``, ``container_image``,
            ``command`` (a ``List[str]`` or a plain shell ``str``; both are
            serialized to a JSON-array string on the wire, e.g.
            ``'["python", "train.py", "--epochs", "10"]'``),
            ``env``, ``language``, ``orchestrator``, ``executor``, ``team``,
            and ``tags`` (an arbitrary key-value dict).
        host_info: Optional dict of ``host_*`` fields (e.g. ``host_vcpus``,
            ``host_memory_mib``, ``host_gpu_model``, etc.).
        cloud_info: Optional dict of ``cloud_*`` fields (e.g.
            ``cloud_vendor_id``, ``cloud_region_id``,
            ``cloud_instance_type``, etc.).

    Returns:
        A dict containing at least:

        - ``run_id`` (str): Unique identifier for this run.
        - ``upload_uri_prefix`` (str): S3 URI prefix for uploading metric files.
        - ``upload_credentials`` (dict): Temporary AWS STS credentials with keys
          ``access_key``, ``secret_key``, ``session_token``, ``expiration``,
          and ``region``.

    Raises:
        SentinelAPIError: On non-2xx responses.
    """
    payload = {k: v for k, v in (metadata or {}).items() if v is not None}
    if host_info:
        payload.update({k: v for k, v in host_info.items() if v is not None})
    if cloud_info:
        payload.update({k: v for k, v in cloud_info.items() if v is not None})
    if "command" in payload:
        cmd = payload["command"]
        if isinstance(cmd, str):
            cmd = shlex_split(cmd)
        payload["command"] = json_dumps(cmd)
    logger.info("Registering run with Sentinel API")
    return _request("POST", "/runs", token=token, payload=payload)

refresh_credentials #

refresh_credentials(token, run_id)

Refresh the temporary upload credentials for an existing run.

Parameters:

Name Type Description Default
token str

Bearer token for authentication.

required
run_id str

The run identifier returned by :func:register_run.

required

Returns:

Type Description
dict

A dict with refreshed upload_credentials (same structure as in

dict

func:register_run).

Raises:

Type Description
SentinelAPIError

On non-2xx responses.

Source code in resource_tracker/sentinel_api.py
def refresh_credentials(
    token: str,
    run_id: str,
) -> dict:
    """Refresh the temporary upload credentials for an existing run.

    Args:
        token: Bearer token for authentication.
        run_id: The run identifier returned by :func:`register_run`.

    Returns:
        A dict with refreshed ``upload_credentials`` (same structure as in
        :func:`register_run`).

    Raises:
        SentinelAPIError: On non-2xx responses.
    """
    logger.info("Refreshing credentials for run %s", run_id)
    return _request(
        "POST",
        f"/runs/{run_id}/refresh-credentials",
        token=token,
    )

finish_run #

finish_run(token, run_id, *, exit_code=0, run_status=finished, data_source=s3, data_uris=None, data_csv=None)

Signal that a run has finished and submit final data.

Parameters:

Name Type Description Default
token str

Bearer token for authentication.

required
run_id str

The run identifier returned by :func:register_run.

required
exit_code int

The exit code of the monitored process.

0
run_status RunStatus

Run outcome, either "finished" or "failed".

finished
data_source DataSource

Either "s3" (uploaded CSV objects) or "inline" (plain CSV string sent inline; the request body is gzip-compressed by :func:_request so no pre-encoding is needed).

s3
data_uris Optional[List[str]]

List of S3 URIs of uploaded gzipped CSV files. Required when data_source="s3".

None
data_csv Optional[str]

Plain CSV string to submit inline. Required when data_source="inline".

None

Returns:

Type Description
dict

A dict with backend-computed statistics for the run.

Raises:

Type Description
SentinelAPIError

On non-2xx responses.

Source code in resource_tracker/sentinel_api.py
def finish_run(
    token: str,
    run_id: str,
    *,
    exit_code: int = 0,
    run_status: RunStatus = RunStatus.finished,
    data_source: DataSource = DataSource.s3,
    data_uris: Optional[List[str]] = None,
    data_csv: Optional[str] = None,
) -> dict:
    """Signal that a run has finished and submit final data.

    Args:
        token: Bearer token for authentication.
        run_id: The run identifier returned by :func:`register_run`.
        exit_code: The exit code of the monitored process.
        run_status: Run outcome, either ``"finished"`` or ``"failed"``.
        data_source: Either ``"s3"`` (uploaded CSV objects) or ``"inline"``
            (plain CSV string sent inline; the request body is gzip-compressed
            by :func:`_request` so no pre-encoding is needed).
        data_uris: List of S3 URIs of uploaded gzipped CSV files.
            Required when ``data_source="s3"``.
        data_csv: Plain CSV string to submit inline.
            Required when ``data_source="inline"``.

    Returns:
        A dict with backend-computed statistics for the run.

    Raises:
        SentinelAPIError: On non-2xx responses.
    """
    payload: Dict[str, Any] = {
        "exit_code": exit_code,
        "run_status": run_status.value,
        "data_source": data_source.value,
    }

    if not data_uris and not data_csv:
        raise ValueError(
            "Either 'data_uris' (for data_source='s3') or 'data_csv' (for data_source='inline') must be provided."
        )

    if data_source == DataSource.s3:
        payload["data_uris"] = data_uris
    else:
        payload["data_csv"] = data_csv

    logger.info(
        "Finishing run %s (status=%s, exit_code=%d)", run_id, run_status, exit_code
    )
    return _request(
        "POST",
        f"/runs/{run_id}/finish",
        token=token,
        payload=payload,
        compress=data_source == DataSource.inline,
    )