Source code for sky.execution

"""Execution layer.

See `Stage` for a Task's life cycle.
"""
import enum
import os
from typing import List, Optional, Tuple, Union

import colorama

import sky
from sky import backends
from sky import clouds
from sky import global_user_state
from sky import optimizer
from sky import sky_logging
from sky.backends import backend_utils
from sky.usage import usage_lib
from sky.utils import controller_utils
from sky.utils import dag_utils
from sky.utils import env_options
from sky.utils import rich_utils
from sky.utils import subprocess_utils
from sky.utils import timeline
from sky.utils import ux_utils

logger = sky_logging.init_logger(__name__)


class Stage(enum.Enum):
    """Stages for a run of a sky.Task."""
    # TODO: rename actual methods to be consistent.
    CLONE_DISK = enum.auto()
    OPTIMIZE = enum.auto()
    PROVISION = enum.auto()
    SYNC_WORKDIR = enum.auto()
    SYNC_FILE_MOUNTS = enum.auto()
    SETUP = enum.auto()
    PRE_EXEC = enum.auto()
    EXEC = enum.auto()
    DOWN = enum.auto()


def _maybe_clone_disk_from_cluster(clone_disk_from: Optional[str],
                                   cluster_name: Optional[str],
                                   task: 'sky.Task') -> 'sky.Task':
    if clone_disk_from is None:
        return task
    task, handle = backend_utils.check_can_clone_disk_and_override_task(
        clone_disk_from, cluster_name, task)
    original_cloud = handle.launched_resources.cloud
    assert original_cloud is not None, handle.launched_resources
    task_resources = list(task.resources)[0]

    with rich_utils.safe_status('Creating image from source cluster '
                                f'{clone_disk_from!r}'):
        image_id = original_cloud.create_image_from_cluster(
            clone_disk_from,
            handle.cluster_name_on_cloud,
            region=handle.launched_resources.region,
            zone=handle.launched_resources.zone,
        )
        rich_utils.force_update_status(
            f'Migrating image {image_id} to target region '
            f'{task_resources.region}...')
        source_region = handle.launched_resources.region
        target_region = task_resources.region
        assert source_region is not None, handle.launched_resources
        assert target_region is not None, task_resources

        image_id = original_cloud.maybe_move_image(
            image_id,
            source_region=source_region,
            target_region=target_region,
            source_zone=handle.launched_resources.zone,
            target_zone=task_resources.zone,
        )
    logger.info(
        f'{colorama.Fore.GREEN}'
        f'Successfully created image {image_id!r} for {clone_disk_from!r} '
        f'on {original_cloud}.{colorama.Style.RESET_ALL}\n'
        'Overriding task\'s image_id.')
    task_resources = task_resources.copy(image_id=image_id,
                                         _is_image_managed=True)
    task.set_resources(task_resources)
    # Set the best_resources to None to trigger a re-optimization, so that
    # the new task_resources is used.
    task.best_resources = None
    logger.debug(f'Overridden task resources: {task.resources}')
    return task


def _execute(
    entrypoint: Union['sky.Task', 'sky.Dag'],
    dryrun: bool = False,
    down: bool = False,
    stream_logs: bool = True,
    handle: Optional[backends.ResourceHandle] = None,
    backend: Optional[backends.Backend] = None,
    retry_until_up: bool = False,
    optimize_target: optimizer.OptimizeTarget = optimizer.OptimizeTarget.COST,
    stages: Optional[List[Stage]] = None,
    cluster_name: Optional[str] = None,
    detach_setup: bool = False,
    detach_run: bool = False,
    idle_minutes_to_autostop: Optional[int] = None,
    no_setup: bool = False,
    clone_disk_from: Optional[str] = None,
    # Internal only:
    # pylint: disable=invalid-name
    _is_launched_by_jobs_controller: bool = False,
    _is_launched_by_sky_serve_controller: bool = False,
) -> Tuple[Optional[int], Optional[backends.ResourceHandle]]:
    """Execute an entrypoint.

    If sky.Task is given or DAG has not been optimized yet, this will call
    sky.optimize() for the caller.

    Args:
      entrypoint: sky.Task or sky.Dag.
      dryrun: bool; if True, only print the provision info (e.g., cluster
        yaml).
      down: bool; whether to tear down the launched resources after all jobs
        finish (successfully or abnormally). If idle_minutes_to_autostop is
        also set, the cluster will be torn down after the specified idle time.
        Note that if errors occur during provisioning/data syncing/setting up,
        the cluster will not be torn down for debugging purposes.
      stream_logs: bool; whether to stream all tasks' outputs to the client.
      handle: Optional[backends.ResourceHandle]; if provided, execution will use
        an existing backend cluster handle instead of provisioning a new one.
      backend: Backend; backend to use for executing the tasks. Defaults to
        CloudVmRayBackend()
      retry_until_up: bool; whether to retry the provisioning until the cluster
        is up.
      optimize_target: OptimizeTarget; the dag optimization metric, e.g.
        OptimizeTarget.COST.
      stages: List of stages to run.  If None, run the whole life cycle of
        execution; otherwise, just the specified stages.  Used for `sky exec`
        skipping all setup steps.
      cluster_name: Name of the cluster to create/reuse.  If None,
        auto-generate a name.
      detach_setup: If True, run setup in non-interactive mode as part of the
        job itself. You can safely ctrl-c to detach from logging, and it will
        not interrupt the setup process. To see the logs again after detaching,
        use `sky logs`. To cancel setup, cancel the job via `sky cancel`.
      detach_run: If True, as soon as a job is submitted, return from this
        function and do not stream execution logs.
      idle_minutes_to_autostop: int; if provided, the cluster will be set to
        autostop after this many minutes of idleness.
      no_setup: bool; whether to skip setup commands or not when (re-)launching.

    Returns:
      job_id: Optional[int]; the job ID of the submitted job. None if the
        backend is not CloudVmRayBackend, or no job is submitted to
        the cluster.
      handle: Optional[backends.ResourceHandle]; the handle to the cluster. None
        if dryrun.
    """
    dag = dag_utils.convert_entrypoint_to_dag(entrypoint)
    assert len(dag) == 1, f'We support 1 task for now. {dag}'
    task = dag.tasks[0]

    if any(r.job_recovery is not None for r in task.resources):
        with ux_utils.print_exception_no_traceback():
            raise ValueError(
                'Job recovery is specified in the task. To launch a '
                'managed job, please use: sky jobs launch')

    cluster_exists = False
    if cluster_name is not None:
        existing_handle = global_user_state.get_handle_from_cluster_name(
            cluster_name)
        cluster_exists = existing_handle is not None
        # TODO(woosuk): If the cluster exists, print a warning that
        # `cpus` and `memory` are not used as a job scheduling constraint,
        # unlike `gpus`.

    stages = stages if stages is not None else list(Stage)

    # Requested features that some clouds support and others don't.
    requested_features = set()

    if controller_utils.Controllers.from_name(cluster_name) is not None:
        requested_features.add(
            clouds.CloudImplementationFeatures.HOST_CONTROLLERS)

    # Add requested features from the task
    requested_features |= task.get_required_cloud_features()

    backend = backend if backend is not None else backends.CloudVmRayBackend()
    if isinstance(backend, backends.CloudVmRayBackend):
        if down and idle_minutes_to_autostop is None:
            # Use auto{stop,down} to terminate the cluster after the task is
            # done.
            idle_minutes_to_autostop = 0
        if idle_minutes_to_autostop is not None:
            if idle_minutes_to_autostop == 0:
                # idle_minutes_to_autostop=0 can cause the following problem:
                # After we set the autostop in the PRE_EXEC stage with -i 0,
                # it could be possible that the cluster immediately found
                # itself have no task running and start the auto{stop,down}
                # process, before the task is submitted in the EXEC stage.
                verb = 'torn down' if down else 'stopped'
                logger.info(f'{colorama.Style.DIM}The cluster will '
                            f'be {verb} after 1 minutes of idleness '
                            '(after all jobs finish).'
                            f'{colorama.Style.RESET_ALL}')
                idle_minutes_to_autostop = 1
            stages.remove(Stage.DOWN)
            if idle_minutes_to_autostop >= 0:
                requested_features.add(
                    clouds.CloudImplementationFeatures.AUTO_TERMINATE)
                if not down:
                    requested_features.add(
                        clouds.CloudImplementationFeatures.STOP)
        # NOTE: in general we may not have sufficiently specified info
        # (cloud/resource) to check STOP_SPOT_INSTANCE here. This is checked in
        # the backend.

    elif idle_minutes_to_autostop is not None:
        # TODO(zhwu): Autostop is not supported for non-CloudVmRayBackend.
        with ux_utils.print_exception_no_traceback():
            raise ValueError(
                f'Backend {backend.NAME} does not support autostop, please try'
                f' {backends.CloudVmRayBackend.NAME}')

    if Stage.CLONE_DISK in stages:
        task = _maybe_clone_disk_from_cluster(clone_disk_from, cluster_name,
                                              task)

    if not cluster_exists:
        # If spot is launched on serve or jobs controller, we don't need to
        # print out the hint.
        if (Stage.PROVISION in stages and task.use_spot and
                not _is_launched_by_jobs_controller and
                not _is_launched_by_sky_serve_controller):
            yellow = colorama.Fore.YELLOW
            bold = colorama.Style.BRIGHT
            reset = colorama.Style.RESET_ALL
            logger.info(
                f'{yellow}Launching an unmanaged spot task, which does not '
                f'automatically recover from preemptions.{reset}\n{yellow}To '
                'get automatic recovery, use managed job instead: '
                f'{reset}{bold}sky jobs launch{reset} {yellow}or{reset} '
                f'{bold}sky.jobs.launch(){reset}.')

        if Stage.OPTIMIZE in stages:
            if task.best_resources is None:
                # TODO: fix this for the situation where number of requested
                # accelerators is not an integer.
                if isinstance(backend, backends.CloudVmRayBackend):
                    # TODO: adding this check because docker backend on a
                    # no-credential machine should not enter optimize(), which
                    # would directly error out ('No cloud is enabled...').  Fix
                    # by moving `sky check` checks out of optimize()?
                    dag = sky.optimize(dag, minimize=optimize_target)
                    task = dag.tasks[0]  # Keep: dag may have been deep-copied.
                    assert task.best_resources is not None, task

    backend.register_info(dag=dag,
                          optimize_target=optimize_target,
                          requested_features=requested_features)

    if task.storage_mounts is not None:
        # Optimizer should eventually choose where to store bucket
        task.sync_storage_mounts()

    try:
        if Stage.PROVISION in stages:
            if handle is None:
                handle = backend.provision(task,
                                           task.best_resources,
                                           dryrun=dryrun,
                                           stream_logs=stream_logs,
                                           cluster_name=cluster_name,
                                           retry_until_up=retry_until_up)

        if handle is None:
            assert dryrun, ('If not dryrun, handle must be set or '
                            'Stage.PROVISION must be included in stages.')
            logger.info('Dryrun finished.')
            return None, None

        if Stage.SYNC_WORKDIR in stages and not dryrun:
            if task.workdir is not None:
                backend.sync_workdir(handle, task.workdir)

        if Stage.SYNC_FILE_MOUNTS in stages and not dryrun:
            backend.sync_file_mounts(handle, task.file_mounts,
                                     task.storage_mounts)

        if no_setup:
            logger.info('Setup commands skipped.')
        elif Stage.SETUP in stages and not dryrun:
            backend.setup(handle, task, detach_setup=detach_setup)

        if Stage.PRE_EXEC in stages and not dryrun:
            if idle_minutes_to_autostop is not None:
                assert isinstance(backend, backends.CloudVmRayBackend)
                assert isinstance(handle, backends.CloudVmRayResourceHandle)
                backend.set_autostop(handle,
                                     idle_minutes_to_autostop,
                                     down=down)

        if Stage.EXEC in stages:
            try:
                global_user_state.update_last_use(handle.get_cluster_name())
                job_id = backend.execute(handle,
                                         task,
                                         detach_run,
                                         dryrun=dryrun)
            finally:
                # Enables post_execute() to be run after KeyboardInterrupt.
                backend.post_execute(handle, down)

        if Stage.DOWN in stages and not dryrun:
            if down and idle_minutes_to_autostop is None:
                backend.teardown_ephemeral_storage(task)
                backend.teardown(handle, terminate=True)
    finally:
        controller = controller_utils.Controllers.from_name(cluster_name)
        if controller is None and not _is_launched_by_sky_serve_controller:
            # UX: print live clusters to make users aware (to save costs).
            #
            # Don't print if this job is launched by the jobs controller,
            # because managed jobs are serverless, there can be many of them,
            # and users tend to continuously monitor managed jobs using `sky
            # job queue`. Also don't print if this job is a skyserve controller
            # job or launched by a skyserve controller job, because the
            # redirect for this subprocess.run won't success and it will
            # pollute the controller logs.
            #
            # Disable the usage collection for this status command.
            env = dict(os.environ,
                       **{env_options.Options.DISABLE_LOGGING.value: '1'})
            subprocess_utils.run(
                'sky status --no-show-managed-jobs --no-show-services', env=env)
        print()
        print('\x1b[?25h', end='')  # Show cursor.
    return job_id, handle


[docs]@timeline.event @usage_lib.entrypoint def launch( task: Union['sky.Task', 'sky.Dag'], cluster_name: Optional[str] = None, retry_until_up: bool = False, idle_minutes_to_autostop: Optional[int] = None, dryrun: bool = False, down: bool = False, stream_logs: bool = True, backend: Optional[backends.Backend] = None, optimize_target: optimizer.OptimizeTarget = optimizer.OptimizeTarget.COST, detach_setup: bool = False, detach_run: bool = False, no_setup: bool = False, clone_disk_from: Optional[str] = None, # Internal only: # pylint: disable=invalid-name _is_launched_by_jobs_controller: bool = False, _is_launched_by_sky_serve_controller: bool = False, _disable_controller_check: bool = False, ) -> Tuple[Optional[int], Optional[backends.ResourceHandle]]: # NOTE(dev): Keep the docstring consistent between the Python API and CLI. """Launch a cluster or task. The task's setup and run commands are executed under the task's workdir (when specified, it is synced to remote cluster). The task undergoes job queue scheduling on the cluster. Currently, the first argument must be a sky.Task, or (EXPERIMENTAL advanced usage) a sky.Dag. In the latter case, currently it must contain a single task; support for pipelines/general DAGs are in experimental branches. Args: task: sky.Task, or sky.Dag (experimental; 1-task only) to launch. cluster_name: name of the cluster to create/reuse. If None, auto-generate a name. retry_until_up: whether to retry launching the cluster until it is up. idle_minutes_to_autostop: automatically stop the cluster after this many minute of idleness, i.e., no running or pending jobs in the cluster's job queue. Idleness gets reset whenever setting-up/ running/pending jobs are found in the job queue. Setting this flag is equivalent to running ``sky.launch(..., detach_run=True, ...)`` and then ``sky.autostop(idle_minutes=<minutes>)``. If not set, the cluster will not be autostopped. down: Tear down the cluster after all jobs finish (successfully or abnormally). If --idle-minutes-to-autostop is also set, the cluster will be torn down after the specified idle time. Note that if errors occur during provisioning/data syncing/setting up, the cluster will not be torn down for debugging purposes. dryrun: if True, do not actually launch the cluster. stream_logs: if True, show the logs in the terminal. backend: backend to use. If None, use the default backend (CloudVMRayBackend). optimize_target: target to optimize for. Choices: OptimizeTarget.COST, OptimizeTarget.TIME. detach_setup: If True, run setup in non-interactive mode as part of the job itself. You can safely ctrl-c to detach from logging, and it will not interrupt the setup process. To see the logs again after detaching, use `sky logs`. To cancel setup, cancel the job via `sky cancel`. Useful for long-running setup commands. detach_run: If True, as soon as a job is submitted, return from this function and do not stream execution logs. no_setup: if True, do not re-run setup commands. clone_disk_from: [Experimental] if set, clone the disk from the specified cluster. This is useful to migrate the cluster to a different availability zone or region. Example: .. code-block:: python import sky task = sky.Task(run='echo hello SkyPilot') task.set_resources( sky.Resources(cloud=sky.AWS(), accelerators='V100:4')) sky.launch(task, cluster_name='my-cluster') Raises: exceptions.ClusterOwnerIdentityMismatchError: if the cluster is owned by another user. exceptions.InvalidClusterNameError: if the cluster name is invalid. exceptions.ResourcesMismatchError: if the requested resources do not match the existing cluster. exceptions.NotSupportedError: if required features are not supported by the backend/cloud/cluster. exceptions.ResourcesUnavailableError: if the requested resources cannot be satisfied. The failover_history of the exception will be set as: 1. Empty: iff the first-ever sky.optimize() fails to find a feasible resource; no pre-check or actual launch is attempted. 2. Non-empty: iff at least 1 exception from either our pre-checks (e.g., cluster name invalid) or a region/zone throwing resource unavailability. exceptions.CommandError: any ssh command error. exceptions.NoCloudAccessError: if all clouds are disabled. Other exceptions may be raised depending on the backend. Returns: job_id: Optional[int]; the job ID of the submitted job. None if the backend is not CloudVmRayBackend, or no job is submitted to the cluster. handle: Optional[backends.ResourceHandle]; the handle to the cluster. None if dryrun. """ entrypoint = task if not _disable_controller_check: controller_utils.check_cluster_name_not_controller( cluster_name, operation_str='sky.launch') return _execute( entrypoint=entrypoint, dryrun=dryrun, down=down, stream_logs=stream_logs, handle=None, backend=backend, retry_until_up=retry_until_up, optimize_target=optimize_target, cluster_name=cluster_name, detach_setup=detach_setup, detach_run=detach_run, idle_minutes_to_autostop=idle_minutes_to_autostop, no_setup=no_setup, clone_disk_from=clone_disk_from, _is_launched_by_jobs_controller=_is_launched_by_jobs_controller, _is_launched_by_sky_serve_controller= _is_launched_by_sky_serve_controller, )
[docs]@usage_lib.entrypoint def exec( # pylint: disable=redefined-builtin task: Union['sky.Task', 'sky.Dag'], cluster_name: str, dryrun: bool = False, down: bool = False, stream_logs: bool = True, backend: Optional[backends.Backend] = None, detach_run: bool = False, ) -> Tuple[Optional[int], Optional[backends.ResourceHandle]]: # NOTE(dev): Keep the docstring consistent between the Python API and CLI. """Execute a task on an existing cluster. This function performs two actions: (1) workdir syncing, if the task has a workdir specified; (2) executing the task's ``run`` commands. All other steps (provisioning, setup commands, file mounts syncing) are skipped. If any of those specifications changed in the task, this function will not reflect those changes. To ensure a cluster's setup is up to date, use ``sky.launch()`` instead. Execution and scheduling behavior: - The task will undergo job queue scheduling, respecting any specified resource requirement. It can be executed on any node of the cluster with enough resources. - The task is run under the workdir (if specified). - The task is run non-interactively (without a pseudo-terminal or pty), so interactive commands such as ``htop`` do not work. Use ``ssh my_cluster`` instead. Args: task: sky.Task, or sky.Dag (experimental; 1-task only) containing the task to execute. cluster_name: name of an existing cluster to execute the task. down: Tear down the cluster after all jobs finish (successfully or abnormally). If --idle-minutes-to-autostop is also set, the cluster will be torn down after the specified idle time. Note that if errors occur during provisioning/data syncing/setting up, the cluster will not be torn down for debugging purposes. dryrun: if True, do not actually execute the task. stream_logs: if True, show the logs in the terminal. backend: backend to use. If None, use the default backend (CloudVMRayBackend). detach_run: if True, detach from logging once the task has been submitted. Raises: ValueError: if the specified cluster does not exist or is not in UP status. sky.exceptions.NotSupportedError: if the specified cluster is a controller that does not support this operation. Returns: job_id: Optional[int]; the job ID of the submitted job. None if the backend is not CloudVmRayBackend, or no job is submitted to the cluster. handle: Optional[backends.ResourceHandle]; the handle to the cluster. None if dryrun. """ entrypoint = task if isinstance(entrypoint, sky.Dag): logger.warning( f'{colorama.Fore.YELLOW}Passing a sky.Dag to sky.exec() is ' 'deprecated. Pass sky.Task instead.' f'{colorama.Style.RESET_ALL}') controller_utils.check_cluster_name_not_controller(cluster_name, operation_str='sky.exec') handle = backend_utils.check_cluster_available( cluster_name, operation='executing tasks', check_cloud_vm_ray_backend=False, dryrun=dryrun) return _execute( entrypoint=entrypoint, dryrun=dryrun, down=down, stream_logs=stream_logs, handle=handle, backend=backend, stages=[ Stage.SYNC_WORKDIR, Stage.EXEC, ], cluster_name=cluster_name, detach_run=detach_run, )