Source code for sky.volumes.client.sdk

"""SDK functions for managed jobs."""
import json
import typing
from typing import List

from sky import exceptions
from sky import sky_logging
from sky.adaptors import common as adaptors_common
from sky.schemas.api import responses
from sky.server import common as server_common
from sky.server import versions
from sky.server.requests import payloads
from sky.usage import usage_lib
from sky.utils import annotations
from sky.utils import context
from sky.utils import ux_utils
from sky.volumes import volume as volume_lib

if typing.TYPE_CHECKING:
    import requests
else:
    requests = adaptors_common.LazyImport('requests')

logger = sky_logging.init_logger(__name__)


[docs]@context.contextual @usage_lib.entrypoint @server_common.check_server_healthy_or_start @annotations.client_api def apply(volume: volume_lib.Volume) -> server_common.RequestId[None]: """Creates or registers a volume. Example: .. code-block:: python import sky.volumes cfg = { 'name': 'pvc', 'type': 'k8s-pvc', 'size': '100GB', 'labels': { 'key': 'value', }, } vol = sky.volumes.Volume.from_yaml_config(cfg) request_id = sky.volumes.apply(vol) sky.get(request_id) or import sky.volumes vol = sky.volumes.Volume( name='vol', type='runpod-network-volume', infra='runpod/ca/CA-MTL-1', size='100GB', ) request_id = sky.volumes.apply(vol) sky.get(request_id) Args: volume: The volume to apply. Returns: The request ID of the apply request. """ body = payloads.VolumeApplyBody( name=volume.name, volume_type=volume.type, cloud=volume.cloud, region=volume.region, zone=volume.zone, size=volume.size, config=volume.config, labels=volume.labels, ) response = server_common.make_authenticated_request( 'POST', '/volumes/apply', json=json.loads(body.model_dump_json())) return server_common.get_request_id(response)
@context.contextual @usage_lib.entrypoint @server_common.check_server_healthy_or_start @annotations.client_api @versions.minimal_api_version(20) def validate(volume: volume_lib.Volume) -> None: """Validates the volume. All validation is done on the server side. Args: volume: The volume to validate. Raises: ValueError: If the volume is invalid. """ body = payloads.VolumeValidateBody( name=volume.name, volume_type=volume.type, infra=volume.infra, resource_name=volume.resource_name, size=volume.size, config=volume.config, labels=volume.labels, ) response = server_common.make_authenticated_request( 'POST', '/volumes/validate', json=json.loads(body.model_dump_json())) if response.status_code == 400: with ux_utils.print_exception_no_traceback(): raise exceptions.deserialize_exception( response.json().get('detail'))
[docs]@context.contextual @usage_lib.entrypoint @server_common.check_server_healthy_or_start @annotations.client_api def ls() -> server_common.RequestId[List[responses.VolumeRecord]]: """Lists all volumes. Returns: The request ID of the list request. """ response = server_common.make_authenticated_request( 'GET', '/volumes', ) return server_common.get_request_id(response)
[docs]@context.contextual @usage_lib.entrypoint @server_common.check_server_healthy_or_start @annotations.client_api def delete(names: List[str]) -> server_common.RequestId[None]: """Deletes volumes. Args: names: List of volume names to delete. Returns: The request ID of the delete request. """ body = payloads.VolumeDeleteBody(names=names) response = server_common.make_authenticated_request( 'POST', '/volumes/delete', json=json.loads(body.model_dump_json())) return server_common.get_request_id(response)