forked from harpalnain/ZerodhaAtom
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathStockDataLogger.py
executable file
·129 lines (105 loc) · 4.28 KB
/
StockDataLogger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Apr 15 23:49:28 2020
@author: harpal
"""
import os
import threading
import datetime
import queue
import time
import pandas as pd
class TickLogger():
def __init__(self, csv_file_name, tick):
#No Need to save following
del tick['symbol']
del tick['exchange']
if 'bid_table' in tick:
tick['bid_table'] = str(tick['bid_table'].to_dict())
if 'offer_table' in tick:
tick['offer_table'] = str(tick['offer_table'].to_dict())
self.csv_file_name = csv_file_name
self.pd = pd.DataFrame()
self.pd = self.pd.append(tick, ignore_index=True)
#If Log file not created then only create log file
if not os.path.exists(csv_file_name):
self.pd.to_csv(self.csv_file_name, index = False)
self.pd = self.pd[0:0]
def append(self,tick):
del tick['symbol']
del tick['exchange']
if 'bid_table' in tick:
tick['bid_table'] = str(tick['bid_table'].to_dict())
if 'offer_table' in tick:
tick['offer_table'] = str(tick['offer_table'].to_dict())
self.pd = self.pd.append(tick, ignore_index=True)
def save(self):
self.pd.to_csv(self.csv_file_name, mode='a', header=False, index = False)
self.pd = self.pd[0:0]
class StockLogger(threading.Thread):
def __init__(self, base_path='/home/harpal/Desktop/StockHistorical/data', chunk_size = 10):
# Call the Thread class's init function
threading.Thread.__init__(self)
self.date = date = datetime.datetime.now().date()
self.base_path = base_path
self.ticks_queue = queue.Queue(500)
if base_path:
self.base_path = base_path
self.__logger_dict = {'NSE':{},'BSE':{}}
self.stop_flag = False
self.chunk_size = chunk_size
self.count = 0
self.last_ticks={}
#self.cur_time = None
def __log_into_panda(self,ticks):
self.count += 1
#print(self.count)
for tick in ticks:
#if self.cur_time == tick['timestamp']:
#continue
#self.cur_time = ticks['timestamp']
symbol = tick['symbol']
exchange = tick['exchange']
# Don't save too much data
#last_tick = self.last_ticks.get(exchange+symbol)
#if last_tick:
# Don't save more then one tick per secound
#if last_tick['timestamp'] == tick['timestamp'] :
#continue
# Don't save repeating price tick
#if last_tick['ltp'] == tick['ltp'] :
#continue
#self.last_ticks[exchange+symbol] = tick
if self.__logger_dict[exchange].get(symbol,None):
self.__logger_dict[exchange][symbol].append(tick)
else:
log_path = self.base_path+'/'+exchange+'/'+symbol
if not os.path.exists(log_path):
os.makedirs(log_path)
log_file = log_path +'/'+str(self.date)+'.csv'
self.__logger_dict[exchange][symbol] = TickLogger(csv_file_name = log_file, tick=tick)
def save_to_files(self):
for exchange in self.__logger_dict:
exchange_temp = self.__logger_dict[exchange]
for tick_logger in exchange_temp:
exchange_temp[tick_logger].save()
def stop(self):
self.stop_flag = True
def log_ticks(self,ticks):
if not self.ticks_queue.full():
self.ticks_queue.put(ticks)
else:
print('Hs Logger Queue is Full')
def run(self):
while True:
while not self.ticks_queue.empty():
ticks = self.ticks_queue.get()
self.__log_into_panda(ticks)
if self.count >= self.chunk_size:
self.count = 0
self.save_to_files()
if self.stop_flag:
self.save_to_files()
break
time.sleep(10)