From 786326e597c3dd104b70fab1fb272c334dfd66d5 Mon Sep 17 00:00:00 2001 From: hhaensel <31985040+hhaensel@users.noreply.github.com> Date: Wed, 7 Feb 2024 07:29:26 +0100 Subject: [PATCH] implement message queue for WebChannels (#703) * implement message queue for WebChannels * Removed println debug --------- Co-authored-by: Adrian Salceanu --- src/WebChannels.jl | 58 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/src/WebChannels.jl b/src/WebChannels.jl index 9b620594d..65114ad5f 100755 --- a/src/WebChannels.jl +++ b/src/WebChannels.jl @@ -8,6 +8,10 @@ import Genie, Genie.Renderer const ClientId = UInt # web socket hash const ChannelName = String +const MESSAGE_QUEUE = Dict{UInt, Tuple{ + Channel{Tuple{String, Channel{Nothing}}}, + Task} +}() struct ChannelNotFoundException <: Exception name::ChannelName @@ -105,10 +109,14 @@ end Unsubscribes a web socket client `ws` from `channel`. """ function unsubscribe(ws::HTTP.WebSockets.WebSocket, channel::ChannelName) :: ChannelClientsCollection - haskey(CLIENTS, id(ws)) && deleteat!(CLIENTS[id(ws)].channels, CLIENTS[id(ws)].channels .== channel) - pop_subscription(id(ws), channel) + client = id(ws) + + haskey(CLIENTS, client) && deleteat!(CLIENTS[client].channels, CLIENTS[client].channels .== channel) + pop_subscription(client, channel) + delete_queue!(MESSAGE_QUEUE, client) - @debug "Unsubscribed: $(id(ws)) ($(Dates.now()))" + + @debug "Unsubscribed: $(client) ($(Dates.now()))" CLIENTS end function unsubscribe(channel_client::ChannelClient, channel::ChannelName) :: ChannelClientsCollection @@ -285,16 +293,52 @@ end """ Writes `msg` to web socket for `client`. """ +function message(client::ClientId, msg::String) + ws = Genie.WebChannels.CLIENTS[client].client + # setup a reply channel + myfuture = Channel{Nothing}(1) + + # retrieve the message queue or set it up if not present + q, _ = get!(MESSAGE_QUEUE, client) do + queue = Channel{Tuple{String, Channel{Nothing}}}(10) + handler = @async while true + message, future = take!(queue) + try + Sockets.send(ws, message) + finally + put!(future, nothing) + end + end + queue, handler + end + + put!(q, (msg, myfuture)) + + take!(myfuture) # Wait until the message is processed +end +function message(client::ChannelClient, msg::String) :: Int + message(client.client, msg) +end function message(ws::HTTP.WebSockets.WebSocket, msg::String) :: Int + message(id(ws), msg) +end + +function message_unsafe(ws::HTTP.WebSockets.WebSocket, msg::String) :: Int Sockets.send(ws, msg) end -function message(client::ClientId, msg::String) :: Int - message(CLIENTS[client].client, msg) +function message_unsafe(client::ClientId, msg::String) :: Int + message_unsafe(CLIENTS[client].client, msg) end -function message(client::ChannelClient, msg::String) :: Int - message(client.client, msg) +function message_unsafe(client::ChannelClient, msg::String) :: Int + message_unsafe(client.client, msg) end +function delete_queue!(d::Dict, client::UInt) + queue, handler = pop!(MESSAGE_QUEUE, client, (nothing, nothing)) + if queue !== nothing + @async Base.throwto(handler, InterruptException()) + end +end """ Encodes `msg` in Base64 and tags it with `Genie.config.webchannels_base64_marker`.