diff --git a/reflex/app.py b/reflex/app.py index d290b8f49f..16d20eb4a2 100644 --- a/reflex/app.py +++ b/reflex/app.py @@ -11,12 +11,11 @@ import inspect import io import json -import multiprocessing -import platform import sys import traceback from datetime import datetime from pathlib import Path +from timeit import default_timer as timer from types import SimpleNamespace from typing import ( TYPE_CHECKING, @@ -72,7 +71,7 @@ from reflex.components.core.sticky import sticky from reflex.components.core.upload import Upload, get_upload_dir from reflex.components.radix import themes -from reflex.config import environment, get_config +from reflex.config import ExecutorType, environment, get_config from reflex.event import ( _EVENT_FIELDS, Event, @@ -1061,10 +1060,23 @@ def memoized_toast_provider(): app_wrappers[(1, "ToasterProvider")] = toast_provider with console.timing("Evaluate Pages (Frontend)"): + performance_metrics: list[tuple[str, float]] = [] for route in self._unevaluated_pages: console.debug(f"Evaluating page: {route}") + start = timer() self._compile_page(route, save_page=should_compile) + end = timer() + performance_metrics.append((route, end - start)) progress.advance(task) + console.debug( + "Slowest pages:\n" + + "\n".join( + f"{route}: {time * 1000:.1f}ms" + for route, time in sorted( + performance_metrics, key=lambda x: x[1], reverse=True + )[:10] + ) + ) # Add the optional endpoints (_upload) self._add_optional_endpoints() @@ -1077,7 +1089,7 @@ def memoized_toast_provider(): progress.advance(task) # Store the compile results. - compile_results = [] + compile_results: list[tuple[str, str]] = [] progress.advance(task) @@ -1156,33 +1168,19 @@ def memoized_toast_provider(): ), ) - # Use a forking process pool, if possible. Much faster, especially for large sites. - # Fallback to ThreadPoolExecutor as something that will always work. - executor = None - if ( - platform.system() in ("Linux", "Darwin") - and (number_of_processes := environment.REFLEX_COMPILE_PROCESSES.get()) - is not None - ): - executor = concurrent.futures.ProcessPoolExecutor( - max_workers=number_of_processes or None, - mp_context=multiprocessing.get_context("fork"), - ) - else: - executor = concurrent.futures.ThreadPoolExecutor( - max_workers=environment.REFLEX_COMPILE_THREADS.get() or None - ) + executor = ExecutorType.get_executor_from_environment() for route, component in zip(self._pages, page_components, strict=True): ExecutorSafeFunctions.COMPONENTS[route] = component ExecutorSafeFunctions.STATE = self._state - with executor: - result_futures = [] + with console.timing("Compile to Javascript"), executor as executor: + result_futures: list[concurrent.futures.Future[tuple[str, str]]] = [] - def _submit_work(fn: Callable, *args, **kwargs): + def _submit_work(fn: Callable[..., tuple[str, str]], *args, **kwargs): f = executor.submit(fn, *args, **kwargs) + f.add_done_callback(lambda _: progress.advance(task)) result_futures.append(f) # Compile the pre-compiled pages. @@ -1208,10 +1206,10 @@ def _submit_work(fn: Callable, *args, **kwargs): _submit_work(compiler.remove_tailwind_from_postcss) # Wait for all compilation tasks to complete. - with console.timing("Compile to Javascript"): - for future in concurrent.futures.as_completed(result_futures): - compile_results.append(future.result()) - progress.advance(task) + compile_results.extend( + future.result() + for future in concurrent.futures.as_completed(result_futures) + ) app_root = self._app_root(app_wrappers=app_wrappers) @@ -1236,10 +1234,12 @@ def _submit_work(fn: Callable, *args, **kwargs): progress.advance(task) # Compile custom components. - *custom_components_result, custom_components_imports = ( - compiler.compile_components(custom_components) - ) - compile_results.append(custom_components_result) + ( + custom_components_output, + custom_components_result, + custom_components_imports, + ) = compiler.compile_components(custom_components) + compile_results.append((custom_components_output, custom_components_result)) all_imports.update(custom_components_imports) progress.advance(task) diff --git a/reflex/compiler/compiler.py b/reflex/compiler/compiler.py index 7cd87fb71a..577d533a05 100644 --- a/reflex/compiler/compiler.py +++ b/reflex/compiler/compiler.py @@ -508,7 +508,7 @@ def compile_tailwind( The compiled Tailwind config. """ # Get the path for the output file. - output_path = get_web_dir() / constants.Tailwind.CONFIG + output_path = str((get_web_dir() / constants.Tailwind.CONFIG).absolute()) # Compile the config. code = _compile_tailwind(config) diff --git a/reflex/config.py b/reflex/config.py index 21614b9b1a..abe46bdd54 100644 --- a/reflex/config.py +++ b/reflex/config.py @@ -2,11 +2,14 @@ from __future__ import annotations +import concurrent.futures import dataclasses import enum import importlib import inspect +import multiprocessing import os +import platform import sys import threading import urllib.parse @@ -16,6 +19,7 @@ from typing import ( TYPE_CHECKING, Any, + Callable, Dict, Generic, List, @@ -481,6 +485,95 @@ class PerformanceMode(enum.Enum): OFF = "off" +class ExecutorType(enum.Enum): + """Executor for compiling the frontend.""" + + THREAD = "thread" + PROCESS = "process" + MAIN_THREAD = "main_thread" + + @classmethod + def get_executor_from_environment(cls): + """Get the executor based on the environment variables. + + Returns: + The executor. + """ + executor_type = environment.REFLEX_COMPILE_EXECUTOR.get() + + reflex_compile_processes = environment.REFLEX_COMPILE_PROCESSES.get() + reflex_compile_threads = environment.REFLEX_COMPILE_THREADS.get() + # By default, use the main thread. Unless the user has specified a different executor. + # Using a process pool is much faster, but not supported on all platforms. It's gated behind a flag. + if executor_type is None: + if ( + platform.system() not in ("Linux", "Darwin") + and reflex_compile_processes is not None + ): + console.warn("Multiprocessing is only supported on Linux and MacOS.") + + if ( + platform.system() in ("Linux", "Darwin") + and reflex_compile_processes is not None + ): + if reflex_compile_processes == 0: + console.warn( + "Number of processes must be greater than 0. If you want to use the default number of processes, set REFLEX_COMPILE_EXECUTOR to 'process'. Defaulting to None." + ) + reflex_compile_processes = None + elif reflex_compile_processes < 0: + console.warn( + "Number of processes must be greater than 0. Defaulting to None." + ) + reflex_compile_processes = None + executor_type = ExecutorType.PROCESS + elif reflex_compile_threads is not None: + if reflex_compile_threads == 0: + console.warn( + "Number of threads must be greater than 0. If you want to use the default number of threads, set REFLEX_COMPILE_EXECUTOR to 'thread'. Defaulting to None." + ) + reflex_compile_threads = None + elif reflex_compile_threads < 0: + console.warn( + "Number of threads must be greater than 0. Defaulting to None." + ) + reflex_compile_threads = None + executor_type = ExecutorType.THREAD + else: + executor_type = ExecutorType.MAIN_THREAD + + match executor_type: + case ExecutorType.PROCESS: + executor = concurrent.futures.ProcessPoolExecutor( + max_workers=reflex_compile_processes, + mp_context=multiprocessing.get_context("fork"), + ) + case ExecutorType.THREAD: + executor = concurrent.futures.ThreadPoolExecutor( + max_workers=reflex_compile_threads + ) + case ExecutorType.MAIN_THREAD: + FUTURE_RESULT_TYPE = TypeVar("FUTURE_RESULT_TYPE") + + class MainThreadExecutor: + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + def submit( + self, fn: Callable[..., FUTURE_RESULT_TYPE], *args, **kwargs + ) -> concurrent.futures.Future[FUTURE_RESULT_TYPE]: + future_job = concurrent.futures.Future() + future_job.set_result(fn(*args, **kwargs)) + return future_job + + executor = MainThreadExecutor() + + return executor + + class EnvironmentVariables: """Environment variables class to instantiate environment variables.""" @@ -522,6 +615,8 @@ class EnvironmentVariables: Path(constants.Dirs.UPLOADED_FILES) ) + REFLEX_COMPILE_EXECUTOR: EnvVar[Optional[ExecutorType]] = env_var(None) + # Whether to use separate processes to compile the frontend and how many. If not set, defaults to thread executor. REFLEX_COMPILE_PROCESSES: EnvVar[Optional[int]] = env_var(None)