-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path02-chia-decorators.py
85 lines (63 loc) · 2.62 KB
/
02-chia-decorators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import aiosqlite
from pathlib import Path
from chia.simulator.simulator_full_node_rpc_client import SimulatorFullNodeRpcClient
from chia.rpc.full_node_rpc_client import FullNodeRpcClient
from chia.rpc.wallet_rpc_client import WalletRpcClient
# decorator
# https://medium.com/opex-analytics/database-connections-in-python-extensible-reusable-and-secure-56ebcf9c67fe
# https://blog.miguelgrinberg.com/post/the-ultimate-guide-to-python-decorators-part-iii-decorators-with-arguments
def with_db_connection(db):
db_path = Path(db)
def _with_db_connection(f):
async def with_connection(*args, **kwargs):
conn = await aiosqlite.connect(db_path)
try:
rv = await f(conn, *args, **kwargs)
except Exception:
await conn.rollback()
raise
else:
await conn.commit()
finally:
await conn.close()
return rv
return with_connection
return _with_db_connection
async def run_rpc(rpc_client, f, *args, **kwargs):
try:
result = await f(rpc_client, *args, **kwargs)
except Exception:
raise
finally:
rpc_client.close()
await rpc_client.await_closed()
return result
def with_full_node_rpc_client(self_hostname, rpc_port, chia_root, chia_config):
def _with_full_node_rpc_client(f):
async def with_rpc_client(*args, **kwargs):
rpc_client = await FullNodeRpcClient.create(
self_hostname, rpc_port, chia_root, chia_config
)
return await run_rpc(rpc_client, f, *args, **kwargs)
return with_rpc_client
return _with_full_node_rpc_client
def with_full_node_sim_rpc_client(self_hostname, rpc_port, chia_root, chia_config):
def _with_full_node_rpc_client(f):
async def with_rpc_client(*args, **kwargs):
rpc_client: SimulatorFullNodeRpcClient = (
await SimulatorFullNodeRpcClient.create(
self_hostname, rpc_port, chia_root, chia_config
)
)
return await run_rpc(rpc_client, f, *args, **kwargs)
return with_rpc_client
return _with_full_node_rpc_client
def with_wallet_rpc_client(self_hostname, rpc_port, chia_root, chia_config):
def _with_wallet_rpc_client(f):
async def with_rpc_client(*args, **kwargs):
rpc_client = await WalletRpcClient.create(
self_hostname, rpc_port, chia_root, chia_config
)
return await run_rpc(rpc_client, f, *args, **kwargs)
return with_rpc_client
return _with_wallet_rpc_client