-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.py
233 lines (200 loc) · 8.62 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
## Client to run the app
# UPA Project 1
# Author: Vojtech Kronika <xkroni01>, Vojtech Giesl <xgiesl00>, Vojtech Fiala <xfiala61>
import argparse
from mongo import *
from datetime import timedelta, datetime
from dateutil import tz
from getData import Downloader
import traceback
from xml_parser import trainStopsInStation, parse_xml_dir
collection_trains = None
collection_stations = None
def setup_db():
global collection_trains
global collection_stations
db = get_database()
collection_trains = db["trains"]
collection_stations = db["stations"]
def find_common(odkud, kam):
# spolecne vlaky pro odkud a kam
# https://stackoverflow.com/a/42302818/13279982
tmp = [
{ "$match": { "_id": { "$in": [ odkud, kam ] } } }, # find matching records
{ "$group": { "_id": 0, "first": { "$first": "$pa" }, "second": { "$last": "$pa" } } }, # remove duplicates
{ "$project": { "cores": { "$setIntersection": [ "$first", "$second" ] }, "_id": 0 } } # create a result
]
result = collection_stations.aggregate(tmp)
try:
result = list(result)[0]['cores']
except:
result = []
return result
def find_similar(station):
query = {"_id": { "$regex": "^"+station, "$options": "si" } } # find matching records
try:
result = collection_stations.find_one(query)["_id"]
except:
result = []
return result
def exist_changed_plan(train, from_station, to_station, dt)-> dict:
...
def get_route(trains:list, from_station, to_station, dt:datetime):
hour = dt.hour
minute = dt.minute
time_int = int(hour)*100 + minute
aggregate_query = [
{'$match': { 'CZPTTCISMessage.Identifiers.PlannedTransportIdentifiers.0.Core': {"$in": trains}}},
{'$match':
{'$and': [{ 'CZPTTCISMessage.CZPTTInformation.CZPTTLocation.Location.PrimaryLocationName': f'{from_station}'},
{'CZPTTCISMessage.CZPTTInformation.CZPTTLocation.TimingAtLocation.Timing.time_int': {'$gte': time_int}}]}},
]
query_result = collection_trains.aggregate(aggregate_query)
min_value = 3000
min_result = None
for result in query_result:
CZPTTLocation = result["CZPTTCISMessage"]["CZPTTInformation"]["CZPTTLocation"]
calendar = result["CZPTTCISMessage"]["CZPTTInformation"]["PlannedCalendar"]
day_bitmap = calendar["BitmapDays"]
date_start = calendar["ValidityPeriod"]["StartDateTime"] + "-00:00"
d1 = datetime.fromisoformat(date_start)
index = (dt - d1).days
if index > len(day_bitmap) or index < 0: # out of bounds
continue
try:
bitDay = day_bitmap[index]
if bitDay == "0": # if its cancelled that day, try another one
continue
# If the bit day is not set or something is broken in some other way, continue
except:
continue
from_time = 0
to_time = 2500
for location in CZPTTLocation:
station = location["Location"]["PrimaryLocationName"]
if station == to_station:
if not trainStopsInStation(location):
break
try:
to_time = location["TimingAtLocation"]["Timing"][0]['time_int']
except:
to_time = location["TimingAtLocation"]["Timing"]['time_int']
if station == from_station:
if not trainStopsInStation(location):
break
try:
from_time = location["TimingAtLocation"]["Timing"][0]['time_int']
except:
from_time = location["TimingAtLocation"]["Timing"]['time_int']
timings = location["TimingAtLocation"]["Timing"]
if type(timings) is not list:
timings = [timings]
for timing in timings:
if timing["@TimingQualifierCode"] == "ALD":
if min_value > timing["time_int"] and timing["time_int"] >= time_int and from_time < to_time:
min_value = timing["time_int"]
min_result = result["CZPTTCISMessage"]
if station == to_station: # the train found
break
return min_result
def print_route(CZPTTCISMessage:dict, from_station, to_station):
print(f"Route from {from_station} to {to_station} ")
print("-------------------------------")
if CZPTTCISMessage is None:
print("No route in selected day")
return
CZPTTLocation = CZPTTCISMessage["CZPTTInformation"]["CZPTTLocation"]
print_out = False
for location in CZPTTLocation:
station = location["Location"]["PrimaryLocationName"]
if station == from_station:
print_out = True
if print_out:
if trainStopsInStation(location):
timings = location["TimingAtLocation"]["Timing"]
if type(timings) is not list:
timings = [timings]
for timing in timings:
if timing["@TimingQualifierCode"] == "ALD":
print(f'{timing["Time"][:8]} (GMT{timing["Time"][-6:]}) - {station}')
else: # when the train doesnt continue, it only has an arrival time
if station == to_station:
print(f'{timing["Time"][:8]} (GMT{timing["Time"][-6:]}) - {station}')
break
if station == to_station:
break
print("-------------------------------")
def iso_converter(day, month, year, time):
current_date = datetime.now()
if year == None:
year = current_date.year
if month == None:
month = current_date.month
if day == None:
day = current_date.day
if time == None:
tmp = str(current_date)
time = tmp[11:-10]
hour = time[:-3]
min = time[3:]
zone = tz.gettz('Europe / Berlin')
date = datetime(year=int(year),month=int(month),day=int(day),hour=int(hour),minute=int(min),tzinfo=zone)
return date
if __name__ == '__main__':
setup_db()
# help, download (v, --unzip), xml parser, from, to, day, time
parser = argparse.ArgumentParser(prog='CeskeDrahyFinder')
subs = parser.add_subparsers()
download_parser = subs.add_parser('download')
download_parser.add_argument('-u', help='unzip downloaded files. Files must be downloaded before this command.', action='store_true')
download_parser.add_argument('-v', help='verbose mode', action='store_true')
client_parser = subs.add_parser('client')
client_parser.add_argument('--day', help='day of departure')
client_parser.add_argument('--month', help='month of departure')
client_parser.add_argument('--year', help='year of departure')
client_parser.add_argument('--time', help='departure time, time format HH:MM')
client_parser.add_argument('--from', help='which station you depart from')
client_parser.add_argument('--to', help='your destination station')
xml_parser = subs.add_parser('parser')
xml_parser.add_argument('--path', help='path to directory with xml files')
args = vars(parser.parse_args())
if(len(args) == 6):
# client mode
try:
date = iso_converter(args["day"], args["month"], args["year"], args["time"])
from_station = args["from"]
to_station = args["to"]
trains = find_common(from_station, to_station)
try:
print(date.strftime('Departure at %d. %b %Y Time: %H:%M'))
dt = datetime.fromisoformat(date.isoformat() + "+00:00")
route = get_route(trains, from_station, to_station, dt)
print_route(route, from_station, to_station)
except Exception as e:
traceback.print_exc()
except:
parser.print_help()
if(len(args)== 2):
# downloader mode
try:
verbose = args['v']
downloader = Downloader(verbose)
if args["u"] == True:
downloader.unzipFolders()
print("files unzipped successfully")
else:
downloader.getFiles()
print("files download successfully")
except:
parser.print_help()
if(len(args)== 1):
# xml parser mode
try:
path = args["path"]
if path == None:
parse_xml_dir(collection_trains, collection_stations)
else:
parse_xml_dir(collection_trains, collection_stations,path)
print("XMLS parsed successfully")
except:
parser.print_help()