-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathfiltered.py
698 lines (618 loc) · 30.4 KB
/
filtered.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
#!/usr/bin/env python
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
"""
Filtering functionality used by various other modules.
"""
# This file is part of Owner Credit
#
# Owner Credit is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Owner Credit is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Owner Credit. If not, see <http://www.gnu.org/licenses/>.
__author__ = "Perry Kundert"
__email__ = "[email protected]"
__copyright__ = "Copyright (c) 2006 Perry Kundert"
__license__ = "Dual License: GPLv3 (or later) and Commercial (see LICENSE)"
import math
import collections
from . import misc
#
# averaged -- Simple average over total specified time period
# weighted -- Weighted average of individual sample/time periods
# weighted_linear -- Simple linear weighted average of all samples within period
#
# Evaluates and returns the new average, optionally weighted by the current value and the
# elapsed time.
#
# Handles an intial value (or None, representing no samples yet). Also handles samples of NaN,
# indicating that a problem occured and there are no new samples available; in this case, when the
# averaging interval expires, the result will revert to NaN. Otherwise, it will retain the last
# known value.
class averaged( misc.value ):
"""
Acts like an integer or float, but returns a simple average of values over a time period.
Samples with identical value/timestamp are ignored. Using no timestamp (or None) causes the
class to use the current real time. Values persist indefinitely 'til replaced by new value(s)
that are within the time interval window.
"""
__slots__ = [ 'interval', 'history' ]
def __init__( self,
interval,
value = 0,
now = None,
lock = misc.value.NoOpRLock()):
self.interval = interval
self.history = collections.deque()
# Initial sample
misc.value.__init__( self, value=value, now=now, lock=lock )
def purge( self,
now = None ):
"""
Discard outdated samples, leaving one that is exactly at or outside the interval window.
The timestamp of the last value in self.history defines the duration used in computing the
average, if less than self.interval. Entries must be in ascending timestamp order.
"""
if now is None:
now = self.now
with self.lock:
deadline = now - self.interval
while len( self.history ) > 1 and self.history[-2][1] <= deadline:
# Second-last value is still at or outside window; discard the last one
self.history.pop()
def compute( self,
now = None ):
"""Return simple average of samples. Recomputes value if history is not empty; it will never
be, so long as a sample has been added. Returns value (without recomputing if history is
empty of relevant values.)
Does not attempt to retain an integer average, even if all samples are integer.
Simple average uses the exclusive range, to retain the idea of an integer interval value
only containing up to its own number of samples; for example, with interval=10 and a now=10,
and with samples at time stamps 10, 9, ... 2, 1, 0, the computed average would only reflect
the latest 9 of the historical values: 10, 9, ..., 2. Note that our 'purge' method may
retain unnecessary samples, for more complex (derived) averaging methods.
"""
with self.lock:
if now is None:
now = self.now
value = 0
count = 0
for v,t in self.history:
if t > now - self.interval:
# sample is within (now, now-interval]; use it.
value += v
#print " --> + " + str( v ),
else:
break
count += 1
if count:
#print " == " + str( value ) + " / " + str( count ),
value /= count
else:
# No relevant history...
if not self.history or math.isnan( self.value ):
# No history at all, or last computed value has been set to NaN; retain.
#print " (no history) "
value = self.value
else:
# We do have history, and a valid computed value. Retain historical value.
#print " (historical) "
value = self.history[0][0]
#print " == " + str( value )
return value
def sample( self,
value = None,
now = None ):
"""
Add sample, and re-compute value (simple average, only values within interval). Should be
usable without change for derived classes that use this history mechanism, if derived
classes implement appropriate purge and/or compute methods. Returns newly computed result.
If None value provided, uses last value (if no samples, raises IndexError exception).
"""
if isinstance( value, misc.value ):
# Another misc.value; we'll compute its current value relative to the timestamp we're
# given (if None; obtain from other value, holding its lock for consistency)
with value.lock:
if now is None:
now = value.now
value = value.compute( now=now )
# Lock required to ensure consistent multi-step update. Updating with a None/NaN will
# update our time (and remember the non-value), but will not contaminate our history. In
# other words, it will indicate a problem with the value, but when corrected, correct
# computation of values will resume.
with self.lock:
# Now that we have a lock, we can ensure that we'll always get a time ahead of self.now,
# unless someone went out of their way to manually use a future time!
if value is None:
value = self.history[0][0]
if now is None:
now = misc.timer()
# We cannot allow revision of history, but multiple samples at the same instant is OK
if now < self.now:
raise ZeroDivisionError( "Invalid sample; attempting to use out-of-order 'now' time value" )
# Reject simple duplicates, (eg. so py.test works; calls multiple times on assertion failures,
# expects no side effects). No lock required; self.history is not allowed to disappear, and
# tuples are immutable
if self.history and self.history[0] == ( value, now ):
return self.value
self.purge( now=now )
if value is None or misc.isnan( value ):
# A non-numeric, but allowed value. Remember it; we may use it or overwrite it, if
# valid history remains to compute a more appropriate value..
self.value = value
else:
# Otherwise, encode the sample in history.
self.history.appendleft( ( value, now ) )
# We expect compute to *retain* non-values (ie. None/NaN) if no sample within interval.
# Otherwise, compute an appropriate value. This is subtle; we need to remember
# non-values, but any legal value is basically ignored; it is just a flag that indicates
# (to 'compute') that it should go ahead and compute a value, perhaps based on ancient
# historical data.
self.value = self.compute( now=now )
self.now = now
return self.value
class weighted( averaged ):
"""
Acts like an integer or float in most situations, but returns a time-weighted average of the
samples presented to it. Results of adding zero or negative time-interval samples is undefined.
Each pair of values defines minimum and maximum value for the area between them. Therefore,
each newly added value contributes to the result immediately, using a simple average of the two
values, weighted by the time between the two values vs. the total duration.
Note that this will make the output non-continuous, over time -- adding a new sample will result
in large jumps in output value, because the new value influences the effective value over the
entire range of time between the current and previous sample!
"""
__slots__ = []
def __init__( self,
interval,
value = 0,
now = None,
lock = averaged.NoOpRLock()):
averaged.__init__( self, interval=interval, value=value, now=now, lock=lock )
def compute( self,
now = None ):
"""Time-weighted average. Oldest value (outside interval window) only used for portion of
interval where no in-window value is available.
"""
# Determine either the end of the interval, or the oldest supplied value, whichever
# is latest (highest timestamp). Remember the oldest known value (may be outside
# of interval, and hence just clamped its timestamp 'then' to the end of interval).
#
#
# v0 v1 v2 v3
# |---------- interval ------------|
# then == ^
# last == v3
#
# If no history is available (no samples) or no time has passed since last sample, then
# returns a non-value self.value unchanged. This should be None/NaN, if that was what was
# supplied at initialization. Otherwise, compute the fresh value.
with self.lock:
if now is None:
now = self.now
elif now < self.now:
# We cannot allow recomputation of history
raise ZeroDivisionError(
"Invalid compute; attempting to use out-of-order 'now=%s' time value (vs. %s)" % (
str( now ), str( self.now )))
if not self.history or now - self.interval > self.history[0][1]:
if self.value is None or math.isnan( self.value ):
# No history, or expired, and our last sample is NaN/None; retain value
return self.value
if not ( now > self.now ):
# No time has passed since last sample; use last computed value
return self.value
if ( self.interval <= 0
or not( now >= self.history[0][1]
and now > self.history[-1][1] )):
# No interval, or no net offset between now and first/last historical value; we only
# have single usable historical value.
return self.history[0][0]
# We have at least one non-empty sample period; clip off the portion of the difference
# "outside" interval.
start = now
last = self.history[0][0]
offset = 0 # First value at 0 offset; will *always* end up > 0!
then = offset
value = 0
for v,t in self.history:
offset = start - t
vclip = v
if offset > self.interval:
# Clip to self.interval, linearly last --> v
outside = offset - self.interval
offset = self.interval
vclip -= last
vclip = vclip * self.interval / offset
vclip += last
dt = offset - then
vavg = ( last + vclip ) / 2
if dt >= 0:
# This value is not in reverse time order; use it
#print " --> " + str( vavg ) + "(" + str( vclip ) + ") * " + str( dt ),
value += vavg * dt
last = v
then = offset
#print " == " + str( value ) + " / " + str( offset ),
value /= offset
#print " == " + str( value )
return value
class weighted_linear( averaged ):
"""
Acts like an integer or float in most situations, but returns a time-weighted linear average of the
samples presented to it. Results of adding zero or negative time-interval samples is undefined.
Each value is considered to persist linearly, until a new value is added with a later timestamp.
Therefore, (perhaps surprisingly) each newly added value doesn't actually influence the result
'til time has "passed" (eg. time is advanced when the next value is added).
This form of weighted average is more what is normally expected; over the duration of the
interval, the result will gradually reflect more of the new sample, and less of the old.
"""
__slots__ = []
def __init__( self,
interval,
value = 0,
now = None,
lock = averaged.NoOpRLock()):
averaged.__init__( self, interval=interval, value=value, now=now, lock=lock )
def compute( self,
now = None ):
"""
Time-weighted linear average. Oldest value (outside interval window) only used for portion
of interval where no in-window value is available; values are considered to persist 'til a
newer value is supplied. If now is supplied, computes relative to that instant, instead of
self.now.
By considering the initial sample to be now and the most recent historical value, we can
avoid specially handling the first history entry.
Does not alter self.value (or any other attribute), but takes the lock to ensure consistency.
"""
# Determine either the end of the interval, or the oldest supplied value, whichever
# is latest (highest timestamp). Remember the oldest known value (may be outside
# of interval, and hence just clamped its timestamp 'then' to the end of interval).
#
#
# .history entries: final
# v--dt--v------3
# |
# ------1 |
# -----------------2 |
# 0 |
# |---------- interval ------------|
# ^---dt--^ then == ^
# now B now A last == 3
#
# If now (A) is coincident with history[0], that value will not be represented (dt == 0 in
# initial loop). If now is future (B), then the initial dt will reflect the 0'th value.
#
# Handles no history, returning value unchanged.
with self.lock:
if now is None:
now = self.now
elif now < self.now:
# We cannot allow recomputation of history
raise ZeroDivisionError(
"Invalid compute; attempting to use out-of-order 'now=%s' time value (vs. %s)" % (
str( now ), str( self.now )))
if not self.history or now - self.interval > self.history[0][1]:
if self.value is None or math.isnan( self.value ):
# No history, or expired, and our last sample is NaN/None; retain value
return self.value
if not ( now > self.now ):
# No time has passed since last sample; use last computed value
return self.value
if ( self.interval <= 0
or not( now >= self.history[0][1]
and now > self.history[-1][1] )):
# Good value and history, but no net time offset between now and first/last
# historical value; we only have single usable historical value.
return self.history[0][0]
# We have at least 2 samples and a non-empty range within self.interval; clip off the
# portion of the difference "outside" interval.
start = now
offset = 0 # First value at 0 offset; must *always* end up > 0!
then = offset
value = 0
for v,t in self.history:
offset = start - t
if offset > self.interval: # Clip to self.interval
offset = self.interval
dt = offset - then
if dt >= 0:
# This value is not in reverse time order; and is at least partially within the
# interval; use it
#print " --> " + str( v ) + " * " + str( dt ),
value += v * dt
then = offset
#print " == " + str( value ) + " / " + str( offset ),
value /= offset
#print " == " + str( value )
return value
class level( misc.value ):
r"""
Filter the incoming values into levels.
The minimal configuration requires a normal value (and no hysteresis). Incoming values will be
either normal (0) or lo (-1) state; somewhat oddly, though, a value exactly at the limit
(eg. offset [0] from normal, the default) will drive us into the "lo" level! Remember, we must
only "meet" the limit when moving away from normal, but must "exceed" it when moving toward
normal.
normal
o
normal 0.0 ---------------o---
o
lo o o
o o ^
^ |
| +-- normal
+-------- lo
Here is a 5-state alarm example:
normal = 0.0
levels = [
-3.0,
-1.0,
1.0,
3.0
]
hysteresis = .1
hi hi
3.0 -------------------
2.9 . . . . . . . . . .
hi
1.0 -----------------o-
.9 . . . . . . . . o .
normal 0.0 o
-.9 . \ . . . . o o
-1.0 ---\-------o-o ^
lo \ o ^ |
\ o | |
-2.9 . . . o o . . | |
-3.0 -------v----- | |
/ lo lo | |
levels ^ ^ ^ | |
| | | | |
| | | | +- hi
| | | +--- normal - must exceed hysteresis toward normal!
| | +--------- lo - must exceed hysteresis toward normal!
| +----------- lo lo
+--------------- lo
"""
__slots__ = [ 'normal', 'limits', 'hysteresis', 'interval', 'state' ]
def __init__( self,
normal = 0, # Normal value ==> 2 states (1 hi, -1 lo)
hysteresis = 0, # Value hysteresis; must exceed toward normal state
limits = None, # Each level adds a state (0 normal, -2 lo lo, 2 hi hi, ...)
interval = None, # Time hysteresis; state change even within hysteresis
value = 0, # Initial value
now = None,
lock = misc.value.NoOpRLock()):
self.normal = normal # The value considered in "normal" level
self.hysteresis = hysteresis
self.limits = limits or [0] # The default with no limits is "normal" and "lo"
self.interval = interval
self.state = 0
# Invokes the initial sample(...)
misc.value.__init__( self, value=value, now=now, lock=lock )
def level( self ):
return self.state
def name( self ):
lvl = self.level()
if lvl == 0:
return "normal"
return ' '.join( [ lvl < 0 and 'lo' or 'hi' ] * abs( lvl ))
def sample( self,
value = None,
now = None ):
# Compute the limits, for going upwards and downwards
#
# Yes, these will skip normal iff hysteresis > than the distance between the two adjacent
# states!
#
# self.limits: [-1, 1]
# self.hysteresis: .25
#
# up: [-.75, 1.0]
# dn: [-1.0, .75]
#
# hi_sta: 1
# lo_sta: -1
if isinstance( value, misc.value ):
with value.lock:
if now is None:
now = value.now
value = value.compute( now=now )
else:
if value is None:
value = self.value
if now is None:
now = misc.timer()
with self.lock:
state = self.state
limits = sorted( self.limits )
up = [ self.normal + lim + ( lim <= 0 and self.hysteresis or 0 )
for lim in limits ]
dn = [ self.normal + lim - ( lim > 0 and self.hysteresis or 0 )
for lim in limits ]
lo_sta = -len( [ lim for lim in limits
if lim <= 0 ] )
hi_sta = lo_sta + len( limits )
state = misc.clamp( state, ( lo_sta, hi_sta ))
'''
print "state == ", state
print "lo_sta == ", lo_sta
print "hi_sta == ", hi_sta
print up
print dn
'''
# Did we exit our state upwards?
while state < hi_sta:
lim = up[state - lo_sta]
if state < 0:
# Must exceed limit towards normal
if value > lim:
state += 1
#print "Value %s exceeds %s, moves us up, to state %s" % (
# value, lim, state )
else:
break
else:
# ... only meet it away from normal
if value >= lim:
state += 1
#print "Value %s meets %s, moves us up, to state %s" % (
# value, lim, state )
else:
break
# ... or downwards?
while state > lo_sta:
lim = dn[state - lo_sta - 1]
if state > 0:
if value < lim:
state -= 1
#print "Value %s exceeds %s, moves down, to state %s" % (
# value, lim, state )
else:
break
else:
if value <= lim:
state -= 1
#print "Value %s meets %s, moves down, to state %s" % (
# value, lim, state )
else:
break
if ( state != self.state
or value > self.value + self.hysteresis
or value < self.value - self.hysteresis ):
self.value = value
self.state = state
self.now = now
return self.value
#
# filter -- filter values over time
#
# Takes the current value and last average, and returns the new
# average, weighted by the current value and the elapsed time.
#
# Optionally specify time-weighted averages by specifying a
# non-'nan' value for weighted, which will also serve as the initial
# interval's value.
#
# The relevant time-range of values is between now and
# now-interval, exclusive: (now, now-interval]
#
# In other words, if interval == 10, and you insert 4 values, that
# last at time now == 100:
#
# [ ( 3, 100 ), ( 2, 94 ), ( 1, 91 ), ( 0, 90 ) ]
#
# then the latest three values 3, 2 and 1 will be included in the
# simple filter with equal weight, while the oldest 3 values 2, 1 and
# 0 will be included in the time-weighted average, with weights 0 * 1,
# 1 * 3, and 2 * 6. The most recent value discarded ( 0 ) becomes the
# new self.weighted, so that its value "persists" until replaced by
# the next value within the interval.
#
# WARNING
#
# Simple filtering (scalar window) treats each value as if it was
# an average value across an identical period of time with each other
# value, thus rejecting the oldest value that just fell out of the
# time window, and including the latest. Time-weighted averaging
# (interval,value tuple window) treats each value as if it were the
# value for the duration beginning at its timestamp, 'til the next
# value. Therefore, the oldest value discarded is remembered (as its
# value extends into the end of the window), and the newest value is
# not used (as it has a 0. duration).
#
# Therefore, the latest value added will *not* influence the
# time-weighted average, but the oldest (just discarded) value
# *will*.. This may be surprising to most users, but makes sense when
# you consider it. If a value has a certain known persistence, you
# may want to enter it twice -- first with a timestamp indicating when
# it took effect, and once more with the current timestamp, to get its
# effect on the output value.
#
# Basically, if you use identically spaced samples, then simple
# filtering treats the samples as the value over the period *ending*
# at the given timestamp, while time-weighted filtering treats the
# samples as the value at the *beginning* at the given timestamp.
#
# Result of get() indeterminate 'til first sample added...
#
# SUMMARY
#
#
# Overall, too complex (mixes simple/time average), and must be
# explicitly supported by the target. Use 'averaged' and derived
# classes, which act like integers or floats (and can often be used as
# such), but implement similar filtering features.
#
class filter( object ):
def __init__( self,
interval, # May be a scalar interval, or tuple/list of interval, initial value
now = None ):
if now is None:
now = misc.timer()
try: # Changing will take effect after next 'add'
self.interval = interval[0] # The filter window interval
self.weighted = interval[1] # Latest value to pass beyond time interval window
except:
self.interval = interval
self.weighted = math.nan
self.now = now
self.history = collections.deque()
if not self.interval and not( math.isnan( self.weighted )):
# Zero timed weighting w/initial value; could be non-zero later, but make it work initially
self.history.appendleft( ( self.weighted, self.now ) )
self.sum = 0.
def get( self ):
if math.isnan( self.weighted ):
return self.sum / len( self.history )
if self.interval: # time-weighted...
return self.sum / self.interval
return self.history[0][0] # but interval set to 0.! Return instaneous value.
def add( self,
value,
now = None ):
if now is None:
now = misc.timer()
# Reject simple duplicates, so py.test works (calls multiple
# times on assertion failures, expects no side effects)
if len( self.history ):
if self.history[0] == ( value, now ):
return self.get();
# Purge dead values. The oldest one discarded becomes the
# current self.weighted (if non-'nan'). As soon as a value
# reaches the end of the window, it is discarded.
dead = now - self.interval
while len( self.history ) and self.history[-1][1] <= dead:
if not math.isnan( self.weighted ):
if not math.isnan( self.history[-1][0] ):
self.weighted = self.history[-1][0]
self.history.pop()
# Save new value
self.history.appendleft( ( value, now ) )
# Compute time-weighted or simple average of remaining values
self.sum = 0.
if math.isnan( self.weighted ):
# Simple average
for v,t in self.history:
self.sum += v
else:
# Time-weighted. If multiple values at same time, latest is used.
# Out-of-order values discarded.
then = self.history[0][1] - self.interval
last = self.weighted
for v,t in reversed( self.history ):
dt = t - then
if dt >= 0:
self.sum += last * dt
last = v
then = t
return self.get()