From 830283005f0bcc8cd915866b7485a721ead1989a Mon Sep 17 00:00:00 2001 From: "david.buniatyan@gmail.com" Date: Wed, 22 Sep 2021 16:25:35 -0700 Subject: [PATCH 1/9] data folder ignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index b6e4761..1418e97 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,9 @@ __pycache__/ *.py[cod] *$py.class + +./data/ + # C extensions *.so From 28816bf386cfed17595d7909beda0ef2e2879556 Mon Sep 17 00:00:00 2001 From: "david.buniatyan@gmail.com" Date: Wed, 22 Sep 2021 16:52:09 -0700 Subject: [PATCH 2/9] hub+ray basic example --- .gitignore | 2 +- examples/ray/README.md | 25 +++++ examples/ray/cluster.yaml | 173 ++++++++++++++++++++++++++++++++++ examples/ray/requirements.txt | 1 + examples/ray/transform.py | 62 ++++++++++++ 5 files changed, 262 insertions(+), 1 deletion(-) create mode 100644 examples/ray/README.md create mode 100644 examples/ray/cluster.yaml create mode 100644 examples/ray/requirements.txt create mode 100644 examples/ray/transform.py diff --git a/.gitignore b/.gitignore index 1418e97..ce512a6 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,7 @@ __pycache__/ *$py.class -./data/ +data # C extensions *.so diff --git a/examples/ray/README.md b/examples/ray/README.md new file mode 100644 index 0000000..c1e5c61 --- /dev/null +++ b/examples/ray/README.md @@ -0,0 +1,25 @@ +## Distributed processing with Ray + +1. Install ray on your local machine +``` +pip3 install ray==1.6 +``` + +2. Start a ray cluster (Optional) +* Optional, if you skip it will start a local ray cluster + +Start the cluster and attach to it +``` +ray up ./cluster.yaml +ray attach ./cluster.yaml +``` + +3. Execute the code +``` +python3 transform.py +``` + +4. Once you are done please shut down the cluster. +``` +ray down ./cluster.yaml +``` \ No newline at end of file diff --git a/examples/ray/cluster.yaml b/examples/ray/cluster.yaml new file mode 100644 index 0000000..85349ce --- /dev/null +++ b/examples/ray/cluster.yaml @@ -0,0 +1,173 @@ + +# An unique identifier for the head node and workers of this cluster. +cluster_name: hub + +# The maximum number of workers nodes to launch in addition to the head +# node. +max_workers: 2 + +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 + +# This executes all commands on all nodes in the docker container, +# and opens all the necessary ports to support the Ray cluster. +# Empty string means disabled. +docker: + image: "rayproject/ray-ml:latest-gpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup + # image: rayproject/ray:latest-gpu # use this one if you don't need ML dependencies, it's faster to pull + container_name: "ray_container" + # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image + # if no cached version is present. + pull_before_run: True + run_options: # Extra options to pass into "docker run" + - --ulimit nofile=65536:65536 + + # Example of running a GPU head with CPU workers + # head_image: "rayproject/ray-ml:latest-gpu" + # Allow Ray to automatically detect GPUs + + # worker_image: "rayproject/ray-ml:latest-cpu" + # worker_run_options: [] + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 + +# Cloud-provider specific configuration. +provider: + type: aws + region: us-west-2 + # Availability zone(s), comma-separated, that nodes may be launched in. + # Nodes are currently spread between zones by a round-robin approach, + # however this implementation detail should not be relied upon. + availability_zone: us-west-2a,us-west-2b + # Whether to allow node reuse. If set to False, nodes will be terminated + # instead of stopped. + cache_stopped_nodes: True # If not present, the default is True. + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 + + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu + +# By default Ray creates a new private keypair, but you can also use your own. +# If you do so, make sure to also set "KeyName" in the head and worker node +# configurations below. +# ssh_private_key: /path/to/your/key.pem + +# Tell the autoscaler the allowed node types and the resources they provide. +# The key is the name of the node type, which is just for debugging purposes. +# The node config specifies the launch config and physical instance type. +available_node_types: + ray.head.default: + # The node type's CPU and GPU resources are auto-detected based on AWS instance type. + # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler. + # You can also set custom resources. + # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set + # resources: {"CPU": 1, "GPU": 1, "custom": 5} + resources: {} + # Provider-specific config for this node type, e.g. instance type. By default + # Ray will auto-configure unspecified fields such as SubnetId and KeyName. + # For more documentation on available fields, see: + # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances + node_config: + InstanceType: m5.large + ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30 + # You can provision additional disk space with a conf as follows + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 100 + # Additional options in the boto docs. + ray.worker.default: + # The minimum number of worker nodes of this type to launch. + # This number should be >= 0. + min_workers: 0 + # The maximum number of worker nodes of this type to launch. + # This takes precedence over min_workers. + max_workers: 2 + # The node type's CPU and GPU resources are auto-detected based on AWS instance type. + # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler. + # You can also set custom resources. + # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set + # resources: {"CPU": 1, "GPU": 1, "custom": 5} + resources: {} + # Provider-specific config for this node type, e.g. instance type. By default + # Ray will auto-configure unspecified fields such as SubnetId and KeyName. + # For more documentation on available fields, see: + # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances + node_config: + InstanceType: m5.large + ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30 + # Run workers on spot by default. Comment this out to use on-demand. + InstanceMarketOptions: + MarketType: spot + # Additional options can be found in the boto docs, e.g. + # SpotOptions: + # MaxPrice: MAX_HOURLY_PRICE + # Additional options in the boto docs. + + +# Specify the node type of the head node (as configured above). +head_node_type: ray.head.default + +# Files or directories to copy to the head and worker nodes. The format is a +# dictionary from REMOTE_PATH: LOCAL_PATH, e.g. +file_mounts: { + "/root/workspace": ".", +} + +# Files or directories to copy from the head node to the worker nodes. The format is a +# list of paths. The same path on the head node will be copied to the worker node. +# This behavior is a subset of the file_mounts behavior. In the vast majority of cases +# you should just use file_mounts. Only use this if you know what you're doing! +cluster_synced_files: [] + +# Whether changes to directories in file_mounts or cluster_synced_files in the head node +# should sync to the worker node continuously +file_mounts_sync_continuously: True + +# Patterns for files to exclude when running rsync up or rsync down +rsync_exclude: + - "**/.git" + - "**/.git/**" + +# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for +# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided +# as a value, the behavior will match git's behavior for finding and using .gitignore files. +rsync_filter: + - ".gitignore" + +# List of commands that will be run before `setup_commands`. If docker is +# enabled, these commands will run outside the container and before docker +# is setup. +initialization_commands: [] + +# List of shell commands to run to set up nodes. +setup_commands: + - pip3 install -r requirements.txt + +# Custom commands that will be run on the head node after common setup. +head_setup_commands: [] + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --head --port=6379 --lru-evict --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 diff --git a/examples/ray/requirements.txt b/examples/ray/requirements.txt new file mode 100644 index 0000000..3dfa106 --- /dev/null +++ b/examples/ray/requirements.txt @@ -0,0 +1 @@ +hub==git+https://github.com/activeloopai/Hub.git@ray_compute \ No newline at end of file diff --git a/examples/ray/transform.py b/examples/ray/transform.py new file mode 100644 index 0000000..be58746 --- /dev/null +++ b/examples/ray/transform.py @@ -0,0 +1,62 @@ +import time +import argparse +import tqdm +import numpy as np +from PIL import Image + +import hub + +SAMPLES = 500 +DS_OUT_PATH = "./data/cars_out" # "s3://snark-test/testing" + +parser = argparse.ArgumentParser(description='PyTorch RPC Batch RL example') +parser.add_argument('--samples', type=int, default=SAMPLES, metavar='S', + help='how many samples dataset should have') +parser.add_argument('--ds_out', type=str, default=DS_OUT_PATH, metavar='O', + help='dataset path to be transformed into') + +args = parser.parse_args() + + +def define_dataset(path: str, n_samples: int = 100) -> hub.Dataset: + """ Define the dataset """ + ds = hub.empty(path, overwrite=True) + + ds.create_tensor("labels", htype="class_label") + ds.create_tensor("images", htype="image", sample_compression="jpeg") + ds.create_tensor("images_downsampled") + + # Define tensor with customized htype, compression and chunk_size + ds['images_downsampled'].meta.htype = ds['images'].meta.htype + ds['images_downsampled'].meta.sample_compression = ds['images'].meta.sample_compression + ds['images_downsampled'].meta.max_chunk_size = 1 * 1024 * 1024 + + return ds + +# Define the remote compute +@hub.compute +def downsample(index, samples_out): + """ Takes image from a sample_in, downsamples it and pushes to a new tensor """ + array = (255*np.random.random((100,100,3))).astype(np.uint8) + img = Image.fromarray(array) + max_d = max(img.size[0], img.size[1]) + min_s = min(100, max_d) + ratio = max_d // min_s + img_downsampled = img.resize((img.size[0] // ratio, img.size[1] // ratio)) + array_downsampled = np.array(img_downsampled) + + samples_out.images.append(array) + samples_out.images_downsampled.append(array_downsampled) + samples_out.labels.append(index) + + +if __name__ == "__main__": + + # Define a dataset and fill in random images + ds_out = define_dataset(args.ds_out, args.samples) + + # Run the distributed computation + t1 = time.time() + downsample().eval(list(range(args.samples)), ds_out, num_workers=12, scheduler="ray") + t2 = time.time() + print(f"The processing took {t2-t1}") \ No newline at end of file From 28f7e0a8c3b5eec28518f56c2bcf0b707e5fb970 Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Wed, 22 Sep 2021 16:57:45 -0700 Subject: [PATCH 3/9] Update README.md --- examples/ray/README.md | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/examples/ray/README.md b/examples/ray/README.md index c1e5c61..e53caee 100644 --- a/examples/ray/README.md +++ b/examples/ray/README.md @@ -1,25 +1,24 @@ ## Distributed processing with Ray -1. Install ray on your local machine +### 1. Install ray on your local machine ``` pip3 install ray==1.6 ``` -2. Start a ray cluster (Optional) -* Optional, if you skip it will start a local ray cluster - -Start the cluster and attach to it +### 2. Start a ray cluster and attach to it (Optional) +Requires AWS credentials. If you skip this step, it will start a ray cluster on your machine. ``` ray up ./cluster.yaml ray attach ./cluster.yaml ``` +You can configure further -3. Execute the code +### 3. Execute the code ``` python3 transform.py ``` -4. Once you are done please shut down the cluster. +### 4. Once you are done please shut down the cluster. ``` ray down ./cluster.yaml -``` \ No newline at end of file +``` From 535ae614124f81af29aca4bbb0d1ff93b0f8d241 Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Wed, 22 Sep 2021 17:00:24 -0700 Subject: [PATCH 4/9] Update README.md --- examples/ray/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/ray/README.md b/examples/ray/README.md index e53caee..5eafa4c 100644 --- a/examples/ray/README.md +++ b/examples/ray/README.md @@ -11,7 +11,7 @@ Requires AWS credentials. If you skip this step, it will start a ray cluster on ray up ./cluster.yaml ray attach ./cluster.yaml ``` -You can configure further +You can further modify the cluster in cluster.yaml ### 3. Execute the code ``` From b014d3d453cae74b18b4b6772db5923563950dc3 Mon Sep 17 00:00:00 2001 From: "david.buniatyan@gmail.com" Date: Wed, 22 Sep 2021 17:46:52 -0700 Subject: [PATCH 5/9] fix rary working --- README.md | 2 +- examples/ray/README.md | 36 +++++++++++++++++++++++++++++------ examples/ray/cluster.yaml | 34 ++++++++++++++++----------------- examples/ray/requirements.txt | 3 ++- examples/ray/transform.py | 15 +++++++++------ 5 files changed, 58 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 20ec6a1..ca06456 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ A repository showcasing examples of using [Hub](https://github.com/pytorch/pytorch) - - [Examples go here](example) + - [Distributed processing with Ray](examples/ray) ## Getting Started with Hub 🚀 diff --git a/examples/ray/README.md b/examples/ray/README.md index c1e5c61..8770cd9 100644 --- a/examples/ray/README.md +++ b/examples/ray/README.md @@ -5,10 +5,8 @@ pip3 install ray==1.6 ``` -2. Start a ray cluster (Optional) -* Optional, if you skip it will start a local ray cluster - -Start the cluster and attach to it +2. Start the cluster and attach to it(Optional) +* Optional, if you skip it will start a local ray cluster (will need to wait untill the whole cluster is created) ``` ray up ./cluster.yaml ray attach ./cluster.yaml @@ -16,10 +14,36 @@ ray attach ./cluster.yaml 3. Execute the code ``` -python3 transform.py +python3 ~/hub/transform.py --num_workers 2 +``` + +or to store on an S3 ``` +python3 ~/hub/transform.py --num_workers 2 --ds_out s3://bucket/dataaset +``` +Change number of workers to 6 once all workers are up. + 4. Once you are done please shut down the cluster. ``` +exit ray down ./cluster.yaml -``` \ No newline at end of file +``` + +Notes + +* To monitor workers, utilization and jobs use this command +``` +ray dashboard ./cluster.yaml +``` + +* Update locally the code and sync the cluster + +``` +ray rsync-up ./cluster.yaml +``` + +* Directly execute code on the cluster from local machine +``` +ray exec ./cluster.yaml "python3 ~/hub/transform.py --num_workers 2" +``` diff --git a/examples/ray/cluster.yaml b/examples/ray/cluster.yaml index 85349ce..34f0600 100644 --- a/examples/ray/cluster.yaml +++ b/examples/ray/cluster.yaml @@ -15,15 +15,15 @@ upscaling_speed: 1.0 # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. # Empty string means disabled. -docker: - image: "rayproject/ray-ml:latest-gpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup +# docker: +# image: "rayproject/ray-ml:latest-gpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup # image: rayproject/ray:latest-gpu # use this one if you don't need ML dependencies, it's faster to pull - container_name: "ray_container" +# container_name: "ray_container" # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image # if no cached version is present. - pull_before_run: True - run_options: # Extra options to pass into "docker run" - - --ulimit nofile=65536:65536 +# pull_before_run: True +# run_options: # Extra options to pass into "docker run" +# - --ulimit nofile=65536:65536 # Example of running a GPU head with CPU workers # head_image: "rayproject/ray-ml:latest-gpu" @@ -54,11 +54,6 @@ auth: # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 - -# How Ray will authenticate with newly launched nodes. -auth: - ssh_user: ubuntu - # By default Ray creates a new private keypair, but you can also use your own. # If you do so, make sure to also set "KeyName" in the head and worker node # configurations below. @@ -91,7 +86,7 @@ available_node_types: ray.worker.default: # The minimum number of worker nodes of this type to launch. # This number should be >= 0. - min_workers: 0 + min_workers: 2 # The maximum number of worker nodes of this type to launch. # This takes precedence over min_workers. max_workers: 2 @@ -109,8 +104,8 @@ available_node_types: InstanceType: m5.large ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30 # Run workers on spot by default. Comment this out to use on-demand. - InstanceMarketOptions: - MarketType: spot + #InstanceMarketOptions: + # MarketType: spot # Additional options can be found in the boto docs, e.g. # SpotOptions: # MaxPrice: MAX_HOURLY_PRICE @@ -123,7 +118,7 @@ head_node_type: ray.head.default # Files or directories to copy to the head and worker nodes. The format is a # dictionary from REMOTE_PATH: LOCAL_PATH, e.g. file_mounts: { - "/root/workspace": ".", + "~/hub": ".", } # Files or directories to copy from the head node to the worker nodes. The format is a @@ -154,7 +149,7 @@ initialization_commands: [] # List of shell commands to run to set up nodes. setup_commands: - - pip3 install -r requirements.txt + - cd ~/hub && pip3 install -r requirements.txt # Custom commands that will be run on the head node after common setup. head_setup_commands: [] @@ -165,9 +160,12 @@ worker_setup_commands: [] # Command to start ray on the head node. You don't need to change this. head_start_ray_commands: - ray stop - - ulimit -n 65536; ray start --head --port=6379 --lru-evict --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml + - ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml # Command to start ray on worker nodes. You don't need to change this. worker_start_ray_commands: - ray stop - - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 + - ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 + +head_node: {} +worker_nodes: {} \ No newline at end of file diff --git a/examples/ray/requirements.txt b/examples/ray/requirements.txt index 3dfa106..5bd8a89 100644 --- a/examples/ray/requirements.txt +++ b/examples/ray/requirements.txt @@ -1 +1,2 @@ -hub==git+https://github.com/activeloopai/Hub.git@ray_compute \ No newline at end of file +ray[default]==1.6 +git+https://github.com/activeloopai/Hub.git@ray_compute \ No newline at end of file diff --git a/examples/ray/transform.py b/examples/ray/transform.py index be58746..5555f8e 100644 --- a/examples/ray/transform.py +++ b/examples/ray/transform.py @@ -6,19 +6,22 @@ import hub -SAMPLES = 500 -DS_OUT_PATH = "./data/cars_out" # "s3://snark-test/testing" +NUM_SAMPLES = 500 +NUM_WORKERS = 2 +DS_OUT_PATH = "~/data/cars_out" # "s3://bucket/dataset_name" parser = argparse.ArgumentParser(description='PyTorch RPC Batch RL example') -parser.add_argument('--samples', type=int, default=SAMPLES, metavar='S', +parser.add_argument('--num_samples', type=int, default=NUM_SAMPLES, metavar='S', help='how many samples dataset should have') parser.add_argument('--ds_out', type=str, default=DS_OUT_PATH, metavar='O', help='dataset path to be transformed into') +parser.add_argument('--num_workers', type=int, default=NUM_WORKERS, metavar='O', + help='number of workers to allocate') args = parser.parse_args() -def define_dataset(path: str, n_samples: int = 100) -> hub.Dataset: +def define_dataset(path: str) -> hub.Dataset: """ Define the dataset """ ds = hub.empty(path, overwrite=True) @@ -53,10 +56,10 @@ def downsample(index, samples_out): if __name__ == "__main__": # Define a dataset and fill in random images - ds_out = define_dataset(args.ds_out, args.samples) + ds_out = define_dataset(args.ds_out) # Run the distributed computation t1 = time.time() - downsample().eval(list(range(args.samples)), ds_out, num_workers=12, scheduler="ray") + downsample().eval(list(range(args.num_samples)), ds_out, num_workers=args.num_workers, scheduler="ray") t2 = time.time() print(f"The processing took {t2-t1}") \ No newline at end of file From 2dcebef8a4a97208977701faa281b9be343d6f35 Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Wed, 22 Sep 2021 18:00:35 -0700 Subject: [PATCH 6/9] Update README.md --- examples/ray/README.md | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/examples/ray/README.md b/examples/ray/README.md index 48a77d0..ae79745 100644 --- a/examples/ray/README.md +++ b/examples/ray/README.md @@ -1,33 +1,30 @@ -## Distributed processing with Ray +# Distributed processing with Ray -### 1. Install ray on your local machine -``` -pip3 install ray==1.6 -``` -### 2. Start a ray cluster and attach to it (Optional) -Requires AWS credentials. If you skip this step, it will start a ray cluster on your machine. +## Execute locally +Install hub+ray and run the script locally to create a dataset of size `num_samples` ``` -ray up ./cluster.yaml -ray attach ./cluster.yaml +pip3 install -r requirements.txt +python3 transform.py --num_workers 2 --ds_out ./tmp/cars --num_samples 1000 ``` -You can further modify the cluster in cluster.yaml -### 3. Execute the code + +## Execute on a cluster +#### 1. Start the cluster +Requires AWS credentials. If you skip this step, it will start a ray cluster on your machine. You can further modify the cluster in cluster.yaml ``` -python3 ~/hub/transform.py --num_workers 2 +ray up ./cluster.yaml ``` -or to store on an S3 +#### 2. Execute the code, dataset created on head node ``` -python3 ~/hub/transform.py --num_workers 2 --ds_out s3://bucket/dataaset +ray exec ./cluster.yaml "python3 ~/hub/transform.py --num_workers 2 --ds_out s3://bucket/dataset_name --num_samples 1000" ``` Change number of workers to 6 once all workers are up. -### 4. Once you are done please shut down the cluster. +#### 4. Shut down the cluster ``` -exit ray down ./cluster.yaml ``` @@ -44,7 +41,8 @@ ray dashboard ./cluster.yaml ray rsync-up ./cluster.yaml ``` -* Directly execute code on the cluster from local machine +* Attach and execute locally ``` -ray exec ./cluster.yaml "python3 ~/hub/transform.py --num_workers 2" +ray attach ./cluster.yaml +> python3 ~/hub/transform.py --num_workers 2 ``` From 70e2e74783dfcc795d1afbfd7ea84f81795e2997 Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Wed, 22 Sep 2021 18:01:32 -0700 Subject: [PATCH 7/9] Update README.md --- examples/ray/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/ray/README.md b/examples/ray/README.md index ae79745..913980a 100644 --- a/examples/ray/README.md +++ b/examples/ray/README.md @@ -1,4 +1,4 @@ -# Distributed processing with Ray +# Distributed Processing with Ray ## Execute locally From 21dfa5e9981f417cd8e4910d307449c76d5557bc Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Wed, 22 Sep 2021 18:03:20 -0700 Subject: [PATCH 8/9] Update README.md --- examples/ray/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/ray/README.md b/examples/ray/README.md index 913980a..d8c0efb 100644 --- a/examples/ray/README.md +++ b/examples/ray/README.md @@ -35,7 +35,7 @@ Notes ray dashboard ./cluster.yaml ``` -* Update locally the code and sync the cluster +* Update locally the code and sync to the cluster ``` ray rsync-up ./cluster.yaml From 753eaf9192cf00a84e22344bead595ebf4fd73a2 Mon Sep 17 00:00:00 2001 From: "david.buniatyan@gmail.com" Date: Fri, 15 Oct 2021 15:06:42 -0700 Subject: [PATCH 9/9] clean up --- examples/ray/cluster.yaml | 2 +- examples/ray/transform.py | 66 ++++++++++++++++++++++++++------------- 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/examples/ray/cluster.yaml b/examples/ray/cluster.yaml index 34f0600..4283873 100644 --- a/examples/ray/cluster.yaml +++ b/examples/ray/cluster.yaml @@ -42,7 +42,7 @@ provider: # Availability zone(s), comma-separated, that nodes may be launched in. # Nodes are currently spread between zones by a round-robin approach, # however this implementation detail should not be relied upon. - availability_zone: us-west-2a,us-west-2b + # availability_zone: us-west-2a,us-west-2b # Whether to allow node reuse. If set to False, nodes will be terminated # instead of stopped. cache_stopped_nodes: True # If not present, the default is True. diff --git a/examples/ray/transform.py b/examples/ray/transform.py index 5555f8e..6b1e04a 100644 --- a/examples/ray/transform.py +++ b/examples/ray/transform.py @@ -8,58 +8,80 @@ NUM_SAMPLES = 500 NUM_WORKERS = 2 -DS_OUT_PATH = "~/data/cars_out" # "s3://bucket/dataset_name" +DS_OUT_PATH = "~/data/cars_out" # "s3://bucket/dataset_name" -parser = argparse.ArgumentParser(description='PyTorch RPC Batch RL example') -parser.add_argument('--num_samples', type=int, default=NUM_SAMPLES, metavar='S', - help='how many samples dataset should have') -parser.add_argument('--ds_out', type=str, default=DS_OUT_PATH, metavar='O', - help='dataset path to be transformed into') -parser.add_argument('--num_workers', type=int, default=NUM_WORKERS, metavar='O', - help='number of workers to allocate') +parser = argparse.ArgumentParser(description="PyTorch RPC Batch RL example") +parser.add_argument( + "--num_samples", + type=int, + default=NUM_SAMPLES, + metavar="S", + help="how many samples dataset should have", +) +parser.add_argument( + "--ds_out", + type=str, + default=DS_OUT_PATH, + metavar="O", + help="dataset path to be transformed into", +) +parser.add_argument( + "--num_workers", + type=int, + default=NUM_WORKERS, + metavar="O", + help="number of workers to allocate", +) args = parser.parse_args() def define_dataset(path: str) -> hub.Dataset: - """ Define the dataset """ + """Define the dataset""" ds = hub.empty(path, overwrite=True) - + ds.create_tensor("labels", htype="class_label") ds.create_tensor("images", htype="image", sample_compression="jpeg") ds.create_tensor("images_downsampled") - + # Define tensor with customized htype, compression and chunk_size - ds['images_downsampled'].meta.htype = ds['images'].meta.htype - ds['images_downsampled'].meta.sample_compression = ds['images'].meta.sample_compression - ds['images_downsampled'].meta.max_chunk_size = 1 * 1024 * 1024 - + ds["images_downsampled"].meta.htype = ds["images"].meta.htype + ds["images_downsampled"].meta.sample_compression = ds[ + "images" + ].meta.sample_compression + ds["images_downsampled"].meta.max_chunk_size = 1 * 1024 * 1024 + return ds + # Define the remote compute @hub.compute def downsample(index, samples_out): - """ Takes image from a sample_in, downsamples it and pushes to a new tensor """ - array = (255*np.random.random((100,100,3))).astype(np.uint8) + """Takes image from a sample_in, downsamples it and pushes to a new tensor""" + array = (255 * np.random.random((100, 100, 3))).astype(np.uint8) img = Image.fromarray(array) max_d = max(img.size[0], img.size[1]) min_s = min(100, max_d) ratio = max_d // min_s img_downsampled = img.resize((img.size[0] // ratio, img.size[1] // ratio)) array_downsampled = np.array(img_downsampled) - samples_out.images.append(array) samples_out.images_downsampled.append(array_downsampled) samples_out.labels.append(index) - + if __name__ == "__main__": - + # Define a dataset and fill in random images ds_out = define_dataset(args.ds_out) # Run the distributed computation t1 = time.time() - downsample().eval(list(range(args.num_samples)), ds_out, num_workers=args.num_workers, scheduler="ray") + downsample().eval( + list(range(args.num_samples)), + ds_out, + num_workers=args.num_workers, + scheduler="ray", + ) t2 = time.time() - print(f"The processing took {t2-t1}") \ No newline at end of file + print(f"The processing took {t2-t1}")