Skip to content

Conversation

@Haichao-Zhang
Copy link
Contributor

@Haichao-Zhang Haichao-Zhang commented May 7, 2025

Post Process Experience
preprocess_unroll_experience feature, useful for a number of scenarios. For example:

  1. many kinds of trajectory labeling (e.g. hindsight success relabeling)
  2. trajectory filtering (e.g. excluding some trajectories from being stored into the buffer).

In some cases, it is also a necessary component to use if we want to use in-algorithm procedures to overwrite quantities in the timestep and record it into the buffer (e.g. step type), so that all the other logics in ALF are fully respected (e.g.masking out the loss for the LAST step based on step type).

Synced Traning
Note that although one may think step-based filtering (e.g. excluding tasks) can also be done on the replay buffer side, the training dynamics are not the same.
This PR ensures synced training, meaning we won't do train step for those invalid/to be excluded steps.
In contrast, replay buffer based filtering cannot ensure synced training.

Customizable Modes
The behavior could be customized by the user. Some examples:
(1) per-step saving without delay: saving each step of unroll experience into the replay buffer as we get it.
(2) all-step saving with delay: saving all the steps of unroll experience into the replay buffer with delay. This can happen in the case where we want to annotate an trajectory based on some quantities that are not immediately available in the current step (e.g. task success/failure).
(3) selective saving: exclude some of the unroll experiences and only save the rest. This could be useful in the case where there are transitions that are irrelevant to the training (e.g. in the multi-task case, where we want to exclude data from certain subtasks).

@common.mark_replay
def train_from_replay_buffer(self, update_global_counter=False):
def train_from_replay_buffer(self,
effective_unroll_steps,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docstring for this arg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This arg is now removed

config, self._num_earliest_frames_ignored)

if self._episodic_annotation:
assert self._env.batch_size == 1, "only support non-batched environment"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this to the docstring of episodic_annotation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion is not necessary here so remove also

Comment on lines 609 to 611
"""A function that determines whether the ``post_process_episode`` function should
be applied to the current list of experiences.
"""
Copy link
Contributor

@runjerry runjerry May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interface mainly used for subclasses? Maybe mention this. Same for post_process_episode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Added comments. Also for post_process_episode

self._cached_exp)
effective_number_of_unroll_steps = len(annotated_exp_list)
# 2) observe
if not self.on_policy:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this condition check should be performed earlier, since it seems a waste to do all the post_process_episode if self.on_policy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Comment on lines 1485 to 1486
< config.initial_collect_steps) or (effective_unroll_steps
== 0):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any situation that train_from_replay_buffer will be called with effective_unroll_steps=0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

effective_unroll_steps is now removed from this function

Comment on lines 905 to 910
for i in range(effective_unroll_steps):
steps += self.train_from_replay_buffer(effective_unroll_steps=1,
update_global_counter=True)
if unrolled:
with record_time("time/after_train_iter"):
self.after_train_iter(root_inputs, rollout_info)
Copy link
Contributor

@runjerry runjerry May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that this update fundamentally changes the off-policy update logic w.r.t. its actual unroll in the env. Previously, between every call of self._unroll_iter_off_policy, the policy gets an "update" from self.train_from_replay_buffer. Now if self._episodic_annotation, policy training only happens after each episode, though the UTD stays the same. I feel that the episodic annotation function should be configurable independently of the choice of such unroll/update logic. Ideally, we may want to keep the previous version here while achieving the same effect of the change of above lines by configuring unroll_length and num_updates_per_train_iter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If self._episodic_annotation is False, everything is the same as before.
If self._episodic_annotation is True, by default (with the new commit), also reduces to the original logic, so everything is the same after before (policy training only happens after each time step, not after each episode)

In the derived class, it is up to the user for determining what kind of annotation function he/she wants to implement and use.

alf.layers.to_float32(policy_state))
effective_number_of_unroll_steps = 1
if self._episodic_annotation:
assert not self.on_policy, "only support episodic annotation for off policy training"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe assert this in the __init__ function?

@runjerry
Copy link
Contributor

runjerry commented May 10, 2025

Thank you Haichao for addressing all my comments. Just one more minor question.

Copy link
Contributor

@le-horizon le-horizon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some high and low level comments if they make sense.

self.observe_for_replay(exp)
store_exp_time = time.time() - t0
# clean up the exp cache
self._cached_exp = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to assume that all envs end on the same step? What if some envs are LAST, some are MID? cached_exp will be cleared even for those with MID steps?

Even when doing this for an env with batch_size 1, this annotation mode will delay experience from being stored into the replay buffer.

Ok to submit the change as is, but may need to do two things:

  1. rename the feature to something like store_experience_on_episode_end, and document its behavior clearly in the docstr.
    experience relabel should be done when reading data out of replay buffer as in hindsight relabel.

  2. assert that batch_size is 1 when enabled.

Also, delaying train_step because of delayed experience storage can have unexpected side effects, e.g. if episodes are 100 steps long, and unroll once per train iter, then summary will only happen every 100 train iters. It will also shift the distribution of the data training sees due to the delay.

Overall I think doing this episode level relabeling at the DataTransformer stage, after reading from replay_buffer is perhaps a better way, and a cleaner way as well (less scattered code). That would require the replay buffer to keep track of episode begin and end, which I think it already does.

Copy link
Contributor Author

@Haichao-Zhang Haichao-Zhang May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to assume that all envs end on the same step? What if some envs are LAST, some are MID? cached_exp will be cleared even for those with MID steps?

There is no such assumption. It is totally up to the users to inject their own assumptions.
By default, the behavior is the same as before.
Sorry that the function names are a bit mis-leading and their role has been extended to handle per-step case as well. Changed the function names and added more comments.

Even when doing this for an env with batch_size 1, this annotation mode will delay experience from being stored into the replay buffer.

No it won't. By default, the behavior is the same as before.

Ok to submit the change as is, but may need to do two things:

  1. rename the feature to something like store_experience_on_episode_end, and document its behavior clearly in the docstr.
    The suggested name is not appropriate.

experience relabel should be done when reading data out of replay buffer as in hindsight relabel.

Different use cases. This is an alternative interface that can support more than pure relabeling (e.g. excluding data), which is not directly supported by the replay buffer hindsight relabel.

  1. assert that batch_size is 1 when enabled.
    There is no such assumption in the current PR. It is up to the user.

Also, delaying train_step because of delayed experience storage can have unexpected side effects, e.g. if episodes are 100 steps long, and unroll once per train iter, then summary will only happen every 100 train iters. It will also shift the distribution of the data training sees due to the delay.
There is no delay.

Overall I think doing this episode level relabeling at the DataTransformer stage, after reading from replay_buffer is perhaps a better way, and a cleaner way as well (less scattered code). That would require the replay buffer to keep track of episode begin and end, which I think it already does.
As explained, it is more than pure relabeling.

with record_time("time/after_train_iter"):
self.after_train_iter(root_inputs, rollout_info)
steps = 0
for i in range(effective_unroll_steps):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unroll_steps is the wrong name? It should be called unroll_iterations to indicate training iterations, not env steps?

also rename effective_number_of_unroll_steps to be effective_unroll_iters to be consistent. (i.e. remove "number_of_")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. Changed.

experience, batch_info = self._replay_buffer.gather_all(
ignore_earliest_frames=True)
num_updates = config.num_updates_per_train_iter
num_updates = effective_num_updates_per_train_iter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to make this change?

Copy link
Contributor Author

@Haichao-Zhang Haichao-Zhang May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary anymore. removed

effective_unroll_iters = effective_unroll_steps // unroll_length
return experience, effective_unroll_iters

def should_post_process_experience(self, rollout_info,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary. We can always call post_process_experience

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Remove this function

As another example, task filtering can be simply achieved by returning ``[]``
in ``post_process_experience`` for that particular task.
- per-episode processing: ``should_post_process_experience`` returns True on episode
end and ``post_process_experience`` can return a list of cached and processed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to mention "cached". It will confuse the user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the docstring.

@Haichao-Zhang Haichao-Zhang force-pushed the PR_episodic_annotation branch from 45c8321 to 9cfe6a5 Compare May 23, 2025 19:12
@Haichao-Zhang Haichao-Zhang force-pushed the PR_episodic_annotation branch from d89cb3e to 26ab09a Compare May 23, 2025 19:33
@Haichao-Zhang Haichao-Zhang changed the title Episodic annotation and synced training Post Process Experience May 23, 2025
@Haichao-Zhang Haichao-Zhang changed the title Post Process Experience Post Process Experience and Synced Training May 23, 2025
with record_time("time/after_train_iter"):
self.after_train_iter(root_inputs, rollout_info)
steps = 0
for i in range(effective_unroll_iters):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's possible the effective_unroll_iters is always smaller than 1 in the case of num_envs > 1.

Copy link
Contributor Author

@Haichao-Zhang Haichao-Zhang May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Now also handles the fractional unroll case.

# 1) process
post_processed_exp_list = self.post_process_experience(
rollout_info, time_step.step_type, exp)
effective_unroll_steps = len(post_processed_exp_list)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order really get this right, need to do: sum(exp.step_type.shape[0] for exp in post_processed_exp_list) / exp.step_type.shape[0]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@Haichao-Zhang
Copy link
Contributor Author

Close for now since this is mostly used in a side project. Will reopen if this becomes a general feature for other usecases.

@Haichao-Zhang
Copy link
Contributor Author

Reopen as seems other people are also trying to use this feature.

@Haichao-Zhang Haichao-Zhang reopened this Jun 5, 2025
@Haichao-Zhang
Copy link
Contributor Author

Pushed a commit 94a50bf to give the user the flexibility to customize effective_unroll_steps for achieving different effects.

@Haichao-Zhang Haichao-Zhang requested a review from hnyu June 5, 2025 18:04
effective_unroll_iters = 1 if unroll_length == 0 else effective_unroll_steps // unroll_length
return experience, effective_unroll_iters

def post_process_experience(self, rollout_info, step_type: StepType,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is confusing with the existing function preprocess_experience which might suggest that this happens after that but in fact this happens before training.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to preprocess_unroll_experience

store_exp_time = 0.
step_time = 0.
max_step_time = 0.
effective_unroll_steps = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we lack a formal definition of "effective" in the code document.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more comments with examples, especially in preprocess_unroll_experience

return experience
# if the input unroll_length is 0 (e.g. fractional unroll), then this it treated as
# an effective unroll iter
effective_unroll_iters = 1 if unroll_length == 0 else effective_unroll_steps // unroll_length
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange to call unroll "iter"? The original definition is that each training iter we have one unroll. So what does unroll iters mean in this context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments. One effective_unroll_iter refers to the unroll_length times of calling of rollout_step in the unroll phase.

@hnyu
Copy link
Collaborator

hnyu commented Jun 6, 2025

This PR ensures synced training, meaning we won't do train step for those invalid/to be excluded steps.

I think the name "synced training" is somewhat confusing to me. The second half of the sentence doesn't relate to synchronization?

@Haichao-Zhang Haichao-Zhang force-pushed the PR_episodic_annotation branch from 788dfb4 to c837043 Compare June 6, 2025 19:45
@Haichao-Zhang Haichao-Zhang changed the title Post Process Experience and Synced Training Post Process Experience and Corresponding train/unroll Ratio Adjustment Jun 6, 2025
@Haichao-Zhang Haichao-Zhang force-pushed the PR_episodic_annotation branch from c837043 to 8fc3ff2 Compare June 6, 2025 19:49
@Haichao-Zhang Haichao-Zhang changed the title Post Process Experience and Corresponding train/unroll Ratio Adjustment Post Process Experience and Customizable Modes Jun 6, 2025
@Haichao-Zhang Haichao-Zhang changed the title Post Process Experience and Customizable Modes Post Process Experience with Customizable Modes Jun 6, 2025
@Haichao-Zhang
Copy link
Contributor Author

This PR ensures synced training, meaning we won't do train step for those invalid/to be excluded steps.

I think the name "synced training" is somewhat confusing to me. The second half of the sentence doesn't relate to synchronization?

This PR ensures synced training, meaning we won't do train step for those invalid/to be excluded steps.

I think the name "synced training" is somewhat confusing to me. The second half of the sentence doesn't relate to synchronization?

Yeah, this part of description is out-dated. Updated with new one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants