Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump aiida-pythonjob to 0.1.4 #373

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aiida_workgraph/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def prepare_for_python_task(task: dict, kwargs: dict, var_kwargs: dict) -> dict:
function_outputs.append(output)

inputs = prepare_pythonjob_inputs(
pickled_function=executor,
function_data=executor,
function_inputs=function_inputs,
function_outputs=function_outputs,
code=code,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dependencies = [
"aiida-core>=2.3",
"cloudpickle",
"aiida-shell~=0.8",
"aiida-pythonjob==0.1.3",
"aiida-pythonjob==0.1.4",
"fastapi",
"uvicorn",
"pydantic_settings",
Expand Down
275 changes: 0 additions & 275 deletions tests/test_pythonjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,6 @@ def multiply(x: Any, y: Any) -> Any:
assert wg.tasks["add1"].node.label == "add1"


def test_importable_function(fixture_localhost, python_executable_path):
"""Test importable function."""
from aiida_workgraph.executors.test import add_pythonjob

wg = WorkGraph("test_importable_function")
wg.add_task(
add_pythonjob,
name="add",
x=1,
y=2,
computer="localhost",
command_info={"label": python_executable_path},
)
wg.run()
assert wg.tasks["add"].outputs["result"].value.value == 3


def test_PythonJob_kwargs(fixture_localhost, python_executable_path):
"""Test function with kwargs."""

Expand Down Expand Up @@ -95,113 +78,6 @@ def add(x, y=1, **kwargs):
assert wg.tasks["add"].inputs["kwargs"].value == {"m": 2, "n": 3}


def test_PythonJob_typing():
"""Test function with typing."""
from numpy import array
from ase import Atoms
from aiida_workgraph.orm.function_data import PickledFunction
from typing import List

def generate_structures(
structures: List[Atoms],
strain_lst: list,
data: array,
strain_lst1: list = None,
data1: array = None,
structure1: Atoms = None,
) -> list[Atoms]:
pass

def generate_structures_2(
structure1: Atoms,
strain_lst1: list = None,
data1: str = "",
) -> list[Atoms]:
pass

modules = PickledFunction.get_required_imports(generate_structures)
assert modules == {
"ase.atoms": {"Atoms"},
"typing": {"List"},
"builtins": {"list"},
"numpy": {"array"},
}
modules = PickledFunction.get_required_imports(generate_structures_2)
assert modules == {"ase.atoms": {"Atoms"}, "builtins": {"list", "str"}}


def test_PythonJob_outputs(fixture_localhost, python_executable_path):
"""Test function with multiple outputs."""

@task(
outputs=[
{"identifier": "workgraph.any", "name": "sum"},
{"identifier": "workgraph.any", "name": "diff"},
]
)
def add(x, y):
return {"sum": x + y, "diff": x - y}

wg = WorkGraph("test_PythonJob_outputs")
wg.add_task(
"PythonJob",
function=add,
name="add",
x=1,
y=2,
# code=code,
computer="localhost",
command_info={"label": python_executable_path},
)
wg.run()
assert wg.tasks["add"].outputs["sum"].value.value == 3
assert wg.tasks["add"].outputs["diff"].value.value == -1


@pytest.mark.usefixtures("started_daemon_client")
def test_PythonJob_namespace_output(fixture_localhost, python_executable_path):
"""Test function with namespace output and input."""

# output namespace
@task(
outputs=[
{
"name": "add_multiply",
"identifier": "workgraph.namespace",
},
{
"name": "add_multiply.add",
"identifier": "workgraph.namespace",
},
{"name": "minus"},
]
)
def myfunc(x, y):
add = {"order1": x + y, "order2": x * x + y * y}
return {
"add_multiply": {"add": add, "multiply": x * y},
"minus": x - y,
}

wg = WorkGraph("test_namespace_outputs")
wg.add_task("PythonJob", function=myfunc, name="myfunc")

wg.submit(
wait=True,
inputs={
"myfunc": {
"x": 1.0,
"y": 2.0,
"computer": "localhost",
"command_info": {"label": python_executable_path},
}
},
)
assert wg.tasks["myfunc"].outputs["add_multiply"].value.add.order1.value == 3
assert wg.tasks["myfunc"].outputs["add_multiply"].value.add.order2.value == 5
assert wg.tasks["myfunc"].outputs["add_multiply"].value.multiply.value == 2


def test_PythonJob_namespace_output_input(fixture_localhost, python_executable_path):
"""Test function with namespace output and input."""

Expand Down Expand Up @@ -271,96 +147,6 @@ def myfunc3(x, y):
assert wg.tasks["myfunc3"].outputs["result"].value.value == 7


def test_PythonJob_parent_folder(fixture_localhost, python_executable_path):
"""Test function with parent folder."""

def add(x, y):
z = x + y
with open("result.txt", "w") as f:
f.write(str(z))
return x + y

def multiply(x, y):
with open("parent_folder/result.txt", "r") as f:
z = int(f.read())
return x * y + z

wg = WorkGraph("test_PythonJob_parent_folder")
wg.add_task("PythonJob", function=add, name="add")
wg.add_task(
"PythonJob",
function=multiply,
name="multiply",
parent_folder=wg.tasks["add"].outputs["remote_folder"],
)

# ------------------------- Submit the calculation -------------------
wg.submit(
inputs={
"add": {
"x": 2,
"y": 3,
"computer": "localhost",
"command_info": {"label": python_executable_path},
},
"multiply": {
"x": 3,
"y": 4,
"computer": "localhost",
"command_info": {"label": python_executable_path},
},
},
wait=True,
)
assert wg.tasks["multiply"].outputs["result"].value.value == 17


def test_PythonJob_upload_files(fixture_localhost, python_executable_path):
"""Test function with upload files."""

# create a temporary file "input.txt" in the current directory
with open("input.txt", "w") as f:
f.write("2")

# create a temporary folder "inputs_folder" in the current directory
# and add a file "another_input.txt" in the folder
import os

os.makedirs("inputs_folder", exist_ok=True)
with open("inputs_folder/another_input.txt", "w") as f:
f.write("3")

def add():
with open("input.txt", "r") as f:
a = int(f.read())
with open("inputs_folder/another_input.txt", "r") as f:
b = int(f.read())
return a + b

wg = WorkGraph("test_PythonJob_upload_files")
wg.add_task("PythonJob", function=add, name="add")

# ------------------------- Submit the calculation -------------------
# we need use full path to the file
input_file = os.path.abspath("input.txt")
input_folder = os.path.abspath("inputs_folder")

wg.run(
inputs={
"add": {
"computer": "localhost",
"command_info": {"label": python_executable_path},
"upload_files": {
"input.txt": input_file,
"inputs_folder": input_folder,
},
},
},
)
# wait=True)
assert wg.tasks["add"].outputs["result"].value.value == 5


def test_PythonJob_copy_files(fixture_localhost, python_executable_path):
"""Test function with copy files."""

Expand Down Expand Up @@ -424,67 +210,6 @@ def multiply(x_folder_name, y_folder_name):
assert wg.tasks["multiply"].outputs["result"].value.value == 25


def test_PythonJob_retrieve_files(fixture_localhost, python_executable_path):
"""Test retrieve files."""

def add(x, y):
z = x + y
with open("result.txt", "w") as f:
f.write(str(z))
return x + y

wg = WorkGraph("test_PythonJob_retrieve_files")
wg.add_task("PythonJob", function=add, name="add")
# ------------------------- Submit the calculation -------------------
wg.run(
inputs={
"add": {
"x": 2,
"y": 3,
"computer": "localhost",
"command_info": {"label": python_executable_path},
"metadata": {
"options": {
"additional_retrieve_list": ["result.txt"],
}
},
},
},
)
assert (
"result.txt" in wg.tasks["add"].outputs["retrieved"].value.list_object_names()
)


def test_data_serializer(fixture_localhost, python_executable_path):
from ase import Atoms
from ase.build import bulk

@task()
def make_supercell(atoms: Atoms, dim: int) -> Atoms:
"""Scale the structure by the given scales."""
return atoms * (dim, dim, dim)

atoms = bulk("Si")

wg = WorkGraph("test_PythonJob_retrieve_files")
wg.add_task(
"PythonJob",
function=make_supercell,
atoms=atoms,
dim=2,
name="make_supercell",
computer="localhost",
command_info={"label": python_executable_path},
)
# ------------------------- Submit the calculation -------------------
wg.submit(wait=True)
assert (
wg.tasks["make_supercell"].outputs["result"].value.value.get_chemical_formula()
== "Si16"
)


def test_load_pythonjob(fixture_localhost, python_executable_path):
"""Test function with typing."""

Expand Down
Loading