Skip to content

Commit

Permalink
Make searcher concurrent.
Browse files Browse the repository at this point in the history
  • Loading branch information
dowlandaiello committed May 21, 2024
1 parent bb63dfc commit 3c6e551
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 127 deletions.
3 changes: 3 additions & 0 deletions shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

let
packageOverrides = pkgs.callPackage ./python-packages.nix {};
skipCheckTests = drv: drv.overridePythonAttrs (old: { doCheck = false; });
python = pkgs.python3.override { inherit packageOverrides; };
pythonWithPackages = python.withPackages (ps: with ps; [
cosmpy
schedule
python-dotenv
(skipCheckTests aiohttp)
(skipCheckTests aiodns)
]);
in
pkgs.mkShell {
Expand Down
132 changes: 80 additions & 52 deletions src/strategies/naive.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
of hops using all available providers.
"""

import asyncio
import random
import threading
from queue import Queue
Expand Down Expand Up @@ -72,18 +73,16 @@ def __load_poolfile(
with open(ctx.cli_args["pool_file"], "r", encoding="utf-8") as f:
poolfile_cts = json.load(f)

def poolfile_ent_to_leg(
ent: dict[str, Any]
) -> Leg:
def poolfile_ent_to_leg(ent: dict[str, Any]) -> Leg:
backend: Optional[Union[PoolProvider, AuctionProvider]] = None

if "osmosis" in ent:
backend = OsmosisPoolProvider(
endpoints["osmosis"],
ent["osmosis"]["address"],
ent["osmosis"]["pool_id"],
(ent["osmosis"]["asset_a"], ent["osmosis"]["asset_b"]),
)
endpoints["osmosis"],
ent["osmosis"]["address"],
ent["osmosis"]["pool_id"],
(ent["osmosis"]["asset_a"], ent["osmosis"]["asset_b"]),
)

if "neutron_astroport" in ent:
backend = NeutronAstroportPoolProvider(
Expand Down Expand Up @@ -113,9 +112,17 @@ def poolfile_ent_to_leg(

if backend:
return Leg(
backend.asset_a if ent["in_asset"] == backend.asset_a() else backend.asset_b,
backend.asset_a if ent["out_asset"] == backend.asset_a() else backend.asset_b,
backend
(
backend.asset_a
if ent["in_asset"] == backend.asset_a()
else backend.asset_b
),
(
backend.asset_a
if ent["out_asset"] == backend.asset_a()
else backend.asset_b
),
backend,
)

raise ValueError("Invalid route leg type.")
Expand Down Expand Up @@ -148,7 +155,7 @@ def poll(
endpoints: dict[str, dict[str, list[str]]] = {
"neutron": {
"http": ["https://neutron-rest.publicnode.com"],
"grpc": ["grpc+https://neutron-grpc.publicnode.com:443"]
"grpc": ["grpc+https://neutron-grpc.publicnode.com:443"],
},
"osmosis": {
"http": ["https://lcd.osmosis.zone"],
Expand Down Expand Up @@ -186,9 +193,7 @@ def poll(
)
self.last_discovered = datetime.now()

def dump_leg(
leg: Leg
) -> dict[str, Any]:
def dump_leg(leg: Leg) -> dict[str, Any]:
if isinstance(leg.backend, AuctionProvider):
return {
"auction": {
Expand Down Expand Up @@ -392,8 +397,12 @@ def exec_arb(
prev_asset_info: Optional[DenomChainInfo] = None

if prev_leg:
prev_asset_info = denom_info_on_chain(
prev_leg.backend.chain_id, prev_leg.out_asset(), leg.backend.chain_id
prev_asset_info = asyncio.run(
denom_info_on_chain(
prev_leg.backend.chain_id,
prev_leg.out_asset(),
leg.backend.chain_id,
)
)

to_swap = (
Expand All @@ -402,7 +411,9 @@ def exec_arb(
else try_multiple_clients_fatal(
ctx.clients[prev_leg.backend.chain_id.split("-")[0]],
lambda client: client.query_bank_balance(
Address(ctx.wallet.public_key(), prefix=prev_leg.backend.chain_prefix),
Address(
ctx.wallet.public_key(), prefix=prev_leg.backend.chain_prefix
),
prev_leg.out_asset(),
),
)
Expand Down Expand Up @@ -484,7 +495,9 @@ def exec_arb(
)

if isinstance(leg.backend, NeutronAstroportPoolProvider):
logger.info("Submitting arb to contract: %s", leg.backend.contract_info.address)
logger.info(
"Submitting arb to contract: %s", leg.backend.contract_info.address
)

if isinstance(leg.backend, OsmosisPoolProvider):
logger.info("Submitting arb to pool: %d", leg.backend.pool_id)
Expand All @@ -503,7 +516,10 @@ def exec_arb(
to_receive,
).wait_to_complete()

if isinstance(leg.backend, AuctionProvider) and leg.in_asset == leg.backend.asset_a:
if (
isinstance(leg.backend, AuctionProvider)
and leg.in_asset == leg.backend.asset_a
):
tx = leg.backend.swap_asset_a(ctx.wallet, to_swap).wait_to_complete()

if tx:
Expand Down Expand Up @@ -532,10 +548,12 @@ def transfer(
succeeded.
"""

denom_info = denom_info_on_chain(
src_chain=prev_leg.backend.chain_id,
src_denom=denom,
dest_chain=leg.backend.chain_id,
denom_info = asyncio.run(
denom_info_on_chain(
src_chain=prev_leg.backend.chain_id,
src_denom=denom,
dest_chain=leg.backend.chain_id,
)
)

if not denom_info:
Expand Down Expand Up @@ -689,7 +707,7 @@ def listen_routes_with_depth_dfs(
) -> None:
denom_cache: dict[str, dict[str, str]] = {}

def next_legs(path: list[Leg]) -> None:
async def next_legs(path: list[Leg]) -> None:
nonlocal denom_cache
nonlocal routes

Expand Down Expand Up @@ -739,7 +757,7 @@ def next_legs(path: list[Leg]) -> None:
if not end in denom_cache:
denom_cache[end] = {
info.chain_id: info.denom
for info in denom_info(prev_pool.backend.chain_id, end)
for info in await denom_info(prev_pool.backend.chain_id, end)
+ [
DenomChainInfo(
denom=end,
Expand Down Expand Up @@ -772,39 +790,49 @@ def next_legs(path: list[Leg]) -> None:
]
random.shuffle(next_pools)

for pool in next_pools:
next_legs(
path
+ [
Leg(
(
pool.asset_a
if pool.asset_a() in denom_cache[end].values()
else pool.asset_b
),
(
pool.asset_b
if pool.asset_a() in denom_cache[end].values()
else pool.asset_a
),
pool,
)
]
)
await asyncio.gather(
*[
next_legs(
path
+ [
Leg(
(
pool.asset_a
if pool.asset_a() in denom_cache[end].values()
else pool.asset_b
),
(
pool.asset_b
if pool.asset_a() in denom_cache[end].values()
else pool.asset_a
),
pool,
)
]
)
for pool in next_pools
]
)

start_pools: list[Union[AuctionProvider, PoolProvider]] = [
*auctions.get(src, {}).values(),
*(pool for pool_set in pools.get(src, {}).values() for pool in pool_set),
]
random.shuffle(start_pools)

for pool in start_pools:
next_legs(
[
Leg(
pool.asset_a if pool.asset_a() == src else pool.asset_b,
pool.asset_b if pool.asset_a() == src else pool.asset_a,
pool,
asyncio.run(
asyncio.gather(
*[
next_legs(
[
Leg(
pool.asset_a if pool.asset_a() == src else pool.asset_b,
pool.asset_b if pool.asset_a() == src else pool.asset_a,
pool,
)
]
)
for pool in start_pools
]
)
)
Loading

0 comments on commit 3c6e551

Please sign in to comment.