Admin Policy Enforcement#

SkyPilot provides an admin policy mechanism that admins can use to enforce certain policies on users’ SkyPilot usage. An admin policy applies custom validation and mutation logic to a user’s tasks and SkyPilot config.

Example usage:

Overview#

SkyPilot has a client-server architecture, where a centralized API server can be deployed and users can interact with the server through a client.

To deploy a admin policy, here are the steps:

  1. Implement the policy in a Python package or host the policy as a RESTful server.

  2. Deploy the policy at the server-side and/or the client-side.

Hint

If admin policy is set at both client-side and server-side, the policy at client-side will be applied first, the mutated tasks and SkyPilot config will then be processed by the policy at server-side.

Client-side#

You can get the policy applied at the client-side by following the steps below.

Set the admin_policy field in the SkyPilot config to the hosted policy URL.

admin_policy: https://example.com/policy

First, install the Python package that implements the policy:

pip install mypackage.subpackage

Then, set the admin_policy field in the SkyPilot config to the path of the Python package that implements the policy.

admin_policy: mypackage.subpackage.MyPolicy

Optionally, you can also apply different policies in different projects by leveraging the layered config, e.g. set a different policy in $pwd/.sky.yaml for the current project:

admin_policy: mypackage.subpackage.AnotherPolicy

Hint

SkyPilot loads the policy from the given package in the same Python environment. You can test the existence of the policy by running:

python -c "from mypackage.subpackage import MyPolicy"

Server-side#

If you have a centralized API server deployed, you can enforce a policy for all users by setting it at the server-side.

Open the server’s dashboard, go to the server’s SkyPilot config and set the admin_policy field to the API URL of the policy server.

admin_policy: https://example.com/policy

First, install the Python package that implements the policy on the API server host:

pip install mypackage.subpackage

For helm deployment, refer to Setting an admin policy to install the policy package.

Then, open the server’s dashboard, go to the server’s SkyPilot config and set the admin_policy field to the path of the Python package that implements the policy.

admin_policy: mypackage.subpackage.MyPolicy

Host admin policy as a RESTful server#

You can host an admin policy as a RESTful API server and configure the SkyPilot to call the RESTful url to apply the policy.

It is recommended to inherit your implementation from the AdminPolicy interface to ensure the request and response body are correctly typed. Here is an example of implementing a policy server using Python and FastAPI:

Example Policy Server
#!/usr/bin/env python3
"""Example RESTful admin policy server for SkyPilot."""

import argparse
from typing import List

import example_policy
from fastapi import FastAPI
from fastapi import Request
from fastapi.responses import JSONResponse
import uvicorn

import sky

app = FastAPI(title="Example Admin Policy Server", version="1.0.0")


class DoNothingPolicy(sky.AdminPolicy):
    """Example policy: do nothing."""

    @classmethod
    def validate_and_mutate(
            cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
        """Returns the user request unchanged."""
        return sky.MutatedUserRequest(user_request.task,
                                      user_request.skypilot_config)


@app.post('/')
async def apply_policy(request: Request) -> JSONResponse:
    """Apply an admin policy loaded from external package to a user request"""
    # Decode
    json_data = await request.json()
    user_request = sky.UserRequest.decode(json_data)
    # Example: change the following list to apply different policies.
    policies: List[sky.AdminPolicy] = [
        # Example: policy that implemented in the server package.
        DoNothingPolicy,
        # Example: policy from third party packages.
        example_policy.UseSpotForGpuPolicy,
    ]
    try:
        for policy in policies:
            mutated_request = policy.validate_and_mutate(user_request)
            user_request.task = mutated_request.task
            user_request.skypilot_config = mutated_request.skypilot_config
    except Exception as e:  # pylint: disable=broad-except
        return JSONResponse(content=str(e), status_code=400)

    return JSONResponse(content=mutated_request.encode())


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--host',
                        default='0.0.0.0',
                        help='Host to bind to (default: 0.0.0.0)')
    parser.add_argument('--port',
                        type=int,
                        default=8080,
                        help='Port to bind to (default: 8080)')
    args = parser.parse_args()
    uvicorn.run(app,
                workers=1,
                host=args.host,
                port=args.port,
                log_level="info")

Note

Since the policy server is typically centralized, hosting admin policies that contains local operations (e.g. Use local GCP credentials for all tasks) at the policy server may lead to unexpected behavior and thus is not recommended.

Optionally, the server can also be implemented in other languages as long as it follows the API convention:

The Admin Policy API

POST /<api-path>

Request body is a marshalled sky.UserRequest in JSON format:

{
  "task": {
    "name": "sky-cmd",
    "resources": {
      "cpus": "1+",
    },
    "num_nodes": 1,
  },
  "skypilot_config": {},
  "request_options": {
    "cluster_name": "test",
    "idle_minutes_to_autostop": null,
    "down": false,
    "dryrun": false
  },
  "at_client_side": false
}

Response body is a marshalled sky.MutatedUserRequest in JSON format:

{
  "task": {
    "name": "sky-cmd",
    "resources": {
      "cpus": "1+",
    },
    "num_nodes": 1,
  },
  "skypilot_config": {}
}

Implement an admin policy package#

Admin policies are implemented by extending the sky.AdminPolicy interface:

class AdminPolicy(PolicyInterface):
    """Abstract interface of an admin-defined policy for all user requests.

    Admins can implement a subclass of AdminPolicy with the following signature:

        import sky

        class SkyPilotPolicyV1(sky.AdminPolicy):
            def validate_and_mutate(user_request: UserRequest) -> MutatedUserRequest:
                ...
                return MutatedUserRequest(task=..., skypilot_config=...)

    The policy can mutate both task and skypilot_config. Admins then distribute
    a simple module that contains this implementation, installable in a way
    that it can be imported by users from the same Python environment where
    SkyPilot is running.

    Users can register a subclass of AdminPolicy in the SkyPilot config file
    under the key 'admin_policy', e.g.

        admin_policy: my_package.SkyPilotPolicyV1
    """

    @classmethod
    @abc.abstractmethod
    def validate_and_mutate(cls,
                            user_request: UserRequest) -> MutatedUserRequest:
        """Validates and mutates the user request and returns mutated request.

        Args:
            user_request: The user request to validate and mutate.
                UserRequest contains (sky.Task, sky.Config)

        Returns:
            MutatedUserRequest: The mutated user request.

        Raises:
            Exception to throw if the user request failed the validation.
        """
        raise NotImplementedError(
            'Your policy must implement validate_and_mutate')

    def apply(self, user_request: UserRequest) -> MutatedUserRequest:
        return self.validate_and_mutate(user_request)

Your custom admin policy should look like this:

import sky

class MyPolicy(sky.AdminPolicy):
    @classmethod
    def validate_and_mutate(cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
        # Logic for validate and modify user requests.
        ...
        return sky.MutatedUserRequest(user_request.task,
                                      user_request.skypilot_config)

UserRequest and MutatedUserRequest are defined as follows (see source code for more details):

@dataclasses.dataclass
class UserRequest:
    """A user request.

    A "user request" is defined as a `sky launch / exec` command or its API
    equivalent.

    `sky jobs launch / serve up` involves multiple launch requests, including
    the launch of controller and clusters for a job (which can have multiple
    tasks if it is a pipeline) or service replicas. Each launch is a separate
    request.

    This class wraps the underlying task, the global skypilot config used to run
    a task, and the request options.

    Args:
        task: User specified task.
        skypilot_config: Global skypilot config to be used in this request.
        request_options: Request options. It is None for jobs and services.
        at_client_side: Is the request intercepted by the policy at client-side?
    """
    task: 'sky.Task'
    skypilot_config: 'sky.Config'
    request_options: Optional['RequestOptions'] = None
    at_client_side: bool = False

    def encode(self) -> str:
        return _UserRequestBody(
            task=common_utils.dump_yaml_str(self.task.to_yaml_config()),
            skypilot_config=common_utils.dump_yaml_str(
                dict(self.skypilot_config)),
            request_options=self.request_options,
            at_client_side=self.at_client_side,
        ).model_dump_json()

    @classmethod
    def decode(cls, body: str) -> 'UserRequest':
        user_request_body = _UserRequestBody.model_validate_json(body)
        return cls(
            task=sky.Task.from_yaml_config(
                common_utils.read_yaml_all_str(user_request_body.task)[0]),
            skypilot_config=config_utils.Config.from_dict(
                common_utils.read_yaml_all_str(
                    user_request_body.skypilot_config)[0]),
            request_options=user_request_body.request_options,
            at_client_side=user_request_body.at_client_side,
        )
@dataclasses.dataclass
class MutatedUserRequest:
    """Mutated user request."""

    task: 'sky.Task'
    skypilot_config: 'sky.Config'

    def encode(self) -> str:
        return _MutatedUserRequestBody(
            task=common_utils.dump_yaml_str(self.task.to_yaml_config()),
            skypilot_config=common_utils.dump_yaml_str(
                dict(self.skypilot_config),)).model_dump_json()

    @classmethod
    def decode(cls, mutated_user_request_body: str) -> 'MutatedUserRequest':
        mutated_user_request_body = _MutatedUserRequestBody.model_validate_json(
            mutated_user_request_body)
        return cls(task=sky.Task.from_yaml_config(
            common_utils.read_yaml_all_str(mutated_user_request_body.task)[0]),
                   skypilot_config=config_utils.Config.from_dict(
                       common_utils.read_yaml_all_str(
                           mutated_user_request_body.skypilot_config)[0],))

In other words, an AdminPolicy can mutate any fields of a user request, including the task and the skypilot config for that specific user request, giving admins a lot of flexibility to control user’s SkyPilot usage.

An AdminPolicy can be used to both validate and mutate user requests. If a request should be rejected, the policy should raise an exception.

The sky.Config and sky.RequestOptions classes are defined as follows:

class Config(Dict[str, Any]):
    """SkyPilot config that supports setting/getting values with nested keys."""

    def get_nested(
        self,
        keys: Tuple[str, ...],
        default_value: Any,
        override_configs: Optional[Dict[str, Any]] = None,
        allowed_override_keys: Optional[List[Tuple[str, ...]]] = None,
        disallowed_override_keys: Optional[List[Tuple[str,
                                                      ...]]] = None) -> Any:
        """Gets a nested key.

        If any key is not found, or any intermediate key does not point to a
        dict value, returns 'default_value'.

        Args:
            keys: A tuple of strings representing the nested keys.
            default_value: The default value to return if the key is not found.
            override_configs: A dict of override configs with the same schema as
                the config file, but only containing the keys to override.
            allowed_override_keys: A list of keys that are allowed to be
                overridden.
            disallowed_override_keys: A list of keys that are disallowed to be
                overridden.

        Returns:
            The value of the nested key, or 'default_value' if not found.
        """
        config = copy.deepcopy(self)
        if override_configs is not None:
            config = _recursive_update(config, override_configs,
                                       allowed_override_keys,
                                       disallowed_override_keys)
        return _get_nested(config, keys, default_value, pop=False)

    def set_nested(self, keys: Tuple[str, ...], value: Any) -> None:
        """In-place sets a nested key to value.

        Like get_nested(), if any key is not found, this will not raise an
        error.
        """
        override = {}
        for i, key in enumerate(reversed(keys)):
            if i == 0:
                override = {key: value}
            else:
                override = {key: override}
        _recursive_update(self, override)

    def pop_nested(self, keys: Tuple[str, ...], default_value: Any) -> Any:
        """Pops a nested key."""
        return _get_nested(self, keys, default_value, pop=True)

    @classmethod
    def from_dict(cls, config: Optional[Dict[str, Any]]) -> 'Config':
        if config is None:
            return cls()
        return cls(**config)
class RequestOptions(pydantic.BaseModel):
    """Request options for admin policy.

    Args:
        cluster_name: Name of the cluster to create/reuse. It is None if not
            specified by the user.
        idle_minutes_to_autostop: Autostop setting requested by a user. The
            cluster will be set to autostop after this many minutes of idleness.
        down: If true, use autodown rather than autostop.
        dryrun: Is the request a dryrun?
    """
    cluster_name: Optional[str]
    # Keep these two fields for backward compatibility. The values are copied
    # from task.resources.autostop_config, so that legacy admin policy plugins
    # can still read the correct autostop config from request options before
    # we drop the compatibility.
    # TODO(aylei): remove these fields after 0.12.0
    idle_minutes_to_autostop: Optional[int]
    down: bool
    dryrun: bool

Example policies#

We have provided a few example policies in examples/admin_policy/example_policy. You can test these policies by installing the example policy package in your Python environment.

git clone https://github.com/skypilot-org/skypilot.git
cd skypilot
pip install examples/admin_policy/example_policy

Reject all tasks#

class RejectAllPolicy(sky.AdminPolicy):
    """Example policy: rejects all user requests."""

    @classmethod
    def validate_and_mutate(
            cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
        """Rejects all user requests."""
        raise RuntimeError('Reject all policy')
admin_policy: example_policy.RejectAllPolicy

Add labels for all tasks on Kubernetes#

class AddLabelsPolicy(sky.AdminPolicy):
    """Example policy: adds a kubernetes label for skypilot_config."""

    @classmethod
    def validate_and_mutate(
            cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
        config = user_request.skypilot_config
        labels = config.get_nested(('kubernetes', 'custom_metadata', 'labels'),
                                   {})
        labels['app'] = 'skypilot'
        config.set_nested(('kubernetes', 'custom_metadata', 'labels'), labels)
        return sky.MutatedUserRequest(user_request.task, config)
admin_policy: example_policy.AddLabelsPolicy

Always disable public IP for AWS tasks#

class DisablePublicIpPolicy(sky.AdminPolicy):
    """Example policy: disables public IP for all AWS tasks."""

    @classmethod
    def validate_and_mutate(
            cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
        config = user_request.skypilot_config
        config.set_nested(('aws', 'use_internal_ip'), True)
        if config.get_nested(('aws', 'vpc_name'), None) is None:
            # If no VPC name is specified, it is likely a mistake. We should
            # reject the request
            raise RuntimeError('VPC name should be set. Check organization '
                               'wiki for more information.')
        return sky.MutatedUserRequest(user_request.task, config)
admin_policy: example_policy.DisablePublicIpPolicy

Use spot for all GPU tasks#

class UseSpotForGpuPolicy(sky.AdminPolicy):
    """Example policy: use spot instances for all GPU tasks."""

    @classmethod
    def validate_and_mutate(
            cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
        """Sets use_spot to True for all GPU tasks."""
        task = user_request.task
        new_resources = []
        for r in task.resources:
            if r.accelerators:
                new_resources.append(r.copy(use_spot=True))
            else:
                new_resources.append(r)

        task.set_resources(type(task.resources)(new_resources))

        return sky.MutatedUserRequest(
            task=task, skypilot_config=user_request.skypilot_config)
admin_policy: example_policy.UseSpotForGpuPolicy

Enforce autostop for all tasks#

class EnforceAutostopPolicy(sky.AdminPolicy):
    """Example policy: enforce autostop for all tasks."""

    @classmethod
    def validate_and_mutate(
            cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
        """Enforces autostop for all tasks.

        Note that with this policy enforced, users can still change the autostop
        setting for an existing cluster by using `sky autostop`.

        Since we refresh the cluster status with `sky.status` whenever this
        policy is applied, we should expect a few seconds latency when a user
        run a request.
        """
        request_options = user_request.request_options

        # Request options is None when a task is executed with `jobs launch` or
        # `sky serve up`.
        if request_options is None:
            return sky.MutatedUserRequest(
                task=user_request.task,
                skypilot_config=user_request.skypilot_config)

        # Get the cluster record to operate on.
        cluster_name = request_options.cluster_name
        cluster_records = []
        if cluster_name is not None:
            cluster_records = sky.status(cluster_name,
                                         refresh=True,
                                         all_users=True)

        # Check if the user request should specify autostop settings.
        need_autostop = False
        if not cluster_records:
            # Cluster does not exist
            need_autostop = True
        elif cluster_records[0]['status'] == sky.ClusterStatus.STOPPED:
            # Cluster is stopped
            need_autostop = True
        elif cluster_records[0]['autostop'] < 0:
            # Cluster is running but autostop is not set
            need_autostop = True

        # Check if the user request is setting autostop settings.
        is_setting_autostop = False
        idle_minutes_to_autostop = request_options.idle_minutes_to_autostop
        is_setting_autostop = (idle_minutes_to_autostop is not None and
                               idle_minutes_to_autostop >= 0)

        # If the cluster requires autostop but the user request is not setting
        # autostop settings, raise an error.
        if need_autostop and not is_setting_autostop:
            raise RuntimeError('Autostop/down must be set for all clusters.')

        return sky.MutatedUserRequest(
            task=user_request.task,
            skypilot_config=user_request.skypilot_config)
admin_policy: example_policy.EnforceAutostopPolicy

Dynamically update Kubernetes contexts to use#

class DynamicKubernetesContextsUpdatePolicy(sky.AdminPolicy):
    """Example policy: update the kubernetes context to use."""

    @classmethod
    def validate_and_mutate(
            cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
        """Updates the kubernetes context to use."""
        # Append any new kubernetes clusters in local kubeconfig. An example
        # implementation of this method can be:
        #  1. Query an organization's internal Kubernetes cluster registry,
        #     which can be some internal API, or a secret vault.
        #  2. Append the new credentials to the local kubeconfig.
        update_current_kubernetes_clusters_from_registry()
        # Get the allowed contexts for the user. Similarly, it can retrieve
        # the latest allowed contexts from an organization's internal API.
        allowed_contexts = get_allowed_contexts()

        # Update the kubernetes allowed contexts in skypilot config.
        config = user_request.skypilot_config
        config.set_nested(('kubernetes', 'allowed_contexts'), allowed_contexts)
        return sky.MutatedUserRequest(task=user_request.task,
                                      skypilot_config=config)
admin_policy: example_policy.DynamicKubernetesContextsUpdatePolicy

Use local GCP credentials for all tasks#

class UseLocalGcpCredentialsPolicy(sky.AdminPolicy):
    """Example policy: use local GCP credentials in the task."""

    @classmethod
    def validate_and_mutate(
            cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
        # Only apply the policy at client-side.
        if not user_request.at_client_side:
            return sky.MutatedUserRequest(user_request.task,
                                          user_request.skypilot_config)

        task = user_request.task
        if task.file_mounts is None:
            task.file_mounts = {}
        # Use the env var to detect whether an explicit credential path is
        # specified.
        cred_path = os.environ.get(_GOOGLE_APPLICATION_CREDENTIALS_ENV)

        if cred_path is not None:
            task.file_mounts[_GOOGLE_APPLICATION_CREDENTIALS_PATH] = cred_path
            activate_cmd = (f'gcloud auth activate-service-account --key-file '
                            f'{_GOOGLE_APPLICATION_CREDENTIALS_PATH}')
            if task.run is None:
                task.run = activate_cmd
            elif isinstance(task.run, str):
                task.run = f'{activate_cmd} && {task.run}'
            else:
                # Impossible according to current code base, but just in case.
                logger.warning('The task run command is not a string, '
                               f'so the local {cred_path} will not be used.')
        else:
            # Otherwise upload the entire default credential directory to get
            # consistent identity in the task and the local environment.
            task.file_mounts['~/.config/gcloud'] = '~/.config/gcloud'
        return sky.MutatedUserRequest(task, user_request.skypilot_config)
admin_policy: example_policy.UseLocalGcpCredentialsPolicy

Note

This policy only take effects when applied at client-side. Use this policy at the server-side will be a no-op.

Add volumes to all tasks#

class AddVolumesPolicy(sky.AdminPolicy):
    """Example policy: add volumes to the task."""

    @classmethod
    def validate_and_mutate(
            cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
        task = user_request.task
        # Use `task.set_volumes` to set the volumes.
        # Or use `task.update_volumes` to update in-place
        # instead of overwriting.
        task.set_volumes({'/mnt/data0': 'pvc0'})
        return sky.MutatedUserRequest(task, user_request.skypilot_config)
admin_policy: example_policy.AddVolumesPolicy