Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

give option to only use main thread #4809

Merged
merged 6 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 31 additions & 31 deletions reflex/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
import inspect
import io
import json
import multiprocessing
import platform
import sys
import traceback
from datetime import datetime
from pathlib import Path
from timeit import default_timer as timer
from types import SimpleNamespace
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -72,7 +71,7 @@
from reflex.components.core.sticky import sticky
from reflex.components.core.upload import Upload, get_upload_dir
from reflex.components.radix import themes
from reflex.config import environment, get_config
from reflex.config import ExecutorType, environment, get_config
from reflex.event import (
_EVENT_FIELDS,
Event,
Expand Down Expand Up @@ -1061,10 +1060,23 @@ def memoized_toast_provider():
app_wrappers[(1, "ToasterProvider")] = toast_provider

with console.timing("Evaluate Pages (Frontend)"):
performance_metrics: list[tuple[str, float]] = []
for route in self._unevaluated_pages:
console.debug(f"Evaluating page: {route}")
start = timer()
self._compile_page(route, save_page=should_compile)
end = timer()
performance_metrics.append((route, end - start))
progress.advance(task)
console.debug(
"Slowest pages:\n"
+ "\n".join(
f"{route}: {time * 1000:.1f}ms"
for route, time in sorted(
performance_metrics, key=lambda x: x[1], reverse=True
)[:10]
)
)

# Add the optional endpoints (_upload)
self._add_optional_endpoints()
Expand All @@ -1077,7 +1089,7 @@ def memoized_toast_provider():
progress.advance(task)

# Store the compile results.
compile_results = []
compile_results: list[tuple[str, str]] = []

progress.advance(task)

Expand Down Expand Up @@ -1156,33 +1168,19 @@ def memoized_toast_provider():
),
)

# Use a forking process pool, if possible. Much faster, especially for large sites.
# Fallback to ThreadPoolExecutor as something that will always work.
executor = None
if (
platform.system() in ("Linux", "Darwin")
and (number_of_processes := environment.REFLEX_COMPILE_PROCESSES.get())
is not None
):
executor = concurrent.futures.ProcessPoolExecutor(
max_workers=number_of_processes or None,
mp_context=multiprocessing.get_context("fork"),
)
else:
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=environment.REFLEX_COMPILE_THREADS.get() or None
)
executor = ExecutorType.get_executor_from_environment()

for route, component in zip(self._pages, page_components, strict=True):
ExecutorSafeFunctions.COMPONENTS[route] = component

ExecutorSafeFunctions.STATE = self._state

with executor:
result_futures = []
with console.timing("Compile to Javascript"), executor as executor:
result_futures: list[concurrent.futures.Future[tuple[str, str]]] = []

def _submit_work(fn: Callable, *args, **kwargs):
def _submit_work(fn: Callable[..., tuple[str, str]], *args, **kwargs):
f = executor.submit(fn, *args, **kwargs)
f.add_done_callback(lambda _: progress.advance(task))
result_futures.append(f)

# Compile the pre-compiled pages.
Expand All @@ -1208,10 +1206,10 @@ def _submit_work(fn: Callable, *args, **kwargs):
_submit_work(compiler.remove_tailwind_from_postcss)

# Wait for all compilation tasks to complete.
with console.timing("Compile to Javascript"):
for future in concurrent.futures.as_completed(result_futures):
compile_results.append(future.result())
progress.advance(task)
compile_results.extend(
future.result()
for future in concurrent.futures.as_completed(result_futures)
)

app_root = self._app_root(app_wrappers=app_wrappers)

Expand All @@ -1236,10 +1234,12 @@ def _submit_work(fn: Callable, *args, **kwargs):
progress.advance(task)

# Compile custom components.
*custom_components_result, custom_components_imports = (
compiler.compile_components(custom_components)
)
compile_results.append(custom_components_result)
(
custom_components_output,
custom_components_result,
custom_components_imports,
) = compiler.compile_components(custom_components)
compile_results.append((custom_components_output, custom_components_result))
all_imports.update(custom_components_imports)

progress.advance(task)
Expand Down
2 changes: 1 addition & 1 deletion reflex/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ def compile_tailwind(
The compiled Tailwind config.
"""
# Get the path for the output file.
output_path = get_web_dir() / constants.Tailwind.CONFIG
output_path = str((get_web_dir() / constants.Tailwind.CONFIG).absolute())

# Compile the config.
code = _compile_tailwind(config)
Expand Down
95 changes: 95 additions & 0 deletions reflex/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

from __future__ import annotations

import concurrent.futures
import dataclasses
import enum
import importlib
import inspect
import multiprocessing
import os
import platform
import sys
import threading
import urllib.parse
Expand All @@ -16,6 +19,7 @@
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generic,
List,
Expand Down Expand Up @@ -481,6 +485,95 @@ class PerformanceMode(enum.Enum):
OFF = "off"


class ExecutorType(enum.Enum):
"""Executor for compiling the frontend."""

THREAD = "thread"
PROCESS = "process"
MAIN_THREAD = "main_thread"

@classmethod
def get_executor_from_environment(cls):
"""Get the executor based on the environment variables.

Returns:
The executor.
"""
executor_type = environment.REFLEX_COMPILE_EXECUTOR.get()

reflex_compile_processes = environment.REFLEX_COMPILE_PROCESSES.get()
reflex_compile_threads = environment.REFLEX_COMPILE_THREADS.get()
# By default, use the main thread. Unless the user has specified a different executor.
# Using a process pool is much faster, but not supported on all platforms. It's gated behind a flag.
if executor_type is None:
if (
platform.system() not in ("Linux", "Darwin")
and reflex_compile_processes is not None
):
console.warn("Multiprocessing is only supported on Linux and MacOS.")

if (
platform.system() in ("Linux", "Darwin")
and reflex_compile_processes is not None
):
if reflex_compile_processes == 0:
console.warn(
"Number of processes must be greater than 0. If you want to use the default number of processes, set REFLEX_COMPILE_EXECUTOR to 'process'. Defaulting to None."
)
reflex_compile_processes = None
elif reflex_compile_processes < 0:
console.warn(
"Number of processes must be greater than 0. Defaulting to None."
)
reflex_compile_processes = None
executor_type = ExecutorType.PROCESS
elif reflex_compile_threads is not None:
if reflex_compile_threads == 0:
console.warn(
"Number of threads must be greater than 0. If you want to use the default number of threads, set REFLEX_COMPILE_EXECUTOR to 'thread'. Defaulting to None."
)
reflex_compile_threads = None
elif reflex_compile_threads < 0:
console.warn(
"Number of threads must be greater than 0. Defaulting to None."
)
reflex_compile_threads = None
executor_type = ExecutorType.THREAD
else:
executor_type = ExecutorType.MAIN_THREAD

match executor_type:
case ExecutorType.PROCESS:
executor = concurrent.futures.ProcessPoolExecutor(
max_workers=reflex_compile_processes,
mp_context=multiprocessing.get_context("fork"),
)
case ExecutorType.THREAD:
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=reflex_compile_threads
)
case ExecutorType.MAIN_THREAD:
FUTURE_RESULT_TYPE = TypeVar("FUTURE_RESULT_TYPE")

class MainThreadExecutor:
def __enter__(self):
return self

def __exit__(self, *args):
pass

def submit(
self, fn: Callable[..., FUTURE_RESULT_TYPE], *args, **kwargs
) -> concurrent.futures.Future[FUTURE_RESULT_TYPE]:
future_job = concurrent.futures.Future()
future_job.set_result(fn(*args, **kwargs))
return future_job

executor = MainThreadExecutor()

return executor


class EnvironmentVariables:
"""Environment variables class to instantiate environment variables."""

Expand Down Expand Up @@ -522,6 +615,8 @@ class EnvironmentVariables:
Path(constants.Dirs.UPLOADED_FILES)
)

REFLEX_COMPILE_EXECUTOR: EnvVar[Optional[ExecutorType]] = env_var(None)

# Whether to use separate processes to compile the frontend and how many. If not set, defaults to thread executor.
REFLEX_COMPILE_PROCESSES: EnvVar[Optional[int]] = env_var(None)

Expand Down
Loading