from causal_world.task_generators.base_task import BaseTask
from causal_world.utils.rotation_utils import euler_to_quaternion
import numpy as np
import math
[docs]class GeneralGeneratorTask(BaseTask):
[docs] def __init__(self, variables_space='space_a_b',
fractional_reward_weight=1,
dense_reward_weights=np.array([]),
activate_sparse_reward=False,
tool_block_mass=0.08,
joint_positions=None,
tool_block_size=0.05,
nums_objects=5):
"""
This task generator generates a general/ random configuration of the
blocks by dropping random blocks from the air and waiting till it comes
to a rest position and then this becomes the new shape/goal that the
actor needs to achieve.
:param variables_space: (str) space to be used either 'space_a' or
'space_b' or 'space_a_b'
:param fractional_reward_weight: (float) weight multiplied by the
fractional volumetric
overlap in the reward.
:param dense_reward_weights: (list float) specifies the reward weights
for all the other reward
terms calculated in the
calculate_dense_rewards
function.
:param activate_sparse_reward: (bool) specified if you want to
sparsify the reward by having
+1 or 0 if the volumetric
fraction overlap more than 90%.
:param tool_block_mass: (float) specifies the blocks mass.
:param joint_positions: (nd.array) specifies the joints position to start
the episode with. None if the default
to be used.
:param tool_block_size: (float) specifies the blocks size.
:param nums_objects: (int) specifies the number of objects to be dropped
from the air.
"""
super().__init__(task_name="general",
variables_space=variables_space,
fractional_reward_weight=fractional_reward_weight,
dense_reward_weights=dense_reward_weights,
activate_sparse_reward=activate_sparse_reward)
self._task_robot_observation_keys = ["time_left_for_task",
"joint_positions",
"joint_velocities",
"end_effector_positions"]
self._task_params["tool_block_mass"] = tool_block_mass
self._task_params["joint_positions"] = joint_positions
self._task_params["nums_objects"] = nums_objects
self._task_params["tool_block_size"] = tool_block_size
self.default_drop_positions = [[0.1, 0.1, 0.2],
[0, 0, 0.2],
[0.05, 0.05, 0.3],
[-0.05, -0.05, 0.1],
[-0.12, -0.12, 0.2],
[-0.12, 0.12, 0.2],
[0.12, -0.10, 0.3],
[0.09, -0.08, 0.1]]
self.tool_mass = self._task_params["tool_block_mass"]
self.nums_objects = self._task_params["nums_objects"]
self.tool_block_size = np.array(self._task_params["tool_block_size"])
[docs] def get_description(self):
"""
:return: (str) returns the description of the task itself.
"""
return "Task where the goal is to rearrange " \
"available objects into a target configuration"
def _set_up_stage_arena(self):
"""
:return:
"""
self._generate_goal_configuration_with_objects(default_bool=True)
return
def _set_intervention_space_a(self):
"""
:return:
"""
super(GeneralGeneratorTask, self)._set_intervention_space_a()
for visual_object in self._stage.get_visual_objects():
del self._intervention_space_a[visual_object]
for rigid_object in self._stage.get_rigid_objects():
del self._intervention_space_a[rigid_object]['size']
self._intervention_space_a['nums_objects'] = \
np.array([1, 5])
self._intervention_space_a['blocks_mass'] = \
np.array([0.02, 0.06])
self._intervention_space_a['tool_block_size'] = \
np.array([0.05, 0.07])
return
def _set_intervention_space_b(self):
"""
:return:
"""
super(GeneralGeneratorTask, self)._set_intervention_space_b()
for visual_object in self._stage.get_visual_objects():
del self._intervention_space_b[visual_object]
for rigid_object in self._stage.get_rigid_objects():
del self._intervention_space_b[rigid_object]['size']
self._intervention_space_b['nums_objects'] = \
np.array([6, 9])
self._intervention_space_b['blocks_mass'] = \
np.array([0.06, 0.08])
self._intervention_space_b['tool_block_size'] = \
np.array([0.04, 0.05])
return
[docs] def sample_new_goal(self, level=None):
"""
Used to sample new goal from the corresponding shape families.
:param level: (int) specifying the level - not used for now.
:return: (dict) the corresponding interventions dict that could then
be applied to get a new sampled goal.
"""
intervention_dict = dict()
if self._task_params['variables_space'] == 'space_a':
intervention_space = self._intervention_space_a
elif self._task_params['variables_space'] == 'space_b':
intervention_space = self._intervention_space_b
elif self._task_params['variables_space'] == 'space_a_b':
intervention_space = self._intervention_space_a_b
intervention_dict['nums_objects'] = np. \
random.randint(intervention_space['nums_objects'][0],
intervention_space['nums_objects'][1])
intervention_dict['blocks_mass'] = np. \
random.uniform(intervention_space['blocks_mass'][0],
intervention_space['blocks_mass'][1])
intervention_dict['tool_block_size'] = np. \
random.uniform(intervention_space['tool_block_size'][0],
intervention_space['tool_block_size'][1])
return intervention_dict
[docs] def get_task_generator_variables_values(self):
"""
:return: (dict) specifying the variables belonging to the task itself.
"""
return {
'nums_objects': self.nums_objects,
'blocks_mass': self.tool_mass,
'tool_block_size': self.tool_block_size
}
[docs] def apply_task_generator_interventions(self, interventions_dict):
"""
:param interventions_dict: (dict) variables and their corresponding
intervention value.
:return: (tuple) first position if the intervention was successful or
not, and second position indicates if
observation_space needs to be reset.
"""
if len(interventions_dict) == 0:
return True, False
reset_observation_space = True
if "nums_objects" in interventions_dict:
self.nums_objects = interventions_dict["nums_objects"]
if "tool_block_size" in interventions_dict:
self.tool_block_size = interventions_dict["tool_block_size"]
if "blocks_mass" in interventions_dict:
self.tool_mass = interventions_dict["blocks_mass"]
if "nums_objects" in interventions_dict or "tool_block_size" in \
interventions_dict:
self._generate_goal_configuration_with_objects(default_bool=False)
elif "blocks_mass" in interventions_dict:
new_interventions_dict = dict()
for rigid_object in self._stage.get_rigid_objects():
if self._stage.get_rigid_objects()[rigid_object].is_not_fixed():
new_interventions_dict[rigid_object] = dict()
new_interventions_dict[rigid_object]['mass'] = \
self.tool_mass
self._stage.apply_interventions(new_interventions_dict)
else:
raise Exception("this task generator variable "
"is not yet defined")
self._set_intervention_space_b()
self._set_intervention_space_a()
self._set_intervention_space_a_b()
self._stage.finalize_stage()
return True, reset_observation_space
def _generate_goal_configuration_with_objects(self, default_bool):
"""
:param default_bool:
:return:
"""
self._stage.remove_everything()
stage_low_bound = np.array(self._stage.get_arena_bb()[0])
stage_low_bound[:2] += 0.04
stage_upper_bound = np.array(self._stage.get_arena_bb()[1])
stage_upper_bound[:2] -= 0.04
stage_upper_bound[2] -= 0.08
self._task_stage_observation_keys = []
joint_positions = self._robot.get_joint_positions_raised()
self._robot.reset_state(joint_positions=joint_positions,
joint_velocities=np.zeros(9))
for object_num in range(self.nums_objects):
if default_bool:
dropping_position = self.default_drop_positions[
object_num % len(self.default_drop_positions)]
dropping_orientation = [0, 0, 0, 1]
else:
dropping_position = np.random.uniform(
stage_low_bound,
stage_upper_bound)
dropping_orientation = euler_to_quaternion(
np.random.uniform(low=0, high=2 * math.pi, size=3))
creation_dict = {
'name': "tool_" + str(object_num),
'shape': "cube",
'initial_position': dropping_position,
'initial_orientation': dropping_orientation,
'mass': self.tool_mass,
'size': np.repeat(self.tool_block_size, 3)
}
self._stage.add_rigid_general_object(**creation_dict)
self._task_stage_observation_keys.append("tool_" + str(object_num) +
'_type')
self._task_stage_observation_keys.append("tool_" + str(object_num) +
'_size')
self._task_stage_observation_keys.append("tool_" + str(object_num) +
'_cartesian_position')
self._task_stage_observation_keys.append("tool_" + str(object_num) +
'_orientation')
self._task_stage_observation_keys.append("tool_" + str(object_num) +
'_linear_velocity')
self._task_stage_observation_keys.append("tool_" + str(object_num) +
'_angular_velocity')
self._robot.forward_simulation(time=0.8)
for rigid_object in self._stage._rigid_objects:
creation_dict = {
'name':
rigid_object.replace('tool', 'goal'),
'shape':
"cube",
'position':
self._stage.get_object_state(rigid_object,
'cartesian_position'),
'orientation':
self._stage.get_object_state(rigid_object, 'orientation'),
'size':
np.repeat(self.tool_block_size, 3)
}
self._stage.add_silhoutte_general_object(**creation_dict)
self._task_stage_observation_keys.append(
rigid_object.replace('tool', 'goal') + '_type')
self._task_stage_observation_keys.append(
rigid_object.replace('tool', 'goal') + '_size')
self._task_stage_observation_keys.append(
rigid_object.replace('tool', 'goal') + '_cartesian_position')
self._task_stage_observation_keys.append(
rigid_object.replace('tool', 'goal') + '_orientation')
trial_index = 1
block_position = self._stage.random_position(
height_limits=[self.tool_block_size/2.0, 0.15])
block_orientation = euler_to_quaternion(
[0, 0, np.random.uniform(-np.pi, np.pi)])
self._stage.set_objects_pose(names=[rigid_object],
positions=[block_position],
orientations=[block_orientation])
self._robot.step_simulation()
while not self._stage.check_feasiblity_of_stage() and \
trial_index < 10:
block_position = self._stage.random_position(
height_limits=[self.tool_block_size/2.0, 0.15])
block_orientation = euler_to_quaternion(
[0, 0, np.random.uniform(-np.pi, np.pi)])
self._stage.set_objects_pose(names=[rigid_object],
positions=[block_position],
orientations=[block_orientation])
self._robot.step_simulation()
trial_index += 1
self._robot.reset_state(joint_positions=joint_positions,
joint_velocities=np.zeros([
9,
]))
self._robot.update_latest_full_state()
return