Skip to content

Commit

Permalink
Merge branch 'main' into eddie-changes
Browse files Browse the repository at this point in the history
  • Loading branch information
TCSo authored Apr 16, 2024
2 parents 14e6cbe + c5000d5 commit df2013b
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 40 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import fogx as fox

# 🦊 Dataset Creation
# from distributed dataset storage
dataset = fox.Dataset(load_from = ["/tmp/rtx", "s3://fox_stroage/"])
dataset = fox.Dataset(load_from = ["/tmp/rtx", "s3://fox_storage/"])

# 🦊 Data collection:
# create a new trajectory
Expand Down
9 changes: 9 additions & 0 deletions docs/Reference.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# API Reference

## Dataset
::: fog_x.dataset.Dataset

-------

## Episode
::: fog_x.episode.Episode
24 changes: 12 additions & 12 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# Welcome to MkDocs
# 🦊 Fog-X Documentation

For full documentation visit [mkdocs.org](https://www.mkdocs.org).
**Fog-X is an efficient and scalable data collection and management framework for robotics learning.**
Supports datasets from [Open-X-Embodiment](https://robotics-transformer-x.github.io/) and 🤗[HuggingFace](https://huggingface.co/).
Fog-X considers both speed 🚀 and memory efficiency 📈 with active metadata and lazily-loaded trajectory data. It supports flexible and distributed dataset partitioning.

## Commands
## Installation

* `mkdocs new [dir-name]` - Create a new project.
* `mkdocs serve` - Start the live-reloading docs server.
* `mkdocs build` - Build the documentation site.
* `mkdocs -h` - Print help message and exit.
```bash
pip install fogx
```

## Project layout
## Usage

mkdocs.yml # The configuration file.
docs/
index.md # The documentation homepage.
... # Other markdown pages, images and other files.
See [Usage Guide](./usage.md) for an overview of how to use Fog-X.

You can also view [working examples on GitHub](https://github.com/KeplerC/fog_x/tree/main/examples).
140 changes: 140 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Usage Guide
The code examples below will assume the following import:
```py
import fog_x as fox
```

## Definitions
- **episode**: one robot trajectory or action, consisting of multiple step data.
- **step data**: data representing a snapshot of the robot action at a certain time
- **metadata**: information that is consistent across a certain episode, e.g. the language instruction associated with the robot action, the name of the person collecting the data, or any other tags/labels.

## The Fog-X Dataset
To start, create a `Dataset` object. Any data that is collected, loaded, or exported
will be saved to the provided path.
There can be existing Fog-X data located at the path as well, so that you can continue
right where you left off.
```py
dataset = fox.Dataset(name="my_fogx_dataset", path="/local/path/my_fogx_dataset")
```

## Collecting Robot Data

```py
# create a new trajectory
episode = dataset.new_episode()

# run robot and collect data
while robot_is_running:
# at each step, add data to the episode
episode.add(feature = "arm_view", value = "image1.jpg")

# Automatically time-aligns and saves the trajectory
episode.close()
```

## Exporting Data
By default, the exported data will be located under the `/export` directory within
the initialized `dataset.path`.
Currently, the supported data formats are `rtx`, `open-x`, and `rlds`.

```py
# Export and share the dataset as standard Open-X-Embodiment format
dataset.export(desired_episodes, format="rtx")
```

### PyTorch
```py
import torch

metadata = dataset.get_episode_info()
metadata = metadata.filter(metadata["feature1"] == "value1")
pytorch_ds = dataset.pytorch_dataset_builder(metadata=metadata)

# get samples from the dataset
for data in torch.utils.data.DataLoader(
pytorch_ds,
batch_size=2,
collate_fn=lambda x: x,
sampler=torch.utils.data.RandomSampler(pytorch_ds),
):
print(data)
```

### HuggingFace
WIP: Currently there is limited support for HuggingFace.

```py
huggingface_ds = dataset.get_as_huggingface_dataset()
```


## Loading Data from Existing Datasets

### RT-X / Tensorflow Datasets
Load any RT-X robotics data available at [Tensorflow Datasets](https://www.tensorflow.org/datasets/catalog/).
You can also find a preview of all the RT-X datasets [here](https://dibyaghosh.com/rtx_viz/).

When loading the episodes, you can optionally specify `additional_metadata` to be associated with it.
You can also load a specific portion of train or test data with the `split` parameter. See the [Tensorflow Split API](https://www.tensorflow.org/datasets/splits) for specifics.

```py
# load all berkeley_autolab_ur5 data
dataset.load_rtx_episodes(name="berkeley_autolab_ur5")

# load 75% of berkeley_autolab_ur5 train data labeled with my_label as train`
dataset.load_rtx_episodes(
name="berkeley_autolab_ur5",
split="train[:75%]",
additional_metadata={"my_label": "train1"}
)
```

## Data Management

### Episode Metadata
You can retrieve episode-level information (metadata) using `dataset.get_episode_info()`.
This is a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html),
meaning you have access to pandas data management methods including `filter`, `map`, `aggregate`, `groupby`, etc.
After processing the metadata, you can then use the metadata to obtain your
desired episodes with `dataset.read_by(desired_metadata)`.

```py
# Retrieve episode-level data as a pandas DataFrame
episode_info = dataset.get_episode_info()

# Use pandas DataFrame filter to select episodes
desired_episode_metadata = episode_info.filter(episode_info["natural_language_instruction"] == "open door")

# Obtain the actual episodes containing their step data
episodes = dataset.read_by(desired_episode_metadata)
```

### Step Data
Step data is stored as a [Polars LazyFrame](https://docs.pola.rs/py-polars/html/reference/lazyframe/index.html).
Lazy loading with Polars results in speedups of 10 to 100 times compared to pandas.
```py
# Retrieve Fog-X dataset as a Polars LazyFrame
step_data = dataset.get_step_data()

# select only the episode_id and natural_language_instruction
lazy_id_to_language = step_data.select("episode_id", "natural_language_instruction")

# the frame is lazily evaluated at memory when we call collect(). returns a Polars DataFrame
id_to_language = lazy_id_to_language.collect()

# drop rows with duplicate natural_language_instruction to see unique instructions
id_to_language.unique(subset=["natural_language_instruction"], maintain_order=True)
```

Polars also allows chaining methods:
```py
# Same as above example, but chained
id_to_language = (
dataset.get_step_data()
.select("episode_id", "natural_language_instruction")
.collect()
.unique(subset=["natural_language_instruction"], maintain_order=True)
)
```

117 changes: 91 additions & 26 deletions fog_x/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import polars
import pandas

from fog_x.database import (
DatabaseConnector,
Expand Down Expand Up @@ -36,6 +38,22 @@ def __init__(
step_data_connector: DatabaseConnector = None,
storage: Optional[str] = None,
) -> None:
"""
Args:
name (str): Name of this dataset. Used as the directory name when exporting.
path (str): Required. Local path of where this dataset should be stored.
features (optional Dict[str, FeatureType]): Description of `param1`.
enable_feature_inference (bool): enable inferring additional FeatureTypes
Example:
```
>>> dataset = fog_x.Dataset('my_dataset', path='~/fog_x/my_dataset`)
```
TODO:
* is replace_existing actually used anywhere?
"""
self.name = name
path = os.path.expanduser(path)
self.path = path
Expand All @@ -55,23 +73,24 @@ def __init__(
if not os.path.exists(f"{path}/{name}"):
os.makedirs(f"{path}/{name}")
step_data_connector = LazyFrameConnector(f"{path}/{name}")
self.db_manager = DatabaseManager(
episode_info_connector, step_data_connector
)
self.db_manager = DatabaseManager(episode_info_connector, step_data_connector)
self.db_manager.initialize_dataset(self.name, features)

self.storage = storage
self.obs_keys = []
self.act_keys = []
self.step_keys = []

def new_episode(
self, metadata: Optional[Dict[str, Any]] = None
) -> Episode:
def new_episode(self, metadata: Optional[Dict[str, Any]] = None) -> Episode:
"""
Create a new episode / trajectory.
TODO #1: support multiple processes writing to the same episode
TODO #2: close the previous episode if not closed
Returns:
Episode
TODO:
* support multiple processes writing to the same episode
* close the previous episode if not closed
"""
return Episode(
metadata=metadata,
Expand Down Expand Up @@ -113,6 +132,10 @@ def export(
) -> None:
"""
Export the dataset.
Args:
export_path (optional str): location of exported data. Uses dataset.path/export by default.
format (str): Supported formats are `rtx`, `open-x`, and `rlds`.
"""
if format == "rtx" or format == "open-x" or format == "rlds":
self.export_rtx(export_path, max_episodes_per_file, version, obs_keys, act_keys, step_keys)
Expand Down Expand Up @@ -274,7 +297,18 @@ def load_rtx_episodes(
additional_metadata: Optional[Dict[str, Any]] = None,
):
"""
Load the dataset.
Load robot data from Tensorflow Datasets.
Args:
name (str): Name of RT-X episodes, which can be found at [Tensorflow Datasets](https://www.tensorflow.org/datasets/catalog) under the Robotics category
split (optional str): the portion of data to load, see [Tensorflow Split API](https://www.tensorflow.org/datasets/splits)
additional_metadata (optional Dict[str, Any]): additional metadata to be associated with the loaded episodes
Example:
```
>>> dataset.load_rtx_episodes(name="berkeley_autolab_ur5)
>>> dataset.load_rtx_episodes(name="berkeley_autolab_ur5", split="train[:10]", additional_metadata={"data_collector": "Alice", "custom_tag": "sample"})
```
"""

# this is only required if rtx format is used
Expand Down Expand Up @@ -334,26 +368,36 @@ def load_rtx_episodes(
fog_episode.add(
feature=str(k),
value=v.numpy(),
feature_type=FeatureType(
tf_feature_spec=data_type[k]
),
feature_type=FeatureType(tf_feature_spec=data_type[k]),
)
self.step_keys.append(k)
fog_episode.close()

def get_episode_info(self):
def get_episode_info(self) -> pandas.DataFrame:
"""
Return the metadata as pandas dataframe.
Returns:
metadata of all episodes as `pandas.DataFrame`
"""
return self.db_manager.get_episode_info_table()

def get_step_data(self):
def get_step_data(self) -> polars.LazyFrame:
"""
Return the all step data as lazy dataframe.
Returns:
step data of all episodes
"""
return self.db_manager.get_step_table_all()

def get_step_data_by_episode_ids(self, episode_ids: List[int], as_lazy_frame = True):
def get_step_data_by_episode_ids(
self, episode_ids: List[int], as_lazy_frame=True
) -> List[polars.LazyFrame] | List[polars.DataFrame]:
"""
Args:
episode_ids (List[int]): list of episode ids
as_lazy_frame (bool): whether to return polars.LazyFrame or polars.DataFrame
Returns:
step data of each episode
"""
episodes = []
for episode_id in episode_ids:
if episode_id == None:
Expand All @@ -363,8 +407,17 @@ def get_step_data_by_episode_ids(self, episode_ids: List[int], as_lazy_frame = T
else:
episodes.append(self.db_manager.get_step_table(episode_id).collect())
return episodes

def read_by(self, episode_info: Any = None):

def read_by(self, episode_info: Any = None) -> List[polars.LazyFrame]:
"""
To be used with `Dataset.get_episode_info`.
Args:
episode_info (pandas.DataFrame): episode metadata information to determine which episodes to read
Returns:
episodes filtered by `episode_info`
"""
episode_ids = list(episode_info["episode_id"])
logger.info(f"Reading episodes as order: {episode_ids}")
episodes = []
Expand All @@ -384,6 +437,11 @@ def get_episodes_from_metadata(self, metadata: Any = None):
return episodes

def pytorch_dataset_builder(self, metadata=None, **kwargs):
"""
Used for loading current dataset as a PyTorch dataset.
To be used with `torch.utils.data.DataLoader`.
"""

import torch
from torch.utils.data import Dataset
episodes = self.get_episodes_from_metadata(metadata)
Expand All @@ -394,17 +452,24 @@ def pytorch_dataset_builder(self, metadata=None, **kwargs):
return pytorch_dataset

def get_as_huggingface_dataset(self):
"""
Load current dataset as a HuggingFace dataset.
TODO:
* currently the support for huggingg face dataset is limited.
it only shows its capability of easily returning a hf dataset
* add features from the episode metadata
* allow selecting episodes based on queries.
doing so requires creating a new copy of the dataset on disk
"""
import datasets

# TODO: currently the support for huggingg face dataset is limited
# it only shows its capability of easily returning a hf dataset
# TODO #1: add features from the episode metadata
# TODO #2: allow selecting episodes based on queries
# doing so requires creating a new copy of the dataset on disk
dataset_path = self.path + "/" + self.name
parquet_files = [os.path.join(dataset_path, f) for f in os.listdir(dataset_path)]
parquet_files = [
os.path.join(dataset_path, f) for f in os.listdir(dataset_path)
]

hf_dataset = datasets.load_dataset('parquet', data_files=parquet_files)
hf_dataset = datasets.load_dataset("parquet", data_files=parquet_files)
return hf_dataset

class PyTorchDataset(Dataset):
Expand Down
Loading

0 comments on commit df2013b

Please sign in to comment.