Skip to content

Commit

Permalink
Create heartbeat and context in init.jl
Browse files Browse the repository at this point in the history
  • Loading branch information
halleysfifthinc committed Dec 18, 2024
1 parent f5b8fbe commit 3285719
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/handlers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ end

function shutdown_request(socket, msg)
# stop heartbeat thread by closing the context
close(zmq_proxy_context[])
close(heartbeat_context[])

send_ipython(requests[], msg_reply(msg, "shutdown_reply",
msg.content))
Expand Down
14 changes: 5 additions & 9 deletions src/heartbeat.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,9 @@
import Libdl

const threadid = zeros(Int, 128) # sizeof(uv_thread_t) <= 8 on Linux, OSX, Win
const zmq_proxy_context = Ref{Context}()

# entry point for new thread
function heartbeat_thread(heartbeat_addr::Cstring)
zmq_proxy_context[] = Context()
heartbeat = Socket(zmq_proxy_context[], ROUTER)
GC.@preserve heartbeat_addr bind(heartbeat, unsafe_string(heartbeat_addr))
function heartbeat_thread(heartbeat::Ptr{Cvoid})
@static if VERSION v"1.9.0-DEV.1588" # julia#46609
# julia automatically "adopts" this thread because
# we entered a Julia cfunction. We then have to enable
Expand All @@ -30,8 +26,8 @@ function heartbeat_thread(heartbeat_addr::Cstring)
return ret
end

function start_heartbeat(heartbeat_addr)
heartbeat_c = @cfunction(heartbeat_thread, Cint, (Cstring,))
ccall(:uv_thread_create, Cint, (Ptr{Int}, Ptr{Cvoid}, Cstring),
threadid, heartbeat_c, heartbeat_addr)
function start_heartbeat(heartbeat)
heartbeat_c = @cfunction(heartbeat_thread, Cint, (Ptr{Cvoid},))
ccall(:uv_thread_create, Cint, (Ptr{Int}, Ptr{Cvoid}, Ptr{Cvoid}),
threadid, heartbeat_c, heartbeat)
end
7 changes: 6 additions & 1 deletion src/init.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const publish = Ref{Socket}()
const raw_input = Ref{Socket}()
const requests = Ref{Socket}()
const control = Ref{Socket}()
const heartbeat = Ref{Socket}()
const heartbeat_context = Ref{Context}()
const profile = Dict{String,Any}()
const read_stdout = Ref{Base.PipeEndpoint}()
const read_stderr = Ref{Base.PipeEndpoint}()
Expand Down Expand Up @@ -86,19 +88,22 @@ function init(args)
raw_input[] = Socket(ROUTER)
requests[] = Socket(ROUTER)
control[] = Socket(ROUTER)
heartbeat_context[] = Context()
heartbeat = Socket(heartbeat_context[], ROUTER)
sep = profile["transport"]=="ipc" ? "-" : ":"
bind(publish[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["iopub_port"])")
bind(requests[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["shell_port"])")
bind(control[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["control_port"])")
bind(raw_input[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["stdin_port"])")
start_heartbeat("$(profile["transport"])://$(profile["ip"])$(sep)$(profile["hb_port"])")
bind(heartbeat[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["hb_port"])")

# associate a lock with each socket so that multi-part messages
# on a given socket don't get inter-mingled between tasks.
for s in (publish[], raw_input[], requests[], control[])
socket_locks[s] = ReentrantLock()
end

start_heartbeat(heartbeat[])
if capture_stdout
read_stdout[], = redirect_stdout()
redirect_stdout(IJuliaStdio(stdout,"stdout"))
Expand Down

0 comments on commit 3285719

Please sign in to comment.