Skip to content

Running KFP with Codeflare SDK #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
blublinsky opened this issue May 26, 2023 · 2 comments
Open

Running KFP with Codeflare SDK #131

blublinsky opened this issue May 26, 2023 · 2 comments

Comments

@blublinsky
Copy link

blublinsky commented May 26, 2023

We finally made it work, but I do not think it is sustainable for the wider population. Here is what we have to do:

  1. Because Codeflare SDK relies on OC, we had to create a new image for KFP execution
FROM python:3.8.16-slim-bullseye

RUN apt update && apt install -y wget
# install oc
RUN mkdir /opt/oc
RUN wget -O /opt/oc/release.tar.gz  https://mirror.openshift.com/pub/openshift-v4/x86_64/clients/ocp/stable-4.11/openshift-client-linux-4.11.40.tar.gz
RUN tar -xzvf  /opt/oc/release.tar.gz -C /opt/oc/ && \
    mv /opt/oc/oc /usr/bin/ && \
    rm -rf /opt/oc

# install libraries
RUN pip install --upgrade pip && pip install codeflare-sdk
RUN pip install "ray[default]"==2.1.0

# Allow writes for yaml files
RUN chmod -R 777 /tmp

Note here that we also had to create a writable directory for saving intermediate YAML
2. Because Codeflare SDK directly manipulates MCAD, RAYCluster, and OpenShift Routes resources, we had to add additional permission to pipeline-runner service account, which should eventually go to KFDef. Here are the files:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: kfp-openshift-route
rules:
  - apiGroups: ["route.openshift.io"]
    resources: ["routes", "routes/custom-host"]
    verbs:  ["create", "get", "list", "patch", "delete"]
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: pipeline-runner-binding-mcad
  namespace: odh-applications
subjects:
  - kind: ServiceAccount
    name: pipeline-runner
    namespace: odh-applications
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: mcad-mcad-controller-role
  ---
  kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: pipeline-runner-binding-ray
  namespace: odh-applications
subjects:
  - kind: ServiceAccount
    name: pipeline-runner
    namespace: odh-applications
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: mcad-controller-ray-clusterrole
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: pipeline-runner-binding-route
  namespace: odh-applications
subjects:
  - kind: ServiceAccount
    name: pipeline-runner
    namespace: odh-applications
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: kfp-openshift-route
  1. To make sure, that we have permission to write files, we had to redirect the working directory. He is the complete code:
import kfp.components as comp
from kfp_tekton.compiler import TektonCompiler
import kfp.dsl as dsl
from kubernetes import client as k8s_client

# execute ray pipeline
def execure_ray_pipeline(token: str,                 # token to authenticate to cluster
                         name: str,                  # name of Ray cluster
                         min_worker: str,            # min number of workers
                         max_worker: str,            # max number of workers
                         min_cpus: str,              # min cpus per worker
                         max_cpus: str,              # max cpus per worker
                         min_memory: str,            # min memory per worker
                         max_memory: str,            # max memory per worker
                         image: str = "ghcr.io/foundation-model-stack/base:ray2.1.0-py38-gpu-pytorch1.12.0cu116-20221213-193103"
                      ):
    # Ray code - basically hello world
    import ray
    @ray.remote
    class Counter:
        def __init__(self):
            self.counter = 0

        def inc(self):
            self.counter += 1

        def get_counter(self):
            return self.counter

    # Import pieces from codeflare-sdk
    from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration, list_all_clusters, list_all_queued
    from codeflare_sdk.cluster.auth import TokenAuthentication
    import os

    # get current namespace
    ns = os.getenv('NAMESPACE', 'default')
    # change the current directory to ensure that we can write
    os.chdir("/tmp")

    print(f"Executing in namespace {ns}, current working directory is {os.getcwd()}")

    # Create authentication object for oc user permissions
    auth = TokenAuthentication(
        token=token,
        server="https://kubernetes.default:443",
        skip_tls=True
    )
    try:
        auth.login()
    except Exception as e:
        print(f"Failed to log into openshift cluster, error {e}. Please check token/server values provided")
        os.abort()
    print("successfully logged in")
    # Create and configure our cluster object (and appwrapper)
    cluster = Cluster(ClusterConfiguration(
        name = name,
        namespace = ns,
        min_worker = int(min_worker),
        max_worker = int(max_worker),
        min_cpus = int(min_cpus),
        max_cpus = int(max_cpus),
        min_memory = int(min_memory),
        max_memory = int(max_memory),
        gpu=0,
        image = image,
        instascale=False
    ))
    print(f"Configuration for Ray cluster {name} in namespace {ns} is created")

    # bring up the cluster
    cluster.up()
    print(f"Creating Ray cluster {name} in namespace {ns}...")

    # and wait for it being up
    cluster.wait_ready()
    rc = cluster.details(print_to_console=False)
    print("Ray cluster is ready")
    print(rc)

    # Get cluster connection points
    ray_cluster_uri = cluster.cluster_uri()
    print(f"Ray_cluster is at {ray_cluster_uri}")

    # Connect to the cluster
    try:
        ray.init(address=f'{ray_cluster_uri}', ignore_reinit_error=True)
    except Exception as e:
        print(f"Failed to connect to Ray cluster, error {e}")
        os.abort()
    print("connected to Ray cluster")

    # execute Ray function
    print("Running Hello world")
    counter = Counter.remote()

    for _ in range(5):
        ray.get(counter.inc.remote())
        print(ray.get(counter.get_counter.remote()))

    # delete cluster
    print("All done. Cleaning up")
    cluster.down()

# components
ray_pipiline_op = comp.func_to_container_op(
    func=execure_ray_pipeline,
    base_image="blublinsky1/kfp-oc:0.0.2"
)

# Pipeline to invoke execution on remote resource
@dsl.pipeline(
    name='simple-ray-pipeline',
    description='Pipeline to show how to use codeflare sdk to create Ray cluster and run jobs'
)
def simple_ray_pipeline(token: str,                 # token to authenticate to cluster
                        name: str = "kfp-ray",      # name of Ray cluster
                        min_worker: str = "2",      # min number of workers
                        max_worker: str = "2",      # max number of workers
                        min_cpus: str = "2",        # min cpus per worker
                        max_cpus: str = "2",        # max cpus per worker
                        min_memory: str = "4",      # min memory per worker
                        max_memory: str = "4",      # max memory per worker
                        image: str = "ghcr.io/foundation-model-stack/base:ray2.1.0-py38-gpu-pytorch1.12.0cu116-20221213-193103"
                        ):

    # invoke pipeline
    pipeline = ray_pipiline_op(token, name, min_worker, max_worker, min_cpus,max_cpus, min_memory,
                               max_memory, image)
    pipeline.execution_options.caching_strategy.max_cache_staleness = "P0D"
    pipeline.add_env_variable(k8s_client.V1EnvVar(
        name='NAMESPACE',
        value_from=k8s_client.V1EnvVarSource(
            field_ref=k8s_client.V1ObjectFieldSelector(field_path="metadata.namespace"))))


if __name__ == '__main__':
    # Compiling the pipeline

    TektonCompiler().compile(simple_ray_pipeline, __file__.replace('.py', '.yaml'))

After all this, the workflow works correctly.
Need to also add an implementation based on https://docs.ray.io/en/latest/cluster/running-applications/job-submission/rest.html

@yuanchi2807
Copy link

Since a component is already running in the cluster, do we still need TokenAuthentication?
Can one invoke load_incluster_config instead?

from kubernetes import client, config
loadedconf = config.load_incluster_config()

If TokenAuthentication is required, how does one update a token in a scheduled operational environment? Does it need to be compiled in also?

# Create authentication object for oc user permissions
    auth = TokenAuthentication(
        token=token,
        server="https://kubernetes.default:443",
        skip_tls=True
    )
    try:
        auth.login()

@roytman
Copy link
Contributor

roytman commented Jun 24, 2023

The following works:

# Create authentication object for oc user permissions
    with open("/var/run/secrets/kubernetes.io/serviceaccount/token", "r") as file:
        token = file.read().rstrip()
    auth = TokenAuthentication(token=token, server="https://kubernetes.default:443", skip_tls=True)
    try:
        auth.login()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants