diff --git a/src/evaluation/Run.jl b/src/evaluation/Run.jl index 783dc7a434..81be9f736b 100644 --- a/src/evaluation/Run.jl +++ b/src/evaluation/Run.jl @@ -1,5 +1,6 @@ import REPL: ends_with_semicolon import .Configuration +import .Throttled import ExpressionExplorer: is_joined_funcname """ @@ -119,10 +120,13 @@ function run_reactive_core!( # Save the notebook. In most cases, this is the only time that we save the notebook, so any state changes that influence the file contents (like `depends_on_disabled_cells`) should be behind this point. (More saves might happen if a macro expansion or package using happens.) save && save_notebook(session, notebook) - + # Send intermediate updates to the clients at most 20 times / second during a reactive run. (The effective speed of a slider is still unbounded, because the last update is not throttled.) # flush_send_notebook_changes_throttled, - send_notebook_changes_throttled, flush_notebook_changes = throttled(1.0 / 20; runtime_multiplier=2.0) do + send_notebook_changes_throttled = Throttled.throttled(1.0 / 20; runtime_multiplier=2.0) do + # We will do a state sync now, so that means that we can delay the status_tree state sync loop, see https://github.com/fonsp/Pluto.jl/issues/2978 + Throttled.force_throttle_without_run(notebook.status_tree.update_listener_ref[]) + # State sync: send_notebook_changes!(ClientRequest(; session, notebook)) end send_notebook_changes_throttled() @@ -229,8 +233,8 @@ function run_reactive_core!( end notebook.wants_to_interrupt = false - flush_notebook_changes() Status.report_business_finished!(run_status) + flush(send_notebook_changes_throttled) return new_order end diff --git a/src/evaluation/Throttled.jl b/src/evaluation/Throttled.jl index 71a96d9980..43c49aa447 100644 --- a/src/evaluation/Throttled.jl +++ b/src/evaluation/Throttled.jl @@ -1,5 +1,55 @@ +module Throttled + import Base.Threads + +struct ThrottledFunction + f::Function + timeout::Real + runtime_multiplier::Float64 + tlock::ReentrantLock + iscoolnow::Ref{Bool} + run_later::Ref{Bool} + last_runtime::Ref{Float64} +end + +"Run the function now" +function Base.flush(tf::ThrottledFunction) + lock(tf.tlock) do + tf.run_later[] = false + tf.last_runtime[] = @elapsed result = tf.f() + result + end +end + +"Start the cooldown period. If at the end, a run_later[] is set, then we run the function and schedule the next cooldown period." +function schedule(tf::ThrottledFunction) + # if the last runtime was quite long, increase the sleep period to match. + Timer(tf.timeout + tf.last_runtime[] * tf.runtime_multiplier) do _t + if tf.run_later[] + flush(tf) + schedule(tf) + else + tf.iscoolnow[] = true + end + end +end + +function (tf::ThrottledFunction)() + if tf.iscoolnow[] + tf.iscoolnow[] = false + flush(tf) + schedule(tf) + else + tf.run_later[] = true + end + nothing +end + + + + + """ throttled(f::Function, timeout::Real) @@ -19,41 +69,30 @@ function throttled(f::Function, timeout::Real; runtime_multiplier::Float64=0.0) run_later = Ref(false) last_runtime = Ref(0.0) - function flush() - lock(tlock) do - run_later[] = false - last_runtime[] = @elapsed result = f() - result - end - end - - function schedule() - # if the last runtime was quite long, increase the sleep period to match. - Timer(timeout + last_runtime[] * runtime_multiplier) do _t - if run_later[] - flush() - schedule() - else - iscoolnow[] = true - end - end - end + tf = ThrottledFunction(f, timeout, runtime_multiplier, tlock, iscoolnow, run_later, last_runtime) + # we initialize hot, and start the cooldown period immediately - schedule() + schedule(tf) + + return tf +end - function throttled_f() - if iscoolnow[] - iscoolnow[] = false - flush() - schedule() - else - run_later[] = true - end - end +""" +Given a throttled function, skip any pending run if hot (but let the cooldown period continue), or start the cooldown period if cool. This forces the throttled function to not fire for a little while. - return throttled_f, flush +Argument should be the first function returned by `throttled`. +""" +function force_throttle_without_run(tf::ThrottledFunction) + # (we can access variables from the function closure hihi) + tf.run_later[] = false + if tf.iscoolnow[] + tf.iscoolnow[] = false + schedule(tf) + end end +force_throttle_without_run(::Function) = nothing + """ simple_leading_throttle(f, delay::Real) @@ -74,4 +113,6 @@ function simple_leading_throttle(f, delay::Real) f(args...;kwargs...) end end +end + end \ No newline at end of file diff --git a/src/evaluation/WorkspaceManager.jl b/src/evaluation/WorkspaceManager.jl index 9183fc55cc..3308fbf7e8 100644 --- a/src/evaluation/WorkspaceManager.jl +++ b/src/evaluation/WorkspaceManager.jl @@ -181,7 +181,7 @@ function start_relaying_self_updates((session, notebook)::SN, run_channel) end function start_relaying_logs((session, notebook)::SN, log_channel) - update_throttled, flush_throttled = Pluto.throttled(0.1) do + update_throttled = Pluto.Throttled.throttled(0.1) do Pluto.send_notebook_changes!(Pluto.ClientRequest(; session, notebook)) end diff --git a/src/webserver/Authentication.jl b/src/webserver/Authentication.jl index 45644703d5..146ce2fea1 100644 --- a/src/webserver/Authentication.jl +++ b/src/webserver/Authentication.jl @@ -1,3 +1,4 @@ +import .Throttled """ Return whether the `request` was authenticated in one of two ways: @@ -29,7 +30,7 @@ function is_authenticated(session::ServerSession, request::HTTP.Request) end # Function to log the url with secret on the Julia CLI when a request comes to the server without the secret. Executes at most once every 5 seconds -const log_secret_throttled = simple_leading_throttle(5) do session::ServerSession, request::HTTP.Request +const log_secret_throttled = Throttled.simple_leading_throttle(5) do session::ServerSession, request::HTTP.Request host = HTTP.header(request, "Host") target = request.target url = Text(string(HTTP.URI(HTTP.URI("http://$host/"); query=Dict("secret" => session.secret)))) diff --git a/src/webserver/Dynamic.jl b/src/webserver/Dynamic.jl index a0ef4bbcd1..fbaf9ba8b5 100644 --- a/src/webserver/Dynamic.jl +++ b/src/webserver/Dynamic.jl @@ -414,7 +414,7 @@ end responses[:reset_shared_state] = function response_reset_shared_state(🙋::ClientRequest) delete!(current_state_for_clients, 🙋.initiator.client) - send_notebook_changes!(🙋; commentary=Dict(:from_reset => true)) + send_notebook_changes!(🙋; commentary=Dict(:from_reset => true)) end responses[:run_multiple_cells] = function response_run_multiple_cells(🙋::ClientRequest) diff --git a/src/webserver/SessionActions.jl b/src/webserver/SessionActions.jl index 75f35d1702..b3d01b89bc 100644 --- a/src/webserver/SessionActions.jl +++ b/src/webserver/SessionActions.jl @@ -1,6 +1,6 @@ module SessionActions -import ..Pluto: Pluto, Status, ServerSession, Notebook, Cell, emptynotebook, tamepath, new_notebooks_directory, without_pluto_file_extension, numbered_until_new, cutename, readwrite, update_save_run!, update_nbpkg_cache!, update_from_file, wait_until_file_unchanged, putnotebookupdates!, putplutoupdates!, load_notebook, clientupdate_notebook_list, WorkspaceManager, try_event_call, NewNotebookEvent, OpenNotebookEvent, ShutdownNotebookEvent, @asynclog, ProcessStatus, maybe_convert_path_to_wsl, move_notebook!, throttled +import ..Pluto: Pluto, Status, ServerSession, Notebook, Cell, emptynotebook, tamepath, new_notebooks_directory, without_pluto_file_extension, numbered_until_new, cutename, readwrite, update_save_run!, update_nbpkg_cache!, update_from_file, wait_until_file_unchanged, putnotebookupdates!, putplutoupdates!, load_notebook, clientupdate_notebook_list, WorkspaceManager, try_event_call, NewNotebookEvent, OpenNotebookEvent, ShutdownNotebookEvent, @asynclog, ProcessStatus, maybe_convert_path_to_wsl, move_notebook!, Throttled using FileWatching import ..Pluto.DownloadCool: download_cool import HTTP @@ -186,10 +186,9 @@ function add(session::ServerSession, notebook::Notebook; run_async::Bool=true) end end - notebook.status_tree.update_listener_ref[] = first(throttled(1.0 / 5; runtime_multiplier=2.0) do - # TODO: this throttle should be trailing + notebook.status_tree.update_listener_ref[] = Throttled.throttled(1.0 / 8; runtime_multiplier=4.0) do Pluto.send_notebook_changes!(Pluto.ClientRequest(; session, notebook)) - end) + end return notebook end diff --git a/src/webserver/Status.jl b/src/webserver/Status.jl index 4c37c15de8..72bfc023c5 100644 --- a/src/webserver/Status.jl +++ b/src/webserver/Status.jl @@ -8,7 +8,7 @@ Base.@kwdef mutable struct Business started_at::Union{Nothing,Float64}=nothing finished_at::Union{Nothing,Float64}=nothing subtasks::Dict{Symbol,Business}=Dict{Symbol,Business}() - update_listener_ref::Ref{Function}=Ref{Function}(_default_update_listener) + update_listener_ref::Ref{Any}=Ref{Any}(_default_update_listener) lock::Threads.SpinLock=Threads.SpinLock() end diff --git a/test/Throttled.jl b/test/Throttled.jl index 17d0e604b6..b04c9e2732 100644 --- a/test/Throttled.jl +++ b/test/Throttled.jl @@ -1,4 +1,4 @@ -import Pluto:throttled +import Pluto: Throttled using Pluto.WorkspaceManager: poll @testset "Throttled" begin @@ -12,7 +12,7 @@ using Pluto.WorkspaceManager: poll @test x[] == 1 dt = 4 / 100 - ft, flush = throttled(f, dt) + ft = Throttled.throttled(f, dt) for x in 1:10 ft() @@ -82,7 +82,7 @@ using Pluto.WorkspaceManager: poll ft() ft() @test x[] == 12 - flush() + flush(ft) @test x[] == 13 sleep(2dt) @test x[] == 13 @@ -108,6 +108,31 @@ using Pluto.WorkspaceManager: poll sleep(2dt) #### + x[] = 0 + Throttled.force_throttle_without_run(ft) + @test x[] == 0 + ft() + @test x[] == 0 + sleep(.1dt) + @test x[] == 0 + sleep(2dt) + @test x[] == 1 + ft() + @test x[] == 2 + sleep(.1dt) + ft() + Throttled.force_throttle_without_run(ft) + @test x[] == 2 + sleep(2dt) + @test x[] == 2 + + + ft() + @test x[] == 3 + sleep(2dt) + + #### + end \ No newline at end of file