Skip to content

Commit

Permalink
socket_manager: add feature to take over another server
Browse files Browse the repository at this point in the history
Another process can take over UDP/TCP sockets without downtime.

    server = ServerEngine::SocketManager::Server.take_over_another_server(path)

This starts a new server that has all UDP/TCP sockets of the
existing server.
It receives the sockets from the existing server and stops it
after starts a new server.

This may not be the primary use case assumed by ServerEngine, but
we need this feature to replace both the server and the workers
with a new process without downtime.
Currently, ServerEngine does not provide this feature for
network servers.

At the moment, I assume that the application side uses this
feature ad hoc, but, in the future, this could be used to support
live reload for entire network servers.

ref: fluent/fluentd#4622

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom authored and ashie committed Sep 3, 2024
1 parent 5e9d11e commit 4a5b1a4
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 65 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,14 @@ se = ServerEngine.create(MyServer, MyWorker, {
se.run
```

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).
Other features:

- `socket_manager_server = SocketManager::Server.take_over_another_server(path)`
- It starts a new manager server that has all UDP/TCP sockets of the existing manager.
- It receives the sockets and stops the existing manager after starts a new manager.
- It means that another process can take over UDP/TCP sockets without downtime.

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).

## Module API

Expand Down
15 changes: 11 additions & 4 deletions lib/serverengine/socket_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,18 @@ def self.open(path = nil)
end
end

def initialize(path)
def self.take_over_another_server(path)
raise NotImplementedError, "Not supported on Windows." if ServerEngine.windows?
server = new(path, start: false)
server.take_over_another_server
server
end

def initialize(path, start: true)
@tcp_sockets = {}
@udp_sockets = {}
@mutex = Mutex.new
@path = start_server(path)
@path = start ? start_server(path) : path
end

attr_reader :path
Expand Down Expand Up @@ -159,9 +166,9 @@ def process_peer(peer)
res = SocketManager.recv_peer(peer)
return if res.nil?

pid, method, bind, port = *res
pid, method, *opts = res
begin
send_socket(peer, pid, method, bind, port)
send_socket(peer, pid, method, *opts)
rescue => e
SocketManager.send_peer(peer, e)
end
Expand Down
126 changes: 86 additions & 40 deletions lib/serverengine/socket_manager_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,63 @@ def recv_udp(family, peer, sent)
end

module ServerModule
def start_server(path)
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)

begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
end

@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return path
end

def take_over_another_server
another_server = UNIXSocket.new(@path)
begin
idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :get_listening_tcp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@tcp_sockets[key] = another_server.recv_io TCPServer
idx += 1
end

idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :get_listening_udp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@udp_sockets[key] = another_server.recv_io UDPSocket
idx += 1
end

FileUtils.rm_f(@path)
start_server(@path)

SocketManager.send_peer(another_server, [Process.pid, :stop_with_socket_alive])
ensure
another_server.close
end
end

private

def listen_tcp_new(bind_ip, port)
Expand Down Expand Up @@ -76,33 +133,6 @@ def listen_udp_new(bind_ip, port)
UDPSocket.for_fd(usock.fileno)
end

def start_server(path)
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)

begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
end

@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return path
end

def stop_server
@tcp_sockets.reject! {|key,lsock| lsock.close; true }
@udp_sockets.reject! {|key,usock| usock.close; true }
Expand All @@ -111,19 +141,35 @@ def stop_server
@thread.join if RUBY_VERSION >= "2.2"
end

def send_socket(peer, pid, method, bind, port)
sock = case method
when :listen_tcp
listen_tcp(bind, port)
when :listen_udp
listen_udp(bind, port)
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end

SocketManager.send_peer(peer, nil)

peer.send_io sock
def send_socket(peer, pid, method, *opts)
case method
when :listen_tcp
bind, port = opts
sock = listen_tcp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :listen_udp
bind, port = opts
sock = listen_udp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :get_listening_tcp
idx, = opts
key = @tcp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@tcp_sockets.values[idx]) if key
when :get_listening_udp
idx, = opts
key = @udp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@udp_sockets.values[idx]) if key
when :stop_with_socket_alive
@tcp_sockets.clear
@udp_sockets.clear
stop_server
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end
end
end

Expand Down
40 changes: 20 additions & 20 deletions lib/serverengine/socket_manager_win.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ def recv_udp(family, peer, sent)
end

module ServerModule
def start_server(addr)
# We need to take care about selecting an available port.
# By passing `nil` or `0` as `addr`, an available port is automatically selected.
# However, we should consider using NamedPipe instead of TCPServer.
@server = TCPServer.new("127.0.0.1", addr)
@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return @server.addr[1]
end

private

TCP_OPTIONS = [Socket::SOCK_STREAM, Socket::IPPROTO_TCP, TCPServer, true]
Expand Down Expand Up @@ -107,26 +127,6 @@ def htons(h)
[h].pack("S").unpack("n")[0]
end

def start_server(addr)
# We need to take care about selecting an available port.
# By passing `nil` or `0` as `addr`, an available port is automatically selected.
# However, we should consider using NamedPipe instead of TCPServer.
@server = TCPServer.new("127.0.0.1", addr)
@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return @server.addr[1]
end

def stop_server
@tcp_sockets.reject! {|key,lsock| lsock.close; true }
@udp_sockets.reject! {|key,usock| usock.close; true }
Expand Down

0 comments on commit 4a5b1a4

Please sign in to comment.