Skip to content

Commit

Permalink
Merge branch 'testnet' into staging
Browse files Browse the repository at this point in the history
  • Loading branch information
mikecot committed Jun 24, 2024
2 parents a989224 + bd3601f commit b7884a3
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 28 deletions.
94 changes: 71 additions & 23 deletions lavapProviderHealth/run.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#!/usr/bin/env python3

import os, requests, subprocess, json, threading, shlex, string, time, random, queue, psycopg2
import os, requests, subprocess, json, threading, shlex, string, time, random, queue, psycopg2, traceback
from datetime import datetime
from datetime import timezone
from dateutil.parser import parse as parse_date
from dateutil.relativedelta import relativedelta
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Dict, List, Optional, Any
from pathlib import Path

def log(function: str, content: str) -> None:
timestamp: str = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
Expand All @@ -21,6 +22,21 @@ def get_env_var(name, default):
log("env_vars", f"{name} is set to {value}")
return value

def parse_dotenv_for_var(var_name):
current_dir = Path(__file__).parent
parent_dir = current_dir.parent

dotenv_path = current_dir / '.env'
if not dotenv_path.exists():
dotenv_path = parent_dir / '.env'

if dotenv_path.exists():
with open(dotenv_path, 'r') as file:
for line in file:
if line.startswith(var_name):
return line.strip().split('=', 1)[1]
return None

# Constants
POSTGRES_URL = os.environ.get('JSINFO_HEALTHPROBEJOB_POSTGRESQL_URL', 'postgres://jsinfo:secret@localhost:5432/jsinfo')
PROVIDERS_URL: str = get_env_var('JSINFO_HEALTHPROBEJOB_PROVIDERS_URL', "https://jsinfo.lavanet.xyz/providers")
Expand All @@ -30,10 +46,20 @@ def get_env_var(name, default):
HPLAWNS_QUERY_INTERVAL = relativedelta(days=int(get_env_var('JSINFO_HEALTHPROBEJOB_HPLAWNS_QUERY_INTERVAL', 1)))
DEBUG_PRINT_ACCOUNT_INFO_STDOUT: bool = get_env_var('JSINFO_HEALTHPROBEJOB_DEBUG_PRINT_ACCOUNT_INFO_STDOUT', 'False') == 'True'
HTTP_SERVER_ADDRESS: tuple[str, int] = tuple(json.loads(get_env_var('JSINFO_HEALTHPROBEJOB_HTTP_SERVER_ADDRESS', json.dumps(('127.0.0.1', 6500)))))
GEO_LOCATION: bool = get_env_var('JSINFO_HEALTHPROBEJOB_GEO_LOCATION', 'US')
GEO_LOCATION: bool = get_env_var('JSINFO_HEALTHPROBEJOB_GEO_LOCATION', 'EU')
CD_ON_START: str = get_env_var('JSINFO_HEALTHPROBEJOB_CD_ON_START', "~/Documents/lava_projects/lava/config/health_examples")
BATCH_AMOUNT: int = get_env_var('JSINFO_HEALTHPROBEJOB_BATCH_AMOUNT', 8)

# Parse some vars from the .env file
env_var_value = parse_dotenv_for_var('JSINFO_HEALTHPROBEJOB_POSTGRESQL_URL')
if env_var_value:
POSTGRES_URL = env_var_value
log("env", "The JSINFO_HEALTHPROBEJOB_POSTGRESQL_URL env file was loaded from disk.")
env_var_value = parse_dotenv_for_var('JSINFO_HEALTHPROBEJOB_NODE_URL')
if env_var_value:
NODE_URL = env_var_value
log("env", "The JSINFO_HEALTHPROBEJOB_NODE_URL env file was loaded from disk:" + env_var_value)

HEALTH_RESULTS_GUID: str = ''.join(random.choices(string.ascii_letters + string.digits, k=32))

def update_guid() -> None:
Expand Down Expand Up @@ -134,14 +160,14 @@ def is_status_better(old_status: str, new_status: str, old_data: str, new_data:
elif old_data_json != None and new_data_json == None:
return False

old_block = old_data_json.get('Block', 0)
new_block = new_data_json.get('Block', 0)
old_block = old_data_json.get('block', 0)
new_block = new_data_json.get('block', 0)

if new_block > old_block:
return True

old_others = old_data_json.get('Others', 0)
new_others = new_data_json.get('Others', 0)
old_others = old_data_json.get('others', 0)
new_others = new_data_json.get('others', 0)

if new_others > old_others:
return True
Expand Down Expand Up @@ -221,20 +247,32 @@ def replace_for_compare(data):
def db_worker_work_accountinfo(data):
log("db_worker_work_accountinfo", f"Processing item: {data}")
execute_db_operation("""
SELECT data FROM provider_accountinfo WHERE provider = %s ORDER BY timestamp DESC LIMIT 1
SELECT data, timestamp FROM provider_accountinfo WHERE provider = %s ORDER BY timestamp DESC LIMIT 1
""", (data['provider_id'],))
result = db_cur.fetchone()
if result is not None:
existing_data = result[0].replace(" ", "").replace("\t", "").replace("\n", "").lower()
new_data = data['data'].replace(" ", "").replace("\t", "").replace("\n", "").lower()
if result is None or replace_for_compare(existing_data) != replace_for_compare(new_data):
if replace_for_compare(existing_data) == replace_for_compare(new_data):
# If data is the same, update the timestamp of the existing record
execute_db_operation("""
UPDATE provider_accountinfo SET timestamp = %s WHERE provider = %s
""", (data['timestamp'], data['provider_id']))
log("db_worker_work_accountinfo", "Data is the same - timestamp updated for existing record")
else:
# If data is different, insert a new record
execute_db_operation("""
INSERT INTO provider_accountinfo (provider, timestamp, data)
VALUES (%s, %s, %s)
""", (data['provider_id'], data['timestamp'], data['data']))
log("db_worker_work_accountinfo", "New record inserted")
else:
# If no existing record, insert a new one
execute_db_operation("""
INSERT INTO provider_accountinfo (provider, timestamp, data)
VALUES (%s, %s, %s)
""", (data['provider_id'], data['timestamp'], data['data']))
log("db_worker_work_accountinfo", "New record inserted")
else:
log("db_worker_work_accountinfo", "No new record inserted, data is the same")
log("db_worker_work_accountinfo", "New record inserted because no existing data was found")

def db_worker_work_health(data):
log("db_worker_work_health", f"Processing item: {data}")
Expand Down Expand Up @@ -380,7 +418,6 @@ def replace_archive(input_string: str) -> str:
return input_string

def run_accountinfo_command(address: str) -> Optional[Dict[str, Any]]:
# log('run_accountinfo_command', f'Starting pairing command for address: {address}')
command = f"lavad q pairing account-info {address} --output json --node {NODE_URL}"
output = run_command(command, print_stdout=DEBUG_PRINT_ACCOUNT_INFO_STDOUT)
log('run_accountinfo_command', 'Pairing command completed.')
Expand All @@ -401,16 +438,18 @@ def parse_accountinfo_spec(result: Dict[str, Dict[str, List[str]]], key: str, pr
result[key][chain] = []
for interface in api_interfaces:
if interface not in result[key][chain]:
result[key][chain].append(interface)
result[key][chain].append((interface, ""))

# from the docs:
# https://github.com/lavanet/lava/blob/6249399121690effe2b12cc3adc1d099c343235c/x/pairing/README.md#L220
# if I have a provider with jails > 2 and jail_end_time < date.now() provider status should be frozen with a message of run to unfreeze: lavad tx pairing unfreeze CHAINID
# otherwise if jails > 0 || jail_end_time I consider the provider jailed
jail_end_time = datetime.fromtimestamp(int(provider.get("jail_end_time", "0")))
jails = datetime.fromtimestamp(int(provider.get("jails", "0")))
if jail_end_time != 0 or jails != 0:
if jail_end_time > datetime.now(timezone.utc) and jails > 2:
jail_end_time = provider.get("jail_end_time", "0")
if jail_end_time == "1970-01-01 00:00:00":
jail_end_time = "0"
jails = int(provider.get("jails", "0"))
if jail_end_time != "0" or jails != 0:
if parse_date_to_utc(jail_end_time) > datetime.now(timezone.utc) and jails > 2:
if chain not in result["frozen"]:
result["frozen"][chain] = []
result["frozen"][chain].append((interface, {"message": "run to unfreeze: lavad tx pairing unfreeze " + chain}))
Expand Down Expand Up @@ -441,6 +480,15 @@ def run_health_command(address: str, single_provider_specs_interfaces_data: Opti
run_command(command)
log('run_health_command', 'Health command completed.')

def parse_date_to_utc(dt):
if isinstance(dt, int) or (isinstance(dt, str) and dt.isdigit()):
dt = datetime.fromtimestamp(int(dt), timezone.utc)
elif isinstance(dt, str):
dt = parse_date(dt)
if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt

def get_provider_addresses() -> List[str]:
log('get_provider_addresses', 'Fetching provider addresses...')
response = requests.get(PROVIDERS_URL)
Expand All @@ -450,7 +498,7 @@ def get_provider_addresses() -> List[str]:
for provider in providers:
address: str = provider['address']
next_query_time: Optional[str] = next_query_times.get(address, None)
if next_query_time is None or parse_date(next_query_time) <= datetime.now(timezone.utc):
if next_query_time is None or parse_date_to_utc(next_query_time) <= datetime.now(timezone.utc):
addresses.append(address)
log('get_provider_addresses', f'Fetched {len(addresses)} provider addresses.')
return addresses
Expand All @@ -475,12 +523,12 @@ def process_lava_id_address(address: str) -> None:
run_health_command(address, single_provider_specs_interfaces_data = info_command_parsed_json["healthy"])

for spec, api_interfaces in info_command_parsed_json["frozen"].items():
for api_interface in api_interfaces:
db_add_provider_health_data(HEALTH_RESULTS_GUID, address, spec, api_interface, "frozen", "")
for api_interface, data in api_interfaces:
db_add_provider_health_data(HEALTH_RESULTS_GUID, address, spec, api_interface, "frozen", data)

for spec, api_interfaces in info_command_parsed_json["unstaked"].items():
for api_interface in api_interfaces:
db_add_provider_health_data(HEALTH_RESULTS_GUID, address, spec, api_interface, "unstaked", "")
for api_interface, data in api_interfaces:
db_add_provider_health_data(HEALTH_RESULTS_GUID, address, spec, api_interface, "unstaked", data)

for spec, api_interfaces_and_data in info_command_parsed_json["jailed"].items():
for api_interface, data in api_interfaces_and_data:
Expand All @@ -497,7 +545,7 @@ def process_batch(batch):
process_lava_id_address(address)
log("process_batch", f"Successfully processed address: {address}")
except Exception as e:
log("process_batch", f"Error processing address: {address}. Error: {str(e)}")
log("process_batch", f"Error processing address: {address}. Error: {str(e)}\nStack Trace: {traceback.format_exc()}")
log("process_batch", "Finished loop")

def main() -> None:
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"async-retry": "^1.3.3",
"bun-types": "^1.0.17",
"date-fns": "^3.6.0",
"date-fns-tz": "^3.1.3",
"dotenv": "^16.3.1",
"drizzle-orm": "^0.29.3",
"fastify": "^4.23.2",
Expand Down
10 changes: 7 additions & 3 deletions src/query/handlers/providerHealthHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ import { QueryCheckJsinfoReadDbInstance, QueryGetJsinfoReadDbInstance } from '..
import * as JsinfoSchema from '../../schemas/jsinfoSchema';
import { eq, desc } from "drizzle-orm";
import { Pagination } from '../utils/queryPagination';
import { CSVEscape, GetAndValidateProviderAddressFromRequest, GetDataLength, GetDataLengthForPrints, IsNotNullAndNotZero, SafeSlice } from '../utils/queryUtils';
import { CSVEscape, GetAndValidateProviderAddressFromRequest, GetDataLength, IsNotNullAndNotZero, ParseDateToUtc, SafeSlice } from '../utils/queryUtils';
import { CompareValues } from '../utils/queryUtils';
import path from 'path';
import { JSINFO_QUERY_DEFAULT_ITEMS_PER_PAGE, JSINFO_QUERY_TOTAL_ITEM_LIMIT_FOR_PAGINATION } from '../queryConsts';
import { CachedDiskDbDataFetcher } from '../classes/CachedDiskDbDataFetcher';

export interface HealthReportEntry {
message: string | null;
block: number | null;
Expand Down Expand Up @@ -306,6 +305,8 @@ const GetHealthV2Data = async (addr: string): Promise<HealthReportEntry[]> => {
return healthReportEntries
}



const ParseMessageFromHealthV2 = (data: string | null): string => {
if (!data) return "";
try {
Expand All @@ -316,7 +317,10 @@ const ParseMessageFromHealthV2 = (data: string | null): string => {
}

if (parsedData.jail_end_time && parsedData.jails) {
const date = new Date(parsedData.jail_end_time * 1000);
const date = ParseDateToUtc(parsedData.jail_end_time);
// bad db data
const is1970Included = `${parsedData.jail_end_time}${parsedData.jails}${date}`.includes("1970-01-01");
if (is1970Included) return "";
let formattedDate = `${date.getDate().toString().padStart(2, '0')}/${(date.getMonth() + 1).toString().padStart(2, '0')}/${date.getFullYear()} ${date.getHours().toString().padStart(2, '0')}:${date.getMinutes().toString().padStart(2, '0')}:${date.getSeconds().toString().padStart(2, '0')}`;
return `Jail end time: ${formattedDate}, Jails: ${parsedData.jails}`;
}
Expand Down
7 changes: 5 additions & 2 deletions src/query/handlers/providerHealthLatestHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { FastifyRequest, FastifyReply, RouteShorthandOptions } from 'fastify';
import { QueryGetJsinfoReadDbInstance } from '../queryDb';
import { eq, and, gte, desc } from "drizzle-orm";
import { providerHealth } from '../../schemas/jsinfoSchema';
import { GetAndValidateProviderAddressFromRequest } from '../utils/queryUtils';
import { GetAndValidateProviderAddressFromRequest, ParseDateToUtc } from '../utils/queryUtils';
import { WriteErrorToFastifyReply } from '../utils/queryServerUtils';

type HealthRecord = {
Expand Down Expand Up @@ -98,7 +98,10 @@ const ParseMessageFromHealthV2 = (data: any | null): string => {
}

if (parsedData.jail_end_time && parsedData.jails) {
const date = new Date(parsedData.jail_end_time * 1000);
const date = ParseDateToUtc(parsedData.jail_end_time);
// bad db data
const is1970Included = `${parsedData.jail_end_time}${parsedData.jails}${date}`.includes("1970-01-01");
if (is1970Included) return "";
let formattedDate = `${date.getDate().toString().padStart(2, '0')}/${(date.getMonth() + 1).toString().padStart(2, '0')}/${date.getFullYear()} ${date.getHours().toString().padStart(2, '0')}:${date.getMinutes().toString().padStart(2, '0')}:${date.getSeconds().toString().padStart(2, '0')}`;
return `End Time:${formattedDate}, Jails:${parsedData.jails}`;
}
Expand Down
18 changes: 18 additions & 0 deletions src/query/utils/queryUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { QueryCheckJsinfoReadDbInstance, QueryGetJsinfoReadDbInstance } from "..
import * as JsinfoSchema from '../../schemas/jsinfoSchema';
import { eq } from "drizzle-orm";
import { WriteErrorToFastifyReply } from "./queryServerUtils";
import { parseISO } from 'date-fns';

export function CompareValues(aValue: string | number | null, bValue: string | number | null, direction: 'ascending' | 'descending') {
// Check if direction is 'ascending' or 'descending'
Expand Down Expand Up @@ -149,4 +150,21 @@ export function SafeSlice<T>(data: T[], start: number, end: number, defaultSize:
}

return data.slice(start, end);
}

export function ParseDateToUtc(dt: string | number): Date {
let date: Date;

if (typeof dt === 'number' || (typeof dt === 'string' && /^\d+$/.test(dt))) {
// Convert Unix timestamp to milliseconds and create a Date object
date = new Date(typeof dt === 'string' ? parseInt(dt, 10) * 1000 : dt * 1000);
} else if (typeof dt === 'string') {
// Parse ISO string to Date
date = parseISO(dt);
} else {
throw new Error('Unsupported date type');
}

// Convert to UTC by creating a new Date object using the UTC values from the original date
return new Date(Date.UTC(date.getFullYear(), date.getMonth(), date.getDate(), date.getHours(), date.getMinutes(), date.getSeconds()));
}

0 comments on commit b7884a3

Please sign in to comment.