-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTracker.py
368 lines (316 loc) · 14.9 KB
/
Tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
import numpy as np
import pandas as pd
# import matplotlib.pyplot as plt # for plotting price data
# from pandas.errors import EmptyDataError # error produced if empty csv if parsed
from bs4 import BeautifulSoup
import requests # fetches html content of a website, instead of urllib2 previously
from urllib.request import HTTPError # for catching timeout for website response
from urllib.request import urlopen
from urllib.request import URLError
import time # for sleep function
from datetime import datetime # for timestamp
import os # for creation of directories
import re # for regular expressions
class Item():
def __init__(self, nickname=None, description=None, url=None, asin=None, price=None, currency=None, last_updated=None, in_stock=None, created=None):
self.Nickname = nickname
self.Description = description
self.Asin = asin
self.Url = url
self.Price = price
self.Currency = currency
self.Created = created
self.Last_updated = last_updated
self.In_stock = in_stock
self.Price_log = {"timestamp": [last_updated], "price": [price]}
self.DatetimeFormatStr = "%H:%M, %m/%d/%Y"# "(%Y, %m, %d, %H, %M)" # temporary better: "%H:%M, %m/%d/%Y"
def __str__(self):
return str({
"Nickname": self.Nickname,
"Description": self.Description,
"Asin": self.Asin,
"Url": self.Url,
"Price": self.Price,
"Currency": self.Currency,
"In_stock":self.In_stock,
"Created": self.Created.strftime(self.DatetimeFormatStr),
"Last_updated": self.Last_updated.strftime(self.DatetimeFormatStr)
})
def from_txt(self, file):
with open(file, "r") as f:
class_attrs = eval(f.readline()) # eval is always dangerous! temporary
self.Price_log = eval(f.readline()) # eval is always dangerous! temporary
for index, (timestamp,price) in enumerate(zip(self.Price_log["timestamp"], self.Price_log["price"])):
self.Price_log["timestamp"][index] = datetime.strptime(timestamp, self.DatetimeFormatStr) # str neccesary because of eval()
self.Price_log["price"][index] = float(price)
self.Nickname = class_attrs["Nickname"]
self.Description = class_attrs["Description"]
self.Asin = class_attrs["Asin"]
self.Url = class_attrs["Url"]
self.Price = float(class_attrs["Price"])
self.Currency = class_attrs["Currency"]
self.Created = datetime.strptime(str(class_attrs["Created"]), self.DatetimeFormatStr) # str neccesary because of eval()
self.Last_updated = datetime.strptime(str(class_attrs["Last_updated"]), self.DatetimeFormatStr) # str neccesary because of eval()
def __reformat_date(self, date):
return datetime.strftime(date, self.DatetimeFormatStr)
def to_txt(self, path="./"):
with open(path + self.Nickname + ".txt", "w") as f:
f.write(self.__str__() + "\n")
price_log = self.Price_log.copy()
price_log["timestamp"] = list(map(self.__reformat_date, price_log["timestamp"]))
f.write(str(price_log)) # temporary solution
class Scraper():
def __init__(self):
self.Online = False
def webpage2soup(self, url, parser="lxml"):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36',
}
res = requests.get(url, headers=headers)
res.raise_for_status()
soup = BeautifulSoup(res.text, parser)
return soup
def test_connection(self, url='http://216.58.192.142'):
try:
urlopen(url, timeout=1)
self.Online = True
except URLError as err:
self.Online = False
return self.Online
def ping_AmazonDE(self):
return test_connection("amazon.de")
class Parser():
def __init__(self):
self.Template_Amazon_Url = r"(https://)*(www.)*([a-z_-]+)\.([a-z]+)/([a-z0-9-_]+)/([a-z0-9-_]+)/([a-z0-9-_]+)" # Amazon regex
self.Template_Amazon_Description = r"(<span\s(class=\"a-size-large\"\s)*(id=\"productTitle\")(\sclass=\"a-size-large\")*>\n\s+(.+)\n\s+</span>)"
self.Template_Amazon_Price = r"([0-9,]+)\s(.)"
def __groupbytemplate(self, string, re_template):
regex = re.compile(re_template)
m = regex.search(string)
return m.groups()
def find_attributes(self, html):
attributes = {"description": "",
"currency": "",
"price": ""}
# find product description
description = self.find_description(html)
attributes["description"] = description
# find price and currency
price, currency = self.find_price(html)
attributes["price"] = float(price)
attributes["currency"] = currency
return attributes
def parse_url(self,url):
regex = re.compile(self.Template_Amazon_Url)
m = regex.search(url.lower())
url_slices = m.groups()
topleveldomain = "." + url_slices[3]
if url_slices[3] == "de":
if url_slices[4] == "gp":
asin = url_slices[6]
else:
asin = url_slices[5]
elif url_slices[3] == "com":
asin = url_slices[6]
else:
pass # so far only .com and .de supported
return asin, topleveldomain
def find_price(self, html):
price_str = str(html.select("span#priceblock_ourprice"))
groups = self.__groupbytemplate(price_str, self.Template_Amazon_Price)
price = groups[0].replace(",", ".")
currency = groups[1]
return price, currency
def find_description(self, html):
title_str = "" # why do I have to reference this var before?
for element in html.find_all("span"):
if "productTitle" in str(element):
title_str = str(element)
break
groups = self.__groupbytemplate(title_str, self.Template_Amazon_Description)
description = groups[4]
return description
class Notifier():
def __init__(self, path="./", logfile="events"):
self.Last_event = np.array(["timestamp", "event", "status"]) # event + timestamp
open(path + logfile + ".log", "a")
self.Log_path = path
self.Logfile_name = logfile
self.Log = ""
def prompt(self, event="", end_char=" ", kind="event", status="ongoing"):
if kind == "event":
timestamp = datetime.now()
print(str(timestamp) + " -- " + event, end=end_char)
self.Last_event = np.array([timestamp, event, status])
if kind == "response":
print(event + "!")
timestamp = datetime.now()
self.Last_event[0] = timestamp
self.Last_event[2] = event
return timestamp, event
def log(self, event="", end_char=" ", kind="event", status="ongoing"):
timestamp, event = self.prompt(event, end_char, kind, status)
self.Log = self.Log + str(timestamp) + " -- " + event + end_char
with open(self.Log_path + self.Logfile_name + ".log", "a") as f:
if kind == "event":
f.write(str(timestamp) + " -- " + event + end_char)
if kind == "response":
f.write(" " + event + "!\n")
def send_email(self):
pass
class Tracker(Item, Scraper, Notifier, Parser):
def __init__(self, name="default_tracker", path="./", load=False):
self.Path = path + name + "/"
self.Name = name
self.Items = []
Scraper.__init__(self)
Parser.__init__(self)
if load:
Notifier.__init__(self, self.Path)
self.load(self.Path)
else:
try:
os.mkdir(self.Path)
Notifier.__init__(self, self.Path)
self.log(self.Name + " created.", end_char="\n", status="success")
except FileExistsError:
response = input("A tracker with this name already exists, do you want to load it? [Yes/No]: ")
if response.lower()[0] == "y":
Notifier.__init__(self, self.Path)
self.load(self.Path)
else:
self.log(self.Name + " initialised as blank.", end_char="\n", status="success")
Notifier.__init__(self, self.Path)
def __asin(self, item):
return item.Asin
def add_item(self, nickname=None, description=None, url=None, asin=None, price=None, currency=None, last_updated=None, in_stock=None, created=None, save=False):
if asin.lower() not in list(map(self.__asin,tracker.Items)):
self.log("Adding " + nickname + "to list of tracked items...")
item = Item(nickname, description, url, asin, price, currency, last_updated, in_stock, created)
self.Items.append(item)
self.log("success", kind="response")
if save:
self.log("Saving " + nickname + "...")
item.to_txt(self.Path)
self.log("success", kind="response")
else:
self.log("ASIN matches an item that is already being tracḱed.", end_char="\n")
def add_item_by_url(self, alias, url, save=False):
self.log("Parsing " + url + "...")
asin, _ = self.parse_url(url)
self.log("success", kind="response")
if asin not in list(map(self.__asin,self.Items)):
self.log("Fetching webpage for " + alias + "...")
html = self.webpage2soup(url)
self.log("success", kind="response")
self.log("Fetching attributes for " + alias + "...")
attributes = self.find_attributes(html)
self.log("success", kind="response")
nickname = alias
description = attributes["description"]
price = attributes["price"]
currency = attributes["currency"]
created = datetime.now()
in_stock = None # for now will be set to None. Full mechanic not implemented yet
self.add_item(nickname, description, url, asin, price, currency, created, in_stock, created, save)
else:
self.log("ASIN matches an item that is already being tracḱed.", end_char="\n")
def add_items_via_input(self):
while True:
try:
url = input("input url for an item to be added for tracking: ")
if url == "":
break
nickname = input("input a name for the item: ")
self.add_item_by_url(nickname, url)
ipt = input("Do you want to add another item to the list? [Yes/No]: ")
if "n" in ipt.lower():
break
except:
print("Something seems to have gone wrong. Maybe the url was fautly. Please retry.")
def list_items(self):
for item in self.Items:
print(item.Nickname)
def fetch_price(self, Item):
html = self.webpage2soup(Item.Url)
self.log("Fetching price and currency for " + Item.Nickname + "...")
price, currency = self.find_price(html)
self.log("success", kind="response")
return price, currency
def update_prices(self, timeb4nextfetch=0):
now = datetime.now()
if len(self.Items) > 0:
for Item in self.Items:
try:
price, _ = self.fetch_price(Item)
Item.Price = price
except:
Item.Price = np.nan
self.log("failed", kind="response")
Item.Last_updated = now
Item.Price_log["timestamp"].append(now)
Item.Price_log["price"].append(price)
time.sleep(timeb4nextfetch)
else:
self.log("No items are being tracked. Add an item first.", end_char="\n")
def deploy(self):
self.log(self.Name + " has been deployed.", end_char="\n")
while(True):
self.log("Pinging Amazon.de...")
if self.test_connection(url="https://amazon.de"):
self.log("success", kind="response")
self.update_prices(5)
self.log("All prices have been updated.", status="success", end_char="\n")
self.save()
self.history_to_csv(True)
self.log("Waiting 12 hours for next update...", end_char="\n")
time.sleep(60*60*12)
else:
self.log("failed", kind="response")
self.log("Waiting 10min before trying again...")
time.sleep(60*10)
self.log("finished waiting", kind="response")
def load(self, path):
self.Path = path
regex = re.compile(r"/([a-zA-Z0-9-_]+)/$")
m = regex.search(path)
self.Name = m.groups()[0]
files_in_dir = [f for f in os.listdir(self.Path) if os.path.isfile(os.path.join(self.Path, f))]
for file in files_in_dir:
if file[-4:] == ".txt":
item = Item()
self.log("Importing "+ file[:-4] + "...")
item.from_txt(self.Path + file)
if item not in self.Items:
self.Items.append(item)
self.log("success", kind="response")
else:
self.log("was already imported", kind="response")
self.log("Loading logfile...")
with open(self.Log_path + self.Logfile_name + ".log") as f:
self.Log = f.read()
self.log("success", kind="response")
def save(self):
self.log("Saving current state...")
for item in self.Items:
item.to_txt(self.Path)
self.log("success", kind="response")
def __reformat_date(self, date):
return datetime.strftime(date, Item().DatetimeFormatStr)
def history_to_csv(self, save=False):
df = pd.DataFrame({})
self.log("Creating .csv from archived prices...")
for item in self.Items:
dct = {item.Nickname: []}
timestamps, prices = item.Price_log.values()
for timestamp, price in zip(timestamps, prices):
dct[item.Nickname].append(price)
df_item = pd.DataFrame(dct, index=[list(map(self.__reformat_date, timestamps))])
df = pd.concat([df, df_item], axis=1)
df.index.name = "timestamp"
self.log("success", kind="response")
if save:
self.log("Saving price history to .csv...")
df.to_csv(self.Path + "price_hist.csv")
self.log("success", kind="response")
return df