Source code for causal_world.wrappers.planning_wrappers

from causal_world.intervention_actors.base_actor import \
    BaseInterventionActorPolicy
from causal_world.utils.rotation_utils import quaternion_to_euler, \
    euler_to_quaternion
import gym
import numpy as np


class ObjectSelectorActorPolicy(BaseInterventionActorPolicy):

    def __init__(self):
        """

        """
        super(ObjectSelectorActorPolicy, self).__init__()
        self.low_joint_positions = None
        self.current_action = None
        self.selected_object = None

    def initialize_actor(self, env):
        """

        :param env:
        :return:
        """
        self.low_joint_positions = env.get_joint_positions_raised()
        return

    def add_action(self, action, selected_object):
        """

        :param action:
        :param selected_object:
        :return:
        """
        self.current_action = action
        self.selected_object = selected_object

    def _act(self, variables_dict):
        """

        :param variables_dict:
        :return:
        """
        # action 1 is [0, 1, 2, 3, 4, 5, 6] 0 do nothing, 1 go
        #  up 2 down, 3 right, 4 left, 5, front, 6 back
        # action 2 is [0, 1, 2, 3, 4, 5, 6] 0 do nothing, 1 yaw clockwise,
        # 2 yaw anticlockwise, 3 roll clockwise,
        # 4 roll anticlockwise, 5 pitch clockwise, 6 pitch anticlockwise
        interventions_dict = dict()
        # interventions_dict['joint_positions'] = self.low_joint_positions
        interventions_dict[self.selected_object] = dict()
        if self.current_action[1] != 0:
            interventions_dict[self.selected_object]['cartesian_position'] = \
                variables_dict[self.selected_object]['cartesian_position']
            if self.current_action[1] == 1:
                interventions_dict[
                    self.selected_object]['cartesian_position'][-1] += 0.002
            elif self.current_action[1] == 2:
                interventions_dict[
                    self.selected_object]['cartesian_position'][-1] -= 0.002
            elif self.current_action[1] == 3:
                interventions_dict[
                    self.selected_object]['cartesian_position'][0] += 0.002
            elif self.current_action[1] == 4:
                interventions_dict[
                    self.selected_object]['cartesian_position'][0] -= 0.002
            elif self.current_action[1] == 5:
                interventions_dict[
                    self.selected_object]['cartesian_position'][1] += 0.002
            elif self.current_action[1] == 6:
                interventions_dict[
                    self.selected_object]['cartesian_position'][1] -= 0.002
            else:
                raise Exception("The passed action mode is not supported")
        if self.current_action[2] != 0:
            euler_orientation = \
                quaternion_to_euler(variables_dict
                                    [self.selected_object]['orientation'])
            if self.current_action[2] == 1:
                euler_orientation[-1] += 0.1
            elif self.current_action[2] == 2:
                euler_orientation[-1] -= 0.1
            elif self.current_action[2] == 3:
                euler_orientation[1] += 0.1
            elif self.current_action[2] == 4:
                euler_orientation[1] -= 0.1
            elif self.current_action[2] == 5:
                euler_orientation[0] += 0.1
            elif self.current_action[2] == 6:
                euler_orientation[0] -= 0.1
            else:
                raise Exception("The passed action mode is not supported")
            interventions_dict[self.selected_object]['orientation'] = \
                euler_to_quaternion(euler_orientation)
        return interventions_dict


[docs]class ObjectSelectorWrapper(gym.Wrapper):
[docs] def __init__(self, env): """ :param env: (causal_world.CausalWorld) the environment to convert. """ super(ObjectSelectorWrapper, self).__init__(env) self.env = env self.env.set_skip_frame(1) self.intervention_actor = ObjectSelectorActorPolicy() self.intervention_actor.initialize_actor(self.env) self.observation_space = gym.spaces.Box( self.env.observation_space.low[28:], self.env.observation_space.high[28:], dtype=np.float64) self.objects_order = list( self.env.get_stage().get_rigid_objects().keys()) self.objects_order.sort() number_of_objects = len(self.objects_order) self.action_space = gym.spaces.Tuple( (gym.spaces.Discrete(number_of_objects), gym.spaces.Discrete(7), gym.spaces.Discrete(7))) self.env.add_wrapper_info({'object_selector': dict()})
[docs] def step(self, action): """ Used to step through the enviroment. :param action: (nd.array) specifies which action should be taken by the robot, should follow the same action mode specified. :return: (nd.array) specifies the observations returned after stepping through the environment. Again, it follows the observation_mode specified. """ self.intervention_actor.add_action(action, self.objects_order[action[0]]) intervention_dict = self.intervention_actor.act( self.env.get_current_state_variables()) self.env.do_intervention(intervention_dict, check_bounds=False) obs, reward, done, info = self.env.step(self.env.action_space.low) obs = obs[28:] return obs, reward, done, info
[docs] def reset(self): """ Resets the environment to the current starting state of the environment. :return: (nd.array) specifies the observations returned after resetting the environment. Again, it follows the observation_mode specified. """ result = self.env.reset() interventions_dict = dict() interventions_dict['joint_positions'] = self.intervention_actor.low_joint_positions self.env.do_intervention(interventions_dict, check_bounds=False) return result