Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
essenciary committed Feb 7, 2024
2 parents d4db5e7 + 786326e commit 5c395dc
Showing 1 changed file with 51 additions and 7 deletions.
58 changes: 51 additions & 7 deletions src/WebChannels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import Genie, Genie.Renderer

const ClientId = UInt # web socket hash
const ChannelName = String
const MESSAGE_QUEUE = Dict{UInt, Tuple{
Channel{Tuple{String, Channel{Nothing}}},
Task}
}()

struct ChannelNotFoundException <: Exception
name::ChannelName
Expand Down Expand Up @@ -105,10 +109,14 @@ end
Unsubscribes a web socket client `ws` from `channel`.
"""
function unsubscribe(ws::HTTP.WebSockets.WebSocket, channel::ChannelName) :: ChannelClientsCollection
haskey(CLIENTS, id(ws)) && deleteat!(CLIENTS[id(ws)].channels, CLIENTS[id(ws)].channels .== channel)
pop_subscription(id(ws), channel)
client = id(ws)

haskey(CLIENTS, client) && deleteat!(CLIENTS[client].channels, CLIENTS[client].channels .== channel)
pop_subscription(client, channel)
delete_queue!(MESSAGE_QUEUE, client)

@debug "Unsubscribed: $(id(ws)) ($(Dates.now()))"

@debug "Unsubscribed: $(client) ($(Dates.now()))"
CLIENTS
end
function unsubscribe(channel_client::ChannelClient, channel::ChannelName) :: ChannelClientsCollection
Expand Down Expand Up @@ -285,16 +293,52 @@ end
"""
Writes `msg` to web socket for `client`.
"""
function message(client::ClientId, msg::String)
ws = Genie.WebChannels.CLIENTS[client].client
# setup a reply channel
myfuture = Channel{Nothing}(1)

# retrieve the message queue or set it up if not present
q, _ = get!(MESSAGE_QUEUE, client) do
queue = Channel{Tuple{String, Channel{Nothing}}}(10)
handler = @async while true
message, future = take!(queue)
try
Sockets.send(ws, message)
finally
put!(future, nothing)
end
end
queue, handler
end

put!(q, (msg, myfuture))

take!(myfuture) # Wait until the message is processed
end
function message(client::ChannelClient, msg::String) :: Int
message(client.client, msg)
end
function message(ws::HTTP.WebSockets.WebSocket, msg::String) :: Int
message(id(ws), msg)
end

function message_unsafe(ws::HTTP.WebSockets.WebSocket, msg::String) :: Int
Sockets.send(ws, msg)
end
function message(client::ClientId, msg::String) :: Int
message(CLIENTS[client].client, msg)
function message_unsafe(client::ClientId, msg::String) :: Int
message_unsafe(CLIENTS[client].client, msg)
end
function message(client::ChannelClient, msg::String) :: Int
message(client.client, msg)
function message_unsafe(client::ChannelClient, msg::String) :: Int
message_unsafe(client.client, msg)
end

function delete_queue!(d::Dict, client::UInt)
queue, handler = pop!(MESSAGE_QUEUE, client, (nothing, nothing))
if queue !== nothing
@async Base.throwto(handler, InterruptException())
end
end

"""
Encodes `msg` in Base64 and tags it with `Genie.config.webchannels_base64_marker`.
Expand Down

0 comments on commit 5c395dc

Please sign in to comment.