Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

completely rewrite GameCache #28

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 48 additions & 36 deletions src/backend_steam_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from steam_network.caches.friends_cache import FriendsCache
from steam_network.caches.games_cache import GamesCache
from steam_network.caches.local_machine_cache import LocalMachineCache
from steam_network.caches.stats_cache import StatsCache
from steam_network.caches.stats_cache import StatsCache, GameStats
from steam_network.caches.times_cache import TimesCache
from steam_network.caches.user_info_cache import UserInfoCache

Expand Down Expand Up @@ -121,13 +121,16 @@ async def user_presence_update_handler(user_id: str, proto_user_info: ProtoUserI

self._update_owned_games_task : Task[None] = asyncio.create_task(asyncio.sleep(0))
self._owned_games_parsed : bool = False

self._load_persistent_cache()

def _load_persistent_cache(self):
if "games" in self._persistent_cache:
self._games_cache.loads(self._persistent_cache["games"])

if "stats" in self._persistent_cache:
self._stats_cache.loads(self._persistent_cache["stats"])

async def shutdown(self):
await self._websocket_client.close()
await self._websocket_client.wait_closed()
Expand All @@ -145,7 +148,7 @@ async def _cancel_task(self, task):
# periodic tasks

async def _update_owned_games(self):
new_games = self._games_cache.consume_added_games()
new_games = self._games_cache.get_apps_to_import_into_galaxy()
if not new_games:
return

Expand All @@ -154,12 +157,13 @@ async def _update_owned_games(self):

for i, game in enumerate(new_games):
dlcs: List[Dlc] = list()
for app in self._games_cache.get_dlcs_for_game(int(game.appid)):
for dlc in self._games_cache.get_dlcs_for_game(int(game.appid)):
dlcs.append(Dlc(
dlc_id=app.appid,
dlc_title=app.title,
dlc_id=dlc.appid,
dlc_title=dlc.title,
license_info=LicenseInfo(LicenseType.SinglePurchase)
))
self._games_cache.mark_app_as_sent_to_galaxy(dlc.appid)

self._add_game(
Game(
Expand All @@ -169,6 +173,7 @@ async def _update_owned_games(self):
license_info=LicenseInfo(LicenseType.SinglePurchase),
)
)
self._games_cache.mark_app_as_sent_to_galaxy(game.appid)
if i % 50 == 49:
await asyncio.sleep(5) # give Galaxy a breath in case of adding thousands games

Expand All @@ -179,6 +184,10 @@ def tick(self):
if self._user_info_cache.changed:
self._store_credentials(self._user_info_cache.to_dict())

if self._stats_cache.dirty:
self._persistent_cache["stats"] = self._stats_cache.dump()
self._persistent_storage_state.modified = True

# authentication

async def _get_websocket_auth_step(self) -> UserActionRequired:
Expand Down Expand Up @@ -369,20 +378,20 @@ async def get_owned_games(self) -> List[Game]:
raise AuthenticationRequired()

await self._games_cache.wait_ready(GAME_CACHE_IS_READY_TIMEOUT)
self._games_cache.add_game_lever = True

owned_games = []
owned_witcher_3_dlcs = set()

try:
async for app in self._games_cache.get_owned_games():
async for app in self._games_cache.get_apps(type_="game", shared=False):
dlcs: List[Dlc] = list()
for dlc in self._games_cache.get_dlcs_for_game(int(app.appid)):
dlcs.append(Dlc(
dlc_id=dlc.appid,
dlc_title=dlc.title,
license_info=LicenseInfo(LicenseType.SinglePurchase)
))
self._games_cache.mark_app_as_sent_to_galaxy(dlc.appid)

owned_games.append(
Game(
Expand All @@ -392,6 +401,8 @@ async def get_owned_games(self) -> List[Game]:
LicenseInfo(LicenseType.SinglePurchase, None),
)
)
self._games_cache.mark_app_as_sent_to_galaxy(app.appid)

if app.appid in WITCHER_3_DLCS_APP_IDS:
owned_witcher_3_dlcs.add(app.appid)

Expand All @@ -413,6 +424,7 @@ async def get_owned_games(self) -> List[Game]:
self._owned_games_parsed = True

self._persistent_cache["games"] = self._games_cache.dump()
self._persistent_cache["stats"] = self._stats_cache.dump()
self._persistent_storage_state.modified = True

return owned_games
Expand All @@ -421,7 +433,7 @@ async def get_subscriptions(self) -> List[Subscription]:
if not self._owned_games_parsed:
await self._games_cache.wait_ready(90)
any_shared_game = False
async for _ in self._games_cache.get_shared_games():
async for _ in self._games_cache.get_apps(type_="game", shared=True):
any_shared_game = True
break
return [
Expand All @@ -435,42 +447,42 @@ async def get_subscriptions(self) -> List[Subscription]:

async def get_subscription_games(self, subscription_name: str, context: Any):
games = []
async for game in self._games_cache.get_shared_games():
async for game in self._games_cache.get_apps(type_="game", shared=True):
games.append(SubscriptionGame(game_id=str(game.appid), game_title=game.title))
self._games_cache.mark_app_as_sent_to_galaxy(game.appid)
yield games

async def prepare_achievements_context(self, game_ids: List[str]) -> Any:
if self._user_info_cache.steam_id is None:
raise AuthenticationRequired()

if not self._stats_cache.import_in_progress:
await self._websocket_client.refresh_game_stats(game_ids.copy())
else:
logger.info("Game stats import already in progress")
await self._stats_cache.wait_ready(
10 * 60
) # Don't block future imports in case we somehow don't receive one of the responses
logger.info("Finished achievements context prepare")
await self._websocket_client.refresh_game_stats([int(x) for x in game_ids])

logger.info(f"Finished achievements context prepare: {game_ids}")

async def get_unlocked_achievements(self, game_id: str, context: Any) -> List[Achievement]:
logger.info(f"Asked for achievs for {game_id}")
game_stats = self._stats_cache.get(game_id)
achievements = []
if game_stats and "achievements" in game_stats:
for achievement in game_stats["achievements"]:
# Fix for trailing whitespace in some achievement names which resulted in achievements not matching with website data
achievement_name = achievement["name"]
achievement_name = achievement_name.strip()
if not achievement_name:
achievement_name = achievement["name"]

achievements.append(
Achievement(
achievement["unlock_time"],
achievement_id=None,
achievement_name=achievement_name,
)
logger.info(f"Asked for achievements for {game_id}")

if not self._stats_cache.ready:
logger.debug(f"waiting for import to finish to get achievements for {game_id}")
await self._stats_cache.wait_ready()

game_stats: GameStats = self._stats_cache.get(int(game_id))
if not game_stats:
logger.warning(f"game stats for {game_id} not found in stats cache")
return []

achievements: List[Achievement] = []

for achievement in game_stats.achievements:
achievements.append(
Achievement(
unlock_time=achievement.unlock_time,
# achievement_id=achievement.id_, # Galaxy doesn't like us sending an achievement id - so we won't
achievement_name=achievement.name.strip(),
)
)

return achievements

async def prepare_game_times_context(self, game_ids: List[str]) -> Any:
Expand Down
Loading