Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add probability masking to space.sample #1296

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ pygame
sphinx_github_changelog
ale_py
tabulate
mujoco-py<2.2,>=2.1
7 changes: 6 additions & 1 deletion gymnasium/spaces/box.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ def is_bounded(self, manner: str = "both") -> bool:
f"manner is not in {{'below', 'above', 'both'}}, actual value: {manner}"
)

def sample(self, mask: None = None) -> NDArray[Any]:
def sample(self, mask: None = None, probability: None = None) -> NDArray[Any]:
r"""Generates a single random sample inside the Box.

In creating a sample of the box, each coordinate is sampled (independently) from a distribution
Expand All @@ -355,6 +355,7 @@ def sample(self, mask: None = None) -> NDArray[Any]:

Args:
mask: A mask for sampling values from the Box space, currently unsupported.
probability: A probability mask for sampling values from the Box space, currently unsupported.

Returns:
A sampled value from the Box
Expand All @@ -363,6 +364,10 @@ def sample(self, mask: None = None) -> NDArray[Any]:
raise gym.error.Error(
f"Box.sample cannot be provided a mask, actual value: {mask}"
)
elif probability is not None:
raise gym.error.Error(
f"Box.sample cannot be provided a probability mask, actual value: {probability}"
)

high = self.high if self.dtype.kind == "f" else self.high.astype("int64") + 1
sample = np.empty(self.shape)
Expand Down
30 changes: 24 additions & 6 deletions gymnasium/spaces/dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,25 +149,43 @@ def seed(self, seed: int | dict[str, Any] | None = None) -> dict[str, int]:
f"Expected seed type: dict, int or None, actual type: {type(seed)}"
)

def sample(self, mask: dict[str, Any] | None = None) -> dict[str, Any]:
def _verify_mask(self, mask: dict[str, Any], mask_name: str) -> None:
"""Check the validity of the mask."""
assert isinstance(
mask, dict
), f"Expected {mask_name} to be a dict, actual type: {type(mask)}"
assert (
mask.keys() == self.spaces.keys()
), f"Expected {mask_name} keys to be same as space keys, mask keys: {mask.keys()}, space keys: {self.spaces.keys()}"

def sample(
self,
mask: dict[str, Any] | None = None,
probability: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Generates a single random sample from this space.

The sample is an ordered dictionary of independent samples from the constituent spaces.

Args:
mask: An optional mask for each of the subspaces, expects the same keys as the space
probability: An optional probability mask for each of the subspaces, expects the same keys as the space

Returns:
A dictionary with the same key and sampled values from :attr:`self.spaces`
"""
if mask is not None:
assert isinstance(
mask, dict
), f"Expects mask to be a dict, actual type: {type(mask)}"
assert (
mask.keys() == self.spaces.keys()
), f"Expect mask keys to be same as space keys, mask keys: {mask.keys()}, space keys: {self.spaces.keys()}"
probability is None
), "Only one of `mask` or `probability` can be provided"
self._verify_mask(mask, "mask")
return {k: space.sample(mask=mask[k]) for k, space in self.spaces.items()}
elif probability is not None:
self._verify_mask(probability, "probability")
return {
k: space.sample(probability=probability[k])
for k, space in self.spaces.items()
}

return {k: space.sample() for k, space in self.spaces.items()}

Expand Down
98 changes: 78 additions & 20 deletions gymnasium/spaces/discrete.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ class Discrete(Space[np.int64]):
>>> observation_space = Discrete(3, start=-1, seed=42) # {-1, 0, 1}
>>> observation_space.sample()
np.int64(-1)
>>> observation_space.sample(mask=np.array([0,0,1], dtype=np.int8))
np.int64(1)
>>> observation_space.sample(probability=np.array([0,0,1], dtype=np.float64))
np.int64(1)
>>> observation_space.sample(probability=np.array([0,0.3,0.7], dtype=np.float64))
np.int64(1)
"""

def __init__(
Expand Down Expand Up @@ -56,42 +62,94 @@ def is_np_flattenable(self):
"""Checks whether this space can be flattened to a :class:`spaces.Box`."""
return True

def sample(self, mask: MaskNDArray | None = None) -> np.int64:
def sample(
self, mask: MaskNDArray | None = None, probability: MaskNDArray | None = None
) -> np.int64:
"""Generates a single random sample from this space.

A sample will be chosen uniformly at random with the mask if provided
A sample will be chosen uniformly at random with the mask if provided, or it will be chosen according to a specified probability distribution if the probability mask is provided.

Args:
mask: An optional mask for if an action can be selected.
Expected `np.ndarray` of shape ``(n,)`` and dtype ``np.int8`` where ``1`` represents valid actions and ``0`` invalid / infeasible actions.
If there are no possible actions (i.e. ``np.all(mask == 0)``) then ``space.start`` will be returned.
probability: An optional probability mask describing the probability of each action being selected.
Expected `np.ndarray` of shape ``(n,)`` and dtype ``np.float64`` where each value is in the range ``[0, 1]`` and the sum of all values is 1.
If the values do not sum to 1, an exception will be thrown.

Returns:
A sampled integer from the space
"""
if mask is not None:
assert isinstance(
mask, np.ndarray
), f"The expected type of the mask is np.ndarray, actual type: {type(mask)}"
assert (
mask.dtype == np.int8
), f"The expected dtype of the mask is np.int8, actual dtype: {mask.dtype}"
assert mask.shape == (
self.n,
), f"The expected shape of the mask is {(self.n,)}, actual shape: {mask.shape}"
valid_action_mask = mask == 1
assert np.all(
np.logical_or(mask == 0, valid_action_mask)
), f"All values of a mask should be 0 or 1, actual values: {mask}"
if np.any(valid_action_mask):
if mask is not None and probability is not None:
raise ValueError("Only one of `mask` or `probability` can be provided")

mask_type = (
"mask"
if mask is not None
else "probability" if probability is not None else None
)
chosen_mask = mask if mask is not None else probability

if chosen_mask is not None:
self._validate_mask(
chosen_mask,
(self.n,),
np.int8 if mask is not None else np.float64,
mask_type,
)
valid_action_mask = self._get_valid_action_mask(chosen_mask, mask_type)
if mask_type == "mask":
if np.any(valid_action_mask):
return self.start + self.np_random.choice(
np.where(valid_action_mask)[0]
)
return self.start
elif mask_type == "probability":
normalized_probability = probability / np.sum(
probability, dtype=float
) # as recommended by the numpy.random.Generator.choice documentation
return self.start + self.np_random.choice(
np.where(valid_action_mask)[0]
np.where(valid_action_mask)[0],
p=normalized_probability[valid_action_mask],
)
else:
return self.start

return self.start + self.np_random.integers(self.n)

def _validate_mask(
self,
mask: MaskNDArray,
expected_shape: tuple[int],
expected_dtype: type,
mask_type: str,
):
"""Validates the type, shape, and dtype of a mask."""
assert isinstance(
mask, np.ndarray
), f"The expected type of `{mask_type}` is np.ndarray, actual type: {type(mask)}"
assert (
mask.shape == expected_shape
), f"The expected shape of `{mask_type}` is {expected_shape}, actual shape: {mask.shape}"
assert (
mask.dtype == expected_dtype
), f"The expected dtype of `{mask_type}` is {expected_dtype}, actual dtype: {mask.dtype}"

def _get_valid_action_mask(self, mask: MaskNDArray, mask_type: str) -> MaskNDArray:
"""Returns a valid action mask based on the mask type."""
if mask_type == "mask":
valid_action_mask = mask == 1
assert np.all(
np.logical_or(mask == 0, valid_action_mask)
), f"All values of `mask` should be 0 or 1, actual values: {mask}"
elif mask_type == "probability":
valid_action_mask = np.logical_and(mask > 0, mask <= 1)
assert np.all(
np.logical_or(mask == 0, valid_action_mask)
), f"All values of `probability mask` should be 0, 1, or in between, actual values: {mask}"
assert np.isclose(
np.sum(mask), 1
), f"The sum of all values of `probability mask` should be 1, actual sum: {np.sum(mask)}"
return valid_action_mask

def contains(self, x: Any) -> bool:
"""Return boolean specifying if x is a valid member of this space."""
if isinstance(x, int):
Expand Down
43 changes: 41 additions & 2 deletions gymnasium/spaces/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ def sample(
NDArray[Any] | tuple[Any, ...] | None,
]
) = None,
probability: None | (
tuple[
NDArray[Any] | tuple[Any, ...] | None,
NDArray[Any] | tuple[Any, ...] | None,
]
) = None,
num_nodes: int = 10,
num_edges: int | None = None,
) -> GraphInstance:
Expand All @@ -192,6 +198,9 @@ def sample(
mask: An optional tuple of optional node and edge mask that is only possible with Discrete spaces
(Box spaces don't support sample masks).
If no ``num_edges`` is provided then the ``edge_mask`` is multiplied by the number of edges
probability: An optional tuple of optional node and edge probability mask that is only possible with Discrete spaces
(Box spaces don't support sample probability masks).
If no ``num_edges`` is provided then the ``edge_mask`` is multiplied by the number of edges
num_nodes: The number of nodes that will be sampled, the default is `10` nodes
num_edges: An optional number of edges, otherwise, a random number between `0` and :math:`num_nodes^2`

Expand All @@ -202,11 +211,31 @@ def sample(
num_nodes > 0
), f"The number of nodes is expected to be greater than 0, actual value: {num_nodes}"

mask_type = None
if mask is not None:
assert (
probability is None
), "Only one of `mask` or `probability` can be provided"
node_space_mask, edge_space_mask = mask
mask_type = "mask"
elif probability is not None:
node_space_mask, edge_space_mask = probability
mask_type = "probability"
else:
node_space_mask, edge_space_mask = None, None

return self._sample(
node_space_mask, edge_space_mask, num_nodes, num_edges, mask_type
)

def _sample(
self,
node_space_mask: NDArray[Any] | tuple[Any, ...] | None,
edge_space_mask: NDArray[Any] | tuple[Any, ...] | None,
num_nodes: int,
num_edges: int | None,
mask_type: str,
) -> GraphInstance:
# we only have edges when we have at least 2 nodes
if num_edges is None:
if num_nodes > 1:
Expand All @@ -231,9 +260,19 @@ def sample(
sampled_edge_space = self._generate_sample_space(self.edge_space, num_edges)

assert sampled_node_space is not None
sampled_nodes = sampled_node_space.sample(node_space_mask)
node_sample_kwargs = (
{"probability": node_space_mask}
if mask_type == "probability"
else {"mask": node_space_mask}
)
edge_sample_kwargs = (
{"probability": edge_space_mask}
if mask_type == "probability"
else {"mask": edge_space_mask}
)
sampled_nodes = sampled_node_space.sample(**node_sample_kwargs)
sampled_edges = (
sampled_edge_space.sample(edge_space_mask)
sampled_edge_space.sample(**edge_sample_kwargs)
if sampled_edge_space is not None
else None
)
Expand Down
13 changes: 12 additions & 1 deletion gymnasium/spaces/multi_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import numpy as np
from numpy.typing import NDArray

import gymnasium as gym
from gymnasium.spaces.space import MaskNDArray, Space


Expand Down Expand Up @@ -59,7 +60,9 @@ def is_np_flattenable(self):
"""Checks whether this space can be flattened to a :class:`spaces.Box`."""
return True

def sample(self, mask: MaskNDArray | None = None) -> NDArray[np.int8]:
def sample(
self, mask: MaskNDArray | None = None, probability: None = None
) -> NDArray[np.int8]:
"""Generates a single random sample from this space.

A sample is drawn by independent, fair coin tosses (one toss per binary variable of the space).
Expand All @@ -68,11 +71,15 @@ def sample(self, mask: MaskNDArray | None = None) -> NDArray[np.int8]:
mask: An optional np.ndarray to mask samples with expected shape of ``space.shape``.
For ``mask == 0`` then the samples will be ``0`` and ``mask == 1` then random samples will be generated.
The expected mask shape is the space shape and mask dtype is ``np.int8``.
probability: A probability mask for sampling values from the MultiBinary space, currently unsupported.

Returns:
Sampled values from space
"""
if mask is not None:
assert (
probability is None
), "Only one of `mask` or `probability` can be provided, and `probability` is currently unsupported"
assert isinstance(
mask, np.ndarray
), f"The expected type of the mask is np.ndarray, actual type: {type(mask)}"
Expand All @@ -91,6 +98,10 @@ def sample(self, mask: MaskNDArray | None = None) -> NDArray[np.int8]:
self.np_random.integers(low=0, high=2, size=self.n, dtype=self.dtype),
mask.astype(self.dtype),
)
elif probability is not None:
raise gym.error.Error(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this unsupported currently? I'm happy to add this if you wish.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't obvious to me how to do it at the time, and I decided to move on. It honestly would be a huge help if you did! I'm pretty overwhelmed with classes and other commitments I have this semester.

f"MultiBinary.sample cannot be provided a probability, actual value: {probability}"
)

return self.np_random.integers(low=0, high=2, size=self.n, dtype=self.dtype)

Expand Down
Loading
Loading