Skip to content

Commit

Permalink
feat(sdk): use transaction log to cap memory usage (wandb#4724)
Browse files Browse the repository at this point in the history
  • Loading branch information
raubitsj authored Jan 6, 2023
1 parent f047efc commit f87c083
Show file tree
Hide file tree
Showing 50 changed files with 2,724 additions and 664 deletions.
3 changes: 0 additions & 3 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,6 @@ disallow_untyped_calls = False
disallow_untyped_defs = False
disallow_untyped_calls = False

[mypy-wandb.sdk.internal.writer]
ignore_errors = True

[mypy-wandb.sdk.internal.datastore]
ignore_errors = True

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ setproctitle
setuptools
appdirs>=1.4.3
dataclasses; python_version < '3.7'
typing_extensions; python_version < '3.10'
11 changes: 11 additions & 0 deletions requirements_test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
pytest
pytest-cov
pytest-xdist
pytest-flask
pytest-split
pytest-mock
pytest-timeout
pytest-openfiles
pytest-flakefinder
pytest-rerunfailures
parameterized
16 changes: 16 additions & 0 deletions tests/functional_tests/t0_main/grpc/t4_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env python

import os

import wandb

# Temporary environment variable for testing grpc service mode
os.environ["WANDB_SERVICE_TRANSPORT"] = "grpc"

run = wandb.init()
print("somedata")
wandb.log(dict(m1=1))
wandb.log(dict(m2=2))
status = run.status()
print("STATUS", status)
wandb.finish()
17 changes: 17 additions & 0 deletions tests/functional_tests/t0_main/grpc/t4_status.yea
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
plugin:
- wandb
tag:
shard: grpc
assert:
- :wandb:runs_len: 1
- :wandb:runs[0][config]: {}
- :wandb:runs[0][summary]:
m1: 1
m2: 2
- :wandb:runs[0][exitcode]: 0
- :op:contains:
- :wandb:runs[0][telemetry][3] # feature
- 23 # service
- :op:contains:
- :wandb:runs[0][telemetry][3] # feature
- 6 # grpc
22 changes: 22 additions & 0 deletions tests/standalone_tests/mitm_tests/mem_pressure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/env python
import time

import wandb
import yea


def main():
run = wandb.init()
history = 20
for i in range(history):
if i % 10 == 0:
print(i)
run.log(dict(num=i))
time.sleep(0.1)
print("done")
run.finish()


if __name__ == "__main__":
yea.setup()
main()
24 changes: 24 additions & 0 deletions tests/standalone_tests/mitm_tests/t1_mem_pressure_init.yea
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
plugin:
- wandb
tag:
shards:
- standalone-mitm
option:
- :wandb:mitm
command:
program: mem_pressure.py
assert:
- :wandb:runs_len: 1
trigger:
- :wandb:init:
command: pause
service: graphql
time: 20
- :wandb:log:
command: pause
service: file_stream
count: 1
skip: 10
- :wandb:finish:
command: unpause
service: file_stream
70 changes: 67 additions & 3 deletions tests/unit_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from wandb.sdk.internal.handler import HandleManager
from wandb.sdk.internal.sender import SendManager
from wandb.sdk.internal.settings_static import SettingsStatic
from wandb.sdk.internal.writer import WriteManager
from wandb.sdk.lib import filesystem, runid
from wandb.sdk.lib.git import GitRepo
from wandb.sdk.lib.mailbox import Mailbox
Expand Down Expand Up @@ -526,7 +527,6 @@ def internal_hm(
runner,
internal_record_q,
internal_result_q,
internal_sender_q,
internal_writer_q,
_internal_sender,
stopped_event,
Expand All @@ -539,7 +539,6 @@ def helper(settings):
record_q=internal_record_q,
result_q=internal_result_q,
stopped=stopped_event,
sender_q=internal_sender_q,
writer_q=internal_writer_q,
interface=_internal_sender,
context_keeper=_internal_context_keeper,
Expand All @@ -549,6 +548,38 @@ def helper(settings):
yield helper


@pytest.fixture()
def internal_wm(
runner,
internal_writer_q,
internal_result_q,
internal_sender_q,
_internal_sender,
stopped_event,
_internal_context_keeper,
):
def helper(settings):
with runner.isolated_filesystem():
wandb_file = settings.sync_file

# this is hacky, but we dont have a clean rundir always
# so lets at least make sure we can write to this dir
run_dir = Path(wandb_file).parent
os.makedirs(run_dir)

wm = WriteManager(
settings=SettingsStatic(settings.make_static()),
record_q=internal_writer_q,
result_q=internal_result_q,
sender_q=internal_sender_q,
interface=_internal_sender,
context_keeper=_internal_context_keeper,
)
return wm

yield helper


@pytest.fixture()
def internal_get_record():
def _get_record(input_q, timeout=None):
Expand Down Expand Up @@ -590,6 +621,35 @@ def target():
stopped_event.set()


@pytest.fixture()
def start_write_thread(
internal_writer_q, internal_get_record, stopped_event, internal_process
):
def start_write(write_manager):
def target():
try:
while True:
payload = internal_get_record(
input_q=internal_writer_q, timeout=0.1
)
if payload:
write_manager.write(payload)
elif stopped_event.is_set():
break
except Exception:
stopped_event.set()
internal_process._alive = False

t = threading.Thread(target=target)
t.name = "testing-writer"
t.daemon = True
t.start()
return t

yield start_write
stopped_event.set()


@pytest.fixture()
def start_handle_thread(internal_record_q, internal_get_record, stopped_event):
def start_handle(handle_manager):
Expand All @@ -615,20 +675,24 @@ def target():
def _start_backend(
internal_hm,
internal_sm,
internal_wm,
_internal_sender,
start_handle_thread,
start_write_thread,
start_send_thread,
):
def start_backend_func(run=None, initial_run=True, initial_start=True):
ihm = internal_hm(run.settings)
iwm = internal_wm(run.settings)
ism = internal_sm(run.settings)
ht = start_handle_thread(ihm)
wt = start_write_thread(iwm)
st = start_send_thread(ism)
if initial_run:
_run = _internal_sender.communicate_run(run)
if initial_start:
_internal_sender.communicate_run_start(_run.run)
return ht, st
return ht, wt, st

yield start_backend_func

Expand Down
2 changes: 1 addition & 1 deletion tests/unit_tests/test_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def test_data_write_fill(with_datastore):
"""Leave just room for 1 more byte, then write a 1 byte, followed by another 1 byte."""
sizes = tuple([32768 - 7 - 7 - 8, 1, 1])
records = 3
lengths = (7, 32753, 0, 0), (32760, 8, 0, 0), (32768, 8, 0, 0)
lengths = (7, 32753 + 7, 0), (32760, 8 + 32760, 0), (32768, 8 + 32768, 0)
check(
with_datastore,
chunk_sizes=sizes,
Expand Down
Loading

0 comments on commit f87c083

Please sign in to comment.