Skip to content

Commit

Permalink
Make smoothing optional
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexanderFabisch committed Nov 18, 2023
1 parent 060c0b6 commit 041eb89
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 35 deletions.
80 changes: 60 additions & 20 deletions movement_primitives/dmp/_dmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def dmp_step_rk4(
last_t, t, current_y, current_yd, goal_y, goal_yd, goal_ydd, start_y,
start_yd, start_ydd, goal_t, start_t, alpha_y, beta_y, forcing_term,
coupling_term=None, coupling_term_precomputed=None, int_dt=0.001,
p_gain=0.0, tracking_error=0.0):
p_gain=0.0, tracking_error=0.0, smooth_scaling=False):
"""Integrate regular DMP for one step with RK4 integration.
Parameters
Expand Down Expand Up @@ -75,6 +75,11 @@ def dmp_step_rk4(
tracking_error : float, optional (default: 0)
Tracking error from last step.
smooth_scaling : bool, optional (default: False)
Avoids jumps during the beginning of DMP execution when the goal
is changed and the trajectory is scaled by interpolating between
the old and new scaling of the trajectory.
"""
if coupling_term is None:
cd, cdd = np.zeros_like(current_y), np.zeros_like(current_y)
Expand All @@ -100,19 +105,23 @@ def dmp_step_rk4(
C0 = current_yd
K0 = _dmp_acc(
current_y, C0, cdd, alpha_y, beta_y, goal_y, goal_yd, goal_ydd,
start_y, Z[0], execution_time, F[:, 0], coupling_term, tdd)
start_y, Z[0], execution_time, F[:, 0], coupling_term, tdd,
smooth_scaling)
C1 = current_yd + dt_2 * K0
K1 = _dmp_acc(
current_y + dt_2 * C0, C1, cdd, alpha_y, beta_y, goal_y, goal_yd,
goal_ydd, start_y, Z[1], execution_time, F[:, 1], coupling_term, tdd)
goal_ydd, start_y, Z[1], execution_time, F[:, 1], coupling_term, tdd,
smooth_scaling)
C2 = current_yd + dt_2 * K1
K2 = _dmp_acc(
current_y + dt_2 * C1, C2, cdd, alpha_y, beta_y, goal_y, goal_yd,
goal_ydd, start_y, Z[1], execution_time, F[:, 1], coupling_term, tdd)
goal_ydd, start_y, Z[1], execution_time, F[:, 1], coupling_term, tdd,
smooth_scaling)
C3 = current_yd + dt * K2
K3 = _dmp_acc(
current_y + dt * C2, C3, cdd, alpha_y, beta_y, goal_y, goal_yd,
goal_ydd, start_y, Z[2], execution_time, F[:, 2], coupling_term, tdd)
goal_ydd, start_y, Z[2], execution_time, F[:, 2], coupling_term, tdd,
smooth_scaling)

current_y += dt * (current_yd + dt / 6.0 * (K0 + K1 + K2))
current_yd += dt / 6.0 * (K0 + 2 * K1 + 2 * K2 + K3)
Expand All @@ -123,7 +132,8 @@ def dmp_step_rk4(


def _dmp_acc(Y, V, cdd, alpha_y, beta_y, goal_y, goal_yd, goal_ydd,
start_y, z, execution_time, f, coupling_term, tdd):
start_y, z, execution_time, f, coupling_term, tdd,
smooth_scaling):
"""DMP acceleration.
Parameters
Expand Down Expand Up @@ -172,19 +182,28 @@ def _dmp_acc(Y, V, cdd, alpha_y, beta_y, goal_y, goal_yd, goal_ydd,
tdd : array, shape (n_dims,)
Acceleration correction from tracking error controller.
smooth_scaling : bool, optional (default: False)
Avoids jumps during the beginning of DMP execution when the goal
is changed and the trajectory is scaled by interpolating between
the old and new scaling of the trajectory.
Returns
-------
ydd : array, shape (n_dims,)
Resulting acceleration.
"""
if coupling_term is not None:
_, cdd = coupling_term.coupling(Y, V)
if smooth_scaling:
smoothing = beta_y * (goal_y - start_y) * z
else:
smoothing = 0.0
return (
(
alpha_y * (
beta_y * (goal_y - Y)
+ execution_time * (goal_yd - V)
- beta_y * (goal_y - start_y) * z
- smoothing
)
+ f
+ cdd
Expand All @@ -198,7 +217,7 @@ def dmp_step_euler(
last_t, t, current_y, current_yd, goal_y, goal_yd, goal_ydd, start_y,
start_yd, start_ydd, goal_t, start_t, alpha_y, beta_y, forcing_term,
coupling_term=None, coupling_term_precomputed=None, int_dt=0.001,
p_gain=0.0, tracking_error=0.0):
p_gain=0.0, tracking_error=0.0, smooth_scaling=False):
"""Integrate regular DMP for one step with Euler integration.
Parameters
Expand Down Expand Up @@ -264,6 +283,11 @@ def dmp_step_euler(
tracking_error : float, optional (default: 0)
Tracking error from last step.
smooth_scaling : bool, optional (default: False)
Avoids jumps during the beginning of DMP execution when the goal
is changed and the trajectory is scaled by interpolating between
the old and new scaling of the trajectory.
"""
if start_t >= goal_t:
raise ValueError("Goal must be chronologically after start!")
Expand All @@ -290,19 +314,21 @@ def dmp_step_euler(

z = forcing_term.phase(current_t, int_dt)
f = forcing_term.forcing_term(z).squeeze()
if smooth_scaling:
smoothing = beta_y * (goal_y - start_y) * z
else:
smoothing = 0.0

coupling_sum = cdd + p_gain * tracking_error / dt
ydd = (
alpha_y * (
beta_y * (goal_y - current_y)
+ execution_time * goal_yd
- execution_time * current_yd
- beta_y * (goal_y - start_y) * z
+ execution_time * (goal_yd - current_yd)
- smoothing
)
+ goal_ydd * execution_time ** 2
+ f
+ coupling_sum
) / execution_time ** 2
) / execution_time ** 2 + goal_ydd
current_yd += dt * ydd + cd / execution_time
current_y += dt * current_yd

Expand Down Expand Up @@ -332,7 +358,7 @@ class DMP(WeightParametersMixin, DMPBase):
Behaviors (2013), Neural Computation 25(2), pp. 328-373, doi:
10.1162/NECO_a_00393, https://ieeexplore.ieee.org/document/6797340
with modification of scaling proposed by
(if smooth scaling is activated) with modification of scaling proposed by
P. Pastor, H. Hoffmann, T. Asfour, S. Schaal:
Learning and Generalization of Motor Skills by Learning from Demonstration,
Expand Down Expand Up @@ -360,6 +386,11 @@ class DMP(WeightParametersMixin, DMPBase):
Gain for proportional controller of DMP tracking error.
The domain is [0, execution_time**2/dt].
smooth_scaling : bool, optional (default: False)
Avoids jumps during the beginning of DMP execution when the goal
is changed and the trajectory is scaled by interpolating between
the old and new scaling of the trajectory.
Attributes
----------
execution_time_ : float
Expand All @@ -370,13 +401,15 @@ class DMP(WeightParametersMixin, DMPBase):
the frequency.
"""
def __init__(self, n_dims, execution_time=1.0, dt=0.01,
n_weights_per_dim=10, int_dt=0.001, p_gain=0.0):
n_weights_per_dim=10, int_dt=0.001, p_gain=0.0,
smooth_scaling=False):
super(DMP, self).__init__(n_dims, n_dims)
self._execution_time = execution_time
self.dt_ = dt
self.n_weights_per_dim = n_weights_per_dim
self.int_dt = int_dt
self.p_gain = p_gain
self.smooth_scaling = smooth_scaling

self._init_forcing_term()

Expand Down Expand Up @@ -458,7 +491,8 @@ def step(self, last_y, last_yd, coupling_term=None,
coupling_term=coupling_term,
int_dt=self.int_dt,
p_gain=self.p_gain,
tracking_error=tracking_error)
tracking_error=tracking_error,
smooth_scaling=smooth_scaling)
return np.copy(self.current_y), np.copy(self.current_yd)

def open_loop(self, run_t=None, coupling_term=None,
Expand Down Expand Up @@ -503,7 +537,8 @@ def open_loop(self, run_t=None, coupling_term=None,
self.forcing_term,
coupling_term,
run_t, self.int_dt,
step_function)
step_function,
smooth_scaling=smooth_scaling)

def imitate(self, T, Y, regularization_coefficient=0.0,
allow_final_velocity=False):
Expand Down Expand Up @@ -728,8 +763,8 @@ def ridge_regression(X, Y, regularization_coefficient):
def dmp_open_loop(
goal_t, start_t, dt, start_y, goal_y, alpha_y, beta_y, forcing_term,
coupling_term=None, run_t=None, int_dt=0.001,
step_function=dmp_step_rk4, start_yd=None, start_ydd=None,
goal_yd=None, goal_ydd=None):
step_function=dmp_step_rk4, smooth_scaling=False,
start_yd=None, start_ydd=None, goal_yd=None, goal_ydd=None):
"""Run DMP without external feedback.
Parameters
Expand Down Expand Up @@ -771,6 +806,11 @@ def dmp_open_loop(
step_function : callable, optional (default: dmp_step_rk4)
DMP integration function.
smooth_scaling : bool, optional (default: False)
Avoids jumps during the beginning of DMP execution when the goal
is changed and the trajectory is scaled by interpolating between
the old and new scaling of the trajectory.
start_yd : shape (n_dims,)
Start state derivative (velocity).
Expand Down Expand Up @@ -818,7 +858,7 @@ def dmp_open_loop(
goal_t=goal_t, start_t=start_t,
alpha_y=alpha_y, beta_y=beta_y,
forcing_term=forcing_term, coupling_term=coupling_term,
int_dt=int_dt)
int_dt=int_dt, smooth_scaling=smooth_scaling)
Y[i] = current_y

return T, Y
51 changes: 36 additions & 15 deletions movement_primitives/dmp_fast.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ cpdef dmp_step(
object forcing_term, object coupling_term=None,
tuple coupling_term_precomputed=None,
double int_dt=0.001, double p_gain=0.0,
np.ndarray tracking_error=None):
np.ndarray tracking_error=None,
bool smooth_scaling=False):
"""Integrate regular DMP for one step with Euler integration.
Parameters
Expand Down Expand Up @@ -130,6 +131,11 @@ cpdef dmp_step(
tracking_error : float, optional (default: 0)
Tracking error from last step.
smooth_scaling : bool, optional (default: False)
Avoids jumps during the beginning of DMP execution when the goal
is changed and the trajectory is scaled by interpolating between
the old and new scaling of the trajectory.
"""
if start_t >= goal_t:
raise ValueError("Goal must be chronologically after start!")
Expand All @@ -153,6 +159,7 @@ cpdef dmp_step(
cdef double current_t
cdef double dt
cdef double z
cdef double smoothing

current_t = last_t
while current_t < t:
Expand All @@ -178,17 +185,20 @@ cpdef dmp_step(
s = phase(current_t, forcing_term.alpha_z, goal_t, start_t, int_dt=int_dt)

for d in range(n_dims):
if smooth_scaling:
smoothing = beta_y * (goal_y[d] - start_y[d]) * z
else:
smoothing = 0.0

current_ydd[d] = (
alpha_y * (
beta_y * (goal_y[d] - current_y[d])
+ execution_time * goal_yd[d]
- execution_time * current_yd[d]
- beta_y * (goal_y[d] - start_y[d]) * z
+ execution_time * (goal_yd[d] - current_yd[d])
- smoothing
)
+ goal_ydd[d] * execution_time ** 2
+ f[d]
+ cdd[d]
) / execution_time ** 2
) / execution_time ** 2 + goal_ydd[d]
current_yd[d] += dt * current_ydd[d] + cd[d] / execution_time
current_y[d] += dt * current_yd[d]

Expand All @@ -206,7 +216,8 @@ cpdef dmp_step_rk4(
object forcing_term, object coupling_term=None,
tuple coupling_term_precomputed=None,
double int_dt=0.001, double p_gain=0.0,
np.ndarray tracking_error=None):
np.ndarray tracking_error=None,
bool smooth_scaling=False):
"""Integrate regular DMP for one step with RK4 integration.
Parameters
Expand Down Expand Up @@ -272,6 +283,11 @@ cpdef dmp_step_rk4(
tracking_error : float, optional (default: 0)
Tracking error from last step.
smooth_scaling : bool, optional (default: False)
Avoids jumps during the beginning of DMP execution when the goal
is changed and the trajectory is scaled by interpolating between
the old and new scaling of the trajectory.
"""
cdef int n_dims = current_y.shape[0]

Expand All @@ -297,22 +313,22 @@ cpdef dmp_step_rk4(
cdef np.ndarray[double, ndim=1] K0 = _dmp_acc(
current_y, current_yd, cdd, dt, alpha_y, beta_y, goal_y, goal_yd,
goal_ydd, start_y, Z[0], execution_time, F[:, 0], coupling_term,
p_gain, tdd)
p_gain, tdd, smooth_scaling)
cdef np.ndarray[double, ndim=1] C1 = current_yd + dt_2 * K0
cdef np.ndarray[double, ndim=1] K1 = _dmp_acc(
current_y + dt_2 * current_yd, C1, cdd, dt, alpha_y, beta_y, goal_y, goal_yd,
goal_ydd, start_y, Z[1], execution_time, F[:, 1], coupling_term,
p_gain, tdd)
p_gain, tdd, smooth_scaling)
cdef np.ndarray[double, ndim=1] C2 = current_yd + dt_2 * K1
cdef np.ndarray[double, ndim=1] K2 = _dmp_acc(
current_y + dt_2 * C1, C2, cdd, dt, alpha_y, beta_y, goal_y, goal_yd,
goal_ydd, start_y, Z[1], execution_time, F[:, 1], coupling_term,
p_gain, tdd)
p_gain, tdd, smooth_scaling)
cdef np.ndarray[double, ndim=1] C3 = current_yd + dt * K2
cdef np.ndarray[double, ndim=1] K3 = _dmp_acc(
current_y + dt * C2, C3, cdd, dt, alpha_y, beta_y, goal_y, goal_yd,
goal_ydd, start_y, Z[2], execution_time, F[:, 2], coupling_term,
p_gain, tdd)
p_gain, tdd, smooth_scaling)

cdef int i
for i in range(n_dims):
Expand All @@ -330,21 +346,26 @@ cdef _dmp_acc(
np.ndarray[double, ndim=1] goal_y, np.ndarray[double, ndim=1] goal_yd,
np.ndarray[double, ndim=1] goal_ydd, np.ndarray[double, ndim=1] start_y,
double z, double execution_time, np.ndarray[double, ndim=1] f,
object coupling_term, double p_gain, np.ndarray[double, ndim=1] tdd):
object coupling_term, double p_gain, np.ndarray[double, ndim=1] tdd,
bool smooth_scaling):
if coupling_term is not None:
_, cdd[:] = coupling_term.coupling(Y, V)

cdef int n_dims = Y.shape[0]
cdef np.ndarray[double, ndim=1] Ydd = np.empty(n_dims, dtype=np.float64)
cdef int d
cdef double smoothing
for d in range(n_dims):
if smooth_scaling:
smoothing = beta_y * (goal_y[d] - start_y[d]) * z
else:
smoothing = 0.0
Ydd[d] = (
(
alpha_y * (
beta_y * (goal_y[d] - Y[d])
+ execution_time * goal_yd[d]
- execution_time * V[d]
- beta_y * (goal_y[d] - start_y[d]) * z
+ execution_time * (goal_yd[d] - V[d])
- smoothing
)
+ f[d]
+ cdd[d]
Expand Down

0 comments on commit 041eb89

Please sign in to comment.