Admin Policy Enforcement#
SkyPilot provides an admin policy mechanism that admins can use to enforce certain policies on users’ SkyPilot usage. An admin policy applies custom validation and mutation logic to a user’s tasks and SkyPilot config.
Example usage:
To implement and use an admin policy:
Admins writes a simple Python package with a policy class that implements SkyPilot’s
sky.AdminPolicy
interface;Admins distributes this package to users;
Users simply set the
admin_policy
field in the SkyPilot config file~/.sky/config.yaml
for the policy to go into effect.
Overview#
User-Side#
To apply the policy, a user needs to set the admin_policy
field in the SkyPilot config
~/.sky/config.yaml
to the path of the Python package that implements the policy.
For example:
admin_policy: mypackage.subpackage.MyPolicy
Hint
SkyPilot loads the policy from the given package in the same Python environment. You can test the existence of the policy by running:
python -c "from mypackage.subpackage import MyPolicy"
Admin-Side#
An admin can distribute the Python package to users with a pre-defined policy. The
policy should implement the sky.AdminPolicy
interface:
class AdminPolicy:
"""Abstract interface of an admin-defined policy for all user requests.
Admins can implement a subclass of AdminPolicy with the following signature:
import sky
class SkyPilotPolicyV1(sky.AdminPolicy):
def validate_and_mutate(user_request: UserRequest) -> MutatedUserRequest:
...
return MutatedUserRequest(task=..., skypilot_config=...)
The policy can mutate both task and skypilot_config. Admins then distribute
a simple module that contains this implementation, installable in a way
that it can be imported by users from the same Python environment where
SkyPilot is running.
Users can register a subclass of AdminPolicy in the SkyPilot config file
under the key 'admin_policy', e.g.
admin_policy: my_package.SkyPilotPolicyV1
"""
@classmethod
@abc.abstractmethod
def validate_and_mutate(cls,
user_request: UserRequest) -> MutatedUserRequest:
"""Validates and mutates the user request and returns mutated request.
Args:
user_request: The user request to validate and mutate.
UserRequest contains (sky.Task, sky.Config)
Returns:
MutatedUserRequest: The mutated user request.
Raises:
Exception to throw if the user request failed the validation.
"""
raise NotImplementedError(
'Your policy must implement validate_and_mutate')
Your custom admin policy should look like this:
import sky
class MyPolicy(sky.AdminPolicy):
@classmethod
def validate_and_mutate(cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
# Logic for validate and modify user requests.
...
return sky.MutatedUserRequest(user_request.task,
user_request.skypilot_config)
UserRequest
and MutatedUserRequest
are defined as follows (see source code for more details):
@dataclasses.dataclass
class UserRequest:
"""A user request.
A "user request" is defined as a `sky launch / exec` command or its API
equivalent.
`sky jobs launch / serve up` involves multiple launch requests, including
the launch of controller and clusters for a job (which can have multiple
tasks if it is a pipeline) or service replicas. Each launch is a separate
request.
This class wraps the underlying task, the global skypilot config used to run
a task, and the request options.
Args:
task: User specified task.
skypilot_config: Global skypilot config to be used in this request.
request_options: Request options. It is None for jobs and services.
"""
task: 'sky.Task'
skypilot_config: 'sky.Config'
request_options: Optional['RequestOptions'] = None
@dataclasses.dataclass
class MutatedUserRequest:
task: 'sky.Task'
skypilot_config: 'sky.Config'
In other words, an AdminPolicy
can mutate any fields of a user request, including
the task and the global skypilot config,
giving admins a lot of flexibility to control user’s SkyPilot usage.
An AdminPolicy
can be used to both validate and mutate user requests. If
a request should be rejected, the policy should raise an exception.
The sky.Config
and sky.RequestOptions
classes are defined as follows:
class Config(Dict[str, Any]):
"""SkyPilot config that supports setting/getting values with nested keys."""
def get_nested(self,
keys: Tuple[str, ...],
default_value: Any,
override_configs: Optional[Dict[str, Any]] = None) -> Any:
"""Gets a nested key.
If any key is not found, or any intermediate key does not point to a
dict value, returns 'default_value'.
Args:
keys: A tuple of strings representing the nested keys.
default_value: The default value to return if the key is not found.
override_configs: A dict of override configs with the same schema as
the config file, but only containing the keys to override.
Returns:
The value of the nested key, or 'default_value' if not found.
"""
config = copy.deepcopy(self)
if override_configs is not None:
config = _recursive_update(config, override_configs)
return _get_nested(config, keys, default_value)
def set_nested(self, keys: Tuple[str, ...], value: Any) -> None:
"""In-place sets a nested key to value.
Like get_nested(), if any key is not found, this will not raise an
error.
"""
override = {}
for i, key in enumerate(reversed(keys)):
if i == 0:
override = {key: value}
else:
override = {key: override}
_recursive_update(self, override)
@classmethod
def from_dict(cls, config: Optional[Dict[str, Any]]) -> 'Config':
if config is None:
return cls()
return cls(**config)
@dataclasses.dataclass
class RequestOptions:
"""Request options for admin policy.
Args:
cluster_name: Name of the cluster to create/reuse. It is None if not
specified by the user.
idle_minutes_to_autostop: Autostop setting requested by a user. The
cluster will be set to autostop after this many minutes of idleness.
down: If true, use autodown rather than autostop.
dryrun: Is the request a dryrun?
"""
cluster_name: Optional[str]
idle_minutes_to_autostop: Optional[int]
down: bool
dryrun: bool
Note
The sky.AdminPolicy
should be idempotent. In other words, it should be safe to apply the policy multiple times to the same user request.
Example Policies#
We have provided a few example policies in examples/admin_policy/example_policy. You can test these policies by installing the example policy package in your Python environment.
git clone https://github.com/skypilot-org/skypilot.git
cd skypilot
pip install examples/admin_policy/example_policy
Reject All#
class RejectAllPolicy(sky.AdminPolicy):
"""Example policy: rejects all user requests."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
"""Rejects all user requests."""
raise RuntimeError('Reject all policy')
admin_policy: example_policy.RejectAllPolicy
Add Labels for all Tasks on Kubernetes#
class AddLabelsPolicy(sky.AdminPolicy):
"""Example policy: adds a kubernetes label for skypilot_config."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
config = user_request.skypilot_config
labels = config.get_nested(('kubernetes', 'custom_metadata', 'labels'),
{})
labels['app'] = 'skypilot'
config.set_nested(('kubernetes', 'custom_metadata', 'labels'), labels)
return sky.MutatedUserRequest(user_request.task, config)
admin_policy: example_policy.AddLabelsPolicy
Always Disable Public IP for AWS Tasks#
class DisablePublicIpPolicy(sky.AdminPolicy):
"""Example policy: disables public IP for all AWS tasks."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
config = user_request.skypilot_config
config.set_nested(('aws', 'use_internal_ip'), True)
if config.get_nested(('aws', 'vpc_name'), None) is None:
# If no VPC name is specified, it is likely a mistake. We should
# reject the request
raise RuntimeError('VPC name should be set. Check organization '
'wiki for more information.')
return sky.MutatedUserRequest(user_request.task, config)
admin_policy: example_policy.DisablePublicIpPolicy
Use Spot for all GPU Tasks#
class UseSpotForGpuPolicy(sky.AdminPolicy):
"""Example policy: use spot instances for all GPU tasks."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
"""Sets use_spot to True for all GPU tasks."""
task = user_request.task
new_resources = []
for r in task.resources:
if r.accelerators:
new_resources.append(r.copy(use_spot=True))
else:
new_resources.append(r)
task.set_resources(type(task.resources)(new_resources))
return sky.MutatedUserRequest(
task=task, skypilot_config=user_request.skypilot_config)
admin_policy: example_policy.UseSpotForGpuPolicy
Enforce Autostop for all Tasks#
class EnforceAutostopPolicy(sky.AdminPolicy):
"""Example policy: enforce autostop for all tasks."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
"""Enforces autostop for all tasks.
Note that with this policy enforced, users can still change the autostop
setting for an existing cluster by using `sky autostop`.
Since we refresh the cluster status with `sky.status` whenever this
policy is applied, we should expect a few seconds latency when a user
run a request.
"""
request_options = user_request.request_options
# Request options is None when a task is executed with `jobs launch` or
# `sky serve up`.
if request_options is None:
return sky.MutatedUserRequest(
task=user_request.task,
skypilot_config=user_request.skypilot_config)
# Get the cluster record to operate on.
cluster_name = request_options.cluster_name
cluster_records = []
if cluster_name is not None:
cluster_records = sky.status(cluster_name, refresh=True)
# Check if the user request should specify autostop settings.
need_autostop = False
if not cluster_records:
# Cluster does not exist
need_autostop = True
elif cluster_records[0]['status'] == sky.ClusterStatus.STOPPED:
# Cluster is stopped
need_autostop = True
elif cluster_records[0]['autostop'] < 0:
# Cluster is running but autostop is not set
need_autostop = True
# Check if the user request is setting autostop settings.
is_setting_autostop = False
idle_minutes_to_autostop = request_options.idle_minutes_to_autostop
is_setting_autostop = (idle_minutes_to_autostop is not None and
idle_minutes_to_autostop >= 0)
# If the cluster requires autostop but the user request is not setting
# autostop settings, raise an error.
if need_autostop and not is_setting_autostop:
raise RuntimeError('Autostop/down must be set for all clusters.')
return sky.MutatedUserRequest(
task=user_request.task,
skypilot_config=user_request.skypilot_config)
admin_policy: example_policy.EnforceAutostopPolicy
Dynamically Update Kubernetes Contexts to Use#
class DynamicKubernetesContextsUpdatePolicy(sky.AdminPolicy):
"""Example policy: update the kubernetes context to use."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
"""Updates the kubernetes context to use."""
# Append any new kubernetes clusters in local kubeconfig. An example
# implementation of this method can be:
# 1. Query an organization's internal Kubernetes cluster registry,
# which can be some internal API, or a secret vault.
# 2. Append the new credentials to the local kubeconfig.
update_current_kubernetes_clusters_from_registry()
# Get the allowed contexts for the user. Similarly, it can retrieve
# the latest allowed contexts from an organization's internal API.
allowed_contexts = get_allowed_contexts()
# Update the kubernetes allowed contexts in skypilot config.
config = user_request.skypilot_config
config.set_nested(('kubernetes', 'allowed_contexts'), allowed_contexts)
return sky.MutatedUserRequest(task=user_request.task,
skypilot_config=config)
admin_policy: example_policy.DynamicKubernetesContextsUpdatePolicy