-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathboundaries.py
165 lines (132 loc) · 6.24 KB
/
boundaries.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import numpy as np
from math import sqrt
from scipy.stats import norm
from Config import Config
from threading import Thread
import sys
import os
from utils import expectation_radius
class BrownianParticle():
def __init__(self, Config, xpos, ypos):
#print( xpos)
#print( ypos)
self.Config = Config
self.x = xpos
self.y = ypos
self.curr_iter = 0
self.n_iters = int(Config.N)
self.stuck_hist = np.zeros((int(Config.N),1))
self.pos_hist = np.zeros((int(Config.N),2))
self.exp_r = np.zeros(Config.n_steps)
self.avg_pos = np.zeros((Config.n_steps,2))
self.pos_hist[0,:] = np.array([xpos,ypos])
def run(self):
# sys.stdout.write('[%s] running ... process id: %s\n'
# % (self.name, os.getpid()))
for i in range(Config.n_steps):
self.step(Config.dt,self.n_iters)
self.avg_pos[i] = np.average(self.pos_hist, axis = 0)
self.exp_r[i] = expectation_radius(self.pos_hist, center=[.1,.1])
self.reset()
def step(self, dt, n):
self.dt = dt
x0 = self.pos_hist[self.curr_iter,:]
# For each element of x0, generate a sample of n numbers from a
# normal distribution.
r = norm.rvs(size=x0.shape + (n,), scale = Config.sigma)
#r = r.T
#alpha is a drag coefficient I guess, freefall when 1
drift = self.Config.alpha*(self.Config.g * dt**2)/2
r[1,:] = r[1,:] + drift
out = np.empty(r.shape)
out += np.expand_dims(x0, axis=-1)
out = self.applyValues(r,out)
self.pos_hist[self.curr_iter:self.curr_iter+n,:] = out.T
self.curr_iter += n
self.x = out[0,-1]
self.y = out[1,-1]
def step_interrupt(self, curr_iter, out):
# if there is a sticking event we need to leave the particle there for some time then recalculate the walk
x0 = self.pos_hist
x0[curr_iter:curr_iter + self.sticking_time,:] = x0[curr_iter-1,:]
# For each element of x0, generate a sample of n numbers from a
# normal distribution.
r = norm.rvs(size=x0.shape, scale = Config.sigma)
#r = r.T
#alpha is a drag coefficient I guess, freefall when 1
drift = self.Config.alpha*(self.Config.g * self.dt**2)/2
r += drift
out = np.empty(r.shape)
out[:curr_iter,:] = x0[:curr_iter,:]
out[curr_iter + self.sticking_time:,:] += r[curr_iter+self.sticking_time:,:]
self.pos_hist = out
return out.T
def reset(self):
self.pos_hist = np.zeros_like(self.pos_hist)
self.pos_hist[0,:] = np.array([self.x,self.y])
self.curr_iter = 0
def stick_fnc(self, y, offset=15, skew=2):
# calculates probability of a sticking event using the sigmoid function
# offset : determines where 50% probability of sticking will be
# skeW : determines the ramp
# y : Particle vertical displacement
# return : True if a uniform sampling event is less than or equal to the sticking probability
p = 1 / (1 + np.exp(-y/skew + offset))
stickResult = np.random.rand() <= p
#stickResult = np.random.rand() <= 0.25
#print('stick_func = ' + str(stickResult) + ' p = ' + str(p))
return stickResult
def applyValues(self,r,out):
bounds = self.Config.env_tuple
out[0,0] = self.x
out[1,0] = self.y
sticking_time = self.Config.sticking_time
for i in range(1, self.pos_hist.shape[0]):
# proper bounce
stuck = 0
sticking_time = min(sticking_time,self.pos_hist.shape[0]-i)
if (r[0,i] + out[0,i-1])<bounds[0]:
if self.stick_fnc(out[1,i]) and self.Config.sticky and out[0,i-1]!=bounds[0]:
#if r[0,i] + out[0,i-1]<bounds[0]:
# print('1 out[0,i-1]='+str(out[0,i-1])+' r[0,i]='+str(r[0,i]))
r[0,i]= (bounds[0] - out[0,i-1])
r[0,i+1:i+sticking_time]= 0
r[1,i+1:i+sticking_time]= 0
self.stuck_hist[i+1:i+sticking_time]= 1
stuck = sticking_time
#if r[0,i] + out[0,i-1]<bounds[0]:
# print('2 out[0,i-1]='+str(out[0,i-1])+' r[0,i]='+str(r[0,i]))
else:
r[0,i]= -(out[0,i-1] - bounds[0] + (r[0,i] + out[0,i-1] - bounds[0]))
if (r[0,i] + out[0,i-1])>bounds[1]:
if self.stick_fnc(out[1,i]) and self.Config.sticky and out[0,i-1]!=bounds[1]:
r[0,i]= (bounds[1] - out[0,i-1])
r[0,i+1:i+sticking_time]= 0
r[1,i+1:i+sticking_time]= 0
self.stuck_hist[i+1:i+sticking_time]= 1
stuck = sticking_time
else:
r[0,i]= -(out[0,i-1] - bounds[1] + (r[0,i] + out[0,i-1] - bounds[1]))
if (r[1,i] + out[1,i-1])<bounds[2]:
if self.stick_fnc(out[1,i]) and self.Config.sticky and out[0,i-1]!=bounds[2]:
r[1,i]= (bounds[2] - out[1,i-1] )
r[0,i+1:i+sticking_time]= 0
r[1,i+1:i+sticking_time]= 0
self.stuck_hist[i+1:i+sticking_time]= 1
stuck = sticking_time
else:
r[1,i]= -(out[1,i-1] - bounds[2] + (r[1,i] + out[1,i-1] - bounds[2]))
if (r[1,i] + out[1,i-1])>bounds[3]:
if self.stick_fnc(out[1,i]) and self.Config.sticky and out[0,i-1]!=bounds[3]:
r[1,i]= (bounds[3] - out[1,i-1])
r[0,i+1:i+sticking_time]= 0
r[1,i+1:i+sticking_time]= 0
self.stuck_hist[i+1:i+sticking_time]= 1
stuck = sticking_time
else:
r[1,i]= -(out[1,i-1] - bounds[3] + (r[1,i] + out[1,i-1] - bounds[3]))
out[0,i] = r[0,i] + out[0,i-1]
out[1,i] = r[1,i] + out[1,i-1]
if stuck:
stuck = stuck - 1
return out