Source code for causal_world.task_generators.picking

from causal_world.task_generators.base_task import BaseTask
import numpy as np


[docs]class PickingTaskGenerator(BaseTask):
[docs] def __init__(self, variables_space='space_a_b', fractional_reward_weight=1, dense_reward_weights=np.array([250, 0, 125, 0, 750, 0, 0, 0.005]), activate_sparse_reward=False, tool_block_mass=0.02, joint_positions=None, tool_block_position=np.array([0, 0, 0.0325]), tool_block_orientation=np.array([0, 0, 0, 1]), goal_height=0.15): """ This task generates a task for picking an object in the air. :param variables_space: (str) space to be used either 'space_a' or 'space_b' or 'space_a_b' :param fractional_reward_weight: (float) weight multiplied by the fractional volumetric overlap in the reward. :param dense_reward_weights: (list float) specifies the reward weights for all the other reward terms calculated in the calculate_dense_rewards function. :param activate_sparse_reward: (bool) specified if you want to sparsify the reward by having +1 or 0 if the volumetric fraction overlap more than 90%. :param tool_block_mass: (float) specifies the blocks mass. :param joint_positions: (nd.array) specifies the joints position to start the episode with. None if the default to be used. :param tool_block_position: (nd.array) specifies the cartesian position of the tool block, x, y, z. :param tool_block_orientation: (nd.array) specifies the euler orientation of the tool block, yaw, roll, pitch. :param goal_height: (float) specifies the goal height that needs to be reached. """ super().__init__(task_name="picking", variables_space=variables_space, fractional_reward_weight=fractional_reward_weight, dense_reward_weights=dense_reward_weights, activate_sparse_reward=activate_sparse_reward) self._task_robot_observation_keys = ["time_left_for_task", "joint_positions", "joint_velocities", "end_effector_positions"] self._task_params["goal_height"] = goal_height self._task_params["tool_block_mass"] = tool_block_mass self._task_params["joint_positions"] = joint_positions self._task_params["tool_block_position"] = tool_block_position self._task_params["tool_block_orientation"] = tool_block_orientation self.previous_object_position = None self.previous_end_effector_positions = None self.previous_joint_velocities = None
[docs] def get_description(self): """ :return: (str) returns the description of the task itself. """ return "Task where the goal is to pick a " \ "cube towards a goal height"
def _set_up_stage_arena(self): """ :return: """ creation_dict = { 'name': "tool_block", 'shape': "cube", 'initial_position': self._task_params["tool_block_position"], 'initial_orientation': self._task_params["tool_block_orientation"], 'mass': self._task_params["tool_block_mass"] } self._stage.add_rigid_general_object(**creation_dict) goal_block_position = np.array(self._task_params["tool_block_position"]) goal_block_position[-1] = self._task_params["goal_height"] creation_dict = { 'name': "goal_block", 'shape': "cube", 'position': goal_block_position, 'orientation': self._task_params["tool_block_orientation"] } self._stage.add_silhoutte_general_object(**creation_dict) self._task_stage_observation_keys = [ "tool_block_type", "tool_block_size", "tool_block_cartesian_position", "tool_block_orientation", "tool_block_linear_velocity", "tool_block_angular_velocity", "goal_block_type", "goal_block_size", "goal_block_cartesian_position", "goal_block_orientation" ] return def _set_intervention_space_a(self): """ :return: """ super(PickingTaskGenerator, self)._set_intervention_space_a() for visual_object in self._stage.get_visual_objects(): self._intervention_space_a[visual_object]['cylindrical_position'][ 0][-1] \ = 0.08 self._intervention_space_a[visual_object]['cylindrical_position'][ 1][-1] \ = 0.20 return def _set_intervention_space_b(self): """ :return: """ super(PickingTaskGenerator, self)._set_intervention_space_b() for visual_object in self._stage.get_visual_objects(): self._intervention_space_b[visual_object]['cylindrical_position'][0][ -1] \ = 0.20 self._intervention_space_b[visual_object]['cylindrical_position'][1][ -1] \ = 0.25 return def _calculate_dense_rewards(self, desired_goal, achieved_goal): """ :param desired_goal: :param achieved_goal: :return: """ # rewards order # 1) delta how much are you getting the block close to the goal # 2) absolute how much the block is close to the goal # 3) delta how much are you getting the block close to the center # 4) absolute how much is the the block is close to the center # 5) delta how much the fingers are close to block # 6) absolute how much fingers are close to block # 7) mean dist_of closest two fingers outside_bounding_ellipsoid # 8) delta in joint velocities rewards = list() block_position = self._stage.get_object_state('tool_block', 'cartesian_position') target_height = self._stage.get_object_state('goal_block', 'cartesian_position')[-1] joint_velocities = self._robot.get_latest_full_state()['velocities'] previous_block_to_goal = abs(self.previous_object_position[2] - target_height) current_block_to_goal = abs(block_position[2] - target_height) rewards.append(previous_block_to_goal - current_block_to_goal) rewards.append(-current_block_to_goal) previous_block_to_center = np.sqrt( (self.previous_object_position[0] ** 2 + self.previous_object_position[1] ** 2)) current_block_to_center = np.sqrt( (block_position[0] ** 2 + block_position[1] ** 2)) rewards.append(previous_block_to_center - current_block_to_center) rewards.append(-current_block_to_center) end_effector_positions = \ self._robot.get_latest_full_state()['end_effector_positions'] end_effector_positions = end_effector_positions.reshape(-1, 3) current_distance_from_block = np.linalg.norm(end_effector_positions - block_position) previous_distance_from_block = np.linalg.norm( self.previous_end_effector_positions - self.previous_object_position) rewards.append(previous_distance_from_block - current_distance_from_block) rewards.append(-current_distance_from_block) # check for all the fingers if they are inside the sphere or not object_size = self._stage.get_object_state('tool_block', 'size') dist_outside_bounding_ellipsoid = np.copy( np.abs(end_effector_positions - block_position)) dist_outside_bounding_ellipsoid[ dist_outside_bounding_ellipsoid < object_size] = 0 dist_outside_bounding_ellipsoid = \ np.mean(dist_outside_bounding_ellipsoid, axis=1) dist_outside_bounding_ellipsoid.sort() rewards.append(-np.sum(dist_outside_bounding_ellipsoid[:2])) rewards.append(-np.linalg.norm(joint_velocities - self.previous_joint_velocities)) update_task_info = { 'current_end_effector_positions': end_effector_positions, 'current_tool_block_position': block_position, 'current_velocity': joint_velocities } return rewards, update_task_info def _update_task_state(self, update_task_info): """ :param update_task_info: :return: """ self.previous_end_effector_positions = \ update_task_info['current_end_effector_positions'] self.previous_object_position = \ update_task_info['current_tool_block_position'] self.previous_joint_velocities = \ update_task_info['current_velocity'] return def _set_task_state(self): """ :return: """ self.previous_end_effector_positions = \ self._robot.get_latest_full_state()['end_effector_positions'] self.previous_end_effector_positions = \ self.previous_end_effector_positions.reshape(-1, 3) self.previous_object_position = \ self._stage.get_object_state('tool_block', 'cartesian_position') self.previous_joint_velocities = \ self._robot.get_latest_full_state()['velocities'] return def _handle_contradictory_interventions(self, interventions_dict): """ :param interventions_dict: :return: """ # for example size on goal_or tool should be propagated to the other if 'goal_block' in interventions_dict: if 'size' in interventions_dict['goal_block']: if 'tool_block' not in interventions_dict: interventions_dict['tool_block'] = dict() interventions_dict['tool_block']['size'] = \ interventions_dict['goal_block']['size'] if 'cylindrical_position' in interventions_dict['goal_block']: interventions_dict['goal_block']['cylindrical_position'][0] = 0 interventions_dict['goal_block']['cylindrical_position'][1] = 0 elif 'tool_block' in interventions_dict: if 'size' in interventions_dict['tool_block']: if 'goal_block' not in interventions_dict: interventions_dict['goal_block'] = dict() interventions_dict['goal_block']['size'] = \ interventions_dict['tool_block']['size'] return interventions_dict
[docs] def sample_new_goal(self, level=None): """ Used to sample new goal from the corresponding shape families. :param level: (int) specifying the level - not used for now. :return: (dict) the corresponding interventions dict that could then be applied to get a new sampled goal. """ intervention_dict = dict() intervention_dict['goal_block'] = dict() if self._task_params['variables_space'] == 'space_a': intervention_space = self._intervention_space_a elif self._task_params['variables_space'] == 'space_b': intervention_space = self._intervention_space_b elif self._task_params['variables_space'] == 'space_a_b': intervention_space = self._intervention_space_a_b intervention_dict['goal_block']['cylindrical_position'] = \ np.array([0, 0, np.random.uniform(intervention_space['goal_block']['cylindrical_position'] [0][-1], intervention_space['goal_block']['cylindrical_position'] [1][-1])]) return intervention_dict
def _adjust_variable_spaces_after_intervention(self, interventions_dict): spaces = [self._intervention_space_a, self._intervention_space_b, self._intervention_space_a_b] if 'tool_block' in interventions_dict: if 'size' in interventions_dict['tool_block']: for variable_space in spaces: variable_space['tool_block'][ 'cylindrical_position'][0][ -1] = \ self._stage.get_object_state('tool_block', 'size')[ -1] / 2.0 variable_space['goal_block'][ 'cylindrical_position'][0][ -1] = \ self._stage.get_object_state('goal_block', 'size')[ -1] / 2.0 return