Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Orders] avoid portfolio desync after filled order #1162

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions octobot_trading/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
INSTANT_FILLED_LIMIT_ORDER_PRICE_DELTA = decimal.Decimal("0.005")
CREATED_ORDER_FORCED_UPDATE_PERIOD = 5

# Portfolio
MAX_PORTFOLIO_SYNC_ATTEMPTS = 1
SYNC_ATTEMPTS_INTERVAL = 20
GuillaumeDSM marked this conversation as resolved.
Show resolved Hide resolved

# Tentacles
TRADING_MODE_REQUIRED_STRATEGIES = "required_strategies"
TRADING_MODE_REQUIRED_STRATEGIES_MIN_COUNT = "required_strategies_min_count"
Expand Down
5 changes: 3 additions & 2 deletions octobot_trading/personal_data/exchange_personal_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ async def handle_portfolio_update(self, balance, should_notify: bool = True, is_
return False

async def handle_portfolio_and_position_update_from_order(
self, order, require_exchange_update: bool = True, should_notify: bool = True
self, order, require_exchange_update: bool = True, expect_filled_order_update: bool = False,
should_notify: bool = True
) -> bool:
try:
changed: bool = await self.portfolio_manager.handle_balance_update_from_order(
order, require_exchange_update
order, require_exchange_update, expect_filled_order_update
)
if self.exchange_manager.is_future:
changed = await self.positions_manager.handle_position_update_from_order(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async def terminate(self):
async with self.order.exchange_manager.exchange_personal_data.portfolio_manager.portfolio.lock:
self.ensure_not_cleared(self.order)
await self.order.exchange_manager.exchange_personal_data.\
handle_portfolio_and_position_update_from_order(self.order, False)
handle_portfolio_and_position_update_from_order(self.order, require_exchange_update=False)

# notify order cancelled
await self.order.exchange_manager.exchange_personal_data.handle_order_update_notification(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ async def terminate(self):
async with self.order.exchange_manager.exchange_personal_data.portfolio_manager.portfolio.lock:
self.ensure_not_cleared(self.order)
await self.order.exchange_manager.exchange_personal_data.\
handle_portfolio_and_position_update_from_order(self.order)
handle_portfolio_and_position_update_from_order(self.order, expect_filled_order_update=True)

# notify order filled
await self.order.exchange_manager.exchange_personal_data.handle_order_update_notification(
Expand Down
44 changes: 42 additions & 2 deletions octobot_trading/personal_data/portfolios/portfolio_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
# License along with this library.
import contextlib
import copy
import decimal
import time
import asyncio

import octobot_commons.logging as logging
import octobot_commons.constants as commons_constants
import octobot_commons.tree as commons_tree
import octobot_commons.symbols as commons_symbols
import octobot_commons.enums as commons_enums

import octobot_trading.exchange_channel as exchange_channel
Expand Down Expand Up @@ -77,7 +81,9 @@ def handle_balance_update(self, balance, is_diff_update=False):
self._is_initialized_event_set = True
return changed

async def handle_balance_update_from_order(self, order, require_exchange_update: bool) -> bool:
async def handle_balance_update_from_order(
self, order, require_exchange_update: bool, expect_filled_order_update: bool
) -> bool:
"""
Handle a balance update from an order update
:param order: the order
Expand All @@ -91,9 +97,43 @@ async def handle_balance_update_from_order(self, order, require_exchange_update:
return self._refresh_simulated_trader_portfolio_from_order(order)
# on real trading only:
# reload portfolio to ensure portfolio sync
return await self._refresh_real_trader_portfolio()
return await self._refresh_real_trader_portfolio_until_order_update_applied(
order, expect_filled_order_update
)
return False

async def _refresh_real_trader_portfolio_until_order_update_applied(self, order, expect_filled_order_update):
t0 = time.time()
refreshed = await self._refresh_real_trader_portfolio()
if not expect_filled_order_update:
return refreshed
if self._has_filled_order_been_applied(order):
return refreshed
for i in range(constants.MAX_PORTFOLIO_SYNC_ATTEMPTS):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to use a loop here as it's only 1 loop?

await asyncio.sleep(constants.SYNC_ATTEMPTS_INTERVAL)
refreshed = await self._refresh_real_trader_portfolio()
# + 2 since 1st attempt is before looping
self.logger.info(f"Updated portfolio fetched after {i + 2} attempts")
if self._has_filled_order_been_applied(order):
return refreshed
self.logger.error(f"Filled order has not be counted in portfolio after {time.time() - t0} seconds")
return refreshed

def _has_filled_order_been_applied(self, order):
if self.exchange_manager.is_future:
# implement futures if necessary
return True
# in spot, consider order fill if the portfolio contains at least the order's executed amount
base, quote = commons_symbols.parse_symbol(order.symbol).base_and_quote()
checked_asset = base if order.side == enums.TradeOrderSide.BUY else quote
checked_amount = (
order.origin_quantity
if order.side == enums.TradeOrderSide.BUY else (order.origin_quantity * order.origin_price)
) * decimal.Decimal("0.95") # consider a 5% error margin
available_amount = self.portfolio.get_currency_portfolio(checked_asset).available
# if available_amount > checked_amount, then the order fill is most likely taken into account in portfolio
return available_amount > checked_amount

async def handle_balance_update_from_funding(self, position, funding_rate, require_exchange_update: bool) -> bool:
"""
Handle a balance update from a funding update
Expand Down
116 changes: 98 additions & 18 deletions tests/personal_data/portfolios/test_portfolio_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
# License along with this library.
import decimal
import os
import asyncio

import mock
import pytest
from mock import patch, Mock, AsyncMock
import octobot_commons.constants as commons_constants
import octobot_trading.personal_data as personal_data
import octobot_trading.constants as constants
from octobot_trading.personal_data import SellLimitOrder
from octobot_trading.personal_data.orders import BuyMarketOrder, BuyLimitOrder

from tests.exchanges import backtesting_trader, backtesting_config, backtesting_exchange_manager, fake_backtesting
Expand Down Expand Up @@ -63,46 +66,123 @@ async def test_handle_balance_update_from_order(backtesting_trader):
trader.simulate = False
order = BuyMarketOrder(trader)

with patch.object(portfolio_manager, '_refresh_real_trader_portfolio',
new=AsyncMock()) as _refresh_real_trader_portfolio_mock, \
with patch.object(portfolio_manager, '_refresh_real_trader_portfolio_until_order_update_applied',
new=AsyncMock()) as _refresh_real_trader_portfolio_until_order_update_applied_mock, \
patch.object(portfolio_manager, '_refresh_simulated_trader_portfolio_from_order',
new=Mock()) as _refresh_simulated_trader_portfolio_from_order_mock:
_refresh_real_trader_portfolio_mock.assert_not_called()
await portfolio_manager.handle_balance_update_from_order(order, True)
_refresh_real_trader_portfolio_mock.assert_called_once()
_refresh_real_trader_portfolio_until_order_update_applied_mock.assert_not_called()
await portfolio_manager.handle_balance_update_from_order(order, True, False)
_refresh_real_trader_portfolio_until_order_update_applied_mock.assert_called_once_with(order, False)
_refresh_simulated_trader_portfolio_from_order_mock.assert_not_called()
_refresh_real_trader_portfolio_until_order_update_applied_mock.reset_mock()
await portfolio_manager.handle_balance_update_from_order(order, True, True)
_refresh_real_trader_portfolio_until_order_update_applied_mock.assert_called_once_with(order, True)
_refresh_simulated_trader_portfolio_from_order_mock.assert_not_called()
_refresh_real_trader_portfolio_mock.reset_mock()
_refresh_real_trader_portfolio_until_order_update_applied_mock.reset_mock()
with portfolio_manager.disabled_portfolio_update_from_order():
await portfolio_manager.handle_balance_update_from_order(order, False)
_refresh_real_trader_portfolio_mock.assert_not_called()
await portfolio_manager.handle_balance_update_from_order(order, False, False)
_refresh_real_trader_portfolio_until_order_update_applied_mock.assert_not_called()
_refresh_simulated_trader_portfolio_from_order_mock.assert_not_called()
await portfolio_manager.handle_balance_update_from_order(order, True)
_refresh_real_trader_portfolio_mock.assert_not_called()
await portfolio_manager.handle_balance_update_from_order(order, True, False)
_refresh_real_trader_portfolio_until_order_update_applied_mock.assert_not_called()
_refresh_simulated_trader_portfolio_from_order_mock.assert_not_called()
await portfolio_manager.handle_balance_update_from_order(order, False)
_refresh_real_trader_portfolio_mock.assert_not_called()
await portfolio_manager.handle_balance_update_from_order(order, False, False)
_refresh_real_trader_portfolio_until_order_update_applied_mock.assert_not_called()
_refresh_simulated_trader_portfolio_from_order_mock.assert_called_once()

trader.simulate = True
with patch.object(portfolio_manager, '_refresh_simulated_trader_portfolio_from_order',
new=Mock()) as _refresh_simulated_trader_portfolio_from_order_mock:
_refresh_simulated_trader_portfolio_from_order_mock.assert_not_called()
await portfolio_manager.handle_balance_update_from_order(order, True)
await portfolio_manager.handle_balance_update_from_order(order, True, False)
_refresh_simulated_trader_portfolio_from_order_mock.assert_called_once()
_refresh_simulated_trader_portfolio_from_order_mock.reset_mock()
with portfolio_manager.disabled_portfolio_update_from_order():
await portfolio_manager.handle_balance_update_from_order(order, True)
await portfolio_manager.handle_balance_update_from_order(order, True, False)
_refresh_simulated_trader_portfolio_from_order_mock.assert_not_called()
await portfolio_manager.handle_balance_update_from_order(order, False)
await portfolio_manager.handle_balance_update_from_order(order, False, False)
_refresh_simulated_trader_portfolio_from_order_mock.assert_not_called()
# ensure no side effect with require_exchange_update param
await portfolio_manager.handle_balance_update_from_order(order, False)
await portfolio_manager.handle_balance_update_from_order(order, False, False)
_refresh_simulated_trader_portfolio_from_order_mock.assert_called_once()

trader.is_enabled = False
trader.simulate = False
assert not await portfolio_manager.handle_balance_update_from_order(order, True)
assert not await portfolio_manager.handle_balance_update_from_order(order, False)
assert not await portfolio_manager.handle_balance_update_from_order(order, True, False)
assert not await portfolio_manager.handle_balance_update_from_order(order, False, False)


async def test_refresh_real_trader_portfolio_until_order_update_applied(backtesting_trader):
config, exchange_manager, trader = backtesting_trader
portfolio_manager = exchange_manager.exchange_personal_data.portfolio_manager

trader.simulate = False
order = BuyMarketOrder(trader)
with patch.object(portfolio_manager, '_refresh_real_trader_portfolio',
new=AsyncMock(return_value=True)) as _refresh_real_trader_portfolio_mock, \
patch.object(asyncio, 'sleep', AsyncMock()) as sleep_mock:
with patch.object(portfolio_manager, '_has_filled_order_been_applied',
new=Mock(return_value=True)) as _has_filled_order_been_applied_mock:
assert await portfolio_manager._refresh_real_trader_portfolio_until_order_update_applied(order, False) is True
_has_filled_order_been_applied_mock.assert_not_called()
_refresh_real_trader_portfolio_mock.assert_called_once()
sleep_mock.assert_not_called()
_refresh_real_trader_portfolio_mock.reset_mock()
assert await portfolio_manager._refresh_real_trader_portfolio_until_order_update_applied(order, True) is True
_has_filled_order_been_applied_mock.assert_called_once_with(order)
sleep_mock.assert_not_called()
_refresh_real_trader_portfolio_mock.reset_mock()
with patch.object(portfolio_manager, '_has_filled_order_been_applied',
new=Mock(return_value=False)) as _has_filled_order_been_applied_mock:
assert await portfolio_manager._refresh_real_trader_portfolio_until_order_update_applied(order, True) is True
assert _has_filled_order_been_applied_mock.call_count == 1 + constants.MAX_PORTFOLIO_SYNC_ATTEMPTS
assert _refresh_real_trader_portfolio_mock.call_count == 1 + constants.MAX_PORTFOLIO_SYNC_ATTEMPTS
assert sleep_mock.call_count == constants.MAX_PORTFOLIO_SYNC_ATTEMPTS
_has_filled_order_been_applied_mock.reset_mock()
_refresh_real_trader_portfolio_mock.reset_mock()
sleep_mock.reset_mock()
calls = []

# retry once
def _has_filled_order_been_applied(*args):
if calls:
return True
calls.append(1)
return False
with patch.object(portfolio_manager, '_has_filled_order_been_applied',
new=Mock(side_effect=_has_filled_order_been_applied)) as _has_filled_order_been_applied_mock:
assert await portfolio_manager._refresh_real_trader_portfolio_until_order_update_applied(order, True) is True
assert _has_filled_order_been_applied_mock.call_count == 2
assert _refresh_real_trader_portfolio_mock.call_count == 2
assert sleep_mock.call_count == 1


async def test_has_filled_order_been_applied(backtesting_trader):
config, exchange_manager, trader = backtesting_trader
portfolio_manager = exchange_manager.exchange_personal_data.portfolio_manager

trader.simulate = False
order = BuyMarketOrder(trader)
order.update("BTC/USDT", quantity=decimal.Decimal(10), price=decimal.Decimal(100))

# 10 BTC in PF
assert portfolio_manager._has_filled_order_been_applied(order) is True

order.update("BTC/USDT", quantity=decimal.Decimal(15), price=decimal.Decimal(100))
assert portfolio_manager._has_filled_order_been_applied(order) is False

order = SellLimitOrder(trader)
order.update("BTC/USDT", quantity=decimal.Decimal(10), price=decimal.Decimal(100))
# 1000 USDT in PF
assert portfolio_manager._has_filled_order_been_applied(order) is True

order.update("BTC/USDT", quantity=decimal.Decimal(10), price=decimal.Decimal(200))
assert portfolio_manager._has_filled_order_been_applied(order) is False

trader.exchange_manager.is_future = True
# always True on futures
assert portfolio_manager._has_filled_order_been_applied(order) is True
trader.exchange_manager.is_future = False


async def test_refresh_simulated_trader_portfolio_from_order(backtesting_trader):
Expand Down
Loading