Source code for causal_world.task_generators.reaching

from causal_world.task_generators.base_task import BaseTask
import numpy as np
from causal_world.configs.world_constants import WorldConstants


[docs]class ReachingTaskGenerator(BaseTask):
[docs] def __init__(self, variables_space='space_a_b', fractional_reward_weight=1, dense_reward_weights=np.array([100000,0, 0, 0]), default_goal_60=np.array([0, 0, 0.10]), default_goal_120=np.array([0, 0, 0.13]), default_goal_300=np.array([0, 0, 0.16]), joint_positions=None, activate_sparse_reward=False): """ This task generator will generate a task for reaching. :param variables_space: (str) space to be used either 'space_a' or 'space_b' or 'space_a_b' :param fractional_reward_weight: (float) weight multiplied by the fractional volumetric overlap in the reward. :param dense_reward_weights: (list float) specifies the reward weights for all the other reward terms calculated in the calculate_dense_rewards function. :param default_goal_60: (nd.array) the position of the goal for first finger, x, y, z. :param default_goal_120: (nd.array) the position of the goal for second finger, x, y, z. :param default_goal_300: (nd.array) the position of the goal for third finger, x, y, z. :param joint_positions: (nd.array) specifies the joints position to start the episode with. None if the default to be used. :param activate_sparse_reward: (bool) specified if you want to sparsify the reward by having +1 or 0 if the mean distance from goal is < 0.01. """ super().__init__(task_name="reaching", variables_space=variables_space, fractional_reward_weight=fractional_reward_weight, dense_reward_weights=dense_reward_weights, activate_sparse_reward=activate_sparse_reward) self._task_robot_observation_keys = ["time_left_for_task", "joint_positions", "joint_velocities", "end_effector_positions"] self._task_params['default_goal_60'] = default_goal_60 self._task_params['default_goal_120'] = default_goal_120 self._task_params['default_goal_300'] = default_goal_300 self._task_params["joint_positions"] = joint_positions self.previous_end_effector_positions = None self.previous_joint_velocities = None self.current_number_of_obstacles = 0
def _set_up_stage_arena(self): """ :return: """ creation_dict = { 'name': "goal_60", 'shape': "sphere", 'color': np.array([1, 0, 0]), 'position': self._task_params['default_goal_60'] } self._stage.add_silhoutte_general_object(**creation_dict) creation_dict = { 'name': "goal_120", 'shape': "sphere", 'color': np.array([0, 1, 0]), 'position': self._task_params['default_goal_120'] } self._stage.add_silhoutte_general_object(**creation_dict) creation_dict = { 'name': "goal_300", 'shape': "sphere", 'color': np.array([0, 0, 1]), 'position': self._task_params['default_goal_300'] } self._stage.add_silhoutte_general_object(**creation_dict) self._task_stage_observation_keys = [ "goal_60_cartesian_position", "goal_120_cartesian_position", "goal_300_cartesian_position" ] return
[docs] def get_description(self): """ :return: (str) returns the description of the task itself. """ return \ "Task where the goal is to reach a " \ "goal point for each finger"
def _calculate_dense_rewards(self, desired_goal, achieved_goal): """ :param desired_goal: :param achieved_goal: :return: """ end_effector_positions_goal = desired_goal current_end_effector_positions = achieved_goal previous_dist_to_goal = np.linalg.norm( end_effector_positions_goal - self.previous_end_effector_positions) current_dist_to_goal = np.linalg.norm(end_effector_positions_goal - current_end_effector_positions) rewards = list() rewards.append(previous_dist_to_goal - current_dist_to_goal) rewards.append(-current_dist_to_goal) rewards.append( -np.linalg.norm(self._robot.get_latest_full_state()['torques'])) rewards.append(-np.linalg.norm(np.abs(self._robot.get_latest_full_state( )['velocities'] - self.previous_joint_velocities), ord=2)) update_task_info = { 'current_end_effector_positions': current_end_effector_positions, 'current_velocity': self._robot.get_latest_full_state()['velocities'] } return rewards, update_task_info def _update_task_state(self, update_task_info): """ :param update_task_info: :return: """ self.previous_end_effector_positions = \ update_task_info['current_end_effector_positions'] self.previous_joint_velocities = \ update_task_info['current_velocity'] return def _set_task_state(self): """ :return: """ self.previous_end_effector_positions = \ self._robot.get_latest_full_state()['end_effector_positions'] self.previous_joint_velocities = \ self._robot.get_latest_full_state()['velocities'] return
[docs] def get_desired_goal(self): """ :return: (nd.array) specifies the desired goal as array of all three positions of the finger goals. """ desired_goal = np.array([]) desired_goal = np.append( desired_goal, self._stage.get_object_state('goal_60', 'cartesian_position')) desired_goal = np.append( desired_goal, self._stage.get_object_state('goal_120', 'cartesian_position')) desired_goal = np.append( desired_goal, self._stage.get_object_state('goal_300', 'cartesian_position')) return desired_goal
[docs] def get_achieved_goal(self): """ :return: (nd.array) specifies the achieved goal as concatenated end-effector positions. """ achieved_goal = self._robot.get_latest_full_state( )['end_effector_positions'] return np.array(achieved_goal)
def _goal_reward(self, achieved_goal, desired_goal): """ :param achieved_goal: :param desired_goal: :return: """ current_end_effector_positions = achieved_goal current_dist_to_goal = np.abs(desired_goal - current_end_effector_positions) current_dist_to_goal_mean = np.mean(current_dist_to_goal) return np.array(current_dist_to_goal_mean) def _check_preliminary_success(self, goal_reward): """ :param goal_reward: :return: """ if goal_reward < 0.01: return True else: return False def _calculate_fractional_success(self, goal_reward): """ :param goal_reward: :return: """ clipped_distance = np.clip(goal_reward, 0.01, 0.03) distance_from_success = clipped_distance - 0.01 fractional_success = 1 - (distance_from_success / 0.02) return fractional_success
[docs] def get_info(self): """ :return: (dict) returns the info dictionary after every step of the environment. """ info = dict() info['desired_goal'] = self._current_desired_goal info['achieved_goal'] = self._current_achieved_goal info['success'] = self._task_solved if self._is_ground_truth_state_exposed: info['ground_truth_current_state_varibales'] = \ self.get_current_variable_values() if self._is_partial_solution_exposed: info['possible_solution_intervention'] = dict() info['possible_solution_intervention']['joint_positions'] = \ self._robot.get_joint_positions_from_tip_positions( self._current_desired_goal, self._robot.get_latest_full_state()['positions']) info['fractional_success'] =\ self._calculate_fractional_success(self._current_goal_reward) return info
def _set_intervention_space_a(self): """ :return: """ super(ReachingTaskGenerator, self)._set_intervention_space_a() self._intervention_space_a['number_of_obstacles'] = \ np.array([1, 5]) return def _set_intervention_space_b(self): """ :return: """ super(ReachingTaskGenerator, self)._set_intervention_space_b() self._intervention_space_b['number_of_obstacles'] = \ np.array([1, 5]) return
[docs] def get_task_generator_variables_values(self): """ :return: (dict) specifying the variables belonging to the task itself. """ task_generator_variables = dict() task_generator_variables['number_of_obstacles'] = \ self.current_number_of_obstacles return task_generator_variables
[docs] def apply_task_generator_interventions(self, interventions_dict): """ :param interventions_dict: (dict) variables and their corresponding intervention value. :return: (tuple) first position if the intervention was successful or not, and second position indicates if observation_space needs to be reset. """ if len(interventions_dict) == 0: return True, False reset_observation_space = False if "number_of_obstacles" in interventions_dict: if int(interventions_dict["number_of_obstacles"] ) > self.current_number_of_obstacles: reset_observation_space = True for i in range(self.current_number_of_obstacles, int(interventions_dict["number_of_obstacles"])): self._stage.add_rigid_general_object( name="obstacle_" + str(i), shape="static_cube", size=np.array([0.01, 0.01, 0.01]), color=np.array([0, 0, 0]), position=np.random.uniform(WorldConstants.ARENA_BB[0], WorldConstants.ARENA_BB[1])) self.current_number_of_obstacles += 1 self._task_stage_observation_keys.append("obstacle_" + str(i) + "_type") self._task_stage_observation_keys.append("obstacle_" + str(i) + "_size") self._task_stage_observation_keys.append( "obstacle_" + str(i) + "_cartesian_position") self._task_stage_observation_keys.append("obstacle_" + str(i) + "_orientation") else: return True, reset_observation_space else: raise Exception("this task generator variable " "is not yet defined") self._set_intervention_space_b() self._set_intervention_space_a() self._set_intervention_space_a_b() self._stage.finalize_stage() return True, reset_observation_space