-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathStorage_box_RPi4.py
238 lines (191 loc) · 6.25 KB
/
Storage_box_RPi4.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
"""
Storage box for sensor saving.
Created on Sun Jan 31 13:49:29 2021
@author: fredborg
"""
import json
import threading
import types
class Storage_Box(dict):
"""
Stores sensor data, can give outstrings for sending at will.
Can recive sensor data form different threads and save them, cann then send
the data to a new thread as raw data or a string parsed as the project
parsing.
the class is has added tags and thread safety to a normal dictonay class.
"""
def __init__(self, origin):
dict.__init__(self)
self.__json_data = dict(self)
self.origin = origin
self.send_tags = ["time", "depth_in_M", "temprature", "latitude",
"north_south", "longitude", "north_south", "speed",
"heading", "bias"]
self.lock = threading.RLock()
def update(self, data):
"""
Update the dictonary with new values while beeing threadsafe.
Parameters.
----------
data : TYPE
DESCRIPTION.
Returns
-------
None.
"""
with self.lock:
if type(data) is not dict:
if not isinstance(data, type(None)):
raise ValueError("%s %s" %
("data is not dict but",
type(data)))
return
for keys, values in data.items():
self.__json_data[keys] = values
def get_sensor(self, category):
"""
Parameters.
----------
category : TYPE
DESCRIPTION.
Returns
-------
TYPE
DESCRIPTION.
"""
with self.lock:
return self.__json_data[category]
def __get_value(self, value_name, sensor):
return sensor[value_name]
def get_sensor_old(self, category, t=None):
"""
Parameters.
----------
category : TYPE
DESCRIPTION.
t : TYPE, optional
DESCRIPTION. The default is None.
Returns
-------
TYPE
DESCRIPTION.
"""
with self.lock:
sens = self.get_sensor(category)
return_list = []
if isinstance(sens, dict):
for key, value in sens.items():
return_list.append("<%s_%s:%s>" % (category, key, value))
else:
return_list.append("<%s:%s>" % (category, sens))
return return_list
def get_in_old_style(self):
"""
Send the data using the previous data structure.
Returns.
-------
ret_lst : TYPE
DESCRIPTION.
"""
with self.lock:
ret_lst = []
ret_lst.append("ekkolodd")
for k in self.keys():
ret_lst.extend(self.get_sensor_old(k))
return json.dumps(ret_lst)
def __get_all(self):
"""
Return all data saved in the storage box, parsed for sending.
Returns
-------
TYPE
DESCRIPTION.
"""
ret_dict = {}
ret_dict["payload_type"] = "sensor_data"
ret_dict["payload_data"] = self.__build_dict()
return ret_dict
def get_full_string(self):
"""
Return a string verison of the fill string, parsed forsending.
Returns
-------
TYPE
DESCRIPTION.
"""
with self.lock:
return json.dumps(self.__get_all())
def get_reduced_string(self):
"""
Return a string with all data that contains the relevant tags.
Returns
-------
TYPE: String
A json string parsed as determined by the project.
"""
with self.lock:
ret_dict = {}
ret_dict["payload_type"] = "sensor_data"
ret_dict["payload_data"] = self.__build_sub_dict(self.send_tags)
return json.dumps(ret_dict)
def __get_sentence(self):
[[{"name": k, "value": v} for k, v in self.__json_data.iteritems]]
return {"payload_type": "sensor_data",
"payload_data": self.__json_data}
def keys(self):
"""
Get the keys in the storage box.
The keys consists of all the sensors saved in the system.
Returns
-------
TYPE: List
a list off all the sensors in the system..
"""
return self.__json_data.keys()
def __build_dict(self):
payload_list = []
for keys in self.keys():
sensor = self.get_sensor(keys)
payload_list.append(self.__add_sensor(keys, sensor))
return payload_list
def __build_sub_dict(self, tags):
payload_list = []
for keys in self.keys():
sensor = self.get_sensor(keys)
if isinstance(sensor, dict):
if any(tag in keys.lower() for tag in tags) or any(tag in sensor.keys() for tag in tags):
iter_sensor = sensor.copy()
for sub_keys in iter_sensor.keys():
if not any(sub_keys in tag or tag in sub_keys
for tag in tags):
del sensor[sub_keys]
payload_list.append(self.__add_sensor(keys, sensor))
else:
if any(tag in sensor or sensor in tag for tag in tags):
payload_list.append(self.__add_sensor(keys, sensor))
return payload_list
def __add_sensor(self, name, sensor):
print(name,sensor)
sensor_dict = {}
sensor_dict["name"] = name
sensor_dict["value"] = None
if isinstance(sensor, dict):
if len(sensor) > 1:
value_dict = {}
for key, value in sensor.items():
value_dict[key] = value
sensor_dict["value"] = value_dict
else:
for key,value in sensor.items():
sensor_dict["value"] = value
else:
sensor_dict["value"] = sensor
return sensor_dict
def clear(self):
"""
Clear the data from the storage box.
Returns
-------
None.
"""
self.__json_data.clear()