diff --git a/shell.nix b/shell.nix index 64c0ac3c2..f7a7fa20a 100644 --- a/shell.nix +++ b/shell.nix @@ -2,11 +2,14 @@ let packageOverrides = pkgs.callPackage ./python-packages.nix {}; + skipCheckTests = drv: drv.overridePythonAttrs (old: { doCheck = false; }); python = pkgs.python3.override { inherit packageOverrides; }; pythonWithPackages = python.withPackages (ps: with ps; [ cosmpy schedule python-dotenv + (skipCheckTests aiohttp) + (skipCheckTests aiodns) ]); in pkgs.mkShell { diff --git a/src/strategies/naive.py b/src/strategies/naive.py index 537cb8589..488f546c2 100644 --- a/src/strategies/naive.py +++ b/src/strategies/naive.py @@ -3,6 +3,7 @@ of hops using all available providers. """ +import asyncio import random import threading from queue import Queue @@ -72,18 +73,16 @@ def __load_poolfile( with open(ctx.cli_args["pool_file"], "r", encoding="utf-8") as f: poolfile_cts = json.load(f) - def poolfile_ent_to_leg( - ent: dict[str, Any] - ) -> Leg: + def poolfile_ent_to_leg(ent: dict[str, Any]) -> Leg: backend: Optional[Union[PoolProvider, AuctionProvider]] = None if "osmosis" in ent: backend = OsmosisPoolProvider( - endpoints["osmosis"], - ent["osmosis"]["address"], - ent["osmosis"]["pool_id"], - (ent["osmosis"]["asset_a"], ent["osmosis"]["asset_b"]), - ) + endpoints["osmosis"], + ent["osmosis"]["address"], + ent["osmosis"]["pool_id"], + (ent["osmosis"]["asset_a"], ent["osmosis"]["asset_b"]), + ) if "neutron_astroport" in ent: backend = NeutronAstroportPoolProvider( @@ -113,9 +112,17 @@ def poolfile_ent_to_leg( if backend: return Leg( - backend.asset_a if ent["in_asset"] == backend.asset_a() else backend.asset_b, - backend.asset_a if ent["out_asset"] == backend.asset_a() else backend.asset_b, - backend + ( + backend.asset_a + if ent["in_asset"] == backend.asset_a() + else backend.asset_b + ), + ( + backend.asset_a + if ent["out_asset"] == backend.asset_a() + else backend.asset_b + ), + backend, ) raise ValueError("Invalid route leg type.") @@ -148,7 +155,7 @@ def poll( endpoints: dict[str, dict[str, list[str]]] = { "neutron": { "http": ["https://neutron-rest.publicnode.com"], - "grpc": ["grpc+https://neutron-grpc.publicnode.com:443"] + "grpc": ["grpc+https://neutron-grpc.publicnode.com:443"], }, "osmosis": { "http": ["https://lcd.osmosis.zone"], @@ -186,9 +193,7 @@ def poll( ) self.last_discovered = datetime.now() - def dump_leg( - leg: Leg - ) -> dict[str, Any]: + def dump_leg(leg: Leg) -> dict[str, Any]: if isinstance(leg.backend, AuctionProvider): return { "auction": { @@ -392,8 +397,12 @@ def exec_arb( prev_asset_info: Optional[DenomChainInfo] = None if prev_leg: - prev_asset_info = denom_info_on_chain( - prev_leg.backend.chain_id, prev_leg.out_asset(), leg.backend.chain_id + prev_asset_info = asyncio.run( + denom_info_on_chain( + prev_leg.backend.chain_id, + prev_leg.out_asset(), + leg.backend.chain_id, + ) ) to_swap = ( @@ -402,7 +411,9 @@ def exec_arb( else try_multiple_clients_fatal( ctx.clients[prev_leg.backend.chain_id.split("-")[0]], lambda client: client.query_bank_balance( - Address(ctx.wallet.public_key(), prefix=prev_leg.backend.chain_prefix), + Address( + ctx.wallet.public_key(), prefix=prev_leg.backend.chain_prefix + ), prev_leg.out_asset(), ), ) @@ -484,7 +495,9 @@ def exec_arb( ) if isinstance(leg.backend, NeutronAstroportPoolProvider): - logger.info("Submitting arb to contract: %s", leg.backend.contract_info.address) + logger.info( + "Submitting arb to contract: %s", leg.backend.contract_info.address + ) if isinstance(leg.backend, OsmosisPoolProvider): logger.info("Submitting arb to pool: %d", leg.backend.pool_id) @@ -503,7 +516,10 @@ def exec_arb( to_receive, ).wait_to_complete() - if isinstance(leg.backend, AuctionProvider) and leg.in_asset == leg.backend.asset_a: + if ( + isinstance(leg.backend, AuctionProvider) + and leg.in_asset == leg.backend.asset_a + ): tx = leg.backend.swap_asset_a(ctx.wallet, to_swap).wait_to_complete() if tx: @@ -532,10 +548,12 @@ def transfer( succeeded. """ - denom_info = denom_info_on_chain( - src_chain=prev_leg.backend.chain_id, - src_denom=denom, - dest_chain=leg.backend.chain_id, + denom_info = asyncio.run( + denom_info_on_chain( + src_chain=prev_leg.backend.chain_id, + src_denom=denom, + dest_chain=leg.backend.chain_id, + ) ) if not denom_info: @@ -689,7 +707,7 @@ def listen_routes_with_depth_dfs( ) -> None: denom_cache: dict[str, dict[str, str]] = {} - def next_legs(path: list[Leg]) -> None: + async def next_legs(path: list[Leg]) -> None: nonlocal denom_cache nonlocal routes @@ -739,7 +757,7 @@ def next_legs(path: list[Leg]) -> None: if not end in denom_cache: denom_cache[end] = { info.chain_id: info.denom - for info in denom_info(prev_pool.backend.chain_id, end) + for info in await denom_info(prev_pool.backend.chain_id, end) + [ DenomChainInfo( denom=end, @@ -772,25 +790,29 @@ def next_legs(path: list[Leg]) -> None: ] random.shuffle(next_pools) - for pool in next_pools: - next_legs( - path - + [ - Leg( - ( - pool.asset_a - if pool.asset_a() in denom_cache[end].values() - else pool.asset_b - ), - ( - pool.asset_b - if pool.asset_a() in denom_cache[end].values() - else pool.asset_a - ), - pool, - ) - ] - ) + await asyncio.gather( + *[ + next_legs( + path + + [ + Leg( + ( + pool.asset_a + if pool.asset_a() in denom_cache[end].values() + else pool.asset_b + ), + ( + pool.asset_b + if pool.asset_a() in denom_cache[end].values() + else pool.asset_a + ), + pool, + ) + ] + ) + for pool in next_pools + ] + ) start_pools: list[Union[AuctionProvider, PoolProvider]] = [ *auctions.get(src, {}).values(), @@ -798,13 +820,19 @@ def next_legs(path: list[Leg]) -> None: ] random.shuffle(start_pools) - for pool in start_pools: - next_legs( - [ - Leg( - pool.asset_a if pool.asset_a() == src else pool.asset_b, - pool.asset_b if pool.asset_a() == src else pool.asset_a, - pool, + asyncio.run( + asyncio.gather( + *[ + next_legs( + [ + Leg( + pool.asset_a if pool.asset_a() == src else pool.asset_b, + pool.asset_b if pool.asset_a() == src else pool.asset_a, + pool, + ) + ] ) + for pool in start_pools ] ) + ) diff --git a/src/util.py b/src/util.py index ebbf64431..1719ba436 100644 --- a/src/util.py +++ b/src/util.py @@ -8,6 +8,7 @@ from functools import cached_property from dataclasses import dataclass import urllib3 +import aiohttp import grpc from cosmpy.aerial.client import NetworkConfig, LedgerClient # type: ignore from cosmpy.aerial.contract import LedgerContract # type: ignore @@ -217,91 +218,91 @@ class DenomChainInfo: chain_id: Optional[str] -def denom_info(src_chain: str, src_denom: str) -> list[DenomChainInfo]: +async def denom_info(src_chain: str, src_denom: str) -> list[DenomChainInfo]: """ Gets a denom's denom and channel on/to other chains. """ - client = urllib3.PoolManager() - - resp = client.request( - "POST", - "https://api.skip.money/v1/fungible/assets_from_source", - headers={"accept": "application/json", "content-type": "application/json"}, - json={ - "allow_multi_tx": False, - "include_cw20_assets": True, - "source_asset_denom": src_denom, - "source_asset_chain_id": src_chain, - "client_id": "timewave-arb-bot", - }, - ) - - if resp.status != 200: - return [] - - dests = resp.json()["dest_assets"] - - def chain_info(chain_id: str, info: dict[str, Any]) -> DenomChainInfo: - info = info["assets"][0] - - if info["trace"] != "": - parts = info["trace"].split("/") - port, channel = parts[0], parts[1] - - return DenomChainInfo( - denom=info["denom"], port=port, channel=channel, chain_id=chain_id - ) - - return DenomChainInfo( - denom=info["denom"], port=None, channel=None, chain_id=chain_id - ) - - return [chain_info(chain_id, info) for chain_id, info in dests.items()] - - -def denom_info_on_chain( + async with aiohttp.ClientSession() as session: + async with session.post( + "https://api.skip.money/v1/fungible/assets_from_source", + headers={"accept": "application/json", "content-type": "application/json"}, + json={ + "allow_multi_tx": False, + "include_cw20_assets": True, + "source_asset_denom": src_denom, + "source_asset_chain_id": src_chain, + "client_id": "timewave-arb-bot", + }, + ) as resp: + if resp.status != 200: + return [] + + dests = await resp.json()["dest_assets"] + + def chain_info(chain_id: str, info: dict[str, Any]) -> DenomChainInfo: + info = info["assets"][0] + + if info["trace"] != "": + parts = info["trace"].split("/") + port, channel = parts[0], parts[1] + + return DenomChainInfo( + denom=info["denom"], + port=port, + channel=channel, + chain_id=chain_id, + ) + + return DenomChainInfo( + denom=info["denom"], port=None, channel=None, chain_id=chain_id + ) + + return [chain_info(chain_id, info) for chain_id, info in dests.items()] + + +async def denom_info_on_chain( src_chain: str, src_denom: str, dest_chain: str ) -> Optional[DenomChainInfo]: """ Gets a neutron denom's denom and channel on/to another chain. """ - client = urllib3.PoolManager() - - resp = client.request( - "POST", - "https://api.skip.money/v1/fungible/assets_from_source", - headers={"accept": "application/json", "content-type": "application/json"}, - json={ - "allow_multi_tx": False, - "include_cw20_assets": True, - "source_asset_denom": src_denom, - "source_asset_chain_id": src_chain, - "client_id": "timewave-arb-bot", - }, - ) - - if resp.status != 200: - return None - - dests = resp.json()["dest_assets"] - - if dest_chain in dests: - info = dests[dest_chain]["assets"][0] - - if info["trace"] != "": - port, channel = info["trace"].split("/") - - return DenomChainInfo( - denom=info["denom"], port=port, channel=channel, chain_id=dest_chain - ) - - return DenomChainInfo( - denom=info["denom"], port=None, channel=None, chain_id=dest_chain - ) - - return None + async with aiohttp.ClientSession() as session: + async with session.post( + "https://api.skip.money/v1/fungible/assets_from_source", + headers={"accept": "application/json", "content-type": "application/json"}, + json={ + "allow_multi_tx": False, + "include_cw20_assets": True, + "source_asset_denom": src_denom, + "source_asset_chain_id": src_chain, + "client_id": "timewave-arb-bot", + }, + ) as resp: + if resp.status != 200: + return None + + dests = await resp.json()["dest_assets"] + + if dest_chain in dests: + info = dests[dest_chain]["assets"][0] + + if info["trace"] != "": + port, channel = info["trace"].split("/") + + return DenomChainInfo( + denom=info["denom"], + port=port, + channel=channel, + chain_id=dest_chain, + ) + + return DenomChainInfo( + denom=info["denom"], port=None, channel=None, chain_id=dest_chain + ) + + return None @dataclass