diff --git a/movement_primitives/dmp/_dmp.py b/movement_primitives/dmp/_dmp.py index 3254d0d..79d96f4 100644 --- a/movement_primitives/dmp/_dmp.py +++ b/movement_primitives/dmp/_dmp.py @@ -9,7 +9,7 @@ def dmp_step_rk4( last_t, t, current_y, current_yd, goal_y, goal_yd, goal_ydd, start_y, start_yd, start_ydd, goal_t, start_t, alpha_y, beta_y, forcing_term, coupling_term=None, coupling_term_precomputed=None, int_dt=0.001, - p_gain=0.0, tracking_error=0.0): + p_gain=0.0, tracking_error=0.0, smooth_scaling=False): """Integrate regular DMP for one step with RK4 integration. Parameters @@ -75,6 +75,11 @@ def dmp_step_rk4( tracking_error : float, optional (default: 0) Tracking error from last step. + + smooth_scaling : bool, optional (default: False) + Avoids jumps during the beginning of DMP execution when the goal + is changed and the trajectory is scaled by interpolating between + the old and new scaling of the trajectory. """ if coupling_term is None: cd, cdd = np.zeros_like(current_y), np.zeros_like(current_y) @@ -100,19 +105,23 @@ def dmp_step_rk4( C0 = current_yd K0 = _dmp_acc( current_y, C0, cdd, alpha_y, beta_y, goal_y, goal_yd, goal_ydd, - start_y, Z[0], execution_time, F[:, 0], coupling_term, tdd) + start_y, Z[0], execution_time, F[:, 0], coupling_term, tdd, + smooth_scaling) C1 = current_yd + dt_2 * K0 K1 = _dmp_acc( current_y + dt_2 * C0, C1, cdd, alpha_y, beta_y, goal_y, goal_yd, - goal_ydd, start_y, Z[1], execution_time, F[:, 1], coupling_term, tdd) + goal_ydd, start_y, Z[1], execution_time, F[:, 1], coupling_term, tdd, + smooth_scaling) C2 = current_yd + dt_2 * K1 K2 = _dmp_acc( current_y + dt_2 * C1, C2, cdd, alpha_y, beta_y, goal_y, goal_yd, - goal_ydd, start_y, Z[1], execution_time, F[:, 1], coupling_term, tdd) + goal_ydd, start_y, Z[1], execution_time, F[:, 1], coupling_term, tdd, + smooth_scaling) C3 = current_yd + dt * K2 K3 = _dmp_acc( current_y + dt * C2, C3, cdd, alpha_y, beta_y, goal_y, goal_yd, - goal_ydd, start_y, Z[2], execution_time, F[:, 2], coupling_term, tdd) + goal_ydd, start_y, Z[2], execution_time, F[:, 2], coupling_term, tdd, + smooth_scaling) current_y += dt * (current_yd + dt / 6.0 * (K0 + K1 + K2)) current_yd += dt / 6.0 * (K0 + 2 * K1 + 2 * K2 + K3) @@ -123,7 +132,8 @@ def dmp_step_rk4( def _dmp_acc(Y, V, cdd, alpha_y, beta_y, goal_y, goal_yd, goal_ydd, - start_y, z, execution_time, f, coupling_term, tdd): + start_y, z, execution_time, f, coupling_term, tdd, + smooth_scaling): """DMP acceleration. Parameters @@ -172,6 +182,11 @@ def _dmp_acc(Y, V, cdd, alpha_y, beta_y, goal_y, goal_yd, goal_ydd, tdd : array, shape (n_dims,) Acceleration correction from tracking error controller. + smooth_scaling : bool, optional (default: False) + Avoids jumps during the beginning of DMP execution when the goal + is changed and the trajectory is scaled by interpolating between + the old and new scaling of the trajectory. + Returns ------- ydd : array, shape (n_dims,) @@ -179,12 +194,16 @@ def _dmp_acc(Y, V, cdd, alpha_y, beta_y, goal_y, goal_yd, goal_ydd, """ if coupling_term is not None: _, cdd = coupling_term.coupling(Y, V) + if smooth_scaling: + smoothing = beta_y * (goal_y - start_y) * z + else: + smoothing = 0.0 return ( ( alpha_y * ( beta_y * (goal_y - Y) + execution_time * (goal_yd - V) - - beta_y * (goal_y - start_y) * z + - smoothing ) + f + cdd @@ -198,7 +217,7 @@ def dmp_step_euler( last_t, t, current_y, current_yd, goal_y, goal_yd, goal_ydd, start_y, start_yd, start_ydd, goal_t, start_t, alpha_y, beta_y, forcing_term, coupling_term=None, coupling_term_precomputed=None, int_dt=0.001, - p_gain=0.0, tracking_error=0.0): + p_gain=0.0, tracking_error=0.0, smooth_scaling=False): """Integrate regular DMP for one step with Euler integration. Parameters @@ -264,6 +283,11 @@ def dmp_step_euler( tracking_error : float, optional (default: 0) Tracking error from last step. + + smooth_scaling : bool, optional (default: False) + Avoids jumps during the beginning of DMP execution when the goal + is changed and the trajectory is scaled by interpolating between + the old and new scaling of the trajectory. """ if start_t >= goal_t: raise ValueError("Goal must be chronologically after start!") @@ -290,19 +314,21 @@ def dmp_step_euler( z = forcing_term.phase(current_t, int_dt) f = forcing_term.forcing_term(z).squeeze() + if smooth_scaling: + smoothing = beta_y * (goal_y - start_y) * z + else: + smoothing = 0.0 coupling_sum = cdd + p_gain * tracking_error / dt ydd = ( alpha_y * ( beta_y * (goal_y - current_y) - + execution_time * goal_yd - - execution_time * current_yd - - beta_y * (goal_y - start_y) * z + + execution_time * (goal_yd - current_yd) + - smoothing ) - + goal_ydd * execution_time ** 2 + f + coupling_sum - ) / execution_time ** 2 + ) / execution_time ** 2 + goal_ydd current_yd += dt * ydd + cd / execution_time current_y += dt * current_yd @@ -332,7 +358,7 @@ class DMP(WeightParametersMixin, DMPBase): Behaviors (2013), Neural Computation 25(2), pp. 328-373, doi: 10.1162/NECO_a_00393, https://ieeexplore.ieee.org/document/6797340 - with modification of scaling proposed by + (if smooth scaling is activated) with modification of scaling proposed by P. Pastor, H. Hoffmann, T. Asfour, S. Schaal: Learning and Generalization of Motor Skills by Learning from Demonstration, @@ -360,6 +386,11 @@ class DMP(WeightParametersMixin, DMPBase): Gain for proportional controller of DMP tracking error. The domain is [0, execution_time**2/dt]. + smooth_scaling : bool, optional (default: False) + Avoids jumps during the beginning of DMP execution when the goal + is changed and the trajectory is scaled by interpolating between + the old and new scaling of the trajectory. + Attributes ---------- execution_time_ : float @@ -370,13 +401,15 @@ class DMP(WeightParametersMixin, DMPBase): the frequency. """ def __init__(self, n_dims, execution_time=1.0, dt=0.01, - n_weights_per_dim=10, int_dt=0.001, p_gain=0.0): + n_weights_per_dim=10, int_dt=0.001, p_gain=0.0, + smooth_scaling=False): super(DMP, self).__init__(n_dims, n_dims) self._execution_time = execution_time self.dt_ = dt self.n_weights_per_dim = n_weights_per_dim self.int_dt = int_dt self.p_gain = p_gain + self.smooth_scaling = smooth_scaling self._init_forcing_term() @@ -458,7 +491,8 @@ def step(self, last_y, last_yd, coupling_term=None, coupling_term=coupling_term, int_dt=self.int_dt, p_gain=self.p_gain, - tracking_error=tracking_error) + tracking_error=tracking_error, + smooth_scaling=smooth_scaling) return np.copy(self.current_y), np.copy(self.current_yd) def open_loop(self, run_t=None, coupling_term=None, @@ -503,7 +537,8 @@ def open_loop(self, run_t=None, coupling_term=None, self.forcing_term, coupling_term, run_t, self.int_dt, - step_function) + step_function, + smooth_scaling=smooth_scaling) def imitate(self, T, Y, regularization_coefficient=0.0, allow_final_velocity=False): @@ -728,8 +763,8 @@ def ridge_regression(X, Y, regularization_coefficient): def dmp_open_loop( goal_t, start_t, dt, start_y, goal_y, alpha_y, beta_y, forcing_term, coupling_term=None, run_t=None, int_dt=0.001, - step_function=dmp_step_rk4, start_yd=None, start_ydd=None, - goal_yd=None, goal_ydd=None): + step_function=dmp_step_rk4, smooth_scaling=False, + start_yd=None, start_ydd=None, goal_yd=None, goal_ydd=None): """Run DMP without external feedback. Parameters @@ -771,6 +806,11 @@ def dmp_open_loop( step_function : callable, optional (default: dmp_step_rk4) DMP integration function. + smooth_scaling : bool, optional (default: False) + Avoids jumps during the beginning of DMP execution when the goal + is changed and the trajectory is scaled by interpolating between + the old and new scaling of the trajectory. + start_yd : shape (n_dims,) Start state derivative (velocity). @@ -818,7 +858,7 @@ def dmp_open_loop( goal_t=goal_t, start_t=start_t, alpha_y=alpha_y, beta_y=beta_y, forcing_term=forcing_term, coupling_term=coupling_term, - int_dt=int_dt) + int_dt=int_dt, smooth_scaling=smooth_scaling) Y[i] = current_y return T, Y diff --git a/movement_primitives/dmp_fast.pyx b/movement_primitives/dmp_fast.pyx index 5fac112..2c99374 100644 --- a/movement_primitives/dmp_fast.pyx +++ b/movement_primitives/dmp_fast.pyx @@ -64,7 +64,8 @@ cpdef dmp_step( object forcing_term, object coupling_term=None, tuple coupling_term_precomputed=None, double int_dt=0.001, double p_gain=0.0, - np.ndarray tracking_error=None): + np.ndarray tracking_error=None, + bool smooth_scaling=False): """Integrate regular DMP for one step with Euler integration. Parameters @@ -130,6 +131,11 @@ cpdef dmp_step( tracking_error : float, optional (default: 0) Tracking error from last step. + + smooth_scaling : bool, optional (default: False) + Avoids jumps during the beginning of DMP execution when the goal + is changed and the trajectory is scaled by interpolating between + the old and new scaling of the trajectory. """ if start_t >= goal_t: raise ValueError("Goal must be chronologically after start!") @@ -153,6 +159,7 @@ cpdef dmp_step( cdef double current_t cdef double dt cdef double z + cdef double smoothing current_t = last_t while current_t < t: @@ -178,17 +185,20 @@ cpdef dmp_step( s = phase(current_t, forcing_term.alpha_z, goal_t, start_t, int_dt=int_dt) for d in range(n_dims): + if smooth_scaling: + smoothing = beta_y * (goal_y[d] - start_y[d]) * z + else: + smoothing = 0.0 + current_ydd[d] = ( alpha_y * ( beta_y * (goal_y[d] - current_y[d]) - + execution_time * goal_yd[d] - - execution_time * current_yd[d] - - beta_y * (goal_y[d] - start_y[d]) * z + + execution_time * (goal_yd[d] - current_yd[d]) + - smoothing ) - + goal_ydd[d] * execution_time ** 2 + f[d] + cdd[d] - ) / execution_time ** 2 + ) / execution_time ** 2 + goal_ydd[d] current_yd[d] += dt * current_ydd[d] + cd[d] / execution_time current_y[d] += dt * current_yd[d] @@ -206,7 +216,8 @@ cpdef dmp_step_rk4( object forcing_term, object coupling_term=None, tuple coupling_term_precomputed=None, double int_dt=0.001, double p_gain=0.0, - np.ndarray tracking_error=None): + np.ndarray tracking_error=None, + bool smooth_scaling=False): """Integrate regular DMP for one step with RK4 integration. Parameters @@ -272,6 +283,11 @@ cpdef dmp_step_rk4( tracking_error : float, optional (default: 0) Tracking error from last step. + + smooth_scaling : bool, optional (default: False) + Avoids jumps during the beginning of DMP execution when the goal + is changed and the trajectory is scaled by interpolating between + the old and new scaling of the trajectory. """ cdef int n_dims = current_y.shape[0] @@ -297,22 +313,22 @@ cpdef dmp_step_rk4( cdef np.ndarray[double, ndim=1] K0 = _dmp_acc( current_y, current_yd, cdd, dt, alpha_y, beta_y, goal_y, goal_yd, goal_ydd, start_y, Z[0], execution_time, F[:, 0], coupling_term, - p_gain, tdd) + p_gain, tdd, smooth_scaling) cdef np.ndarray[double, ndim=1] C1 = current_yd + dt_2 * K0 cdef np.ndarray[double, ndim=1] K1 = _dmp_acc( current_y + dt_2 * current_yd, C1, cdd, dt, alpha_y, beta_y, goal_y, goal_yd, goal_ydd, start_y, Z[1], execution_time, F[:, 1], coupling_term, - p_gain, tdd) + p_gain, tdd, smooth_scaling) cdef np.ndarray[double, ndim=1] C2 = current_yd + dt_2 * K1 cdef np.ndarray[double, ndim=1] K2 = _dmp_acc( current_y + dt_2 * C1, C2, cdd, dt, alpha_y, beta_y, goal_y, goal_yd, goal_ydd, start_y, Z[1], execution_time, F[:, 1], coupling_term, - p_gain, tdd) + p_gain, tdd, smooth_scaling) cdef np.ndarray[double, ndim=1] C3 = current_yd + dt * K2 cdef np.ndarray[double, ndim=1] K3 = _dmp_acc( current_y + dt * C2, C3, cdd, dt, alpha_y, beta_y, goal_y, goal_yd, goal_ydd, start_y, Z[2], execution_time, F[:, 2], coupling_term, - p_gain, tdd) + p_gain, tdd, smooth_scaling) cdef int i for i in range(n_dims): @@ -330,21 +346,26 @@ cdef _dmp_acc( np.ndarray[double, ndim=1] goal_y, np.ndarray[double, ndim=1] goal_yd, np.ndarray[double, ndim=1] goal_ydd, np.ndarray[double, ndim=1] start_y, double z, double execution_time, np.ndarray[double, ndim=1] f, - object coupling_term, double p_gain, np.ndarray[double, ndim=1] tdd): + object coupling_term, double p_gain, np.ndarray[double, ndim=1] tdd, + bool smooth_scaling): if coupling_term is not None: _, cdd[:] = coupling_term.coupling(Y, V) cdef int n_dims = Y.shape[0] cdef np.ndarray[double, ndim=1] Ydd = np.empty(n_dims, dtype=np.float64) cdef int d + cdef double smoothing for d in range(n_dims): + if smooth_scaling: + smoothing = beta_y * (goal_y[d] - start_y[d]) * z + else: + smoothing = 0.0 Ydd[d] = ( ( alpha_y * ( beta_y * (goal_y[d] - Y[d]) - + execution_time * goal_yd[d] - - execution_time * V[d] - - beta_y * (goal_y[d] - start_y[d]) * z + + execution_time * (goal_yd[d] - V[d]) + - smoothing ) + f[d] + cdd[d]