diff --git a/.gitignore b/.gitignore index b6e4761..ce512a6 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,9 @@ __pycache__/ *.py[cod] *$py.class + +data + # C extensions *.so diff --git a/README.md b/README.md index 20ec6a1..ca06456 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ A repository showcasing examples of using [Hub](https://github.com/pytorch/pytorch) - - [Examples go here](example) + - [Distributed processing with Ray](examples/ray) ## Getting Started with Hub 🚀 diff --git a/examples/ray/README.md b/examples/ray/README.md new file mode 100644 index 0000000..d8c0efb --- /dev/null +++ b/examples/ray/README.md @@ -0,0 +1,48 @@ +# Distributed Processing with Ray + + +## Execute locally +Install hub+ray and run the script locally to create a dataset of size `num_samples` +``` +pip3 install -r requirements.txt +python3 transform.py --num_workers 2 --ds_out ./tmp/cars --num_samples 1000 +``` + + +## Execute on a cluster +#### 1. Start the cluster +Requires AWS credentials. If you skip this step, it will start a ray cluster on your machine. You can further modify the cluster in cluster.yaml +``` +ray up ./cluster.yaml +``` + +#### 2. Execute the code, dataset created on head node +``` +ray exec ./cluster.yaml "python3 ~/hub/transform.py --num_workers 2 --ds_out s3://bucket/dataset_name --num_samples 1000" +``` +Change number of workers to 6 once all workers are up. + + +#### 4. Shut down the cluster +``` +ray down ./cluster.yaml +``` + +Notes + +* To monitor workers, utilization and jobs use this command +``` +ray dashboard ./cluster.yaml +``` + +* Update locally the code and sync to the cluster + +``` +ray rsync-up ./cluster.yaml +``` + +* Attach and execute locally +``` +ray attach ./cluster.yaml +> python3 ~/hub/transform.py --num_workers 2 +``` diff --git a/examples/ray/cluster.yaml b/examples/ray/cluster.yaml new file mode 100644 index 0000000..4283873 --- /dev/null +++ b/examples/ray/cluster.yaml @@ -0,0 +1,171 @@ + +# An unique identifier for the head node and workers of this cluster. +cluster_name: hub + +# The maximum number of workers nodes to launch in addition to the head +# node. +max_workers: 2 + +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 + +# This executes all commands on all nodes in the docker container, +# and opens all the necessary ports to support the Ray cluster. +# Empty string means disabled. +# docker: +# image: "rayproject/ray-ml:latest-gpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup + # image: rayproject/ray:latest-gpu # use this one if you don't need ML dependencies, it's faster to pull +# container_name: "ray_container" + # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image + # if no cached version is present. +# pull_before_run: True +# run_options: # Extra options to pass into "docker run" +# - --ulimit nofile=65536:65536 + + # Example of running a GPU head with CPU workers + # head_image: "rayproject/ray-ml:latest-gpu" + # Allow Ray to automatically detect GPUs + + # worker_image: "rayproject/ray-ml:latest-cpu" + # worker_run_options: [] + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 + +# Cloud-provider specific configuration. +provider: + type: aws + region: us-west-2 + # Availability zone(s), comma-separated, that nodes may be launched in. + # Nodes are currently spread between zones by a round-robin approach, + # however this implementation detail should not be relied upon. + # availability_zone: us-west-2a,us-west-2b + # Whether to allow node reuse. If set to False, nodes will be terminated + # instead of stopped. + cache_stopped_nodes: True # If not present, the default is True. + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 + +# By default Ray creates a new private keypair, but you can also use your own. +# If you do so, make sure to also set "KeyName" in the head and worker node +# configurations below. +# ssh_private_key: /path/to/your/key.pem + +# Tell the autoscaler the allowed node types and the resources they provide. +# The key is the name of the node type, which is just for debugging purposes. +# The node config specifies the launch config and physical instance type. +available_node_types: + ray.head.default: + # The node type's CPU and GPU resources are auto-detected based on AWS instance type. + # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler. + # You can also set custom resources. + # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set + # resources: {"CPU": 1, "GPU": 1, "custom": 5} + resources: {} + # Provider-specific config for this node type, e.g. instance type. By default + # Ray will auto-configure unspecified fields such as SubnetId and KeyName. + # For more documentation on available fields, see: + # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances + node_config: + InstanceType: m5.large + ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30 + # You can provision additional disk space with a conf as follows + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 100 + # Additional options in the boto docs. + ray.worker.default: + # The minimum number of worker nodes of this type to launch. + # This number should be >= 0. + min_workers: 2 + # The maximum number of worker nodes of this type to launch. + # This takes precedence over min_workers. + max_workers: 2 + # The node type's CPU and GPU resources are auto-detected based on AWS instance type. + # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler. + # You can also set custom resources. + # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set + # resources: {"CPU": 1, "GPU": 1, "custom": 5} + resources: {} + # Provider-specific config for this node type, e.g. instance type. By default + # Ray will auto-configure unspecified fields such as SubnetId and KeyName. + # For more documentation on available fields, see: + # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances + node_config: + InstanceType: m5.large + ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30 + # Run workers on spot by default. Comment this out to use on-demand. + #InstanceMarketOptions: + # MarketType: spot + # Additional options can be found in the boto docs, e.g. + # SpotOptions: + # MaxPrice: MAX_HOURLY_PRICE + # Additional options in the boto docs. + + +# Specify the node type of the head node (as configured above). +head_node_type: ray.head.default + +# Files or directories to copy to the head and worker nodes. The format is a +# dictionary from REMOTE_PATH: LOCAL_PATH, e.g. +file_mounts: { + "~/hub": ".", +} + +# Files or directories to copy from the head node to the worker nodes. The format is a +# list of paths. The same path on the head node will be copied to the worker node. +# This behavior is a subset of the file_mounts behavior. In the vast majority of cases +# you should just use file_mounts. Only use this if you know what you're doing! +cluster_synced_files: [] + +# Whether changes to directories in file_mounts or cluster_synced_files in the head node +# should sync to the worker node continuously +file_mounts_sync_continuously: True + +# Patterns for files to exclude when running rsync up or rsync down +rsync_exclude: + - "**/.git" + - "**/.git/**" + +# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for +# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided +# as a value, the behavior will match git's behavior for finding and using .gitignore files. +rsync_filter: + - ".gitignore" + +# List of commands that will be run before `setup_commands`. If docker is +# enabled, these commands will run outside the container and before docker +# is setup. +initialization_commands: [] + +# List of shell commands to run to set up nodes. +setup_commands: + - cd ~/hub && pip3 install -r requirements.txt + +# Custom commands that will be run on the head node after common setup. +head_setup_commands: [] + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: + - ray stop + - ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 + +head_node: {} +worker_nodes: {} \ No newline at end of file diff --git a/examples/ray/requirements.txt b/examples/ray/requirements.txt new file mode 100644 index 0000000..5bd8a89 --- /dev/null +++ b/examples/ray/requirements.txt @@ -0,0 +1,2 @@ +ray[default]==1.6 +git+https://github.com/activeloopai/Hub.git@ray_compute \ No newline at end of file diff --git a/examples/ray/transform.py b/examples/ray/transform.py new file mode 100644 index 0000000..6b1e04a --- /dev/null +++ b/examples/ray/transform.py @@ -0,0 +1,87 @@ +import time +import argparse +import tqdm +import numpy as np +from PIL import Image + +import hub + +NUM_SAMPLES = 500 +NUM_WORKERS = 2 +DS_OUT_PATH = "~/data/cars_out" # "s3://bucket/dataset_name" + +parser = argparse.ArgumentParser(description="PyTorch RPC Batch RL example") +parser.add_argument( + "--num_samples", + type=int, + default=NUM_SAMPLES, + metavar="S", + help="how many samples dataset should have", +) +parser.add_argument( + "--ds_out", + type=str, + default=DS_OUT_PATH, + metavar="O", + help="dataset path to be transformed into", +) +parser.add_argument( + "--num_workers", + type=int, + default=NUM_WORKERS, + metavar="O", + help="number of workers to allocate", +) + +args = parser.parse_args() + + +def define_dataset(path: str) -> hub.Dataset: + """Define the dataset""" + ds = hub.empty(path, overwrite=True) + + ds.create_tensor("labels", htype="class_label") + ds.create_tensor("images", htype="image", sample_compression="jpeg") + ds.create_tensor("images_downsampled") + + # Define tensor with customized htype, compression and chunk_size + ds["images_downsampled"].meta.htype = ds["images"].meta.htype + ds["images_downsampled"].meta.sample_compression = ds[ + "images" + ].meta.sample_compression + ds["images_downsampled"].meta.max_chunk_size = 1 * 1024 * 1024 + + return ds + + +# Define the remote compute +@hub.compute +def downsample(index, samples_out): + """Takes image from a sample_in, downsamples it and pushes to a new tensor""" + array = (255 * np.random.random((100, 100, 3))).astype(np.uint8) + img = Image.fromarray(array) + max_d = max(img.size[0], img.size[1]) + min_s = min(100, max_d) + ratio = max_d // min_s + img_downsampled = img.resize((img.size[0] // ratio, img.size[1] // ratio)) + array_downsampled = np.array(img_downsampled) + samples_out.images.append(array) + samples_out.images_downsampled.append(array_downsampled) + samples_out.labels.append(index) + + +if __name__ == "__main__": + + # Define a dataset and fill in random images + ds_out = define_dataset(args.ds_out) + + # Run the distributed computation + t1 = time.time() + downsample().eval( + list(range(args.num_samples)), + ds_out, + num_workers=args.num_workers, + scheduler="ray", + ) + t2 = time.time() + print(f"The processing took {t2-t1}")