From e6fc0a3c6a6954c86bdcb8b2bd6c83949bdafd2d Mon Sep 17 00:00:00 2001 From: Phillip Jensen Date: Sun, 15 Dec 2024 15:51:16 +0100 Subject: [PATCH] Comparison fixes --- stats-backend/api2/scanner.py | 87 +++++++++++++------ .../examples/low-level-api/list-offers.py | 7 +- .../examples/low-level-api/v2/list-offers.py | 7 +- 3 files changed, 72 insertions(+), 29 deletions(-) diff --git a/stats-backend/api2/scanner.py b/stats-backend/api2/scanner.py index 94b26a6..e68ce2c 100644 --- a/stats-backend/api2/scanner.py +++ b/stats-backend/api2/scanner.py @@ -31,6 +31,14 @@ r = redis.Redis(connection_pool=pool) +def normalize_properties(data): + """Normalize the properties dictionary for consistent comparison.""" + # Sort lists within the dictionary + for key, value in data.items(): + if isinstance(value, list): + data[key] = sorted(value) + return dict(sorted(data.items())) # Return sorted dictionary + @app.task def update_providers_info(node_props): now = timezone.now() @@ -67,7 +75,6 @@ def update_providers_info(node_props): new_nodes.append(node) existing_nodes_dict[node.node_id] = node - # Update existing_nodes_dict with newly created nodes updated_nodes = Node.objects.filter(node_id__in=new_provider_ids) for node in updated_nodes: @@ -85,10 +92,12 @@ def update_providers_info(node_props): # Get existing Offers existing_offers = Offer.objects.filter( provider__node_id__in=provider_ids, - runtime__in=[data["golem.runtime.name"] for _, data in provider_data_list] + runtime__in=[data["golem.runtime.name"] + for _, data in provider_data_list] ).select_related('provider') - existing_offers_dict = {(offer.provider.node_id, offer.runtime): offer for offer in existing_offers} + existing_offers_dict = { + (offer.provider.node_id, offer.runtime): offer for offer in existing_offers} # Find which offers are new existing_offer_keys = set(existing_offers_dict.keys()) @@ -110,10 +119,11 @@ def update_providers_info(node_props): new_offers.append(offer) existing_offers_dict[(provider_id, runtime)] = offer - # Update existing_offers_dict with newly created offers - updated_offers = Offer.objects.filter(provider__node_id__in=provider_ids).select_related('provider') - existing_offers_dict.update({(offer.provider.node_id, offer.runtime): offer for offer in updated_offers}) + updated_offers = Offer.objects.filter( + provider__node_id__in=provider_ids).select_related('provider') + existing_offers_dict.update( + {(offer.provider.node_id, offer.runtime): offer for offer in updated_offers}) # Now process and update offers offers_to_update = [] # list of offers to bulk update @@ -137,9 +147,12 @@ def update_providers_info(node_props): if not monthly_pricing: print(f"Monthly price is {monthly_pricing}") offer.monthly_price_glm = min(monthly_pricing, MAX_PRICE_CAP_VALUE) - offer.monthly_price_usd = min(monthly_pricing * glm_usd_value.current_price, MAX_PRICE_CAP_VALUE) - offer.hourly_price_glm = min(monthly_pricing / hours_in_current_month, MAX_PRICE_CAP_VALUE) - offer.hourly_price_usd = min(offer.monthly_price_usd / hours_in_current_month, MAX_PRICE_CAP_VALUE) + offer.monthly_price_usd = min( + monthly_pricing * glm_usd_value.current_price, MAX_PRICE_CAP_VALUE) + offer.hourly_price_glm = min( + monthly_pricing / hours_in_current_month, MAX_PRICE_CAP_VALUE) + offer.hourly_price_usd = min( + offer.monthly_price_usd / hours_in_current_month, MAX_PRICE_CAP_VALUE) vcpu_needed = data.get("golem.inf.cpu.threads", 0) memory_needed = data.get("golem.inf.mem.gib", 0.0) @@ -150,25 +163,37 @@ def update_providers_info(node_props): ).order_by("cpu_diff", "memory_diff", "price_usd").first() if closest_ec2 and monthly_pricing: - offer_price_usd = min(monthly_pricing * glm_usd_value.current_price, MAX_PRICE_CAP_VALUE) - ec2_monthly_price = min(closest_ec2.price_usd * 730, MAX_PRICE_CAP_VALUE) + offer_price_usd = min( + monthly_pricing * glm_usd_value.current_price, MAX_PRICE_CAP_VALUE) + ec2_monthly_price = min( + closest_ec2.price_usd * 730, MAX_PRICE_CAP_VALUE) if ec2_monthly_price != 0: offer_is_more_expensive = offer_price_usd > ec2_monthly_price offer_is_cheaper = offer_price_usd < ec2_monthly_price offer.is_overpriced = offer_is_more_expensive offer.overpriced_compared_to = closest_ec2 if offer_is_more_expensive else None - offer.times_more_expensive = offer_price_usd / float(ec2_monthly_price) if offer_is_more_expensive else None - offer.suggest_env_per_hour_price = float(closest_ec2.price_usd) / glm_usd_value.current_price + offer.times_more_expensive = offer_price_usd / \ + float(ec2_monthly_price) if offer_is_more_expensive else None + offer.suggest_env_per_hour_price = float( + closest_ec2.price_usd) / glm_usd_value.current_price offer.cheaper_than = closest_ec2 if offer_is_cheaper else None - offer.times_cheaper = float(ec2_monthly_price) / offer_price_usd if offer_is_cheaper else None + offer.times_cheaper = float( + ec2_monthly_price) / offer_price_usd if offer_is_cheaper else None else: print("EC2 monthly price is zero, cannot compare offer prices.") # Always update the offer if any properties have changed # Compare existing properties with new data - if offer.properties != data: + normalized_existing = normalize_properties(offer.properties.copy()) + normalized_new = normalize_properties(data.copy()) + + if normalized_existing != normalized_new: + print(f"DETECTED CHANGE Updating offer {offer.id}") offer.properties = data offers_to_update.append(offer) + else: + print(f"No changes in offer {offer.id}") + # Bulk update offers if any if offers_to_update: @@ -186,12 +211,22 @@ def update_providers_info(node_props): nodes_to_update = [] for provider_id, data in provider_data_list: node = existing_nodes_dict[provider_id] - node.wallet = data.get("wallet") - node.network = data.get('network', 'mainnet') - node.type = "provider" - nodes_to_update.append(node) + # Check if any field has changed before adding to update list + if (node.wallet != data.get("wallet") or + node.network != data.get('network', 'mainnet') or + node.type != "provider"): + + node.wallet = data.get("wallet") + node.network = data.get('network', 'mainnet') + node.type = "provider" + node.updated_at = timezone.now() # Explicitly set updated_at + nodes_to_update.append(node) + if nodes_to_update: - Node.objects.bulk_update(nodes_to_update, ['wallet', 'network', 'updated_at', 'type']) + Node.objects.bulk_update( + nodes_to_update, + ['wallet', 'network', 'type', 'updated_at'] # Include updated_at in the fields list + ) print(f"Done updating {len(provider_ids)} providers") @@ -201,15 +236,14 @@ def update_providers_info(node_props): from .yapapi_utils import build_parser, print_env_info, format_usage # noqa: E402 - - async def list_offers( conf: Configuration, subnet_tag: str, current_scan_providers, node_props ): async with conf.market() as client: market_api = Market(client) dbuild = DemandBuilder() - dbuild.add(yp.NodeInfo(name="some scanning node", subnet_tag=subnet_tag)) + dbuild.add(yp.NodeInfo( + name="some scanning node", subnet_tag=subnet_tag)) dbuild.add(yp.Activity(expiration=datetime.now(timezone.utc))) async with market_api.subscribe( @@ -234,7 +268,9 @@ async def monitor_nodes_status(subnet_tag: str = "public"): try: await asyncio.wait_for( list_offers( - Configuration(api_config=ApiConfig()), + Configuration(api_config=ApiConfig( + app_key="stats" + )), subnet_tag=subnet_tag, node_props=node_props, current_scan_providers=current_scan_providers, @@ -243,7 +279,8 @@ async def monitor_nodes_status(subnet_tag: str = "public"): ) except asyncio.TimeoutError: print("Scan timeout reached") - print(f"In the current scan, we found {len(current_scan_providers)} providers") + print( + f"In the current scan, we found {len(current_scan_providers)} providers") # Delay update_nodes_data call using Celery update_providers_info.delay(node_props) diff --git a/stats-backend/yapapi/examples/low-level-api/list-offers.py b/stats-backend/yapapi/examples/low-level-api/list-offers.py index da11854..735a236 100755 --- a/stats-backend/yapapi/examples/low-level-api/list-offers.py +++ b/stats-backend/yapapi/examples/low-level-api/list-offers.py @@ -22,7 +22,8 @@ async def list_offers(conf: Configuration, subnet_tag: str): async with conf.market() as client: market_api = Market(client) dbuild = DemandBuilder() - dbuild.add(yp.NodeInfo(name="Golem Stats Indexer", subnet_tag=subnet_tag)) + dbuild.add(yp.NodeInfo( + name="Golem Stats Indexer", subnet_tag=subnet_tag)) dbuild.add(yp.Activity(expiration=datetime.now(timezone.utc))) async with market_api.subscribe( dbuild.properties, dbuild.constraints @@ -132,7 +133,9 @@ def main(): asyncio.get_event_loop().run_until_complete( asyncio.wait_for( list_offers( - Configuration(api_config=ApiConfig()), # YAGNA_APPKEY will be loaded from env + Configuration(api_config=ApiConfig( + app_key="stats" + )), # YAGNA_APPKEY will be loaded from env subnet_tag="public", ), timeout=60, diff --git a/stats-backend/yapapi/examples/low-level-api/v2/list-offers.py b/stats-backend/yapapi/examples/low-level-api/v2/list-offers.py index 9cb06de..73edc14 100755 --- a/stats-backend/yapapi/examples/low-level-api/v2/list-offers.py +++ b/stats-backend/yapapi/examples/low-level-api/v2/list-offers.py @@ -22,7 +22,8 @@ async def list_offers(conf: Configuration, subnet_tag: str): async with conf.market() as client: market_api = Market(client) dbuild = DemandBuilder() - dbuild.add(yp.NodeInfo(name="Golem Stats Indexer", subnet_tag=subnet_tag)) + dbuild.add(yp.NodeInfo( + name="Golem Stats Indexer", subnet_tag=subnet_tag)) dbuild.add(yp.Activity(expiration=datetime.now(timezone.utc))) async with market_api.subscribe( dbuild.properties, dbuild.constraints @@ -104,7 +105,9 @@ def main(): asyncio.get_event_loop().run_until_complete( asyncio.wait_for( list_offers( - Configuration(api_config=ApiConfig()), + Configuration(api_config=ApiConfig( + app_key="stats" + )), subnet_tag="public", ), timeout=15,