Skip to content

resource_tracker.s3_upload #

Upload data to S3 using temporary STS credentials.

Minimal, zero-dependency implementation (stdlib only) of AWS Signature V4 authenticated PUT requests, designed for uploading gzipped CSV metric files via the Sentinel API's temporary STS credentials.

Functions:

Name Description
put_bytes_with_sts

Upload raw bytes to S3 using temporary STS credentials.

put_object_with_sts

Upload a local file to S3 using temporary STS credentials.

put_bytes_with_sts #

put_bytes_with_sts(*, s3_uri, body, access_key, secret_key, session_token, region=None, timeout=DEFAULT_UPLOAD_TIMEOUT)

Upload raw bytes to S3 using temporary STS credentials.

This is the core upload primitive used by the streaming module to push gzipped CSV data without writing a temporary file.

Parameters:

Name Type Description Default
s3_uri str

Target S3 URI (s3://bucket/path/to/object).

required
body bytes

Raw bytes to upload.

required
access_key str

AWS STS access key ID.

required
secret_key str

AWS STS secret access key.

required
session_token str

AWS STS session token.

required
region str | None

AWS region of the target bucket.

None
timeout int

HTTP request timeout in seconds.

DEFAULT_UPLOAD_TIMEOUT

Returns:

Type Description
str

The S3 URI that was written to (same as s3_uri).

Raises:

Type Description
RuntimeError

If the upload fails.

Source code in resource_tracker/s3_upload.py
def put_bytes_with_sts(
    *,
    s3_uri: str,
    body: bytes,
    access_key: str,
    secret_key: str,
    session_token: str,
    region: str | None = None,
    timeout: int = DEFAULT_UPLOAD_TIMEOUT,
) -> str:
    """Upload raw bytes to S3 using temporary STS credentials.

    This is the core upload primitive used by the streaming module to push
    gzipped CSV data without writing a temporary file.

    Args:
        s3_uri: Target S3 URI (``s3://bucket/path/to/object``).
        body: Raw bytes to upload.
        access_key: AWS STS access key ID.
        secret_key: AWS STS secret access key.
        session_token: AWS STS session token.
        region: AWS region of the target bucket.
        timeout: HTTP request timeout in seconds.

    Returns:
        The S3 URI that was written to (same as *s3_uri*).

    Raises:
        RuntimeError: If the upload fails.
    """
    bucket, key = _parse_s3_uri(s3_uri)
    payload_hash = _sha256_hex(body)

    now = datetime.now(timezone.utc)
    amz_date = now.strftime("%Y%m%dT%H%M%SZ")
    date_stamp = now.strftime("%Y%m%d")

    if not region:
        region = _get_bucket_region(bucket, timeout=timeout)

    host = f"{bucket}.s3.{region}.amazonaws.com"
    canonical_uri = "/" + key
    endpoint = f"https://{host}{canonical_uri}"

    canonical_headers = (
        f"host:{host}\n"
        f"x-amz-content-sha256:{payload_hash}\n"
        f"x-amz-date:{amz_date}\n"
        f"x-amz-security-token:{session_token}\n"
    )
    signed_headers = "host;x-amz-content-sha256;x-amz-date;x-amz-security-token"
    canonical_request = (
        f"PUT\n{canonical_uri}\n\n{canonical_headers}\n{signed_headers}\n{payload_hash}"
    )
    canonical_request_hash = _sha256_hex(canonical_request.encode("utf-8"))

    credential_scope = f"{date_stamp}/{region}/s3/aws4_request"
    string_to_sign = (
        f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n{canonical_request_hash}"
    )
    signing_key = _derive_signing_key(secret_key, date_stamp, region)
    signature = hmac.new(
        signing_key, string_to_sign.encode("utf-8"), hashlib.sha256
    ).hexdigest()

    authorization = (
        "AWS4-HMAC-SHA256 "
        f"Credential={access_key}/{credential_scope}, "
        f"SignedHeaders={signed_headers}, "
        f"Signature={signature}"
    )

    req = Request(endpoint, data=body, method="PUT")
    req.add_header("Host", host)
    req.add_header("Authorization", authorization)
    req.add_header("x-amz-date", amz_date)
    req.add_header("x-amz-security-token", session_token)
    req.add_header("x-amz-content-sha256", payload_hash)
    req.add_header("Content-Length", str(len(body)))

    logger.debug("S3 PUT %s (%d bytes)", s3_uri, len(body))

    try:
        with urlopen(req, timeout=timeout) as resp:
            if resp.status not in (200, 201):
                raise RuntimeError(f"S3 upload failed with status {resp.status}")
    except HTTPError as exc:
        error_body = exc.read().decode("utf-8", errors="replace")
        raise RuntimeError(
            f"S3 upload failed with status {exc.code}: {error_body}"
        ) from exc

    logger.debug("S3 PUT %s completed", s3_uri)
    return s3_uri

put_object_with_sts #

put_object_with_sts(*, s3_uri, file_path, access_key, secret_key, session_token, region, timeout=DEFAULT_UPLOAD_TIMEOUT)

Upload a local file to S3 using temporary STS credentials.

Convenience wrapper around :func:put_bytes_with_sts that reads the file contents first.

Parameters:

Name Type Description Default
s3_uri str

Target S3 URI (s3://bucket/path/to/object).

required
file_path Path

Path to the local file to upload.

required
access_key str

AWS STS access key ID.

required
secret_key str

AWS STS secret access key.

required
session_token str

AWS STS session token.

required
region str

AWS region of the target bucket.

required
timeout int

HTTP request timeout in seconds.

DEFAULT_UPLOAD_TIMEOUT

Returns:

Type Description
str

The S3 URI that was written to (same as s3_uri).

Raises:

Type Description
RuntimeError

If the upload fails.

Source code in resource_tracker/s3_upload.py
def put_object_with_sts(
    *,
    s3_uri: str,
    file_path: Path,
    access_key: str,
    secret_key: str,
    session_token: str,
    region: str,
    timeout: int = DEFAULT_UPLOAD_TIMEOUT,
) -> str:
    """Upload a local file to S3 using temporary STS credentials.

    Convenience wrapper around :func:`put_bytes_with_sts` that reads the file
    contents first.

    Args:
        s3_uri: Target S3 URI (``s3://bucket/path/to/object``).
        file_path: Path to the local file to upload.
        access_key: AWS STS access key ID.
        secret_key: AWS STS secret access key.
        session_token: AWS STS session token.
        region: AWS region of the target bucket.
        timeout: HTTP request timeout in seconds.

    Returns:
        The S3 URI that was written to (same as *s3_uri*).

    Raises:
        RuntimeError: If the upload fails.
    """
    logger.debug("Reading file %s for upload to %s", file_path, s3_uri)
    return put_bytes_with_sts(
        s3_uri=s3_uri,
        body=file_path.read_bytes(),
        access_key=access_key,
        secret_key=secret_key,
        session_token=session_token,
        region=region,
        timeout=timeout,
    )