Source code for causal_world.wrappers.action_wrappers

from causal_world.wrappers.policy_wrappers import \
    MovingAverageActionWrapperActorPolicy
from causal_world.actors.dummy_policy import DummyActorPolicy
import gym


[docs]class DeltaActionEnvWrapper(gym.ActionWrapper):
[docs] def __init__(self, env): """ A delta action wrapper for the environment to turn the actions to a delta wrt the previous action executed. :param env: (causal_world.CausalWorld) the environment to convert. """ super(DeltaActionEnvWrapper, self).__init__(env) self.env.add_wrapper_info({'delta_action': dict()})
[docs] def action(self, action): """ Processes the action to transform to a delta action. :param action: (nd.array) the raw action to be processed. :return: (nd.array) the delta action. """ if self.env.get_action_mode() == "joint_positions": offset = self.env.get_robot().get_last_applied_joint_positions() elif self.env.get_action_mode() == "joint_torques": offset = self.env.get_robot().get_latest_full_state()['torques'] elif self.env.get_action_mode() == "end_effector_positions": offset = self.env.get_robot().get_latest_full_state( )['end_effector_positions'] else: raise Exception("action mode is not known") if self.env.are_actions_normalized(): offset = self.env.get_robot().normalize_observation_for_key( observation=offset, key=self.env.get_action_mode()) return action + offset
[docs] def reverse_action(self, action): """ Reverses processing the action to transform to a raw action again. :param action: (nd.array) the delta action. :return: (nd.array) the raw action before processing. """ if self.env.get_action_mode() == "joint_positions": offset = self.env.get_robot().get_last_applied_joint_positions() elif self.env.get_action_mode() == "joint_torques": offset = self.env.get_robot().get_latest_full_state()['torques'] elif self.env.get_action_mode() == "end_effector_positions": offset = self.env.get_robot().get_latest_full_state( )['end_effector_positions'] else: raise Exception("action mode is not known") if self.env.are_actions_normalized(): offset = self.env.get_robot().normalize_observation_for_key( observation=offset, key=self.env.action_mode) return action - offset
[docs]class MovingAverageActionEnvWrapper(gym.ActionWrapper):
[docs] def __init__(self, env, widow_size=8, initial_value=0): """ :param env: (causal_world.CausalWorld) the environment to convert. :param widow_size: (int) the window size for avergaing and smoothing the actions. :param initial_value: (float) intial values to fill the window with. """ super(MovingAverageActionEnvWrapper, self).__init__(env) self._policy = DummyActorPolicy() self._policy = MovingAverageActionWrapperActorPolicy( self._policy, widow_size=widow_size, initial_value=initial_value) self.env.add_wrapper_info({ 'moving_average_action': { 'widow_size': widow_size, 'initial_value': initial_value } }) return
[docs] def action(self, action): """ Processes the action to transform to a smoothed action. :param action: (nd.array) the raw action to be processed. :return: (nd.array) the smoothed action. """ self._policy.policy.add_action(action) return self._policy.act(obs=None)
[docs] def reverse_action(self, action): """ Reverses processing the action to transform to a raw action again. :param action: (nd.array) the smoothed action. :return: (nd.array) the raw action before processing. """ raise Exception("not implemented yet")