import numpy as np
from causal_world.actors.base_policy import BaseActorPolicy
[docs]class GraspingPolicy(BaseActorPolicy):
"""
This policy is expected to run @25 Hz, its a hand designed policy for
picking and placing blocks of a specific size 6.5CM weighing 20grams
for the best result tried.
The policy outputs desired normalized end_effector_positions
Description of phases:
- Phase 0: Move finger-center above the cube center of the current
instruction.
- Phase 1: Lower finger-center down to encircle the target cube, and
close grip.
- Phase 2: Move finger-center up again, keeping the grip tight
(lifting the block).
- Phase 3: Smoothly move the finger-center toward the goal xy, keeping the
height constant.
- Phase 4: Move finger-center vertically toward goal height
(keeping relative difference of different finger heights given
by h0), at the same time loosen the grip (i.e. increasing the
radius of the "grip circle").
- Phase 5: Move finger center up again
Other variables and values:
- alpha: interpolation value between two positions
- ds: Distances of finger tips to grip center
- t: time between 0 and 1 in current phase
- phase: every instruction has 7 phases (described above)
- program_counter: The index of the current instruction in the overall
program. Is incremented once the policy has successfully
completed all phases.
Hyperparameters:
- phase_velocity_k : the speed at which phase "k" in the state machine
progresses.
- d0_r, d0_gb: Distance of finger tips from grip center while gripping the
object.
- gb_angle_spread: Angle between green and blue finger tips along the "grip
circle".
- d1_r, d1_gb: Distance of finger tips from grip center while not gripping
- h1_r, h1_gb: Height of grip center while moving around
- h0_r, h0_gb: Height of grip center to which it is lowered while grasping
- fall_trigger_h: if box is detected below this height when it is supposed
to be gripped, try grasping it again (reset phase to 0).
"""
[docs] def __init__(self, tool_blocks_order):
"""
:param tool_blocks_order: (nd.array) specifies the program where the
indicies ranges from 0 to the
number of blocks available in the
arena.
"""
super(GraspingPolicy, self).__init__(identifier="grasping_policy")
self._program_counter = 0
self._program = tool_blocks_order
self._phase = 0
self._t = 0
self._h0_r = -0.98
self._h1_r = -0.4
self._h0_gb = -0.98
self._h1_gb = -0.4
self._d0_r = 0.038
self._d0_gb = 0.038
self._d1_r = 0.1
self._d1_gb = 0.1
self._a1 = np.pi / 2
self._gb_angle_spread = 0.8 * np.pi
self._a2 = 3 * np.pi / 2 + (self._gb_angle_spread / 2)
self._a3 = 3 * np.pi / 2 - (self._gb_angle_spread / 2)
self._fall_trigger_h = -0.7
self._phase_velocities = [0.008, 0.01, 0.02,
0.005, 0.005, 0.01,
0.01]
self.current_target_x = None
self.current_target_y = None
[docs] def act(self, obs):
"""
The function is called for the agent to act in the world.
:param obs: (nd.array) defines the observations received by the agent
at time step t
:return: (nd.array) defines the action to be executed at time step t
"""
if self._program_counter == len(self._program):
return obs[19:28]
block_idx = self._program[self._program_counter]
number_of_blocks = len(self._program)
target_height = obs[28 + (number_of_blocks*17) + (block_idx*11) + 6]
target_x = obs[28 + (number_of_blocks*17) + (block_idx*11) + 4]
target_y = obs[28 + (number_of_blocks*17) + (block_idx*11) + 5]
# Only set target for cube when phase is 0, otherwise the robot moving
# the target cube creates a runaway effect due to a moving target
if self._phase == 0:
self.current_target_x = obs[28 + (block_idx * 17) + 4]
self.current_target_y = obs[28 + (block_idx * 17) + 5]
if self._program_counter < len(self._program) - 1:
next_block_idx = self._program[self._program_counter + 1]
else:
next_block_idx = self._program[-1]
next_cube_x = obs[28 + (next_block_idx*17) + 4]
next_cube_y = obs[28 + (next_block_idx*17) + 5]
# Detect falling cube
if self._phase == 3 and obs[28 + (block_idx*17) + 6] < self._fall_trigger_h:
self._phase = 0
self._t = 0
#calculate the target of the grip center
interpolated_xy = self._get_interpolated_xy(target_x,
target_y,
self.current_target_x,
self.current_target_y,
next_cube_x,
next_cube_y)
# Target heights of fingertips
target_h_r, target_h_g, target_h_b = self._get_target_hs(target_height)
# Target-distance of fingertips from the grip center
d_r, d_g, d_b = self._get_ds()
# Construct full target positions for each fingertip
pos_r = np.array([interpolated_xy[0] + d_r * np.cos(self._a1),
interpolated_xy[1] + d_r * np.sin(self._a1),
target_h_r])
pos_g = np.array([interpolated_xy[0] + d_g * np.cos(self._a2),
interpolated_xy[1] + d_g * np.sin(self._a2),
target_h_g])
pos_b = np.array([interpolated_xy[0] + d_b * np.cos(self._a3),
interpolated_xy[1] + d_b * np.sin(self._a3),
target_h_b])
self._t += self._phase_velocities[self._phase]
if self._t >= 1.0:
self._phase += 1
self._t -= 1.0
if self._phase >= 7:
self._phase = 0
self._program_counter += 1
self._t = 0
return np.concatenate((pos_r, pos_g, pos_b), axis=0)
def _get_ds(self):
"""
:return: distances of finger tips to grip center
"""
if self._phase == 0:
d_r = self._d1_r
d_gb = self._d1_gb
elif self._phase == 1:
a = self._mix_sin(max(0, 2 * (self._t - 0.5)))
d_r = self._combine_convex(self._d1_r, self._d0_r, a)
d_gb = self._combine_convex(self._d1_gb, self._d0_gb, a)
elif self._phase in [2, 3]:
d_r = self._d0_r
d_gb = self._d0_gb
elif self._phase == 4:
d_r = self._d0_r
d_gb = self._d0_gb
elif self._phase in [5, 6]:
d_r = self._d1_r
d_gb = self._d1_gb
else:
raise ValueError()
return [d_r, d_gb, d_gb]
def _get_interpolated_xy(self, target_x,
target_y, current_cube_x,
current_cube_y,
next_cube_x,
next_cube_y):
"""
:param target_x: target x of the grip center.
:param target_y: target y of the grip center.
:param current_cube_x: x of current cube to be gripped.
:param current_cube_y: y of current cube to be gripped.
:param next_cube_x: x of next cube to be gripped.
:param next_cube_y: y of next cube to be gripped.
:return:
"""
if self._phase < 4:
current_x = current_cube_x
current_y = current_cube_y
else:
current_x = next_cube_x
current_y = next_cube_y
alpha = self._get_alpha()
xy_target = (1 - alpha) * np.array([current_x, current_y]) + \
alpha * np.array([target_x, target_y])
return xy_target
def _get_alpha(self):
"""
:return: alpha for interpolation depending on the phase.
"""
if self._phase < 3:
return 0
elif self._phase == 3:
return self._mix_sin(self._t)
elif self._phase == 4:
return 1.0
elif self._phase == 5:
return 1.0
elif self._phase == 6:
return 1 - self._mix_sin(self._t)
else:
raise ValueError()
def _get_target_hs(self, target_height):
"""
:param target_height: target height to be reached.
:return: target height for all the end effectors.
"""
if self._phase == 0:
h_r = self._h1_r
h_gb = self._h1_gb
elif self._phase == 1:
a = self._mix_sin(max(0, self._t))
h_r = self._combine_convex(self._h1_r, self._h0_r, a)
h_gb = self._combine_convex(self._h1_gb, self._h0_gb, a)
elif self._phase == 2:
a = self._mix_sin(max(0, self._t))
h_r = self._combine_convex(self._h0_r, self._h1_r, a)
h_gb = self._combine_convex(self._h0_gb, self._h1_gb, a)
elif self._phase == 3:
h_r = self._h1_r
h_gb = self._h1_gb
elif self._phase == 4:
h_target_r = target_height
h_target_gb = h_target_r + (self._h0_gb - self._h0_r)
h_r = self._combine_convex(self._h1_r, h_target_gb,
self._mix_sin(self._t))
h_gb = self._combine_convex(self._h1_gb, h_target_gb,
self._mix_sin(self._t))
elif self._phase == 5:
h_target_r = target_height
h_target_gb = h_target_r + (self._h0_gb - self._h0_r)
h_r = self._combine_convex(h_target_r, self._h1_r,
self._mix_sin(self._t))
h_gb = self._combine_convex(h_target_gb, self._h1_gb,
self._mix_sin(self._t))
elif self._phase == 6:
h_r = self._h1_r
h_gb = self._h1_gb
else:
raise ValueError()
return np.array([h_r, h_gb, h_gb])
[docs] def reset(self):
"""
resets the controller
:return:
"""
self._phase = 0
self._t = 0
self._program_counter = 0
def _mix_sin(self, t):
"""
:param t: time ranging from 0 to 1.
:return: mixed sin wave.
"""
return 0.5 * (1 - np.cos(t * np.pi))
def _combine_convex(self, a, b, alpha):
"""
:param a: start
:param b: end
:param alpha: interpolation
:return: convex combination.
"""
return (1 - alpha) * a + alpha * b