Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow using a custom Process class #80

Open
ShakedDovrat opened this issue Jan 5, 2022 · 4 comments
Open

Allow using a custom Process class #80

ShakedDovrat opened this issue Jan 5, 2022 · 4 comments
Labels
enhancement New feature or request

Comments

@ShakedDovrat
Copy link

Thank you for creating this great package.

I would like to create a pipeline where some of the stages use PyTorch (with GPU usage). PyTorch cannot access the GPU from inside a multiprocessing.Process subprocess. For that reason PyTorch includes a torch.multiprocessing.Process class which has the same API as multiprocessing.Process.

I would like the ability to use a custom Process class instead of the default multiprocessing.Process, so I can use PyTorch in the pipeline. Without it I'm afraid pypeln is unusable to me.

For instance, add an optional process_class arguement to map (and other functions) with a default value multiprocessing.Process.

Alternatively, maybe there's a walkaround for what I need that I'm unaware of. In that case, please let me know.

@ShakedDovrat ShakedDovrat added the enhancement New feature or request label Jan 5, 2022
@cgarciae
Copy link
Owner

cgarciae commented Jan 5, 2022

Hey @ShakedDovrat! I do believe we can expose a config option to let users specify which Process class they wants to use. Currently there is a use_threads flag which changes a multiprocessing.Process to a multithreading.Thread since they follow the same API, maybe if we add a worker_class option we could make it more general. I see two ways of doing this:

1. Add the option to all API functions

The workers are initialized here:

def start_workers(

They this is called by the start method here:
def start(self):

To get the information you need to add the worker_class field here:
process_fn: ProcessFn
index: int
timeout: float
stage_params: StageParams
main_queue: IterableQueue
on_start: tp.Optional[tp.Callable[..., Kwargs]]
on_done: tp.Optional[tp.Callable[..., Kwargs]]
use_threads: bool
f_args: tp.List[str]
namespace: utils.Namespace = field(
default_factory=lambda: utils.Namespace(done=False, task_start_time=None)
)
process: tp.Optional[tp.Union[multiprocessing.Process, threading.Thread]] = None

and here:
process_fn: ProcessFn
workers: int
maxsize: int
total_sources: int
timeout: float
dependencies: tp.List["Stage"]
on_start: tp.Optional[tp.Callable[..., Kwargs]]
on_done: tp.Optional[tp.Callable[..., Kwargs]]
use_threads: bool
f_args: tp.List[str]

After that you have add this to all public functions that want to use this.

2. Add a context manager

This simplifies a lot of stuff since during start_workers you would just have to check that a global variable is set or not and use it. The API could look like this:

with pl.process.config(worker_class=torch.multiprocessing.Process):
   # run your pipeline here

@cgarciae
Copy link
Owner

cgarciae commented Jan 5, 2022

Option 2 sounds way easier to implement but sets all workers to the same class (which I think is probably what you want 99% of the time), the other method is more general but requires the user to specify the class per stage which can be tedious.

@ShakedDovrat
Copy link
Author

Thank you @cgarciae! I will look into it.

@cgarciae
Copy link
Owner

cgarciae commented Jan 6, 2022

BTW: you might want to check https://github.com/pytorch/data

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants