Admin Policy Enforcement#
SkyPilot provides an admin policy mechanism that admins can use to enforce certain policies on users’ SkyPilot usage. An admin policy applies custom validation and mutation logic to a user’s tasks and SkyPilot config.
Example usage:
Overview#
SkyPilot has a client-server architecture, where a centralized API server can be deployed and users can interact with the server through a client.
To deploy a admin policy, here are the steps:
Implement the policy in a Python package or host the policy as a RESTful server.
Deploy the policy at the server-side and/or the client-side.
Hint
If admin policy is set at both client-side and server-side, the policy at client-side will be applied first, the mutated tasks and SkyPilot config will then be processed by the policy at server-side.
Client-side#
You can get the policy applied at the client-side by following the steps below.
Set the admin_policy field in the SkyPilot config to the hosted policy URL.
admin_policy: https://example.com/policy
First, install the Python package that implements the policy:
pip install mypackage.subpackage
Then, set the admin_policy field in the SkyPilot config to the path of the Python package that implements the policy.
admin_policy: mypackage.subpackage.MyPolicy
Optionally, you can also apply different policies in different projects by leveraging the layered config, e.g. set a different policy in $pwd/.sky.yaml
for the current project:
admin_policy: mypackage.subpackage.AnotherPolicy
Hint
SkyPilot loads the policy from the given package in the same Python environment. You can test the existence of the policy by running:
python -c "from mypackage.subpackage import MyPolicy"
Server-side#
If you have a centralized API server deployed, you can enforce a policy for all users by setting it at the server-side.
Open the server’s dashboard, go to the server’s SkyPilot config and set the admin_policy field to the API URL of the policy server.
admin_policy: https://example.com/policy
First, install the Python package that implements the policy on the API server host:
pip install mypackage.subpackage
For helm deployment, refer to Setting an admin policy to install the policy package.
Then, open the server’s dashboard, go to the server’s SkyPilot config and set the admin_policy field to the path of the Python package that implements the policy.
admin_policy: mypackage.subpackage.MyPolicy
Host admin policy as a RESTful server#
You can host an admin policy as a RESTful API server and configure the SkyPilot to call the RESTful url to apply the policy.
It is recommended to inherit your implementation from the AdminPolicy interface to ensure the request and response body are correctly typed. Here is an example of implementing a policy server using Python and FastAPI:
Example Policy Server
#!/usr/bin/env python3
"""Example RESTful admin policy server for SkyPilot."""
import argparse
from typing import List
import example_policy
from fastapi import FastAPI
from fastapi import Request
from fastapi.responses import JSONResponse
import uvicorn
import sky
app = FastAPI(title="Example Admin Policy Server", version="1.0.0")
class DoNothingPolicy(sky.AdminPolicy):
"""Example policy: do nothing."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
"""Returns the user request unchanged."""
return sky.MutatedUserRequest(user_request.task,
user_request.skypilot_config)
@app.post('/')
async def apply_policy(request: Request) -> JSONResponse:
"""Apply an admin policy loaded from external package to a user request"""
# Decode
json_data = await request.json()
user_request = sky.UserRequest.decode(json_data)
# Example: change the following list to apply different policies.
policies: List[sky.AdminPolicy] = [
# Example: policy that implemented in the server package.
DoNothingPolicy,
# Example: policy from third party packages.
example_policy.UseSpotForGpuPolicy,
]
try:
for policy in policies:
mutated_request = policy.validate_and_mutate(user_request)
user_request.task = mutated_request.task
user_request.skypilot_config = mutated_request.skypilot_config
except Exception as e: # pylint: disable=broad-except
return JSONResponse(content=str(e), status_code=400)
return JSONResponse(content=mutated_request.encode())
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--host',
default='0.0.0.0',
help='Host to bind to (default: 0.0.0.0)')
parser.add_argument('--port',
type=int,
default=8080,
help='Port to bind to (default: 8080)')
args = parser.parse_args()
uvicorn.run(app,
workers=1,
host=args.host,
port=args.port,
log_level="info")
Note
Since the policy server is typically centralized, hosting admin policies that contains local operations (e.g. Use local GCP credentials for all tasks) at the policy server may lead to unexpected behavior and thus is not recommended.
Optionally, the server can also be implemented in other languages as long as it follows the API convention:
The Admin Policy API
POST /<api-path>
Request body is a marshalled sky.UserRequest in JSON format:
{
"task": {
"name": "sky-cmd",
"resources": {
"cpus": "1+",
},
"num_nodes": 1,
},
"skypilot_config": {},
"request_options": {
"cluster_name": "test",
"idle_minutes_to_autostop": null,
"down": false,
"dryrun": false
},
"at_client_side": false
}
Response body is a marshalled sky.MutatedUserRequest in JSON format:
{
"task": {
"name": "sky-cmd",
"resources": {
"cpus": "1+",
},
"num_nodes": 1,
},
"skypilot_config": {}
}
Implement an admin policy package#
Admin policies are implemented by extending the sky.AdminPolicy
interface:
class AdminPolicy(PolicyInterface):
"""Abstract interface of an admin-defined policy for all user requests.
Admins can implement a subclass of AdminPolicy with the following signature:
import sky
class SkyPilotPolicyV1(sky.AdminPolicy):
def validate_and_mutate(user_request: UserRequest) -> MutatedUserRequest:
...
return MutatedUserRequest(task=..., skypilot_config=...)
The policy can mutate both task and skypilot_config. Admins then distribute
a simple module that contains this implementation, installable in a way
that it can be imported by users from the same Python environment where
SkyPilot is running.
Users can register a subclass of AdminPolicy in the SkyPilot config file
under the key 'admin_policy', e.g.
admin_policy: my_package.SkyPilotPolicyV1
"""
@classmethod
@abc.abstractmethod
def validate_and_mutate(cls,
user_request: UserRequest) -> MutatedUserRequest:
"""Validates and mutates the user request and returns mutated request.
Args:
user_request: The user request to validate and mutate.
UserRequest contains (sky.Task, sky.Config)
Returns:
MutatedUserRequest: The mutated user request.
Raises:
Exception to throw if the user request failed the validation.
"""
raise NotImplementedError(
'Your policy must implement validate_and_mutate')
def apply(self, user_request: UserRequest) -> MutatedUserRequest:
return self.validate_and_mutate(user_request)
Your custom admin policy should look like this:
import sky
class MyPolicy(sky.AdminPolicy):
@classmethod
def validate_and_mutate(cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
# Logic for validate and modify user requests.
...
return sky.MutatedUserRequest(user_request.task,
user_request.skypilot_config)
UserRequest
and MutatedUserRequest
are defined as follows (see source code for more details):
@dataclasses.dataclass
class UserRequest:
"""A user request.
A "user request" is defined as a `sky launch / exec` command or its API
equivalent.
`sky jobs launch / serve up` involves multiple launch requests, including
the launch of controller and clusters for a job (which can have multiple
tasks if it is a pipeline) or service replicas. Each launch is a separate
request.
This class wraps the underlying task, the global skypilot config used to run
a task, and the request options.
Args:
task: User specified task.
skypilot_config: Global skypilot config to be used in this request.
request_options: Request options. It is None for jobs and services.
at_client_side: Is the request intercepted by the policy at client-side?
"""
task: 'sky.Task'
skypilot_config: 'sky.Config'
request_options: Optional['RequestOptions'] = None
at_client_side: bool = False
def encode(self) -> str:
return _UserRequestBody(
task=common_utils.dump_yaml_str(self.task.to_yaml_config()),
skypilot_config=common_utils.dump_yaml_str(
dict(self.skypilot_config)),
request_options=self.request_options,
at_client_side=self.at_client_side,
).model_dump_json()
@classmethod
def decode(cls, body: str) -> 'UserRequest':
user_request_body = _UserRequestBody.model_validate_json(body)
return cls(
task=sky.Task.from_yaml_config(
common_utils.read_yaml_all_str(user_request_body.task)[0]),
skypilot_config=config_utils.Config.from_dict(
common_utils.read_yaml_all_str(
user_request_body.skypilot_config)[0]),
request_options=user_request_body.request_options,
at_client_side=user_request_body.at_client_side,
)
@dataclasses.dataclass
class MutatedUserRequest:
"""Mutated user request."""
task: 'sky.Task'
skypilot_config: 'sky.Config'
def encode(self) -> str:
return _MutatedUserRequestBody(
task=common_utils.dump_yaml_str(self.task.to_yaml_config()),
skypilot_config=common_utils.dump_yaml_str(
dict(self.skypilot_config),)).model_dump_json()
@classmethod
def decode(cls, mutated_user_request_body: str) -> 'MutatedUserRequest':
mutated_user_request_body = _MutatedUserRequestBody.model_validate_json(
mutated_user_request_body)
return cls(task=sky.Task.from_yaml_config(
common_utils.read_yaml_all_str(mutated_user_request_body.task)[0]),
skypilot_config=config_utils.Config.from_dict(
common_utils.read_yaml_all_str(
mutated_user_request_body.skypilot_config)[0],))
In other words, an AdminPolicy
can mutate any fields of a user request, including
the task and the skypilot config for that specific user request,
giving admins a lot of flexibility to control user’s SkyPilot usage.
An AdminPolicy
can be used to both validate and mutate user requests. If
a request should be rejected, the policy should raise an exception.
The sky.Config
and sky.RequestOptions
classes are defined as follows:
class Config(Dict[str, Any]):
"""SkyPilot config that supports setting/getting values with nested keys."""
def get_nested(
self,
keys: Tuple[str, ...],
default_value: Any,
override_configs: Optional[Dict[str, Any]] = None,
allowed_override_keys: Optional[List[Tuple[str, ...]]] = None,
disallowed_override_keys: Optional[List[Tuple[str,
...]]] = None) -> Any:
"""Gets a nested key.
If any key is not found, or any intermediate key does not point to a
dict value, returns 'default_value'.
Args:
keys: A tuple of strings representing the nested keys.
default_value: The default value to return if the key is not found.
override_configs: A dict of override configs with the same schema as
the config file, but only containing the keys to override.
allowed_override_keys: A list of keys that are allowed to be
overridden.
disallowed_override_keys: A list of keys that are disallowed to be
overridden.
Returns:
The value of the nested key, or 'default_value' if not found.
"""
config = copy.deepcopy(self)
if override_configs is not None:
config = _recursive_update(config, override_configs,
allowed_override_keys,
disallowed_override_keys)
return _get_nested(config, keys, default_value, pop=False)
def set_nested(self, keys: Tuple[str, ...], value: Any) -> None:
"""In-place sets a nested key to value.
Like get_nested(), if any key is not found, this will not raise an
error.
"""
override = {}
for i, key in enumerate(reversed(keys)):
if i == 0:
override = {key: value}
else:
override = {key: override}
_recursive_update(self, override)
def pop_nested(self, keys: Tuple[str, ...], default_value: Any) -> Any:
"""Pops a nested key."""
return _get_nested(self, keys, default_value, pop=True)
@classmethod
def from_dict(cls, config: Optional[Dict[str, Any]]) -> 'Config':
if config is None:
return cls()
return cls(**config)
class RequestOptions(pydantic.BaseModel):
"""Request options for admin policy.
Args:
cluster_name: Name of the cluster to create/reuse. It is None if not
specified by the user.
idle_minutes_to_autostop: Autostop setting requested by a user. The
cluster will be set to autostop after this many minutes of idleness.
down: If true, use autodown rather than autostop.
dryrun: Is the request a dryrun?
"""
cluster_name: Optional[str]
# Keep these two fields for backward compatibility. The values are copied
# from task.resources.autostop_config, so that legacy admin policy plugins
# can still read the correct autostop config from request options before
# we drop the compatibility.
# TODO(aylei): remove these fields after 0.12.0
idle_minutes_to_autostop: Optional[int]
down: bool
dryrun: bool
Example policies#
We have provided a few example policies in examples/admin_policy/example_policy. You can test these policies by installing the example policy package in your Python environment.
git clone https://github.com/skypilot-org/skypilot.git
cd skypilot
pip install examples/admin_policy/example_policy
Reject all tasks#
class RejectAllPolicy(sky.AdminPolicy):
"""Example policy: rejects all user requests."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
"""Rejects all user requests."""
raise RuntimeError('Reject all policy')
admin_policy: example_policy.RejectAllPolicy
Add labels for all tasks on Kubernetes#
class AddLabelsPolicy(sky.AdminPolicy):
"""Example policy: adds a kubernetes label for skypilot_config."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
config = user_request.skypilot_config
labels = config.get_nested(('kubernetes', 'custom_metadata', 'labels'),
{})
labels['app'] = 'skypilot'
config.set_nested(('kubernetes', 'custom_metadata', 'labels'), labels)
return sky.MutatedUserRequest(user_request.task, config)
admin_policy: example_policy.AddLabelsPolicy
Always disable public IP for AWS tasks#
class DisablePublicIpPolicy(sky.AdminPolicy):
"""Example policy: disables public IP for all AWS tasks."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
config = user_request.skypilot_config
config.set_nested(('aws', 'use_internal_ip'), True)
if config.get_nested(('aws', 'vpc_name'), None) is None:
# If no VPC name is specified, it is likely a mistake. We should
# reject the request
raise RuntimeError('VPC name should be set. Check organization '
'wiki for more information.')
return sky.MutatedUserRequest(user_request.task, config)
admin_policy: example_policy.DisablePublicIpPolicy
Use spot for all GPU tasks#
class UseSpotForGpuPolicy(sky.AdminPolicy):
"""Example policy: use spot instances for all GPU tasks."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
"""Sets use_spot to True for all GPU tasks."""
task = user_request.task
new_resources = []
for r in task.resources:
if r.accelerators:
new_resources.append(r.copy(use_spot=True))
else:
new_resources.append(r)
task.set_resources(type(task.resources)(new_resources))
return sky.MutatedUserRequest(
task=task, skypilot_config=user_request.skypilot_config)
admin_policy: example_policy.UseSpotForGpuPolicy
Enforce autostop for all tasks#
class EnforceAutostopPolicy(sky.AdminPolicy):
"""Example policy: enforce autostop for all tasks."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
"""Enforces autostop for all tasks.
Note that with this policy enforced, users can still change the autostop
setting for an existing cluster by using `sky autostop`.
Since we refresh the cluster status with `sky.status` whenever this
policy is applied, we should expect a few seconds latency when a user
run a request.
"""
request_options = user_request.request_options
# Request options is None when a task is executed with `jobs launch` or
# `sky serve up`.
if request_options is None:
return sky.MutatedUserRequest(
task=user_request.task,
skypilot_config=user_request.skypilot_config)
# Get the cluster record to operate on.
cluster_name = request_options.cluster_name
cluster_records = []
if cluster_name is not None:
cluster_records = sky.status(cluster_name,
refresh=True,
all_users=True)
# Check if the user request should specify autostop settings.
need_autostop = False
if not cluster_records:
# Cluster does not exist
need_autostop = True
elif cluster_records[0]['status'] == sky.ClusterStatus.STOPPED:
# Cluster is stopped
need_autostop = True
elif cluster_records[0]['autostop'] < 0:
# Cluster is running but autostop is not set
need_autostop = True
# Check if the user request is setting autostop settings.
is_setting_autostop = False
idle_minutes_to_autostop = request_options.idle_minutes_to_autostop
is_setting_autostop = (idle_minutes_to_autostop is not None and
idle_minutes_to_autostop >= 0)
# If the cluster requires autostop but the user request is not setting
# autostop settings, raise an error.
if need_autostop and not is_setting_autostop:
raise RuntimeError('Autostop/down must be set for all clusters.')
return sky.MutatedUserRequest(
task=user_request.task,
skypilot_config=user_request.skypilot_config)
admin_policy: example_policy.EnforceAutostopPolicy
Dynamically update Kubernetes contexts to use#
class DynamicKubernetesContextsUpdatePolicy(sky.AdminPolicy):
"""Example policy: update the kubernetes context to use."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
"""Updates the kubernetes context to use."""
# Append any new kubernetes clusters in local kubeconfig. An example
# implementation of this method can be:
# 1. Query an organization's internal Kubernetes cluster registry,
# which can be some internal API, or a secret vault.
# 2. Append the new credentials to the local kubeconfig.
update_current_kubernetes_clusters_from_registry()
# Get the allowed contexts for the user. Similarly, it can retrieve
# the latest allowed contexts from an organization's internal API.
allowed_contexts = get_allowed_contexts()
# Update the kubernetes allowed contexts in skypilot config.
config = user_request.skypilot_config
config.set_nested(('kubernetes', 'allowed_contexts'), allowed_contexts)
return sky.MutatedUserRequest(task=user_request.task,
skypilot_config=config)
admin_policy: example_policy.DynamicKubernetesContextsUpdatePolicy
Use local GCP credentials for all tasks#
class UseLocalGcpCredentialsPolicy(sky.AdminPolicy):
"""Example policy: use local GCP credentials in the task."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
# Only apply the policy at client-side.
if not user_request.at_client_side:
return sky.MutatedUserRequest(user_request.task,
user_request.skypilot_config)
task = user_request.task
if task.file_mounts is None:
task.file_mounts = {}
# Use the env var to detect whether an explicit credential path is
# specified.
cred_path = os.environ.get(_GOOGLE_APPLICATION_CREDENTIALS_ENV)
if cred_path is not None:
task.file_mounts[_GOOGLE_APPLICATION_CREDENTIALS_PATH] = cred_path
activate_cmd = (f'gcloud auth activate-service-account --key-file '
f'{_GOOGLE_APPLICATION_CREDENTIALS_PATH}')
if task.run is None:
task.run = activate_cmd
elif isinstance(task.run, str):
task.run = f'{activate_cmd} && {task.run}'
else:
# Impossible according to current code base, but just in case.
logger.warning('The task run command is not a string, '
f'so the local {cred_path} will not be used.')
else:
# Otherwise upload the entire default credential directory to get
# consistent identity in the task and the local environment.
task.file_mounts['~/.config/gcloud'] = '~/.config/gcloud'
return sky.MutatedUserRequest(task, user_request.skypilot_config)
admin_policy: example_policy.UseLocalGcpCredentialsPolicy
Note
This policy only take effects when applied at client-side. Use this policy at the server-side will be a no-op.
Add volumes to all tasks#
class AddVolumesPolicy(sky.AdminPolicy):
"""Example policy: add volumes to the task."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
task = user_request.task
# Use `task.set_volumes` to set the volumes.
# Or use `task.update_volumes` to update in-place
# instead of overwriting.
task.set_volumes({'/mnt/data0': 'pvc0'})
return sky.MutatedUserRequest(task, user_request.skypilot_config)
admin_policy: example_policy.AddVolumesPolicy