-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathduneservice.py
137 lines (110 loc) · 4.42 KB
/
duneservice.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from requests import get, post
from web3 import Web3
import time
from datamodels import APIConfig
BASE_URL = "https://api.dune.com/api/v1/"
QUERIES = {
"erc1155-single-transfer": "1532614"
}
class DuneService:
def __init__(self, api_config: APIConfig, user_config, silent=False):
self.api_key = api_config.dune_api_key
self.w3 = Web3(Web3.HTTPProvider(api_config.alchemy_polygon_url))
self.contract = user_config['contract_address']
#self.min_transfers = user_config['min_transfers']
def get_headers(self):
return {"x-dune-api-key" : self.api_key}
def gen_query_params(self):
# end_block = get_latest_block_num()
# start_block = end_block - 10000
# end_block = 15575300
# start_block = 13276755
# Hardcoded for a few days, need to pass params from front end
params={
"contract": self.contract,
"start_block": "260000",
"end_block": "35284517",
"min_transfers": "0"
}
return params
def get_latest_block_num(self):
return self.w3.eth.block_number
def make_api_url(self, module, action, ID):
"""
We shall use this function to generate a URL to call the API.
"""
url = BASE_URL + module + "/" + ID + "/" + action
return url
def execute_query(self, query_id, params=None):
"""
Takes in the query ID.
Calls the API to execute the query.
Returns the execution ID of the instance which is executing the query.
"""
url = self.make_api_url("query", "execute", query_id)
headers = self.get_headers()
if params is None:
response = post(url, headers=headers)
else:
response = post(url, headers=headers, json={"query_parameters": params})
if 'execution_id' in response.json():
return response.json()['execution_id']
else:
raise response.json()['error']
def get_query_status(self, execution_id):
"""
Takes in an execution ID.
Fetches the status of query execution using the API
Returns the status response object
"""
url = self.make_api_url("execution", "status", execution_id)
response = get(url, headers=self.get_headers())
return response
def get_query_results(self, execution_id):
"""
Takes in an execution ID.
Fetches the results returned from the query using the API
Returns the results response object
"""
url = self.make_api_url("execution", "results", execution_id)
response = get(url, headers=self.get_headers())
return response
def cancel_query_execution(self, execution_id):
"""
Takes in an execution ID.
Cancels the ongoing execution of the query.
Returns the response object.
"""
url = self.make_api_url("execution", "cancel", execution_id)
response = get(url, headers=HEADER)
return response
def post_process_query_result(self, response, start_block, end_block):
rows = response['result']['rows']
if len(rows) == 0:
return {}
# Just take the first 1 event if there are multiple
return {
'start_block': start_block,
'end_block': end_block,
'wallet': response['result']['rows'][0]['from'],
'transfers': response['result']['rows'][0]['transfers']
}
def run_query_loop(self):
query_id = QUERIES["erc1155-single-transfer"]
query_params = self.gen_query_params()
print(query_params, 'q')
execution_id = self.execute_query(query_id, params=query_params)
response = self.get_query_status(execution_id).json()
while response['state'] != 'QUERY_STATE_COMPLETED' and response['state'] != 'QUERY_STATE_FAILED':
print('query {}, sleeping 5s...'.format(response['state']))
time.sleep(5.0)
response = self.get_query_status(execution_id).json()
response = self.get_query_results(execution_id).json()
print("FULL RESPONSE")
print(response)
if (response['state'] == 'QUERY_STATE_FAILED'):
print("Dune query failed!")
print(response)
return "failed"
else:
return self.post_process_query_result(response, query_params['start_block'], query_params['end_block'])