Skip to content

_alicloud

sc_crawler.vendors._alicloud #

Functions:

Name Description
inventory_compliance_frameworks

Manual list of compliance frameworks known for Alibaba Cloud.

inventory_regions

List all available Alibaba Cloud regions.

inventory_zones

List all availability zones.

inventory_servers

List all server types at Alibaba Cloud using the DescribeInstanceTypes API endpoint.

inventory_server_prices

Fetch server pricing and regional availability using the QuerySkuPriceListRequest API endpoint.

inventory_server_prices_spot

Fetch spot instance pricing by time-based sampling of on-demand instances per region.

inventory_storages

List all block storage offerings.

inventory_storage_prices

Fetch server prices using the QuerySkuPriceListRequest API endpoint.

inventory_traffic_prices

Collect inbound and outbound traffic prices of Alibaba Cloud regions.

inventory_ipv4_prices

Static IPv4 pricing of Alibaba Cloud regions.

inventory_compliance_frameworks #

inventory_compliance_frameworks(vendor)

Manual list of compliance frameworks known for Alibaba Cloud.

Resources: https://www.alibabacloud.com/en/trust-center/compliance

Source code in sc_crawler/vendors/_alicloud.py
def inventory_compliance_frameworks(vendor):
    """Manual list of compliance frameworks known for Alibaba Cloud.

    Resources: <https://www.alibabacloud.com/en/trust-center/compliance>
    """
    return map_compliance_frameworks_to_vendor(
        vendor.vendor_id, ["hipaa", "soc2t2", "iso27001"]
    )

inventory_regions #

inventory_regions(vendor)

List all available Alibaba Cloud regions.

Data sources:

Source code in sc_crawler/vendors/_alicloud.py
def inventory_regions(vendor):
    """
    List all available Alibaba Cloud regions.

    Data sources:

    - <https://api.alibabacloud.com/document/Ecs/2014-05-26/DescribeRegions>
    - Foundation year collected from <https://www.alibabacloud.com/en/global-locations?_p_lc=1>
    - Aliases (old region names) collected from <https://help.aliyun.com/zh/user-center/product-overview/regional-name-change-announcement>
    """
    request = DescribeRegionsRequest(accept_language="en-US")
    response = _ecs_client().describe_regions(request)
    regions = [region.to_map() for region in response.body.regions.region]

    items = []
    for region in regions:
        location = locations[region.get("RegionId")]
        items.append(
            {
                "vendor_id": vendor.vendor_id,
                "region_id": region.get("RegionId"),
                "name": region.get("LocalName"),
                "api_reference": region.get("RegionId"),
                "display_name": f"{location['city']} ({location['country_id']})",
                "aliases": location.get("alias", []),
                "country_id": location.get("country_id"),
                "state": None,  # not available
                "city": location.get("city"),
                "address_line": None,  # not available
                "zip_code": None,  # not available
                "lon": location.get("lon"),
                "lat": location.get("lat"),
                "founding_year": location.get("founding_year"),
                # "Clean electricity accounted for 56.0% of the total electricity consumption at Alibaba Cloud's self-built data centers"
                # https://www.alibabagroup.com/en-US/esg?spm=a3c0i.28208492.4078276800.1.3ee123b78lagGT
                "green_energy": None,
            }
        )
    return items

inventory_zones #

inventory_zones(vendor)

List all availability zones.

Source code in sc_crawler/vendors/_alicloud.py
def inventory_zones(vendor):
    """List all availability zones."""
    vendor.progress_tracker.start_task(
        name="Scanning region(s) for zone(s)", total=len(vendor.regions)
    )
    clients = _ecs_clients(vendor)

    @cachier(hash_func=jsoned_hash, separate_files=True)
    def fetch_zones_for_region(region_id):
        """Worker function to fetch zones for a single region."""
        request = DescribeZonesRequest(region_id=region_id, accept_language="en-US")
        try:
            response = clients[region_id].describe_zones(request)
            zone_items = []
            for zone in response.body.to_map()["Zones"]["Zone"]:
                zone_items.append(
                    {
                        "vendor_id": vendor.vendor_id,
                        "region_id": region_id,
                        "zone_id": zone.get("ZoneId"),
                        "name": zone.get("LocalName"),
                        "api_reference": zone.get("ZoneId"),
                        "display_name": zone.get("LocalName"),
                    }
                )
            return zone_items
        except Exception as e:
            logger.error(f"Failed to get zones for region {region_id}: {e}")
            return []
        finally:
            vendor.progress_tracker.advance_task()

    with ThreadPoolExecutor(max_workers=8) as executor:
        items = executor.map(
            fetch_zones_for_region, [r.region_id for r in vendor.regions]
        )
    items = list(chain.from_iterable(items))
    vendor.progress_tracker.hide_task()
    return items

inventory_servers #

inventory_servers(vendor)

List all server types at Alibaba Cloud using the DescribeInstanceTypes API endpoint.

Source code in sc_crawler/vendors/_alicloud.py
def inventory_servers(vendor):
    """List all server types at Alibaba Cloud using the `DescribeInstanceTypes` API endpoint."""
    client = _ecs_client()
    request = DescribeInstanceTypesRequest(max_results=1000)
    response = client.describe_instance_types(request)
    instance_types = [
        instance_type.to_map()
        for instance_type in response.body.instance_types.instance_type
    ]
    while response.body.next_token:
        request = DescribeInstanceTypesRequest(
            max_results=1000, next_token=response.body.next_token
        )
        response = client.describe_instance_types(request)
        for instance_type in response.body.instance_types.instance_type:
            instance_types.append(instance_type.to_map())

    region_availability_info: dict[str, list[dict]] = _get_region_availability_info(
        vendor
    )

    CPU_ARCH_MAP = {"X86": CpuArchitecture.X86_64, "ARM": CpuArchitecture.ARM64}
    STORAGE_CATEGORY_MAP = {
        "": None,
        "local_ssd_pro": StorageType.SSD,
        "local_hdd_pro": StorageType.HDD,
    }

    def drop_zero_value(x):
        return None if x == 0 else x

    items = []
    for instance_type in instance_types:
        family = instance_type.get("InstanceTypeFamily")
        vcpus = instance_type.get("CpuCoreCount")
        cpu_model = instance_type.get("PhysicalProcessorModel")
        memory_size_mb = int((instance_type.get("MemorySize") * 1024))
        memory_size_gb = (
            memory_size_mb // 1024
            if memory_size_mb >= 1024
            else round(memory_size_mb / 1024, 2)
        )
        storage_size = int(
            instance_type.get("LocalStorageAmount", 0)
            * instance_type.get("LocalStorageCapacity", 0)
            # convert GiB to GB
            * 1024**3
            / 1000**3
        )
        storage_type = STORAGE_CATEGORY_MAP[instance_type["LocalStorageCategory"]]
        gpu_count = _standardize_gpu_count(
            instance_type["GPUSpec"], instance_type.get("GPUAmount", 0)
        )
        gpu_memory_per_gpu = instance_type.get("GPUMemorySize", 0) * 1024  # GiB -> MiB
        # GPUMemorySize contains total memory for fractional or single GPUs, but per-GPU memory for multiple GPUs
        gpu_memory_total = (
            gpu_count * gpu_memory_per_gpu if gpu_count >= 1 else gpu_memory_per_gpu
        )
        gpu_model = _standardize_gpu_model(instance_type["GPUSpec"])
        description_parts = [
            f"{vcpus} vCPUs",
            f"{memory_size_gb} GiB RAM",
            f"{storage_size} GB {storage_type.value if storage_type else ''} storage",
            (
                f"{gpu_count}x{gpu_model} {gpu_memory_per_gpu} GiB VRAM"
                if gpu_count and gpu_model
                else None
            ),
        ]
        description = f"{family} family ({', '.join(filter(None, description_parts))})"
        server_id = instance_type.get("InstanceTypeId")
        status = (
            Status.ACTIVE
            if any(
                _is_resource_available(
                    region_availability_info,
                    region_id,
                    zone_info.get("ZoneId"),
                    server_id,
                )
                for region_id, zones in region_availability_info.items()
                for zone_info in zones
            )
            else Status.INACTIVE
        )

        items.append(
            {
                "vendor_id": vendor.vendor_id,
                "server_id": server_id,
                "name": server_id,
                "api_reference": server_id,
                "display_name": server_id,
                "description": description,
                "family": family,
                "vcpus": vcpus,
                "hypervisor": "KVM",
                "cpu_allocation": _determine_cpu_allocation_type(instance_type),
                "cpu_cores": instance_type.get("CpuCoreCount", 0),
                "cpu_speed": drop_zero_value(instance_type.get("CpuSpeedFrequency")),
                "cpu_architecture": CPU_ARCH_MAP[instance_type.get("CpuArchitecture")],
                "cpu_manufacturer": _extract_manufacturer(cpu_model),
                "cpu_family": _extract_family(cpu_model),
                "cpu_model": _standardize_cpu_model(cpu_model),
                "cpu_l1_cache": None,
                "cpu_l2_cache": None,
                "cpu_l3_cache": None,
                "cpu_flags": [],
                "cpus": [],
                "memory_amount": memory_size_mb,
                "memory_generation": None,
                "memory_speed": None,
                "memory_ecc": None,
                "gpu_count": gpu_count,
                "gpu_memory_min": drop_zero_value(int(gpu_memory_per_gpu)),
                "gpu_memory_total": drop_zero_value(int(gpu_memory_total)),
                # TODO fill in from GPUSpec? or just let the inspector fill it in?
                "gpu_manufacturer": None,
                "gpu_family": None,
                "gpu_model": gpu_model,
                "gpus": [],
                "storage_size": storage_size,
                "storage_type": storage_type,
                "storages": [],
                "network_speed": drop_zero_value(
                    instance_type.get("InstanceBandwidthRx", 0) / 1024 / 1000
                ),
                "inbound_traffic": 0,
                "outbound_traffic": 0,
                "ipv4": 0,
                "status": status,
            }
        )

    return items

inventory_server_prices #

inventory_server_prices(vendor)

Fetch server pricing and regional availability using the QuerySkuPriceListRequest API endpoint.

Alternative approach could be looking at https://g.alicdn.com/aliyun/ecs-price-info-intl/2.0.375/price/download/instancePrice.json.

Source code in sc_crawler/vendors/_alicloud.py
def inventory_server_prices(vendor):
    """Fetch server pricing and regional availability using the `QuerySkuPriceListRequest` API endpoint.

    Alternative approach could be looking at <https://g.alicdn.com/aliyun/ecs-price-info-intl/2.0.375/price/download/instancePrice.json>.
    """
    skus = _get_sku_prices(
        sku_type="server",
        extra_request_params={
            "price_entity_code": "instance_type",
            # filter for Linux prices only for now
            "price_factor_condition_map": {"vm_os_kind": ["linux"]},
        },
        vendor=vendor,
    )

    items = []
    unsupported_regions = set()
    region_availability_info: dict[str, list[dict]] = _get_region_availability_info(
        vendor
    )

    for sku in skus:
        sku_region_id = sku["SkuFactorMap"]["vm_region_no"]
        region = get_region_by_id(sku_region_id, vendor)
        if not region:
            unsupported_regions.add(sku_region_id)
            continue
        for zone in region.zones:
            server_id = sku.get("SkuFactorMap", {}).get("instance_type")
            status = (
                Status.ACTIVE
                if _is_resource_available(
                    region_availability_info, region.region_id, zone.zone_id, server_id
                )
                else Status.INACTIVE
            )

            items.append(
                {
                    "vendor_id": vendor.vendor_id,
                    "region_id": region.region_id,
                    "zone_id": zone.zone_id,
                    "server_id": server_id,
                    "operating_system": sku.get("SkuFactorMap").get("vm_os_kind"),
                    "allocation": Allocation.ONDEMAND,
                    "unit": PriceUnit.HOUR,
                    "price": float(sku.get("CskuPriceList")[0].get("Price")),
                    "price_upfront": 0,
                    "price_tiered": [],
                    "currency": sku.get("CskuPriceList")[0].get("Currency"),
                    "status": status,
                }
            )
    for unsupported_region in unsupported_regions:
        vendor.log(f"Found non-supported region: {unsupported_region}", level=WARN)
    return items

inventory_server_prices_spot #

inventory_server_prices_spot(vendor)

Fetch spot instance pricing by time-based sampling of on-demand instances per region.

Each region worker fetches spot prices for a random sample of instances within sample_time, adapting to different response times, parallelized across regions.

Source code in sc_crawler/vendors/_alicloud.py
def inventory_server_prices_spot(vendor):
    """Fetch spot instance pricing by time-based sampling of on-demand instances per region.

    Each region worker fetches spot prices for a random sample of instances within sample_time,
    adapting to different response times, parallelized across regions.
    """
    sample_time = 120  # seconds

    ecs_clients: dict[str, EcsClient] = _ecs_clients(vendor)

    ondemand_instances = defaultdict(list)
    for server_price in vendor.server_prices:
        if (
            server_price.allocation == Allocation.ONDEMAND
            and server_price.status == Status.ACTIVE
        ):
            ondemand_instances[server_price.region_id].append(
                (server_price.zone_id, server_price.server_id)
            )

    if not ondemand_instances:
        logger.error("No active ondemand instances found")
        return []

    for region_id in ondemand_instances:
        shuffle(ondemand_instances[region_id])

    def fetch_spot_instance_price(
        region_id: str, zone_instance_list: list[tuple[str, str]], client: EcsClient
    ):
        spot_instances = []
        start_time = time()

        for zone_id, instance_type in zone_instance_list:
            try:
                # Check if time limit exceeded
                elapsed = time() - start_time
                if elapsed >= sample_time:
                    break

                price_response_body: DescribePriceResponseBody = _get_instance_price(
                    region_id=region_id,
                    zone_id=zone_id,
                    instance_type=instance_type,
                    client=client,
                    spot_strategy="SpotAsPriceGo",
                )

                if not price_response_body:
                    continue

                if not next(
                    (
                        r
                        for r in price_response_body.price_info.rules.rule
                        if r.description == "Preemptible Instance discount"
                    ),
                    None,
                ):
                    continue

                trade_price = next(
                    (
                        p.trade_price
                        for p in price_response_body.price_info.price.detail_infos.detail_info
                        if p.resource == "instanceType"
                    ),
                    None,
                )

                if not trade_price:
                    continue

                spot_instances.append(
                    {
                        "vendor_id": vendor.vendor_id,
                        "region_id": region_id,
                        "zone_id": zone_id,
                        "server_id": instance_type,
                        "operating_system": "linux",
                        "allocation": Allocation.SPOT,
                        "unit": PriceUnit.HOUR,
                        "price": float(trade_price),
                        "price_upfront": 0,
                        "price_tiered": [],
                        "currency": price_response_body.price_info.price.currency,
                        "status": Status.ACTIVE,
                    }
                )
            except AttributeError:
                continue
            except Exception as e:
                logger.error(
                    f"Failed to get spot price for {instance_type} in {region_id}/{zone_id}: {e}"
                )
                continue
            finally:
                vendor.progress_tracker.advance_task()

        if not spot_instances:
            logger.info(f"No spot prices found in region {region_id}")

        return spot_instances

    vendor.progress_tracker.start_task(
        name=f"Fetching spot instance prices for {sample_time} second(s)",
        total=sum(len(zil) for zil in ondemand_instances.values()),
    )

    with ThreadPoolExecutor(max_workers=len(ondemand_instances)) as executor:
        items = executor.map(
            lambda args: fetch_spot_instance_price(*args),
            [
                (region_id, zone_instance_list, ecs_clients[region_id])
                for region_id, zone_instance_list in ondemand_instances.items()
            ],
        )
    items = list(chain.from_iterable(items))

    vendor.progress_tracker.hide_task()

    vendor.set_table_rows_active(
        ServerPrice,
        ServerPrice.allocation == Allocation.SPOT,
        ServerPrice.observed_at >= datetime.now() - timedelta(days=30),
    )

    return items

inventory_storages #

inventory_storages(vendor)

List all block storage offerings.

Data sources:

Source code in sc_crawler/vendors/_alicloud.py
def inventory_storages(vendor):
    """List all block storage offerings.

    Data sources:

    - <https://www.alibabacloud.com/help/en/ecs/user-guide/essds>
    - <https://www.alibabacloud.com/help/en/ecs/developer-reference/api-ecs-2014-05-26-createdisk>
    """
    disk_info = [
        # NOTE there's only a single `cloud_essd` ID at Alibaba Cloud,
        # but we suffix with the performance level (PL0, PL1, PL2, PL3)
        # to differentiate them as these are products with very different characteristics
        {
            "name": "cloud_essd-pl0",
            "min_size": 1,
            "max_size": 65536,
            "max_iops": 10000,
            "max_tp": 1440,
            "info": "Enterprise SSD with performance level 0.",
        },
        {
            "name": "cloud_essd-pl1",
            "min_size": 20,
            "max_size": 65536,
            "max_iops": 50000,
            "max_tp": 2800,
            "info": "Enterprise SSD with performance level 1.",
        },
        {
            "name": "cloud_essd-pl2",
            "min_size": 461,
            "max_size": 65536,
            "max_iops": 100000,
            "max_tp": 6000,
            "info": "Enterprise SSD with performance level 2.",
        },
        {
            "name": "cloud_essd-pl3",
            "min_size": 1261,
            "max_size": 65536,
            "max_iops": 1000000,
            "max_tp": 32000,
            "info": "Enterprise SSD with performance level 3.",
        },
        {
            "name": "cloud_ssd",
            "min_size": 20,
            "max_size": 32768,
            "max_iops": 20000,
            "max_tp": 256,
            "info": "Standard SSD.",
        },
        {
            "name": "cloud_efficiency",
            "min_size": 20,
            "max_size": 32768,
            "max_iops": 3000,
            "max_tp": 80,
            "info": "Ultra Disk, older generation.",
        },
        {
            "name": "cloud",
            "min_size": 5,
            "max_size": 2000,
            "max_iops": 300,
            "max_tp": 40,
            "info": "Lowest cost HDD.",
        },
    ]

    items = []
    for disk in disk_info:
        items.append(
            {
                "storage_id": disk.get("name"),
                "vendor_id": vendor.vendor_id,
                "name": disk.get("name"),
                "description": disk.get("info"),
                "storage_type": (
                    StorageType.HDD if disk.get("name") == "cloud" else StorageType.SSD
                ),
                "max_iops": disk.get("max_iops"),
                "max_throughput": disk.get("max_tp"),
                "min_size": disk.get("min_size"),
                "max_size": disk.get("max_size"),
            }
        )
    return items

inventory_storage_prices #

inventory_storage_prices(vendor)

Fetch server prices using the QuerySkuPriceListRequest API endpoint.

Source code in sc_crawler/vendors/_alicloud.py
def inventory_storage_prices(vendor):
    """Fetch server prices using the `QuerySkuPriceListRequest` API endpoint."""
    skus = _get_sku_prices(
        sku_type="storage",
        extra_request_params={"price_entity_code": "datadisk"},
        vendor=vendor,
    )

    items = []
    unsupported_regions = set()
    for sku in skus:
        storage_id = sku["SkuFactorMap"]["datadisk_category"]
        pl = sku["SkuFactorMap"]["datadisk_performance_level"]
        if storage_id in ["cloud", "cloud_ssd", "cloud_efficiency"]:
            # no diff in performance levels, pick one
            if pl != "PL1":
                continue
        else:
            # keep the 4 performance levels
            if pl not in ["PL0", "PL1", "PL2", "PL3"]:
                continue
            storage_id = storage_id + "-" + pl.lower()
        region_id = sku["SkuFactorMap"]["vm_region_no"]
        region = get_region_by_id(region_id, vendor)
        if not region:
            unsupported_regions.add(region_id)
            continue
        items.append(
            {
                "vendor_id": vendor.vendor_id,
                "region_id": region.region_id,
                "storage_id": storage_id,
                "unit": PriceUnit.GB_MONTH,
                "price": float(sku["CskuPriceList"][0]["Price"]),
                "currency": sku["CskuPriceList"][0]["Currency"],
            }
        )
    for unsupported_region in unsupported_regions:
        vendor.log(f"Found non-supported region: {unsupported_region}", level=WARN)
    return items

inventory_traffic_prices #

inventory_traffic_prices(vendor)

Collect inbound and outbound traffic prices of Alibaba Cloud regions.

Inbound is free as per https://www.alibabacloud.com/help/en/ecs/public-bandwidth. Outbound traffic pricing collected from the QuerySkuPriceListRequest API endpoint.

Account level tiering information can be found at https://www.alibabacloud.com/help/en/cdt/internet-data-transfers/#4a98c9ee8eemn.

Source code in sc_crawler/vendors/_alicloud.py
def inventory_traffic_prices(vendor):
    """Collect inbound and outbound traffic prices of Alibaba Cloud regions.

    Inbound is free as per <https://www.alibabacloud.com/help/en/ecs/public-bandwidth>.
    Outbound traffic pricing collected from the `QuerySkuPriceListRequest` API endpoint.

    Account level tiering information can be found at <https://www.alibabacloud.com/help/en/cdt/internet-data-transfers/#4a98c9ee8eemn>.
    """
    items = []
    skus = _get_sku_prices(
        sku_type="traffic",
        extra_request_params={"price_entity_code": "vm_flow_out"},
        # vendor=vendor,
    )
    unsupported_regions = set()
    for sku in skus:
        region_id = sku["SkuFactorMap"]["vm_region_no"]
        region = get_region_by_id(region_id, vendor)
        if not region:
            unsupported_regions.add(region_id)
            continue
        price = next(p for p in sku["CskuPriceList"] if float(p["Price"]) > 0)
        items.append(
            {
                "vendor_id": vendor.vendor_id,
                "region_id": region.region_id,
                "price": float(price["Price"]),
                "price_tiered": [],
                "currency": price["Currency"],
                "unit": PriceUnit.GB_MONTH,
                "direction": TrafficDirection.OUT,
            }
        )
        # incoming traffic is free
        items.append(
            {
                "vendor_id": vendor.vendor_id,
                "region_id": region.region_id,
                "price": 0,
                "price_tiered": [],
                "currency": price["Currency"],
                "unit": PriceUnit.GB_MONTH,
                "direction": TrafficDirection.IN,
            }
        )
    for unsupported_region in unsupported_regions:
        vendor.log(f"Found non-supported region: {unsupported_region}", level=WARN)
    return items

inventory_ipv4_prices #

inventory_ipv4_prices(vendor)

Static IPv4 pricing of Alibaba Cloud regions.

Static (not Elastic) IP addresses are free, you only pay for bandwidth or traffic as per https://www.alibabacloud.com/help/en/ecs/user-guide/public-ip-address?spm=a2c63.p38356.0.i1#52c0fa8bbcee6.

Source code in sc_crawler/vendors/_alicloud.py
def inventory_ipv4_prices(vendor):
    """Static IPv4 pricing of Alibaba Cloud regions.

    Static (not Elastic) IP addresses are free, you only pay for bandwidth or traffic
    as per <https://www.alibabacloud.com/help/en/ecs/user-guide/public-ip-address?spm=a2c63.p38356.0.i1#52c0fa8bbcee6>.
    """
    items = []
    for region in vendor.regions:
        items.append(
            {
                "vendor_id": vendor.vendor_id,
                "region_id": region.region_id,
                "price": 0,
                "currency": "USD",
                "unit": PriceUnit.MONTH,
            }
        )
    return items