Skip to content

Commit

Permalink
Optimize submission collection, improve contest initialization (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
MsRandom authored Nov 18, 2024
1 parent 71f4587 commit 9c58252
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 126 deletions.
2 changes: 1 addition & 1 deletion miner/miner/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
BASELINE_CACHE_JSON = Path("baseline_cache.json")

logging.basicConfig(
level=logging.DEBUG,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(filename)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
Expand Down
79 changes: 50 additions & 29 deletions neuron/neuron/submissions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from fiber.chain.commitments import publish_raw_commitment, get_raw_commitment
from fiber.chain.commitments import publish_raw_commitment, _deserialize_commitment_field
from fiber.logging_utils import get_logger
from substrateinterface import SubstrateInterface, Keypair
from substrateinterface.storage import StorageKey

from .checkpoint import CheckpointSubmission, SPEC_VERSION, Key, MinerModelInfo
from .contest import CURRENT_CONTEST, ModelRepositoryInfo
Expand Down Expand Up @@ -34,41 +35,61 @@ def make_submission(
)


def get_submission(
def get_submissions(
substrate: SubstrateInterface,
hotkeys: list[Key],
netuid: int,
hotkey: Key,
) -> MinerModelInfo | None:
try:
commitment = get_raw_commitment(substrate, netuid, hotkey)

if not commitment:
return None

decoder = Decoder(commitment.data)
block: int
) -> list[MinerModelInfo | None]:
submissions: list[MinerModelInfo | None] = [None] * len(hotkeys)

storage_keys: list[StorageKey] = []
for hotkey in hotkeys:
storage_keys.append(substrate.create_storage_key(
"Commitments",
"CommitmentOf",
[netuid, hotkey]
))

commitments = substrate.query_multi(
storage_keys=storage_keys,
block_hash=substrate.get_block_hash(block),
)

spec_version = decoder.read_uint16()
for storage, commitment in commitments:
hotkey = storage.params[1]
try:
if not commitment or not commitment.value:
continue

if spec_version != SPEC_VERSION:
return None
fields = commitment.value["info"]["fields"]
if not fields:
continue

while not decoder.eof:
info = CheckpointSubmission.decode(decoder)
repository_url = info.get_repo_link()
field = _deserialize_commitment_field(fields[0])
if field is None:
continue

if (
info.contest != CURRENT_CONTEST.id or
repository_url == CURRENT_CONTEST.baseline_repository.url or
info.revision == CURRENT_CONTEST.baseline_repository.revision
):
decoder = Decoder(field[1])
spec_version = decoder.read_uint16()
if spec_version != SPEC_VERSION:
continue

repository = ModelRepositoryInfo(url=repository_url, revision=info.revision)
while not decoder.eof:
info = CheckpointSubmission.decode(decoder)
repository_url = info.get_repo_link()

if (
info.contest != CURRENT_CONTEST.id or
repository_url == CURRENT_CONTEST.baseline_repository.url
):
continue

return MinerModelInfo(repository, commitment.block)
repository = ModelRepositoryInfo(url=repository_url, revision=info.revision)
submissions[hotkeys.index(hotkey)] = MinerModelInfo(repository, block)
except Exception as e:
logger.error(f"Failed to get submission from miner {hotkey}")
logger.debug(f"Submission parsing error", exc_info=e)
continue

return None
except Exception as e:
logger.error(f"Failed to get submission from miner {hotkey}")
logger.debug(f"Submission parsing error", exc_info=e)
return None
return submissions
7 changes: 3 additions & 4 deletions validator/base_validator/auto_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class AutoUpdater:

def __init__(self):
self._stop_flag = Event()
self._thread = Thread(target=self._monitor)
self._thread = Thread(target=self._monitor, daemon=True)
self._check_for_updates()
self._thread.start()

Expand All @@ -41,8 +41,7 @@ def _check_for_updates(self):
repo = git.Repo(search_parent_directories=True)
current_version = repo.head.commit.hexsha

with repo.git.custom_environment(GIT_AUTO_STASH="1"):
repo.remotes.origin.pull("main")
repo.remotes.origin.pull("main")

new_version = repo.head.commit.hexsha

Expand All @@ -55,5 +54,5 @@ def _check_for_updates(self):

def _restart(self):
self._stop_flag.set()
time.sleep(1)
time.sleep(5)
os.kill(os.getpid(), signal.SIGTERM)
2 changes: 2 additions & 0 deletions validator/submission_tester/start.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#!/bin/bash

set -e

./submission_tester/update.sh
sudo -u api /home/api/.local/bin/uv run uvicorn "$@"
3 changes: 3 additions & 0 deletions validator/submission_tester/update.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ echo "api ALL = (sandbox) NOPASSWD: ALL" >> /etc/sudoers
echo "Defaults env_keep += \"VALIDATOR_HOTKEY_SS58_ADDRESS VALIDATOR_DEBUG CUDA_VISIBLE_DEVICES\"" >> /etc/sudoers

git config --system advice.detachedHead false
git config --system rebase.autostash true
git config --system rebase.autosquash true
git config --system pull.autostash true

sudo -u api pipx ensurepath
sudo -u api pipx install uv
Expand Down
138 changes: 46 additions & 92 deletions validator/weight_setting/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import random
from argparse import ArgumentParser
from asyncio import sleep
from dataclasses import dataclass
from datetime import date, datetime, timedelta, time
from itertools import islice
from json import JSONDecodeError
Expand All @@ -25,7 +26,6 @@
from fiber.logging_utils import get_logger
from substrateinterface.exceptions import SubstrateRequestException
from substrateinterface import SubstrateInterface, Keypair
from tqdm import tqdm
from wandb.sdk.wandb_run import Run

from neuron import (
Expand All @@ -42,7 +42,7 @@
TIMEZONE,
ModelRepositoryInfo,
SPEC_VERSION,
get_submission,
get_submissions,
BENCHMARKS_VERSION,
)
from neuron.submission_tester import (
Expand All @@ -53,7 +53,7 @@
from .wandb_args import add_wandb_args
from .winner_selection import get_scores, get_contestant_scores, get_tiers, get_contestant_tier

VALIDATOR_VERSION: tuple[int, int, int] = (5, 2, 7)
VALIDATOR_VERSION: tuple[int, int, int] = (5, 2, 8)
VALIDATOR_VERSION_STRING = ".".join(map(str, VALIDATOR_VERSION))

WEIGHTS_VERSION = (
Expand All @@ -67,6 +67,7 @@
logger = get_logger(__name__)


@dataclass
class ContestState:
id: ContestId
miner_score_version: int
Expand All @@ -80,16 +81,9 @@ def __init__(
):
self.id = contest_id
self.miner_score_version = BENCHMARKS_VERSION
self.submission_spec_version = COLLECTED_SUBMISSIONS_VERSION
self.miner_info = miner_info

def __setstate__(self, state):
self.miner_score_version = state.get("miner_score_version", 0)
self.submission_spec_version = state.get("submission_spec_version", 0)
self.__dict__.update(state)

def __repr__(self):
return f"ContestState(id={self.id}, miner_score_version={self.miner_score_version}, miner_info={self.miner_info})"


class Validator:
auto_updater: AutoUpdater
Expand Down Expand Up @@ -570,55 +564,16 @@ def is_blacklisted(blacklisted_keys: dict, hotkey: str, coldkey: str):
return hotkey in blacklisted_keys["hotkeys"] or coldkey in blacklisted_keys["coldkeys"]

def get_miner_submissions(self) -> list[MinerModelInfo | None]:
visited_repositories: dict[str, tuple[Uid, int]] = {}
visited_revisions: dict[str, tuple[Uid, int]] = {}
blacklisted_keys = self.get_blacklisted_keys()

miner_info: list[MinerModelInfo | None] = []

for hotkey, node in tqdm(self.metagraph.nodes.items()):
if self.is_blacklisted(blacklisted_keys, hotkey, node.coldkey):
miner_info.append(None)
continue

logger.info(f"Getting submission for hotkey {hotkey}")

info = get_submission(
self.substrate,
self.metagraph.netuid,
hotkey,
)

if not info:
miner_info.append(None)
continue

existing_repository_submission = visited_repositories.get(info.repository.url)
existing_revision_submission = visited_revisions.get(info.repository.revision)
hotkeys = [hotkey for hotkey, node in self.metagraph.nodes.items() if not self.is_blacklisted(blacklisted_keys, hotkey, node.coldkey)]

if existing_repository_submission and existing_revision_submission:
existing_submission = min(
existing_repository_submission, existing_revision_submission, key=itemgetter(1)
)
else:
existing_submission = existing_repository_submission or existing_revision_submission

if existing_submission:
existing_uid, existing_block = existing_submission

if info.block > existing_block:
miner_info.append(None)
continue

miner_info[existing_uid] = None

miner_info.append(info)
visited_repositories[info.repository.url] = node.node_id, info.block
visited_revisions[info.repository.revision] = node.node_id, info.block

sleep(0.2)

return miner_info
return get_submissions(
substrate=self.substrate,
hotkeys=hotkeys,
netuid=self.metagraph.netuid,
block=self.block,
)

async def send_submissions_to_api(self, apis: list[BenchmarkingApi], submissions: dict[Key, ModelRepositoryInfo]):
iterator = iter(submissions.items())
Expand Down Expand Up @@ -653,52 +608,45 @@ def non_tested_miners(self):
}
)

async def do_step(self, block: int):
now = self.current_time()

if (not self.last_day or self.last_day < now.date()) and now.hour >= 12:
# Past noon, should start collecting submissions
logger.info("Collecting all submissions")

miner_info = self.get_miner_submissions()
def initialize_contest(self, now: datetime):
logger.info("Collecting all submissions")

logger.info(f"Got {len([info for info in miner_info if info])} submissions")
miner_info = self.get_miner_submissions()

nodes = self.metagraph_nodes()
logger.info(f"Got {len([info for info in miner_info if info])} submissions")

logger.info(f"Working on contest {self.contest.id.name} today's submissions")
logger.info(f"Working on contest {self.contest.id.name}")

submissions = {
nodes[uid].hotkey: submission.repository
for uid, submission in enumerate(miner_info)
if submission
}
if not self.contest_state or self.contest_state.id != CURRENT_CONTEST.id:
# New contest, restart
self.contest = CURRENT_CONTEST
self.contest_state = ContestState(self.contest.id, miner_info)
else:
self.contest_state.miner_info = miner_info

await self.start_benchmarking(submissions)
self.average_benchmarking_time = None
self.benchmarking_state = BenchmarkState.NOT_STARTED

if not self.contest_state or self.contest_state.id != CURRENT_CONTEST.id:
# New contest, restart
self.contest = CURRENT_CONTEST
self.contest_state = ContestState(self.contest.id, miner_info)
else:
self.contest_state.miner_info = miner_info
logger.info(f"Setting updated benchmarks")
self.last_benchmarks = self.benchmarks

self.average_benchmarking_time = None
self.benchmarking_state = BenchmarkState.NOT_STARTED
self.benchmarks = self.clear_benchmarks()
self.invalid.clear()

logger.info(f"Setting updated benchmarks")
self.last_benchmarks = self.benchmarks
self.last_day = now.date()

self.benchmarks = self.clear_benchmarks()
self.invalid.clear()
self.start_wandb_run()

self.last_day = now.date()
self.benchmarking = False

self.start_wandb_run()
self.step += 1

self.benchmarking = True
async def do_step(self, block: int):
now = self.current_time()

self.step += 1
if (not self.last_day or self.last_day < now.date()) and now.hour >= 12:
# Past noon, should start collecting submissions
self.initialize_contest(now)
return

last_update = self.metagraph.nodes[self.keypair.ss58_address].last_updated
Expand Down Expand Up @@ -731,7 +679,13 @@ async def do_step(self, block: int):
for uid in remaining
}

await self.start_benchmarking(submissions)
try:
await self.start_benchmarking(submissions)
except Exception as e:
logger.error(f"Failed to start benchmarking, retrying in 60 seconds", exc_info=e)
await sleep(60)
return

self.benchmarking = True

self.save_state()
Expand Down Expand Up @@ -872,7 +826,7 @@ async def sleep_for_blocks(self, now: datetime, blocks: int, reason: str):
next_noon = datetime.combine(now.date() + timedelta(days=int(now.hour >= 12)), time(12), tzinfo=TIMEZONE)
blocks_to_sleep = min(blocks, ceil((next_noon - now).total_seconds() / 12))
logger.info(f"{reason}, sleeping for {blocks_to_sleep} blocks")
await asyncio.sleep(blocks_to_sleep * 12)
await sleep(blocks_to_sleep * 12)

@property
def block(self):
Expand Down

0 comments on commit 9c58252

Please sign in to comment.