from causal_world.task_generators.base_task import BaseTask
import numpy as np
import copy
[docs]class Stacking2TaskGenerator(BaseTask):
[docs] def __init__(self, variables_space='space_a_b',
fractional_reward_weight=1,
dense_reward_weights=np.array([750, 250,
250, 125,
0.005]),
activate_sparse_reward=False,
tool_block_mass=0.02,
tool_block_size=0.065,
joint_positions=None,
tool_block_1_position=np.array([0, 0, 0.0325]),
tool_block_1_orientation=np.array([0, 0, 0, 1]),
tool_block_2_position=np.array([0.01, 0.08, 0.0325]),
tool_block_2_orientation=np.array([0, 0, 0, 1]),
goal_position=np.array([-0.06, -0.06, 0.0325]),
goal_orientation=np.array([0, 0, 0, 1])):
"""
This task generates a task for stacking 2 blocks above each other.
Note: it belongs to the same shape family of towers, we only provide a
specific task generator for it to be able to do reward engineering
and to reproduce the baselines for it in an easy way.
:param variables_space: (str) space to be used either 'space_a' or
'space_b' or 'space_a_b'
:param fractional_reward_weight: (float) weight multiplied by the
fractional volumetric
overlap in the reward.
:param dense_reward_weights: (list float) specifies the reward weights
for all the other reward
terms calculated in the
calculate_dense_rewards
function.
:param activate_sparse_reward: (bool) specified if you want to
sparsify the reward by having
+1 or 0 if the volumetric
fraction overlap more than 90%.
:param tool_block_mass: (float) specifies the blocks mass.
:param joint_positions: (nd.array) specifies the joints position to start
the episode with. None if the default
to be used.
:param tool_block_1_position: (nd.array) specifies the cartesian position
of the first tool block, x, y, z.
:param tool_block_1_orientation: (nd.array) specifies the euler orientation
of the first tool block, yaw, roll, pitch.
:param tool_block_2_position: (nd.array) specifies the cartesian position
of the second tool block, x, y, z.
:param tool_block_2_orientation: (nd.array) specifies the euler orientation
of the second tool block, yaw, roll, pitch.
:param goal_position: (nd.array) specifies the cartesian position
of the goal stack, x, y, z.
:param goal_orientation: (nd.array) specifies the euler orientation
of the goal stack, yaw, roll, pitch.
"""
super().__init__(task_name="stacking2",
variables_space=variables_space,
fractional_reward_weight=fractional_reward_weight,
dense_reward_weights=dense_reward_weights,
activate_sparse_reward=activate_sparse_reward)
self._task_robot_observation_keys = ["time_left_for_task",
"joint_positions",
"joint_velocities",
"end_effector_positions"]
self._task_params["tool_block_mass"] = tool_block_mass
self._task_params["joint_positions"] = joint_positions
self._task_params["tool_block_1_position"] = tool_block_1_position
self._task_params["tool_block_1_orientation"] = tool_block_1_orientation
self._task_params["tool_block_2_position"] = tool_block_2_position
self._task_params["tool_block_2_orientation"] = tool_block_2_orientation
self._task_params["goal_position"] = goal_position
self._task_params["goal_orientation"] = goal_orientation
self._task_params["tool_block_size"] = tool_block_size
self.previous_tool_block_1_position = None
self.previous_tool_block_2_position = None
self.previous_end_effector_positions = None
self.previous_joint_velocities = None
[docs] def get_description(self):
"""
:return: (str) returns the description of the task itself.
"""
return "Task where the goal shape is a tower of two blocks"
def _set_up_stage_arena(self):
"""
:return:
"""
creation_dict = {
'name': "tool_block_1",
'shape': "cube",
'initial_position': self._task_params["tool_block_1_position"],
'initial_orientation': self._task_params["tool_block_1_orientation"],
'mass': self._task_params["tool_block_mass"]
}
self._stage.add_rigid_general_object(**creation_dict)
creation_dict = {
'name': "tool_block_2",
'shape': "cube",
'initial_position': self._task_params["tool_block_2_position"],
'initial_orientation': self._task_params["tool_block_2_orientation"],
'mass': self._task_params["tool_block_mass"]
}
self._stage.add_rigid_general_object(**creation_dict)
creation_dict = {
'name': "goal_block_1",
'shape': "cube",
'position': self._task_params["goal_position"],
'orientation': self._task_params["goal_orientation"]
}
self._stage.add_silhoutte_general_object(**creation_dict)
goal_block_2_position = copy.deepcopy(np.array(self._task_params["goal_position"]))
goal_block_2_position[2] += self._task_params["tool_block_size"]
creation_dict = {
'name': "goal_block_2",
'shape': "cube",
'position': goal_block_2_position,
'orientation': self._task_params["goal_orientation"]
}
self._stage.add_silhoutte_general_object(**creation_dict)
self._task_stage_observation_keys = [
"tool_block_1_type", "tool_block_1_size",
"tool_block_1_cartesian_position", "tool_block_1_orientation",
"tool_block_1_linear_velocity", "tool_block_1_angular_velocity",
"tool_block_2_type", "tool_block_2_size",
"tool_block_2_cartesian_position", "tool_block_2_orientation",
"tool_block_2_linear_velocity", "tool_block_2_angular_velocity",
"goal_block_1_type", "goal_block_1_size",
"goal_block_1_cartesian_position", "goal_block_1_orientation",
"goal_block_2_type", "goal_block_2_size",
"goal_block_2_cartesian_position", "goal_block_2_orientation"
]
return
def _calculate_dense_rewards(self, desired_goal, achieved_goal):
"""
:param desired_goal:
:param achieved_goal:
:return:
"""
rewards = [0.0] * 5
block_position_1 = self._stage.get_object_state('tool_block_1',
'cartesian_position')
block_position_2 = self._stage.get_object_state('tool_block_2',
'cartesian_position')
goal_block_1_position = self._stage.get_object_state('goal_block_1',
'cartesian_position')
goal_block_2_position = self._stage.get_object_state('goal_block_2',
'cartesian_position')
joint_velocities = self._robot.get_latest_full_state()['velocities']
end_effector_positions = \
self._robot.get_latest_full_state()['end_effector_positions']
end_effector_positions = end_effector_positions.reshape(-1, 3)
lower_block_positioned = False
if np.linalg.norm(block_position_1 - goal_block_1_position) < 0.02:
lower_block_positioned = True
if not lower_block_positioned:
current_distance_from_block = np.linalg.norm(end_effector_positions -
block_position_1)
previous_distance_from_block = np.linalg.norm(
self.previous_end_effector_positions -
self.previous_tool_block_1_position)
rewards[0] = previous_distance_from_block - current_distance_from_block
previous_dist_to_goal = np.linalg.norm(goal_block_1_position -
self.previous_tool_block_1_position)
current_dist_to_goal = np.linalg.norm(goal_block_1_position - block_position_1)
rewards[1] = previous_dist_to_goal - current_dist_to_goal
else:
current_distance_from_block = np.linalg.norm(end_effector_positions -
block_position_2)
previous_distance_from_block = np.linalg.norm(
self.previous_end_effector_positions -
self.previous_tool_block_2_position)
rewards[0] = previous_distance_from_block - current_distance_from_block
block_2_above_block_1 = False
if np.linalg.norm(block_position_1[:2] - block_position_2[:2]) < 0.005:
block_2_above_block_1 = True
previous_block_to_goal_height = abs(self.previous_tool_block_2_position[2] -
goal_block_2_position[2])
current_block_to_goal_height = abs(block_position_2[2] - goal_block_2_position[2])
if not block_2_above_block_1:
rewards[2] = (previous_block_to_goal_height -
current_block_to_goal_height)
else:
rewards[2] = 0.0
if block_position_2[2] > goal_block_2_position[2]:
# if block 2 high enough activate horizontal reward
previous_block_1_to_block_2 = np.linalg.norm(
self.previous_tool_block_1_position[:2] -
self.previous_tool_block_2_position[:2])
current_block_1_to_block_2 = np.linalg.norm(
block_position_1[:2] -
block_position_2[:2])
rewards[3] = previous_block_1_to_block_2 - current_block_1_to_block_2
else:
rewards[3] = 0.0
rewards[4] = -np.linalg.norm(joint_velocities -
self.previous_joint_velocities)
update_task_info = {
'current_end_effector_positions': end_effector_positions,
'current_tool_block_1_position': block_position_1,
'current_tool_block_2_position': block_position_2,
'current_velocity': joint_velocities
}
return rewards, update_task_info
def _update_task_state(self, update_task_info):
"""
:param update_task_info:
:return:
"""
self.previous_end_effector_positions = \
update_task_info['current_end_effector_positions']
self.previous_tool_block_1_position = \
update_task_info['current_tool_block_1_position']
self.previous_tool_block_2_position = \
update_task_info['current_tool_block_2_position']
self.previous_joint_velocities = \
update_task_info['current_velocity']
return
def _set_task_state(self):
"""
:return:
"""
self.previous_end_effector_positions = \
self._robot.get_latest_full_state()['end_effector_positions']
self.previous_end_effector_positions = \
self.previous_end_effector_positions.reshape(-1, 3)
self.previous_tool_block_1_position = \
self._stage.get_object_state('tool_block_1', 'cartesian_position')
self.previous_tool_block_2_position = \
self._stage.get_object_state('tool_block_2', 'cartesian_position')
self.previous_joint_velocities = \
self._robot.get_latest_full_state()['velocities']
return
def _set_intervention_space_a(self):
"""
:return:
"""
super(Stacking2TaskGenerator, self)._set_intervention_space_a()
self._intervention_space_a['goal_tower'] = dict()
self._intervention_space_a['goal_tower']['cylindrical_position'] = \
copy.deepcopy(self._intervention_space_a['goal_block_1']
['cylindrical_position'])
self._intervention_space_a['goal_tower']['cylindrical_position'][0][-1] = \
self._task_params["goal_position"][-1] * 2.0
self._intervention_space_a['goal_tower']['cylindrical_position'][1][
-1] = \
self._task_params["goal_position"][-1] * 2.0
self._intervention_space_a['goal_tower']['euler_orientation'] = \
copy.deepcopy(self._intervention_space_a['goal_block_1']
['euler_orientation'])
for visual_object in self._stage.get_visual_objects():
del self._intervention_space_a[visual_object]['size']
del self._intervention_space_a[visual_object]['euler_orientation']
del self._intervention_space_a[visual_object]['cylindrical_position']
for rigid_object in self._stage.get_rigid_objects():
del self._intervention_space_a[rigid_object]['size']
return
def _set_intervention_space_b(self):
"""
:return:
"""
super(Stacking2TaskGenerator, self)._set_intervention_space_b()
self._intervention_space_b['goal_tower'] = dict()
self._intervention_space_b['goal_tower']['cylindrical_position'] = \
copy.deepcopy(self._intervention_space_b['goal_block_1']
['cylindrical_position'])
self._intervention_space_b['goal_tower']['cylindrical_position'][0][-1] = \
self._task_params["goal_position"][-1] * 2.0
self._intervention_space_b['goal_tower']['cylindrical_position'][1][-1] = \
self._task_params["goal_position"][-1] * 2.0
self._intervention_space_b['goal_tower']['euler_orientation'] = \
copy.deepcopy(self._intervention_space_b['goal_block_1']
['euler_orientation'])
for visual_object in self._stage.get_visual_objects():
del self._intervention_space_b[visual_object]['size']
del self._intervention_space_b[visual_object]['euler_orientation']
del self._intervention_space_b[visual_object][
'cylindrical_position']
for rigid_object in self._stage.get_rigid_objects():
del self._intervention_space_b[rigid_object]['size']
return
[docs] def get_task_generator_variables_values(self):
"""
:return: (dict) specifying the variables belonging to the task itself.
"""
return {
'goal_tower': {'cylindrical_position':
self._stage.get_object_state('goal_block_1',
'cylindrical_position'),
'euler_orientation':
self._stage.get_object_state('goal_block_1',
'euler_orientation')
}
}
[docs] def apply_task_generator_interventions(self, interventions_dict):
"""
:param interventions_dict: (dict) variables and their corresponding
intervention value.
:return: (tuple) first position if the intervention was successful or
not, and second position indicates if
observation_space needs to be reset.
"""
if len(interventions_dict) == 0:
return True, False
reset_observation_space = False
if 'goal_tower' in interventions_dict:
new_interventions_dict = dict()
new_interventions_dict['goal_block_1'] = dict()
new_interventions_dict['goal_block_2'] = dict()
if 'cylindrical_position' in interventions_dict['goal_tower']:
new_interventions_dict['goal_block_1']['cylindrical_position'] = \
copy.deepcopy(interventions_dict['goal_tower']['cylindrical_position'])
new_interventions_dict['goal_block_2'][
'cylindrical_position'] = \
copy.deepcopy(interventions_dict['goal_tower'][
'cylindrical_position'])
new_interventions_dict['goal_block_1']['cylindrical_position'][-1] \
= interventions_dict['goal_tower']['cylindrical_position'][-1] / 2.0
new_interventions_dict['goal_block_2']['cylindrical_position'][
-1] \
= interventions_dict['goal_tower'][
'cylindrical_position'][-1] * (3 / 2.0)
elif 'euler_orientation' in interventions_dict['goal_tower']:
new_interventions_dict['goal_block_1']['euler_orientation'] = \
copy.deepcopy(
interventions_dict['goal_tower']['euler_orientation'])
new_interventions_dict['goal_block_2'][
'euler_orientation'] = \
copy.deepcopy(interventions_dict['goal_tower'][
'euler_orientation'])
else:
raise Exception("this task generator variable "
"is not yet defined")
self._stage.apply_interventions(new_interventions_dict)
else:
raise Exception("this task generator variable "
"is not yet defined")
return True, reset_observation_space
[docs] def sample_new_goal(self, level=None):
"""
Used to sample new goal from the corresponding shape families.
:param level: (int) specifying the level - not used for now.
:return: (dict) the corresponding interventions dict that could then
be applied to get a new sampled goal.
"""
intervention_space = self.get_variable_space_used()
intervention_dict = dict()
intervention_dict['goal_tower'] = dict()
intervention_dict['goal_tower']['cylindrical_position'] = \
np.random.uniform(
intervention_space['goal_tower']['cylindrical_position'][0],
intervention_space['goal_tower']['cylindrical_position'][1])
intervention_dict['goal_tower']['euler_orientation'] = \
np.random.uniform(
intervention_space['goal_tower']['euler_orientation'][0],
intervention_space['goal_tower']['euler_orientation'][1])
return intervention_dict