Skip to content

Commit

Permalink
Add more logging, fix benchmarking
Browse files Browse the repository at this point in the history
  • Loading branch information
MsRandom committed Nov 28, 2024
1 parent 5d11c70 commit d86d906
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 55 deletions.
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,7 @@ uv run submit_model \
--netuid {netuid} \
--subtensor.network finney \
--wallet.name {wallet} \
--wallet.hotkey {hotkey} \
--logging.trace \
--logging.debug
--wallet.hotkey {hotkey}
```
5. Follow the interactive prompts to submit the repository link, revision, and contest to participate in
6. Optionally, benchmark your submission locally before submitting (make sure you have the right hardware e.g. NVIDIA GeForce RTX 4090). uv and huggingface are required for benchmarking:
Expand All @@ -150,7 +148,7 @@ If your hardware is not accessed within a container(as in, can use Docker), then

To get started, go to the `validator`, and create a `.env` file with the following contents:
```
VALIDATOR_ARGS=--netuid {netuid} --subtensor.network {network} --wallet.name {wallet} --wallet.hotkey {hotkey} --logging.trace --logging.debug
VALIDATOR_ARGS=--netuid {netuid} --subtensor.network {network} --wallet.name {wallet} --wallet.hotkey {hotkey}
VALIDATOR_HOTKEY_SS58_ADDRESS={ss58-address}
```

Expand Down Expand Up @@ -201,8 +199,6 @@ In the another pod/container without a GPU, to run the scoring validator, clone
--subtensor.network {network} \
--wallet.name {wallet} \
--wallet.hotkey {hotkey} \
--logging.trace \
--logging.debug \
--benchmarker_api {API component routes, space separated if multiple}
```
Expand Down
16 changes: 8 additions & 8 deletions base/base/submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,29 +137,29 @@ def get_submissions(


def deduplicate_submissions(submissions: Submissions) -> Submissions:
existing_repositories: dict[str, Submission] = {}
existing_revisions: dict[str, Submission] = {}
existing_repositories: dict[str, tuple[Key, Submission]] = {}
existing_revisions: dict[str, tuple[Key, Submission]] = {}
to_remove: set[Key] = set()

for key, submission in submissions.items():
url = submission.repository_info.url
revision = submission.repository_info.revision
block = submission.block

existing_repository = existing_repositories.get(url)
existing_revision = existing_revisions.get(revision)
existing_repository_key, existing_repository = existing_repositories.get(url, (None, None))
existing_revision_key, existing_revision = existing_revisions.get(revision, (None, None))

if (existing_repository and existing_repository.block < block) or (existing_revision and existing_revision.block < block):
to_remove.add(key)
continue

if existing_repository:
to_remove.add(existing_repository)
to_remove.add(existing_repository_key)
if existing_revision:
to_remove.add(existing_revision)
to_remove.add(existing_revision_key)

existing_repositories[url] = submission
existing_revisions[revision] = submission
existing_repositories[url] = key, submission
existing_revisions[revision] = key, submission

for key in to_remove:
submissions.pop(key)
Expand Down
12 changes: 6 additions & 6 deletions base/testing/benchmarker.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def _benchmark_baseline(self, contest: Contest, inputs: list[TextToImageRequest]
logger.info("Benchmarking baseline")
while not self.baseline and not self._stop_flag.is_set():
try:
self._benchmark_submission(contest, inputs, contest.baseline_repository)
self.baseline = self._benchmark_submission(contest, inputs, contest.baseline_repository)
except CancelledError:
logger.warning("Benchmarking was canceled while testing the baseline")
return
Expand Down Expand Up @@ -121,10 +121,10 @@ def benchmark_submissions(self, contest: Contest, submissions: dict[Key, Reposit
finally:
self.submission_times.append(perf_counter() - start_time)

average_benchmark_time = self.get_average_benchmark_time()
if average_benchmark_time:
eta = (len(submissions) - len(self.benchmarks)) * average_benchmark_time
logger.info(f"Average benchmark time: {average_benchmark_time}, ETA: {timedelta(seconds=eta)}")
average_benchmarking_time = self.get_average_benchmarking_time()
if average_benchmarking_time:
eta = (len(submissions) - len(self.benchmarks)) * average_benchmarking_time
logger.info(f"Average benchmark time: {average_benchmarking_time}, ETA: {timedelta(seconds=eta)}")

if self._is_done(submissions):
logger.info("Benchmarking complete")
Expand All @@ -133,7 +133,7 @@ def benchmark_submissions(self, contest: Contest, submissions: dict[Key, Reposit
logger.warning("Benchmarking canceled")
self.state = BenchmarkState.NOT_STARTED

def get_average_benchmark_time(self) -> float | None:
def get_average_benchmarking_time(self) -> float | None:
return (
sum(self.submission_times) / len(self.submission_times)
if self.submission_times
Expand Down
66 changes: 36 additions & 30 deletions base/testing/inference_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ def _run(self, script: str, args: list[str]):
encoding='utf-8',
cwd=self._sandbox_directory.absolute(),
)
logger.info(process.stdout)
logger.info(process.stderr)
if process.stdout.strip():
logger.info(process.stdout)
if process.stderr.strip():
logger.info(process.stderr)
if process.returncode:
raise InvalidSubmissionError(f"Failed to run {script}")

Expand Down Expand Up @@ -200,34 +202,38 @@ def benchmark(self) -> BenchmarkOutput:
text=True,
bufsize=1,
) as process:
load_time = self.wait_for_socket(process)
with Client(abspath(self._socket_path)) as client:
logger.info(f"Benchmarking {len(self._inputs)} samples")
for i, request in enumerate(self._inputs):
logger.info(f"Sample {i + 1}/{len(self._inputs)}")
start_joules = self._contest.device.get_joules()
vram_monitor = VRamMonitor(self._contest)

client.send(request.model_dump_json().encode("utf-8"))

start = perf_counter()

output = client.recv_bytes()

generation_time = perf_counter() - start
joules_used = self._contest.device.get_joules() - start_joules
watts_used = joules_used / generation_time
vram_used = vram_monitor.complete()

metrics.append(Metrics(
generation_time=generation_time,
size=size,
vram_used=vram_used,
watts_used=watts_used,
load_time=load_time,
))
outputs.append(output)
check_process(process)
try:
load_time = self.wait_for_socket(process)
with Client(abspath(self._socket_path)) as client:
logger.info(f"Benchmarking {len(self._inputs)} samples")
for i, request in enumerate(self._inputs):
logger.info(f"Sample {i + 1}/{len(self._inputs)}")
start_joules = self._contest.device.get_joules()
vram_monitor = VRamMonitor(self._contest)

data = request.model_dump_json().encode("utf-8")
logger.debug(data)
client.send_bytes(data)

start = perf_counter()
output = client.recv_bytes()

generation_time = perf_counter() - start
joules_used = self._contest.device.get_joules() - start_joules
watts_used = joules_used / generation_time
vram_used = vram_monitor.complete()

metrics.append(Metrics(
generation_time=generation_time,
size=size,
vram_used=vram_used,
watts_used=watts_used,
load_time=load_time,
))
outputs.append(output)
check_process(process)
finally:
log_process(process)

average_generation_time = sum(metric.generation_time for metric in metrics) / len(metrics)
vram_used = max(metric.vram_used for metric in metrics) - start_vram
Expand Down
2 changes: 1 addition & 1 deletion validator/base_validator/api_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class BenchmarkingResults(BaseModel):
benchmarks: Benchmarks
invalid_submissions: set[Key]
baseline: Metrics | None
average_benchmark_time: float | None
average_benchmarking_time: float | None


class BenchmarkingStartRequest(BaseModel):
Expand Down
2 changes: 1 addition & 1 deletion validator/submission_tester/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def state(request: Request) -> BenchmarkingResults:
benchmarks=benchmarker.benchmarks,
invalid_submissions=benchmarker.invalid_submissions,
baseline=benchmarker.baseline.metrics if benchmarker.baseline else None,
average_benchmark_time=benchmarker.get_average_benchmark_time(),
average_benchmarking_time=benchmarker.get_average_benchmarking_time(),
)


Expand Down
2 changes: 1 addition & 1 deletion validator/weight_setting/state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def load_state(self) -> ContestState | None:
return None

def save_state(self, state: ContestState):
logger.info(f"Saving state")
logger.debug(f"Saving state")

with self._state_file.open("wb") as file:
file.write(state.model_dump_json(indent=4).encode())
20 changes: 18 additions & 2 deletions validator/weight_setting/validator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import timedelta
from importlib.metadata import version
from signal import signal, SIGINT, SIGHUP, SIGTERM
from threading import Event
Expand Down Expand Up @@ -164,13 +165,28 @@ def update_benchmarks(self, benchmarking_results: list[BenchmarkingResults]):
if not self.contest_state:
return

self.contest_state.baseline = benchmarking_results[0].baseline
self.contest_state.average_benchmarking_time = benchmarking_results[0].average_benchmark_time
baseline = benchmarking_results[0].baseline
average_benchmarking_time = benchmarking_results[0].average_benchmarking_time

if self.contest_state.baseline != baseline:
logger.info(f"Updating baseline to {baseline}")

self.contest_state.baseline = baseline
self.contest_state.average_benchmarking_time = average_benchmarking_time

for result in benchmarking_results:
for key in result.benchmarks.keys() - self.contest_state.benchmarks.keys():
logger.info(f"Updating benchmarks for {key}")
for key in result.invalid_submissions - self.contest_state.invalid_submissions:
logger.info(f"Marking submission from {key} as invalid")

self.contest_state.benchmarks.update(result.benchmarks)
self.contest_state.invalid_submissions.update(result.invalid_submissions)

if average_benchmarking_time:
eta = (len(self.contest_state.submissions) - len(self.contest_state.benchmarks)) * average_benchmarking_time
logger.info(f"Average benchmark time: {average_benchmarking_time}, ETA: {timedelta(seconds=eta)}")

def step(self):
return self.contest_state.step if self.contest_state else 0

Expand Down

0 comments on commit d86d906

Please sign in to comment.