Source code for causal_world.task_generators.base_task

import numpy as np
import math
import copy
from causal_world.utils.state_utils import get_bounding_box_volume
from causal_world.utils.state_utils import get_intersection
from causal_world.utils.rotation_utils import cart2cyl
from causal_world.utils.task_utils import combine_intervention_spaces
import pybullet
import logging


[docs]class BaseTask(object):
[docs] def __init__(self, task_name, variables_space, fractional_reward_weight=1, dense_reward_weights=np.array([]), activate_sparse_reward=False): """ This class represents the base task generator which includes all the common functionalities of the task generators. :param task_name: (str) the task name :param variables_space: (str) space to be used either 'space_a' or 'space_b' or 'space_a_b' :param fractional_reward_weight: (float) weight multiplied by the fractional volumetric overlap in the reward. :param dense_reward_weights: (list float) specifies the reward weights for all the other reward terms calculated in the calculate_dense_rewards function. :param activate_sparse_reward: (bool) specified if you want to sparsify the reward by having +1 or 0 if the volumetric fraction overlap more than 90%. """ self._robot = None self._stage = None self._task_solved = False self._task_name = task_name self._task_robot_observation_keys = [] self._task_stage_observation_keys = [] # the helper keys are observations that are not included in the # task observations but it will be needed in reward # calculation or new observations calculation self._robot_observation_helper_keys = [] self._stage_observation_helper_keys = [] self._non_default_robot_observation_funcs = dict() self._non_default_stage_observation_funcs = dict() self._current_full_observations_dict = dict() self._task_params = dict() self._task_params["task_name"] = self._task_name self._task_params["fractional_reward_weight"] = fractional_reward_weight self._task_params["dense_reward_weights"] = dense_reward_weights self._task_params['activate_sparse_reward'] = activate_sparse_reward self._intervention_space_a = dict() self._intervention_space_b = dict() self._intervention_space_a_b = dict() self._task_params['variables_space'] = variables_space self._task_params["joint_positions"] = None self._current_starting_state = dict() self._default_starting_state = dict() self._empty_stage = None self._recreation_time = 0 #TODO: this should be increased! there might be a bug from pybullet. self._period_to_clear_memory = 1 self._current_desired_goal = None self._current_achieved_goal = None self._current_goal_reward = None self._max_episode_length = None self._create_world_func = None self._is_partial_solution_exposed = False self._is_ground_truth_state_exposed = False return
[docs] def set_intervention_space(self, variables_space): """ :param variables_space: (str) "space_a", "space_b" or "space_a_b" :return: """ self._task_params['variables_space'] = variables_space return
def _adjust_variable_spaces_after_intervention(self, interventions_dict): """ to be used to adjust the variable spaces after an intervention that affects it. Like size for instance. :param interventions_dict: (dict) specifies the intervention variables and the corresponding value to it. :return: """ return def _save_pybullet_state(self): """ :return: (dict) specifies pybullet state of the different clients running. """ pybullet_state = dict() if self._stage._pybullet_client_full_id is not None: pybullet_state['full'] = pybullet.\ saveState(physicsClientId=self._stage._pybullet_client_full_id) if self._stage._pybullet_client_w_goal_id is not None: pybullet_state['w_goal'] = pybullet.\ saveState(physicsClientId=self._stage._pybullet_client_w_goal_id) if self._stage._pybullet_client_w_o_goal_id is not None: pybullet_state['w_o_goal'] = pybullet. \ saveState(physicsClientId=self._stage._pybullet_client_w_o_goal_id) pybullet_state['latest_full_state'] = copy.deepcopy( self._robot.get_latest_full_state()) return pybullet_state def _restore_pybullet_state(self, pybullet_state): """ :param pybullet_state: (dict) specifies pybullet state of the different clients running. :return: """ if self._stage._pybullet_client_full_id is not None: pybullet.\ restoreState(pybullet_state['full'], physicsClientId=self._stage._pybullet_client_full_id) if self._stage._pybullet_client_w_goal_id is not None: pybullet.\ restoreState(pybullet_state['w_goal'], physicsClientId=self._stage._pybullet_client_w_goal_id) if self._stage._pybullet_client_w_o_goal_id is not None: pybullet. \ restoreState(pybullet_state['w_o_goal'], physicsClientId=self._stage._pybullet_client_w_o_goal_id) self._robot._latest_full_state = copy.deepcopy( pybullet_state['latest_full_state']) return def _remove_pybullet_state(self, pybullet_state): """ Removes the pybullet state from memory. :param pybullet_state: (dict) specifies pybullet state of the different clients running. :return: """ if self._stage._pybullet_client_full_id is not None: pybullet. \ removeState(pybullet_state['full'], physicsClientId=self._stage._pybullet_client_full_id) if self._stage._pybullet_client_w_goal_id is not None: pybullet. \ removeState(pybullet_state['w_goal'], physicsClientId=self._stage._pybullet_client_w_goal_id) if self._stage._pybullet_client_w_o_goal_id is not None: pybullet. \ removeState(pybullet_state['w_o_goal'], physicsClientId=self._stage._pybullet_client_w_o_goal_id) return
[docs] def save_state(self): """ :return: (dict) specifies all variables and their corresponding values in the environment. """ state = dict() # state['pybullet_states'] = \ # self._save_pybullet_state() state['stage_object_state'] = \ self._stage.get_full_env_state() state['robot_object_state'] = \ self._robot.get_full_env_state() state['latest_full_state'] = copy.deepcopy( self._robot.get_latest_full_state()) state['task_observations'] = \ copy.deepcopy(self._task_stage_observation_keys) return state
[docs] def restore_state(self, state_dict, avoid_reloading_urdf=False): """ :param state_dict: (dict) specifies all variables and their corresponding values in the environment. :param avoid_reloading_urdf: (bool) true if reloading the urdf is to be avoided. :return: """ old_number_of_rigid_objects = len(self._stage.get_rigid_objects()) old_number_of_visual_objects = len(self._stage.get_visual_objects()) reset_observation_space = False self._stage.remove_everything() if not avoid_reloading_urdf and \ self._recreation_time != 0 and \ self._recreation_time % self._period_to_clear_memory == 0: self._create_world_func() self._robot._disable_velocity_control() self._robot.set_full_env_state(state_dict['robot_object_state']) self._remove_pybullet_state(self._empty_stage) self._empty_stage = self._save_pybullet_state() else: self._restore_pybullet_state(self._empty_stage) self._robot.set_full_env_state(state_dict['robot_object_state']) self._stage.set_full_env_state(state_dict['stage_object_state']) self._recreation_time += 1 new_number_of_rigid_objects = len(self._stage.get_rigid_objects()) new_number_of_visual_objects = len(self._stage.get_visual_objects()) if old_number_of_rigid_objects != new_number_of_rigid_objects: reset_observation_space = True if old_number_of_visual_objects != new_number_of_visual_objects: reset_observation_space = True self._task_stage_observation_keys = copy.deepcopy( state_dict['task_observations']) self._robot._latest_full_state = copy.deepcopy( state_dict['latest_full_state']) # self._restore_pybullet_state(state_dict['pybullet_states']) return reset_observation_space
[docs] def get_variable_space_used(self): """ :return: (dict) returns the variables and their corresponding spaces used in the current environment. """ if self._task_params['variables_space'] == 'space_a': intervention_space = self._intervention_space_a elif self._task_params['variables_space'] == 'space_b': intervention_space = self._intervention_space_b elif self._task_params['variables_space'] == 'space_a_b': intervention_space = self._intervention_space_a_b return intervention_space
[docs] def activate_sparse_reward(self): """ Activate the sparse reward in the environment. :return: """ self._task_params['activate_sparse_reward'] = True return
[docs] def get_description(self): """ :return: (str) returns the description of the task itself. """ return
def _set_task_state(self): """ Used to set the task state at the beginning. :return: """ return def _handle_contradictory_interventions(self, interventions_dict): """ handles the contradictory interventions that changes each other for instance objects and silhouettes or the other way around. :param interventions_dict: (dict) the current interventions dict to be performed. :return: (dict) the newly adapted interventions dict after handling the contradictory interventions. """ return interventions_dict def _set_up_stage_arena(self): """ Sets up the stage arena. :return: """ return def _set_up_non_default_observations(self): """ Sets up the non default observation. :return: """ return
[docs] def get_task_generator_variables_values(self): """ :return: (dict) specifying the variables belonging to the task itself. """ return {}
[docs] def apply_task_generator_interventions(self, interventions_dict): """ :param interventions_dict: (dict) variables and their corresponding intervention value. :return: (tuple) first position if the intervention was successful or not, and second position indicates if observation_space needs to be reset. """ return True, False
[docs] def get_info(self): """ :return: (dict) returns the info dictionary after every step of the environment. """ info = dict() info['desired_goal'] = self._current_desired_goal info['achieved_goal'] = self._current_achieved_goal info['success'] = self._task_solved if self._is_ground_truth_state_exposed: info['ground_truth_current_state_varibales'] = \ self.get_current_variable_values() if self._is_partial_solution_exposed: info['possible_solution_intervention'] = dict() for rigid_object in self._stage._rigid_objects: #check if there is an equivilant visual object corresponding possible_corresponding_goal = rigid_object.replace( 'tool', 'goal') if possible_corresponding_goal in self._stage.get_visual_objects( ): info['possible_solution_intervention'][rigid_object] = dict( ) info['possible_solution_intervention'][rigid_object]['cartesian_position'] = \ self._stage.get_object_state(possible_corresponding_goal, 'cartesian_position') info['possible_solution_intervention'][rigid_object]['orientation'] = \ self._stage.get_object_state(possible_corresponding_goal, 'orientation') info['fractional_success'] = self._current_goal_reward return info
[docs] def expose_potential_partial_solution(self): """ Specified to add the potential partial solution to the info dict, that can be used as privileged information afterwards. :return: """ self._is_partial_solution_exposed = True return
[docs] def add_ground_truth_state_to_info(self): """ Specified to add the full ground truth state to the info dict. :return: """ self._is_ground_truth_state_exposed = True return
def _update_task_state(self, update_task_state_dict): """ :param update_task_state_dict: (dict) specifying variables and values that could be used in updating the task state. :return: """ return def _calculate_dense_rewards(self, desired_goal, achieved_goal): """ Specified by the task generator itself. :param desired_goal: (nd.array) specifies the bounding boxes of the desired goal. :param achieved_goal: (nd.array) specifies the bounding boxes of the achieved goal. :return: (tuple) first place specifying the list of reward terms caclulated, and second place specifying the update_task_state_dict to be used by the _update_task_state function. """ return np.array([]), None
[docs] def sample_new_goal(self, level=None): """ Used to sample new goal from the corresponding shape families. :param level: (int) specifying the level - not used for now. :return: (dict) the corresponding interventions dict that could then be applied to get a new sampled goal. """ intervention_dict = dict() if self._task_params['variables_space'] == 'space_a': intervention_space = self._intervention_space_a elif self._task_params['variables_space'] == 'space_b': intervention_space = self._intervention_space_b elif self._task_params['variables_space'] == 'space_a_b': intervention_space = self._intervention_space_a_b for visual_object in self._stage.get_visual_objects(): if visual_object in intervention_space: intervention_dict[visual_object] = dict() if 'cylindrical_position' in intervention_space[visual_object]: intervention_dict[visual_object]['cylindrical_position'] = \ cart2cyl(self._stage.random_position( height_limits=intervention_space[visual_object] ['cylindrical_position'][:, 2], radius_limits=intervention_space[visual_object] ['cylindrical_position'][:, 0], angle_limits=intervention_space[visual_object] ['cylindrical_position'][:, 1])) if 'euler_orientation' in intervention_space[visual_object]: intervention_dict[visual_object]['euler_orientation'] = \ np.random.uniform( intervention_space[visual_object]['euler_orientation'][0], intervention_space[visual_object]['euler_orientation'][1]) return intervention_dict
[docs] def reset_default_state(self): """ Resets the environment task to the default task setting of the corresponding shape family, when it was first initialized. Without the interventions performed afterwards. :return: """ self.restore_state(self._default_starting_state) self._task_solved = False self._set_task_state() self._current_starting_state = copy.deepcopy\ (self._default_starting_state) return
def _set_intervention_space_a(self): """ Used to the set space A limits for all the variables. :return: """ #you can override these easily self._intervention_space_a = dict() self._intervention_space_a['joint_positions'] = \ np.array([[-1.57, -1.2, -3.0] * 3, [-0.69, 0, 0] * 3]) #TODO: add the rest of the exposed variables here for rigid_object in self._stage.get_rigid_objects(): self._intervention_space_a[rigid_object] = dict() if self._stage.get_rigid_objects( )[rigid_object].is_not_fixed(): height = self._stage.get_object_state(rigid_object, 'size')[-1] self._intervention_space_a[rigid_object]['cylindrical_position'] = \ np.array([[0.0, - math.pi, height/2.0], [0.11, math.pi, 0.15]]) self._intervention_space_a[rigid_object]['euler_orientation'] = \ np.array([[0, 0, -math.pi], [0, 0, math.pi]]) if self._stage.get_rigid_objects( )[rigid_object].__class__.__name__ == 'Cuboid': self._intervention_space_a[rigid_object]['size'] = \ np.array([[0.055, 0.055, 0.055], [0.075, 0.075, 0.075]]) self._intervention_space_a[rigid_object]['color'] = \ np.array([[0.5, 0.5, 0.5], [1, 1, 1]]) if self._stage.get_rigid_objects( )[rigid_object].is_not_fixed(): self._intervention_space_a[rigid_object]['mass'] = \ np.array([0.015, 0.045]) for visual_object in self._stage._visual_objects: self._intervention_space_a[visual_object] = dict() height = self._stage.get_object_state(visual_object, 'size')[-1] self._intervention_space_a[visual_object]['cylindrical_position'] = \ np.array([[0.0, - math.pi, height/2.0], [0.11, math.pi, 0.15]]) self._intervention_space_a[visual_object]['euler_orientation'] = \ np.array([[0, 0, -math.pi], [0, 0, math.pi]]) if self._stage.get_visual_objects( )[visual_object].__class__.__name__ == 'SCuboid': self._intervention_space_a[visual_object]['size'] = \ np.array([[0.055, 0.055, 0.055], [0.075, 0.075, 0.075]]) self._intervention_space_a[visual_object]['color'] = \ np.array([[0.5, 0.5, 0.5], [1, 1, 1]]) self._intervention_space_a['floor_color'] = \ np.array([[0.5, 0.5, 0.5], [1, 1, 1]]) self._intervention_space_a['stage_color'] = \ np.array([[0, 0, 0], [0.5, 0.5, 0.5]]) self._intervention_space_a['floor_friction'] = \ np.array([0.3, 0.6]) self._intervention_space_a['stage_friction'] = \ np.array([0.3, 0.6]) for link in self._robot.get_link_names(): self._intervention_space_a[link] = dict() self._intervention_space_a[link]['color'] = \ np.array([[0, 0, 0], [0.5, 0.5, 0.5]]) self._intervention_space_a[link]['mass'] = \ np.array([0.015, 0.045]) return def _set_intervention_space_b(self): """ Used to the set space B limits for all the variables. :return: """ # you can override these easily self._intervention_space_b = dict() self._intervention_space_b['joint_positions'] = \ np.array([[-0.69, 0, 0] * 3, [1.0, 1.57, 3.0] * 3]) # TODO: add the rest of the exposed variables here for rigid_object in self._stage.get_rigid_objects(): self._intervention_space_b[rigid_object] = dict() if self._stage.get_rigid_objects( )[rigid_object].is_not_fixed(): height = self._stage.get_object_state(rigid_object, 'size')[-1] self._intervention_space_b[rigid_object]['cylindrical_position'] = \ np.array([[0.11, - math.pi, height/2.0], [0.15, math.pi, 0.3]]) self._intervention_space_b[rigid_object]['euler_orientation'] = \ np.array([[0, 0, -math.pi], [0, 0, math.pi]]) if self._stage.get_rigid_objects( )[rigid_object].__class__.__name__ == 'Cuboid': self._intervention_space_b[rigid_object]['size'] = \ np.array([[0.075, 0.075, 0.075], [0.095, 0.095, 0.095]]) self._intervention_space_b[rigid_object]['color'] = \ np.array([[0, 0, 0], [0.5, 0.5, 0.5]]) if self._stage.get_rigid_objects( )[rigid_object].is_not_fixed(): self._intervention_space_b[rigid_object]['mass'] = \ np.array([0.045, 0.1]) for visual_object in self._stage.get_visual_objects(): self._intervention_space_b[visual_object] = dict() height = self._stage.get_object_state(visual_object, 'size')[-1] self._intervention_space_b[visual_object]['cylindrical_position'] = \ np.array([[0.11, - math.pi, height/2.0], [0.15, math.pi, 0.3]]) self._intervention_space_b[visual_object]['euler_orientation'] = \ np.array([[0, 0, -math.pi], [0, 0, math.pi]]) if self._stage.get_visual_objects( )[visual_object].__class__.__name__ == 'SCuboid': self._intervention_space_b[visual_object]['size'] = \ np.array([[0.075, 0.075, 0.075], [0.095, 0.095, 0.095]]) self._intervention_space_b[visual_object]['color'] = \ np.array([[0, 0, 0], [0.5, 0.5, 0.5]]) self._intervention_space_b['floor_color'] = \ np.array([[0, 0, 0], [0.5, 0.5, 0.5]]) self._intervention_space_b['stage_color'] = \ np.array([[0.5, 0.5, 0.5], [1, 1, 1]]) self._intervention_space_b['floor_friction'] = \ np.array([0.6, 0.8]) self._intervention_space_b['stage_friction'] = \ np.array([0.6, 0.8]) for link in self._robot.get_link_names(): self._intervention_space_b[link] = dict() self._intervention_space_b[link]['color'] = \ np.array([[0.5, 0.5, 0.5], [1, 1, 1]]) self._intervention_space_b[link]['mass'] = \ np.array([0.045, 0.1]) return def _set_intervention_space_a_b(self): """ Used to the set space A_B limits for all the variables. :return: """ # you can override these easily #iterate over intervention space a and space b and concatenate them self._intervention_space_a_b = dict() for variable in self._intervention_space_a: if isinstance(self._intervention_space_a[variable], dict): self._intervention_space_a_b[variable] = dict() for subvariable_name in self._intervention_space_a[variable]: self._intervention_space_a_b[variable][subvariable_name] = \ combine_intervention_spaces(self._intervention_space_a[variable][subvariable_name], self._intervention_space_b[variable][subvariable_name]) else: self._intervention_space_a_b[variable] = combine_intervention_spaces( self._intervention_space_a[variable], self._intervention_space_b[variable]) return
[docs] def get_desired_goal(self): """ :return: (nd.array) specifies the desired goal as bounding boxes of goal shapes by default. """ desired_goal = [] for visual_goal in self._stage.get_visual_objects(): desired_goal.append(self._stage.get_visual_objects() [visual_goal].get_bounding_box()) return np.array(desired_goal)
[docs] def get_achieved_goal(self): """ :return: (nd.array) specifies the achieved goal as bounding boxes of objects by default. """ achieved_goal = [] for rigid_object in self._stage.get_rigid_objects(): if self._stage.get_rigid_objects()[rigid_object].is_not_fixed(): achieved_goal.append(self._stage.get_rigid_objects() [rigid_object].get_bounding_box()) return np.array(achieved_goal)
def _goal_reward(self, achieved_goal, desired_goal): """ :param achieved_goal: (nd.array) specifies the achieved goal as bounding boxes of objects by default. :param desired_goal: (nd.array) specifies the desired goal as bounding boxes of goal shapes by default. :return: (float) returns the fractional overlap of visual objects and rigid objects. """ # intersection areas / union of all visual_objects achieved_goal = np.reshape(achieved_goal, [-1, 2, 3]) desired_goal = np.reshape(desired_goal, [-1, 2, 3]) intersection_area = 0 #under the assumption that the visual objects dont intersect union_area = 0 for desired_subgoal_bb in desired_goal: union_area += get_bounding_box_volume(desired_subgoal_bb) for rigid_object_bb in achieved_goal: intersection_area += get_intersection(desired_subgoal_bb, rigid_object_bb) if union_area > 0: sparse_reward = intersection_area / float(union_area) else: sparse_reward = 1 return sparse_reward def _update_success(self, goal_reward): """ Used to update if the task is considered solved or not. :param goal_reward: (float) the fractional overlap of visual objects and rigid objects. :return: """ preliminary_success = self._check_preliminary_success(goal_reward) if preliminary_success: self._task_solved = True else: self._task_solved = False return def _check_preliminary_success(self, goal_reward): """ Used to check if the task is considered solved or not. :param goal_reward: (float) the fractional overlap of visual objects and rigid objects. :return: (bool) true if the task is considered solved. False otherwise. """ if goal_reward > 0.9: return True else: return False
[docs] def get_reward(self): """ Used to calculate the final reward for the last action executed in the system. :return: (float) the final reward which can be a mix of dense rewards and the sparse rewards caclulated by default using the fractional overlap of visual objects and rigid objects. """ self._current_desired_goal = self.get_desired_goal() self._current_achieved_goal = self.get_achieved_goal() self._current_goal_reward = self._goal_reward( desired_goal=self._current_desired_goal, achieved_goal=self._current_achieved_goal) goal_reward = self._current_goal_reward self._update_success(self._current_goal_reward) if self._task_params['activate_sparse_reward']: if self._task_solved: goal_reward = 1 else: goal_reward = 0 return goal_reward else: dense_rewards, update_task_state_dict = \ self._calculate_dense_rewards(achieved_goal=self._current_achieved_goal, desired_goal=self._current_desired_goal) reward = np.sum(np.array(dense_rewards) * self._task_params["dense_reward_weights"]) \ + goal_reward * \ self._task_params["fractional_reward_weight"] self._update_task_state(update_task_state_dict) return reward
[docs] def compute_reward(self, achieved_goal, desired_goal, info): """ Used to calculate the reward given a hypothetical situation that could be used in hindsight experience replay algorithms variants. Can only be used in the spare reward setting for the other setting it can be tricky here. :param achieved_goal: (nd.array) specifies the achieved goal as bounding boxes of objects by default. :param desired_goal: (nd.array) specifies the desired goal as bounding boxes of goal shapes by default. :param info: (dict) not used for now. :return: (float) the final reward achieved given the hypothetical situation. """ goal_reward = self._goal_reward(desired_goal=desired_goal, achieved_goal=achieved_goal) if self._task_params['activate_sparse_reward']: if self._check_preliminary_success(goal_reward): goal_reward = 1 else: goal_reward = 0 return goal_reward else: reward = goal_reward * self._task_params[ "fractional_reward_weight"] return reward
[docs] def init_task(self, robot, stage, max_episode_length, create_world_func): """ :param robot: (causal_world.envs.Robot) robot object of the environment. :param stage: (causal_world.envs.Stage) stage object of the environment. :param max_episode_length: (int) specifies the maximum episode length of the task. :param create_world_func: (func) the function used to create the world around the robot. :return: """ self._create_world_func = create_world_func self._robot = robot self._stage = stage if self._task_params["joint_positions"] is not None: self._robot.reset_state(joint_positions=np.array( self._task_params["joint_positions"]), joint_velocities=np.zeros([ 9, ])) else: self._robot.reset_state( joint_positions=self._robot.get_rest_pose()[0], joint_velocities=np.zeros([ 9, ])) self._empty_stage = self._save_pybullet_state() self._set_up_stage_arena() self._default_starting_state = \ self.save_state() self._current_starting_state = \ self.save_state() self._stage.finalize_stage() if max_episode_length is None: self._max_episode_length = self.get_default_max_episode_length() else: self._max_episode_length = max_episode_length self._setup_non_default_robot_observation_key( 'time_left_for_task', self._calculate_time_left, lower_bound=np.array([0]), upper_bound=np.array([self._max_episode_length])) self._set_up_non_default_observations() self._set_intervention_space_a() self._set_intervention_space_b() self._set_intervention_space_a_b() self._set_task_state() return
def _calculate_time_left(self): """ :return: (float) returns the time left to achieve the task in the episode. """ current_control_index = self._robot.get_control_index() time_spent = (current_control_index + 1) * self._robot.get_dt() return self._max_episode_length - time_spent def _setup_non_default_robot_observation_key(self, observation_key, observation_function, lower_bound, upper_bound): """ Used to setup non default robot observation keys. :param observation_key: (str) observation key to be added :param observation_function: (func) function used to calculate the observation using no parameters since it can use the self._current_full_observations_dict property in the task. :param lower_bound: (nd.array) specifies the lower bound of this observation for normalization. :param upper_bound: (nd.array) specifies the upper bound of this observation for normalization. :return: """ self._robot.add_observation(observation_key, lower_bound=lower_bound, upper_bound=upper_bound) self._non_default_robot_observation_funcs[observation_key] = \ observation_function return def _setup_non_default_stage_observation_key(self, observation_key, observation_function, lower_bound, upper_bound): """ Used to setup non default Stage observation keys. :param observation_key: (str) observation key to be added :param observation_function: (func) function used to calculate the observation using no parameters since it can use the self._current_full_observations_dict property in the task. :param lower_bound: (nd.array) specifies the lower bound of this observation for normalization. :param upper_bound: (nd.array) specifies the upper bound of this observation for normalization. :return: """ self._stage.add_observation(observation_key, lower_bound=lower_bound, upper_bound=upper_bound) self._non_default_stage_observation_funcs[observation_key] = \ observation_function return
[docs] def reset_task(self, interventions_dict=None, check_bounds=True): """ :param interventions_dict: (dict) intervention dict to be specified if an intervention to be latched as the new starting state of the environment. :param check_bounds: (bool) specified when not in train mode and a check for the intervention if its allowed or not is needed. :return: (tuple): success_signal specifying if the intervention is successful or not, interventions_info specifying number of interventions and other info, reset_observation_space_signal a bool specifying if the observation space needs to be changed or not. """ self._robot.clear() reset_observation_space_signal = \ self.restore_state(self._current_starting_state) self._task_solved = False success_signal = None interventions_info = None if interventions_dict is not None: success_signal, interventions_info, reset_observation_space_signal = \ self.apply_interventions(interventions_dict, check_bounds=check_bounds) if success_signal: self._current_starting_state = self.save_state() self._set_task_state() return success_signal, interventions_info, reset_observation_space_signal
[docs] def filter_structured_observations(self): """ :return: (np.array) returns the structured observations as set up by the corresponding task generator. """ robot_observations_dict = self._robot.\ get_current_observations(self._robot_observation_helper_keys) stage_observations_dict = self._stage.\ get_current_observations(self._stage_observation_helper_keys) self._current_full_observations_dict = dict(robot_observations_dict) self._current_full_observations_dict.update(stage_observations_dict) observations_filtered = np.array([]) for key in self._task_robot_observation_keys: if key in self._non_default_robot_observation_funcs: if self._robot._normalize_observations: normalized_observations = \ self._robot.\ normalize_observation_for_key\ (key=key, observation= self._non_default_robot_observation_funcs[key]()) observations_filtered = \ np.append(observations_filtered, normalized_observations) else: observations_filtered =\ np.append(observations_filtered, self._non_default_robot_observation_funcs[key]()) else: observations_filtered = \ np.append(observations_filtered, np.array(self._current_full_observations_dict[key])) for key in self._task_stage_observation_keys: if key in self._non_default_stage_observation_funcs: if self._stage._normalize_observations: normalized_observations = \ self._stage.normalize_observation_for_key\ (key=key, observation= self._non_default_stage_observation_funcs[key]()) observations_filtered = \ np.append(observations_filtered, normalized_observations) else: observations_filtered =\ np.append(observations_filtered, self._non_default_robot_observation_funcs[key]()) else: observations_filtered = \ np.append(observations_filtered, np.array(self._current_full_observations_dict[key])) return observations_filtered
[docs] def get_task_params(self): """ :return: (dict) specifying all variables belonging to the task generator and their values. """ return self._task_params
[docs] def is_done(self): """ :return: (bool) specifying if the task is achieved or not - not used for now. """ return False
[docs] def do_single_random_intervention(self): """ Does a single random intervention on one of the variables in the environment. :return: (tuple): success_signal specifying if the intervention is successful or not, interventions_info specifying number of interventions and other info, interventions_dict specifying the intervention performed, reset_observation_space_signal a bool specifying if the observation space needs to be changed or not. """ interventions_dict = dict() if self._task_params['variables_space'] == 'space_a': intervention_space = self._intervention_space_a elif self._task_params['variables_space'] == 'space_b': intervention_space = self._intervention_space_a elif self._task_params['variables_space'] == 'space_a_b': intervention_space = self._intervention_space_a_b # choose random variable one intervention only and intervene if len(intervention_space) == 0: return False, {}, {} variable_name = np.random.choice(list(intervention_space)) variable_space = intervention_space[variable_name] sub_variable_name = None if isinstance(variable_space, dict): sub_variable_name = np.random.choice(list(variable_space.keys())) variable_space = variable_space[sub_variable_name] chosen_intervention = np.random.uniform(variable_space[0], variable_space[1]) if sub_variable_name is None: interventions_dict[variable_name] = \ chosen_intervention else: interventions_dict[variable_name] = dict() interventions_dict[variable_name][sub_variable_name] = \ chosen_intervention success_signal, interventions_info, reset_observation_space_signal = \ self.apply_interventions(interventions_dict, check_bounds=False) return success_signal, interventions_info, interventions_dict, \ reset_observation_space_signal
[docs] def get_intervention_space_a(self): """ :return: (dict) specifies the variables and their corresponding bounds in space A. """ return self._intervention_space_a
[docs] def get_intervention_space_b(self): """ :return: (dict) specifies the variables and their corresponding bounds in space B. """ return self._intervention_space_b
[docs] def get_intervention_space_a_b(self): """ :return: (dict) specifies the variables and their corresponding bounds in space A_B. """ return self._intervention_space_a_b
[docs] def get_current_variable_values(self): """ :return: """ variable_params = dict() variable_params.\ update(self._robot.get_current_variable_values()) variable_params. \ update(self._stage.get_current_variable_values()) variable_params. \ update(self.get_task_generator_variables_values()) return variable_params
[docs] def is_intervention_in_bounds(self, interventions_dict): """ :param interventions_dict: (dict) specifying the variable names and its corresponding values. :return: (bool) true if the intervention values are in the operating intervention space. False otherwise. """ if self._task_params['variables_space'] == 'space_a': intervention_space = self._intervention_space_a elif self._task_params['variables_space'] == 'space_b': intervention_space = self._intervention_space_b elif self._task_params['variables_space'] == 'space_a_b': intervention_space = self._intervention_space_a_b for intervention in interventions_dict: if intervention in intervention_space: if not isinstance(interventions_dict[intervention], dict): if ((intervention_space[intervention][0] > interventions_dict[intervention]).any() or \ (intervention_space[intervention][1] < interventions_dict[intervention]).any()): logging.warning("lower bound is: " + str(intervention_space[intervention][0])) logging.warning("applied: " + str(interventions_dict[intervention])) logging.warning("upper bound is:" + str(intervention_space[intervention][1])) return False else: for sub_variable_name in interventions_dict[intervention]: if sub_variable_name in intervention_space[intervention] and \ ((intervention_space[intervention] [sub_variable_name][0] > interventions_dict[intervention][sub_variable_name]).any() or \ (intervention_space[intervention] [sub_variable_name][1] < interventions_dict[intervention][sub_variable_name]).any()): logging.warning("lower bound is:" + str(intervention_space [intervention][sub_variable_name][0])) logging.warning("applied:" + str(interventions_dict [intervention][sub_variable_name])) logging.warning("upper bound is:" + str(intervention_space[intervention] [sub_variable_name][1])) return False elif sub_variable_name not in intervention_space[intervention]: logging.warning("{} not in intervention_space") return False else: logging.warning("{} not in intervention_space".format(intervention)) return False return True
[docs] def divide_intervention_dict(self, interventions_dict): """ Divides the interventions to three dicts for the robot, stage and the task specific interventions. :param interventions_dict: (dict) specifying the variable names and its corresponding values. :return: (tuple) robot_interventions_dict, stage_interventions_dict, task_generator_interventions_dict """ robot_intervention_keys = \ self._robot.get_current_variable_values().keys() stage_intervention_keys = \ self._stage.get_current_variable_values().keys() task_generator_intervention_keys = \ self.get_task_generator_variables_values().keys() robot_interventions_dict = dict() stage_interventions_dict = dict() task_generator_interventions_dict = dict() for intervention in interventions_dict: if intervention in robot_intervention_keys: robot_interventions_dict[intervention] = \ interventions_dict[intervention] elif intervention in stage_intervention_keys: stage_interventions_dict[intervention] = \ interventions_dict[intervention] elif intervention in task_generator_intervention_keys: task_generator_interventions_dict[intervention] = \ interventions_dict[intervention] return robot_interventions_dict, \ stage_interventions_dict, \ task_generator_interventions_dict
[docs] def apply_interventions(self, interventions_dict, check_bounds=False): """ :param interventions_dict: (dict) specifying the variable names and its corresponding values. :param check_bounds: (bool) true to check if variables and their corresponding values exist in the operating space. :return: (tuple): success_signal specifying if the intervention is successful or not, interventions_info specifying number of interventions and other info, reset_observation_space_signal a bool specifying if the observation space needs to be changed or not. """ interventions_info = { 'out_bounds': False, 'robot_infeasible': None, 'stage_infeasible': None, 'task_generator_infeasible': None } stage_infeasible = False robot_infeasible = False if check_bounds and not self.is_intervention_in_bounds( interventions_dict): interventions_info['out_bounds'] = True logging.warning("Applying intervention is out of bounds " + str(interventions_dict)) return False, interventions_info, False if check_bounds and not self.is_intervention_in_bounds( interventions_dict): interventions_info['out_bounds'] = True logging.warning("Applying intervention is out of bounds " + str(interventions_dict)) return False, interventions_info, False interventions_dict = \ dict(self._handle_contradictory_interventions(interventions_dict)) robot_interventions_dict, stage_interventions_dict, \ task_generator_interventions_dict = \ self.divide_intervention_dict(interventions_dict) current_state = self.save_state() self._robot.apply_interventions(robot_interventions_dict) self._stage.apply_interventions(stage_interventions_dict) task_generator_intervention_success_signal, reset_observation_space_signal = \ self.apply_task_generator_interventions \ (task_generator_interventions_dict) #this is a hack for now to not check feasibility when adding/removing objects since #The stage state is quite different afterwards and it will be # hard to restore its exact state #we dont handle this if len(task_generator_interventions_dict) == 0: pre_contact_check_state = self._save_pybullet_state() self._robot.step_simulation() if not self._stage.check_feasiblity_of_stage(): stage_infeasible = True logging.warning("Applying intervention lead to infeasibility of the stage") if not self._robot.check_feasibility_of_robot_state(): robot_infeasible = True logging.warning("Applying intervention lead to infeasibility of the robot") if stage_infeasible or robot_infeasible: self.restore_state(current_state, avoid_reloading_urdf=True) else: self._restore_pybullet_state(pre_contact_check_state) self._remove_pybullet_state(pre_contact_check_state) interventions_info['robot_infeasible'] = \ robot_infeasible interventions_info['stage_infeasible'] = \ stage_infeasible interventions_info['task_generator_infeasible'] = \ not task_generator_intervention_success_signal success = not robot_infeasible and \ not stage_infeasible and \ task_generator_intervention_success_signal if success: self._adjust_variable_spaces_after_intervention(interventions_dict) return success, \ interventions_info, reset_observation_space_signal
[docs] def get_default_max_episode_length(self): """ :return: (int) returns the default maximum episode length. """ if self._task_params["task_name"] == 'reaching': episode_length = 5 else: episode_length = len(self._stage.get_rigid_objects()) * 10 return episode_length
[docs] def get_task_name(self): """ :return: (str) specifies the name of the goal shape family generator. """ return self._task_name