Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions alf/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1426,9 +1426,7 @@ def train_from_unroll(self, experience, train_info):
return shape[0] * shape[1]

@common.mark_replay
def train_from_replay_buffer(self,
effective_unroll_steps,
update_global_counter=False):
def train_from_replay_buffer(self, update_global_counter=False):
"""This function can be called by any algorithm that has its own
replay buffer configured. There are several parameters specified in
``self._config`` that will affect how the training is performed:
Expand Down Expand Up @@ -1482,8 +1480,7 @@ def train_from_replay_buffer(self,
# training is not started yet, ``_replay_buffer`` will be None since it
# is only lazily created later when online RL training started.
if (self._replay_buffer and self._replay_buffer.total_size
< config.initial_collect_steps) or (effective_unroll_steps
== 0):
< config.initial_collect_steps):
assert (
self._replay_buffer.num_environments *
self._replay_buffer.max_length >= config.initial_collect_steps
Expand Down
36 changes: 20 additions & 16 deletions alf/algorithms/rl_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ def __init__(self,
during deployment. In this case, the algorithm do not need to
create certain components such as value_network for ActorCriticAlgorithm,
critic_networks for SacAlgorithm.
episodic_annotation: if True, annotate the episode before being observed by the
replay buffer.
episodic_annotation: episodic annotation is an operation that annotates the
episode after it being collected, and then the annotated episode will be
observed by the replay buffer. If True, annotate the episode before being
observed by the replay buffer. Otherwise, episodic annotation is not applied.
overwrite_policy_output (bool): if True, overwrite the policy output
with next_step.prev_action. This option can be used in some
cases such as data collection.
Expand Down Expand Up @@ -244,9 +246,6 @@ def __init__(self,
replay_buffer_length = adjust_replay_buffer_length(
config, self._num_earliest_frames_ignored)

if self._episodic_annotation:
assert self._env.batch_size == 1, "only support non-batched environment"

if config.whole_replay_buffer_training and config.clear_replay_buffer:
# For whole replay buffer training, we would like to be sure
# that the replay buffer have enough samples in it to perform
Expand Down Expand Up @@ -608,21 +607,27 @@ def _async_unroll(self, unroll_length: int):
def should_post_process_episode(self, rollout_info, step_type: StepType):
"""A function that determines whether the ``post_process_episode`` function should
be applied to the current list of experiences.
Users can customize this function in the derived class.
Bu default, it returns True all the time steps. When this is combined with
``post_process_episode`` which simply return the input unmodified (as the default
implementation in this class), it is a dummy version of eposodic annotation with
logic equivalent to the case of episodic_annotation=False.
"""
return False
return True

def post_process_episode(self, experiences: List[Experience]):
"""A function for postprocessing a list of experience. It is called when
``should_post_process_episode`` is True.
It can be used to create a number of useful features such as 'hindsight relabeling'
of a trajectory etc.
By default, it returns the input unmodified.
Users can customize this function in the derived class, to create a number of
useful features such as 'hindsight relabeling' of a trajectory etc.

Args:
experiences: a list of experience, containing the experience starting from the
initial time when ``should_post_process_episode`` is False to the step where
``should_post_process_episode`` is True.
"""
return None
return experiences

def _process_unroll_step(self, policy_step, action, time_step,
transformed_time_step, policy_state,
Expand All @@ -633,6 +638,7 @@ def _process_unroll_step(self, policy_step, action, time_step,
alf.layers.to_float32(policy_state))
effective_number_of_unroll_steps = 1
if self._episodic_annotation:
assert not self.on_policy, "only support episodic annotation for off policy training"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe assert this in the __init__ function?

store_exp_time = 0
# if last step, annotate
rollout_info = policy_step.info
Expand All @@ -645,11 +651,10 @@ def _process_unroll_step(self, policy_step, action, time_step,
self._cached_exp)
effective_number_of_unroll_steps = len(annotated_exp_list)
# 2) observe
if not self.on_policy:
t0 = time.time()
for exp in annotated_exp_list:
self.observe_for_replay(exp)
store_exp_time = time.time() - t0
t0 = time.time()
for exp in annotated_exp_list:
self.observe_for_replay(exp)
store_exp_time = time.time() - t0
# clean up the exp cache
self._cached_exp = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to assume that all envs end on the same step? What if some envs are LAST, some are MID? cached_exp will be cleared even for those with MID steps?

Even when doing this for an env with batch_size 1, this annotation mode will delay experience from being stored into the replay buffer.

Ok to submit the change as is, but may need to do two things:

  1. rename the feature to something like store_experience_on_episode_end, and document its behavior clearly in the docstr.
    experience relabel should be done when reading data out of replay buffer as in hindsight relabel.

  2. assert that batch_size is 1 when enabled.

Also, delaying train_step because of delayed experience storage can have unexpected side effects, e.g. if episodes are 100 steps long, and unroll once per train iter, then summary will only happen every 100 train iters. It will also shift the distribution of the data training sees due to the delay.

Overall I think doing this episode level relabeling at the DataTransformer stage, after reading from replay_buffer is perhaps a better way, and a cleaner way as well (less scattered code). That would require the replay buffer to keep track of episode begin and end, which I think it already does.

Copy link
Contributor Author

@Haichao-Zhang Haichao-Zhang May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to assume that all envs end on the same step? What if some envs are LAST, some are MID? cached_exp will be cleared even for those with MID steps?

There is no such assumption. It is totally up to the users to inject their own assumptions.
By default, the behavior is the same as before.
Sorry that the function names are a bit mis-leading and their role has been extended to handle per-step case as well. Changed the function names and added more comments.

Even when doing this for an env with batch_size 1, this annotation mode will delay experience from being stored into the replay buffer.

No it won't. By default, the behavior is the same as before.

Ok to submit the change as is, but may need to do two things:

  1. rename the feature to something like store_experience_on_episode_end, and document its behavior clearly in the docstr.
    The suggested name is not appropriate.

experience relabel should be done when reading data out of replay buffer as in hindsight relabel.

Different use cases. This is an alternative interface that can support more than pure relabeling (e.g. excluding data), which is not directly supported by the replay buffer hindsight relabel.

  1. assert that batch_size is 1 when enabled.
    There is no such assumption in the current PR. It is up to the user.

Also, delaying train_step because of delayed experience storage can have unexpected side effects, e.g. if episodes are 100 steps long, and unroll once per train iter, then summary will only happen every 100 train iters. It will also shift the distribution of the data training sees due to the delay.
There is no delay.

Overall I think doing this episode level relabeling at the DataTransformer stage, after reading from replay_buffer is perhaps a better way, and a cleaner way as well (less scattered code). That would require the replay buffer to keep track of episode begin and end, which I think it already does.
As explained, it is more than pure relabeling.

else:
Expand Down Expand Up @@ -903,8 +908,7 @@ def _train_iter_off_policy(self):
self.train()
steps = 0
for i in range(effective_unroll_steps):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unroll_steps is the wrong name? It should be called unroll_iterations to indicate training iterations, not env steps?

also rename effective_number_of_unroll_steps to be effective_unroll_iters to be consistent. (i.e. remove "number_of_")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. Changed.

steps += self.train_from_replay_buffer(effective_unroll_steps=1,
update_global_counter=True)
steps += self.train_from_replay_buffer(update_global_counter=True)
if unrolled:
with record_time("time/after_train_iter"):
self.after_train_iter(root_inputs, rollout_info)
Expand Down
Loading