-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathconvert_to_robomimic_hdf5.py
65 lines (56 loc) · 2.32 KB
/
convert_to_robomimic_hdf5.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# Author: Jimmy Wu
# Date: October 2024
#
# References:
# - https://github.com/ARISE-Initiative/robomimic/blob/master/robomimic/scripts/dataset_states_to_obs.py
import argparse
from pathlib import Path
import cv2 as cv
import h5py
import numpy as np
from scipy.spatial.transform import Rotation
from tqdm import tqdm
from constants import POLICY_IMAGE_WIDTH, POLICY_IMAGE_HEIGHT
from episode_storage import EpisodeReader
def main(input_dir, output_path):
# Get list of episode dirs
episode_dirs = sorted([child for child in Path(input_dir).iterdir() if child.is_dir()])
# Convert to robomimic HDF5 format
with h5py.File(output_path, 'w') as f:
data_group = f.create_group('data')
# Iterate through episodes
for episode_idx, episode_dir in enumerate(tqdm(episode_dirs)):
# Load episode data
reader = EpisodeReader(episode_dir)
# Extract observations
observations = {}
for obs in reader.observations:
for k, v in obs.items():
if v.ndim == 3:
# Resize image
v = cv.resize(v, (POLICY_IMAGE_WIDTH, POLICY_IMAGE_HEIGHT))
# Append extracted observation
if k not in observations:
observations[k] = []
observations[k].append(v)
# Extract actions
actions = [
np.concatenate((
action['base_pose'],
action['arm_pos'],
Rotation.from_quat(action['arm_quat']).as_rotvec(), # Convert quat to axis-angle
action['gripper_pos'],
)) for action in reader.actions
]
# Write to HDF5
episode_key = f'demo_{episode_idx}'
episode_group = data_group.create_group(episode_key)
for k, v in observations.items():
episode_group.create_dataset(f'obs/{k}', data=np.array(v))
episode_group.create_dataset('actions', data=np.array(actions))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--input-dir', default='data/demos')
parser.add_argument('--output-path', default='data/demos.hdf5')
args = parser.parse_args()
main(args.input_dir, args.output_path)