diff --git a/utils/mm-python-template/modify_base_template.py b/utils/mm-python-template/modify_base_template.py index 9511f442..7dece5a3 100644 --- a/utils/mm-python-template/modify_base_template.py +++ b/utils/mm-python-template/modify_base_template.py @@ -85,15 +85,10 @@ def add_function_arguments_dynamically(cookiecutter_json: dict) -> None: # create definition string for item in dict_items: for key, value in cookiecutter_json[item].items(): - # use filepattern in __main__ to pass multiple files form inpdir if key == "inpdir": continue if "python_type" in value: def_string += f"{key} : {value['python_type']}, " - elif "_pattern" in key: - original_key = key.replace("_pattern", "") - new_python_type = "List[Path]" - def_string += f"{original_key}: {new_python_type}, " def_string = def_string[:-2] + ")" # now add the docstring docstring = f" '''{cookiecutter_json['plugin_name']}.\n\n Args:\n" @@ -103,9 +98,6 @@ def add_function_arguments_dynamically(cookiecutter_json: dict) -> None: continue if "python_type" in value: docstring += f" {key}: {value['description']}\n" - elif "_pattern" in key: - original_key = key.replace("_pattern", "") - docstring += f" {original_key}: {value['description']}\n" path = ( Path("{{cookiecutter.container_name}}") / "src" @@ -211,26 +203,149 @@ def add_ict_inputs_outputs_dynamically(cookiecutter_json: dict) -> None: updated_ict_file.write(f" type: {value['type']}\n") -# need to handle cases where base_command is just python3 -# calling some script already in container for example -ADD_SOURCE = False -base_command = cookiecutter_data["base_command"] -if "arguments" in cookiecutter_data: - arguments = cookiecutter_data["arguments"] - # combine base_command and arguments need to determine if source is needed - base_command = base_command + " " + arguments -# look for .py file in base_command -for item in base_command.split(): - if ".py" in item or "python" in item: - ADD_SOURCE = True - break - +def generate_pydantic_models(cookiecutter_json: dict) -> str: + """Generate Pydantic models based on cookiecutter data.""" + + models_code = "" + + if all(value['python_type'] == 'Path' for value in cookiecutter_json["inputs"].values()): + pass + else: + # Input Model (if there are other inputs) + models_code += "class InputModel(BaseModel):\n" + for input_name, input_data in cookiecutter_json["inputs"].items(): + # Check if the python_type is Path, and change it to UploadFile for file inputs + if input_data['python_type'] == 'Path': + python_type = 'UploadFile' # For file uploads, use UploadFile + models_code += f" {input_name}: {python_type} = File(..., description=\"{input_data['description']}\")\n" + else: + python_type = input_data['python_type'] # Use the provided type for other fields + models_code += f" {input_name}: {python_type} = Field(..., description=\"{input_data['description']}\")\n" + models_code += "\n" + + # Output Model + models_code += "class OutputModel(BaseModel):\n" + for output_name, output_data in cookiecutter_json["outputs"].items(): + # If it's a file path (Path), use Path directly + if output_data['python_type'] == 'Path': + python_type = 'Path' # Outputs are file paths, so we use Path + models_code += f" {output_name}: {python_type} = Field(..., description=\"{output_data['description']}\")\n" + else: + python_type = output_data['python_type'] # Use the specified type for other output fields + models_code += f" {output_name}: {python_type} = Field(..., description=\"{output_data['description']}\")\n" + models_code += "\n" + + return models_code + + +def generate_post_function(cookiecutter_json: dict) -> str: + """Generate the POST function using FastAPI, dynamically calling the specified function.""" + + # Extract function name from cookiecutter JSON data + function_name = cookiecutter_json["plugin_package"].split(".")[-1] + + # Start building the function signature for the POST request + post_code = f"@app.post(\"/process\", response_model=OutputModel)\n" + post_code += f"async def process_data(" + + # Add arguments dynamically based on input data + input_fields = cookiecutter_json["inputs"] + + # Loop through the inputs to determine whether they are files or other types + for input_name, input_data in input_fields.items(): + if input_data['python_type'] == 'Path': + # If the input type is Path, use UploadFile in the function signature + post_code += f"{input_name}: UploadFile = File(..., description=\"{input_data['description']}\")\n " + else: + # For other types, use Field to define them + post_code += f"{input_name}: {input_data['python_type']} = Field(..., description=\"{input_data['description']}\")\n " + + post_code = post_code.strip() # Remove the extra last newline + + # Closing the function definition + post_code += f"""):\n \"\"\"Process the input data and return the results.\"\"\"\n\n""" + + # Add logic to handle file processing + post_code += """ + # Handle the uploaded files (save them to disk) + input_data_dict = {} +""" + for input_name, input_data in input_fields.items(): + if input_data['python_type'] == 'Path': + # Handle saving the uploaded file to disk + post_code += f""" + if {input_name}: + file_content = await {input_name}.read() + file_path = Path.cwd() / {input_name}.filename + with open(file_path, 'wb') as f: + f.write(file_content) + input_data_dict["{input_name}"] = file_path +""" + + # Add the plugin function call, unpacking the input data + post_code += f""" + # Dynamically call the plugin function with unpacked input data + result = {function_name}(**input_data_dict) + + # Get the file extension of the input file + input_file_extension = {input_name}.filename.split('.')[-1] + + # Use glob to find files with the same extension as the input file + output_files = list(Path.cwd().rglob(f"*.{{input_file_extension}}")) + + # Use the first file found with the matching extension + output_file_path = output_files[0] + + # Get the MIME type of the output file + mime_type, _ = mimetypes.guess_type(output_file_path) + + # Serve the file as a downloadable file with proper headers + return FileResponse( + output_file_path, + media_type=mime_type or "application/octet-stream", # Fallback MIME type + headers={{"Content-Disposition": f"attachment; filename={{output_file_path.name}}"}} + ) + """ + + return post_code + + +def update_server_template(cookiecutter_json: dict) -> None: + """Update the server template with dynamically generated Pydantic models and POST function.""" + # Path to the template server.py file + server_template_path = Path("{{cookiecutter.container_name}}") / "src" / "{{cookiecutter.package_folders}}" / "server.py" + + # Generate the models and post function code + models_code = generate_pydantic_models(cookiecutter_json) + post_code = generate_post_function(cookiecutter_json) + + # Uvicorn startup code + uvicorn_code = """ +if __name__ == "__main__": + uvicorn.run("server:app", host="0.0.0.0", port=8000, reload=True) +""" + + # Open the server template and append the generated code + with server_template_path.open("a") as server_file: + server_file.write("\n\n# Generated Pydantic models\n") + server_file.write(models_code) + server_file.write("\n# Generated POST function\n") + server_file.write(post_code) + server_file.write("\n# Uvicorn startup code\n") + server_file.write(uvicorn_code) + + + +ADD_SOURCE = True +# no general way to determine from .cwl file if their is src code to be included +# or its just in some container on the web to be executed add_readme_options_dynamically(cookiecutter_data) add_ict_inputs_outputs_dynamically(cookiecutter_data) add_test_content(cookiecutter_data) if ADD_SOURCE: add_function_arguments_dynamically(cookiecutter_data) add_main_function_dynamically(cookiecutter_data) + update_server_template(cookiecutter_data) cookiecutter_data[ "base_command" ] = f"python3 -m {cookiecutter_data['plugin_package']}" diff --git a/utils/mm-python-template/read_cwl_inputs_outputs.py b/utils/mm-python-template/read_cwl_inputs_outputs.py index 675c17ae..c0f88f3c 100644 --- a/utils/mm-python-template/read_cwl_inputs_outputs.py +++ b/utils/mm-python-template/read_cwl_inputs_outputs.py @@ -155,7 +155,8 @@ def insert_inputs_outputs_cookiecutter( keys (List[str]): The keys to insert into the cookiecutter dictionary. """ # add base_command to cookiecutter - cookiecutter["base_command"] = transformed["base_command"] + if "base_command" in transformed: + cookiecutter["base_command"] = transformed["base_command"] # add arguments to cookiecutter if "arguments" in transformed: cookiecutter["arguments"] = transformed["arguments"] diff --git a/utils/mm-python-template/{{cookiecutter.container_name}}/src/{{cookiecutter.package_folders}}/server.py b/utils/mm-python-template/{{cookiecutter.container_name}}/src/{{cookiecutter.package_folders}}/server.py new file mode 100644 index 00000000..99ffd32c --- /dev/null +++ b/utils/mm-python-template/{{cookiecutter.container_name}}/src/{{cookiecutter.package_folders}}/server.py @@ -0,0 +1,13 @@ +from fastapi import FastAPI, UploadFile, File +from fastapi.responses import FileResponse +from pydantic import BaseModel, Field +from pathlib import Path +import uvicorn +import mimetypes + +from {{cookiecutter.plugin_package}}.{{cookiecutter.package_name}} import ( + {{cookiecutter.package_name}}, +) + +# Initialize the FastAPI app +app = FastAPI()