Skip to content

inspector

sc_crawler.inspector #

inspector_data_path cached #

inspector_data_path()

Download current inspector data into a temp folder.

Source code in sc_crawler/inspector.py
@cache
def inspector_data_path() -> str | PathLike:
    """Download current inspector data into a temp folder."""
    temp_dir = mkdtemp()
    register(rmtree, temp_dir)
    response = get(
        "https://github.com/SpareCores/sc-inspector-data/archive/refs/heads/main.zip"
    )
    zip_path = path.join(temp_dir, "downloaded.zip")
    with open(zip_path, "wb") as f:
        f.write(response.content)
    with ZipFile(zip_path, "r") as zip_ref:
        zip_ref.extractall(temp_dir)
    remove(zip_path)
    return path.join(temp_dir, "sc-inspector-data-main", "data")

inspect_server_benchmarks #

inspect_server_benchmarks(server)

Generate a list of BenchmarkScore-like dicts for the Server.

Source code in sc_crawler/inspector.py
def inspect_server_benchmarks(server: "Server") -> List[dict]:
    """Generate a list of BenchmarkScore-like dicts for the Server."""
    benchmarks = []

    framework = "bogomips"
    try:
        benchmarks.append(
            {
                **_benchmark_metafields(
                    server, framework="lscpu", benchmark_id=framework
                ),
                "score": round(float(_server_lscpu_field(server, "BogoMIPS:"))),
            }
        )
    except Exception as e:
        _log_cannot_load_benchmarks(server, framework, e)

    framework = "bw_mem"
    try:
        with open(_server_framework_stdout_path(server, framework), "r") as lines:
            for line in lines:
                # filter out error messages
                if match(r"^(rd|wr|rdwr) \d+(\.\d+) \d+(\.\d+)$", line):
                    row = line.strip().split()
                    benchmarks.append(
                        {
                            **_benchmark_metafields(server, framework=framework),
                            "config": {"operation": row[0], "size": float(row[1])},
                            "score": float(row[2]),
                        }
                    )
    except Exception as e:
        _log_cannot_load_benchmarks(server, framework, e)

    framework = "compression_text"
    try:
        algos = _server_framework_stdout_from_json(server, framework)
        for algo, levels in algos.items():
            for level, datas in levels.items():
                for data in datas:
                    config = {
                        "algo": algo,
                        "compression_level": None if level == "null" else int(level),
                        "threads": data["threads"],
                    }
                    if data.get("extra_args", {}).get("block_size"):
                        config["block_size"] = data["extra_args"]["block_size"]
                    for measurement in ["ratio", "compress", "decompress"]:
                        if data[measurement]:
                            benchmarks.append(
                                {
                                    **_benchmark_metafields(
                                        server,
                                        benchmark_id=":".join([framework, measurement]),
                                    ),
                                    "config": config,
                                    "score": float(data[measurement]),
                                }
                            )
    except Exception as e:
        _log_cannot_load_benchmarks(server, framework, e, True)

    framework = "geekbench"
    try:
        with open(_server_framework_path(server, framework, "results.json"), "r") as fp:
            scores = json.load(fp)
        geekbench_version = _server_framework_meta(server, framework)["version"]
        for cores, workloads in scores.items():
            for workload, values in workloads.items():
                workload_fields = {
                    "config": {"cores": cores, "framework_version": geekbench_version},
                    "score": float(values["score"]),
                }
                if values.get("description"):
                    workload_fields["note"] = values["description"]
                benchmarks.append(
                    {
                        **_benchmark_metafields(
                            server,
                            benchmark_id=":".join(
                                [framework, sub(r"\W+", "_", workload.lower())]
                            ),
                        ),
                        **workload_fields,
                    }
                )
    except Exception as e:
        _log_cannot_load_benchmarks(server, framework, e, True)

    framework = "passmark"
    try:
        with open(_server_framework_stdout_path(server, framework), "r") as fp:
            scores = yaml_safe_load(fp)
        passmark_version = ".".join(
            [str(scores["Version"][i]) for i in ["Major", "Minor", "Build"]]
        )
        for key, name in PASSMARK_MAPS.items():
            benchmarks.append(
                {
                    **_benchmark_metafields(
                        server,
                        benchmark_id=":".join(
                            [framework, sub(r"\W+", "_", name.lower())]
                        ),
                    ),
                    "config": {"framework_version": passmark_version},
                    "score": float(scores["Results"][key]),
                }
            )
    except Exception as e:
        _log_cannot_load_benchmarks(server, framework, e, True)

    framework = "openssl"
    try:
        with open(_server_framework_path(server, framework, "parsed.json"), "r") as fp:
            workloads = json.load(fp)
        openssl_version = _server_framework_meta(server, framework)["version"]
        for workload in workloads:
            benchmarks.append(
                {
                    **_benchmark_metafields(server, framework=framework),
                    "config": {
                        "algo": workload["algo"],
                        "block_size": workload["block_size"],
                        "framework_version": openssl_version,
                    },
                    "score": float(workload["speed"]),
                }
            )
    except Exception as e:
        _log_cannot_load_benchmarks(server, framework, e, True)

    framework = "stress_ng"
    # TODO deprecate
    try:
        cores_per_path = {"stressng": server.vcpus, "stressngsinglecore": 1}
        for cores_path in cores_per_path.keys():
            stressng_version = _server_framework_meta(server, cores_path)["version"]
            line = _extract_line_from_file(
                _server_framework_stderr_path(server, cores_path),
                "bogo-ops-per-second-real-time",
            )
            benchmarks.append(
                {
                    **_benchmark_metafields(
                        server,
                        framework=cores_path,
                        benchmark_id=":".join([framework, "cpu_all"]),
                    ),
                    "config": {
                        "cores": cores_per_path[cores_path],
                        "framework_version": stressng_version,
                    },
                    "score": float(line.split(": ")[1]),
                }
            )
    except Exception:
        # backfill with newer method - can be dropped once we deprecate stress_ng:cpu_all
        try:
            records = []
            with open(
                _server_framework_stdout_path(server, "stressngfull"), newline=""
            ) as f:
                rows = csv.reader(f, quoting=csv.QUOTE_NONNUMERIC)
                for row in rows:
                    records.append(row)
            for i in [0, len(records) - 1]:
                stressng_version = _server_framework_meta(server, "stressngfull")[
                    "version"
                ]
                benchmarks.append(
                    {
                        **_benchmark_metafields(
                            server,
                            framework="stressngfull",
                            benchmark_id=":".join([framework, "cpu_all"]),
                        ),
                        "config": {
                            "cores": records[i][0],
                            "framework_version": stressng_version,
                        },
                        "score": records[i][1],
                    }
                )
        except Exception as e:
            _log_cannot_load_benchmarks(server, framework, e, True)

    workload = "div16"
    try:
        records = []
        with open(
            _server_framework_stdout_path(server, "stressngfull"), newline=""
        ) as f:
            rows = csv.reader(f, quoting=csv.QUOTE_NONNUMERIC)
            for row in rows:
                records.append(row)
        for record in records:
            stressng_version = _server_framework_meta(server, "stressngfull")["version"]
            benchmarks.append(
                {
                    **_benchmark_metafields(
                        server,
                        framework="stressngfull",
                        benchmark_id=":".join([framework, workload]),
                    ),
                    "config": {
                        "cores": record[0],
                        "framework_version": stressng_version,
                    },
                    "score": record[1],
                }
            )
        # best single and multi core performance
        bests = {"best1": records[0][1], "bestn": max([r[1] for r in records])}
        for k, v in bests.items():
            benchmarks.append(
                {
                    **_benchmark_metafields(
                        server,
                        framework="stressngfull",
                        benchmark_id=":".join([framework, k]),
                    ),
                    "config": {
                        "framework_version": stressng_version,
                    },
                    "score": v,
                }
            )
    except Exception as e:
        _log_cannot_load_benchmarks(server, framework, e, True)

    for framework in SERVER_CLIENT_FRAMEWORK_MAPS.keys():
        try:
            versions = _server_framework_meta(server, framework)["version"]
            # drop the build number at the end of the redis server version
            if framework == "redis":
                versions = sub(r" build=[a-zA-Z0-9]+", "", versions)

            records = []
            with open(
                _server_framework_stdout_path(server, framework), newline=""
            ) as f:
                rows = csv.DictReader(f, quoting=csv.QUOTE_NONNUMERIC)
                for row in rows:
                    if "connections" in row.keys():
                        row["connections_per_vcpus"] = row["connections"] / server.vcpus
                    records.append(row)

            framework_config = SERVER_CLIENT_FRAMEWORK_MAPS[framework]
            keys = framework_config["keys"]
            measurements = framework_config["measurements"]

            # don't care about threads, keep the records with the highest rps
            records = sorted(records, key=lambda x: (*[x[k] for k in keys], -x["rps"]))
            records = groupby(records, key=itemgetter(*keys))
            records = [next(group) for _, group in records]

            for record in records:
                for measurement in measurements:
                    score_field = measurement.split("-")[0]
                    if score_field == "throughput":
                        score_field = "rps"
                    score = record[score_field]
                    server_usrsys = record["server_usr"] + record["server_sys"]
                    client_usrsys = record["client_usr"] + record["client_sys"]
                    note = (
                        "CPU usage (server/client usr+sys): "
                        f"{round(server_usrsys, 4)}/{round(client_usrsys, 4)}."
                    )
                    if measurement.endswith("-extrapolated"):
                        note += f" Original RPS: {score}."
                        score = round(
                            score / server_usrsys * (server_usrsys + client_usrsys), 2
                        )
                    if measurement.startswith("throughput"):
                        # drop the "k" suffix and multiply by 1024
                        size = int(record["size"][:-1]) * 1024
                        score = score * size
                    benchmarks.append(
                        {
                            **_benchmark_metafields(
                                server,
                                framework=framework,
                                benchmark_id=":".join([framework, measurement]),
                            ),
                            "config": {
                                **{k: record[k] for k in keys},
                                "framework_version": versions,
                            },
                            "score": score,
                            "note": note,
                        }
                    )
        except Exception as e:
            _log_cannot_load_benchmarks(server, framework, e, True)

    return benchmarks

inspect_update_server_dict #

inspect_update_server_dict(server)

Update a Server-like dict based on inspector data.

Source code in sc_crawler/inspector.py
def inspect_update_server_dict(server: dict) -> dict:
    """Update a Server-like dict based on inspector data."""
    server_obj = ServerBase.validate(server)

    lookups = {
        "dmidecode_cpu": lambda: _server_dmidecode_section(
            server_obj, "Processor Information"
        ),
        "dmidecode_cpus": lambda: _server_dmidecode_sections(
            server_obj, "Processor Information"
        ),
        "dmidecode_memory": lambda: _server_dmidecode_section(
            server_obj, "Memory Device"
        ),
        "lscpu": lambda: _server_lscpu(server_obj),
        "nvidiasmi": lambda: _server_nvidiasmi(server_obj),
        "gpu": lambda: lookups["nvidiasmi"].find("gpu"),
        "gpus": lambda: lookups["nvidiasmi"].findall("gpu"),
    }
    for k, f in lookups.items():
        try:
            lookups[k] = f()
        except Exception as e:
            lookups[k] = Exception(str(e))

    def lscpu_lookup(field: str):
        return _listsearch(lookups["lscpu"], "field", field)["data"]

    mappings = {
        "cpu_cores": lambda: (
            int(lscpu_lookup("Core(s) per socket:")) * int(lscpu_lookup("Socket(s):"))
        ),
        # use 1st CPU's speed, convert to Ghz
        "cpu_speed": lambda: lookups["dmidecode_cpu"]["Max Speed"] / 1e9,
        "cpu_manufacturer": lambda: _standardize_manufacturer(
            lookups["dmidecode_cpu"]["Manufacturer"]
        ),
        "cpu_family": lambda: _standardize_cpu_family(
            lookups["dmidecode_cpu"]["Family"]
        ),
        "cpu_model": lambda: _standardize_cpu_model(
            lookups["dmidecode_cpu"]["Version"]
        ),
        "cpu_l1_cache": lambda: _l123_cache(lookups["lscpu"], 1),
        "cpu_l2_cache": lambda: _l123_cache(lookups["lscpu"], 2),
        "cpu_l3_cache": lambda: _l123_cache(lookups["lscpu"], 3),
        "cpu_flags": lambda: lscpu_lookup("Flags:").split(" "),
        "memory_generation": lambda: DdrGeneration[lookups["dmidecode_memory"]["Type"]],
        # convert to Mhz
        "memory_speed": lambda: int(lookups["dmidecode_memory"]["Speed"]) / 1e6,
        "gpus": lambda: _gpus_details(lookups["gpus"]),
        "gpu_manufacturer": lambda: _gpu_most_common(server["gpus"], "manufacturer"),
        "gpu_family": lambda: _gpu_most_common(server["gpus"], "family"),
        "gpu_model": lambda: _gpu_most_common(server["gpus"], "model"),
        "gpu_memory_min": lambda: min([gpu["fb_memory"] for gpu in server["gpus"]]),
        "gpu_memory_total": lambda: sum([gpu["fb_memory"] for gpu in server["gpus"]]),
    }
    for k, f in mappings.items():
        try:
            newval = f()
            if newval:
                server[k] = newval
        except Exception as e:
            _log_cannot_update_server(server_obj, k, e)

    # backfill CPU model from alternative sources when not provided by DMI decode
    if not isinstance(lookups["lscpu"], BaseException):
        cpu_model = lscpu_lookup("Model name:")
        # CPU speed seems to be unreliable as reported by dmidecode,
        # e.g. it's 2Ghz in GCP for all instances
        speed = search(r" @ ([0-9\.]*)GHz$", cpu_model)
        if speed:
            server["cpu_speed"] = speed.group(1)
        # manufacturer data might be more likely to present in lscpu (unstructured)
        for manufacturer in ["Intel", "AMD"]:
            if manufacturer in cpu_model:
                server["cpu_manufacturer"] = manufacturer
        for family in ["Xeon", "EPYC"]:
            if family in cpu_model:
                server["cpu_family"] = family
        if server.get("cpu_model") is None:
            server["cpu_model"] = _standardize_cpu_model(cpu_model)

    # 2 Ghz CPU speed at Google is a lie
    if server["vendor_id"] == "gcp" and server.get("cpu_speed") == 2:
        server["cpu_speed"] = None

    # standardize GPU model
    if server.get("gpu_model"):
        server["gpu_model"] = _standardize_gpu_model(server["gpu_model"], server)
        server["gpu_family"] = _standardize_gpu_family(server)
        if not server.get("gpu_manufacturer") and server["gpu_model"] == "A100":
            server["gpu_manufacturer"] = "NVIDIA"

    return server