diff --git a/README.md b/README.md index 3edf48b..6031c42 100644 --- a/README.md +++ b/README.md @@ -413,8 +413,14 @@ se = ServerEngine.create(MyServer, MyWorker, { se.run ``` -See also [examples](https://github.com/fluent/serverengine/tree/master/examples). +Other features: + +- `socket_manager_server = SocketManager::Server.take_over_another_server(path)` + - It starts a new manager server that has all UDP/TCP sockets of the existing manager. + - It means that another process can take over UDP/TCP sockets without downtime. + - The old process should stop without removing the file for the socket after the new process starts. +See also [examples](https://github.com/fluent/serverengine/tree/master/examples). ## Module API diff --git a/lib/serverengine/socket_manager.rb b/lib/serverengine/socket_manager.rb index fe0e484..83d5f42 100644 --- a/lib/serverengine/socket_manager.rb +++ b/lib/serverengine/socket_manager.rb @@ -96,14 +96,22 @@ def self.open(path = nil) end end - def initialize(path) + def self.take_over_another_server(path) + raise NotImplementedError, "Not supported on Windows." if ServerEngine.windows? + server = new(path, start: false) + server.take_over_another_server + server + end + + def initialize(path, start: true) @tcp_sockets = {} @udp_sockets = {} @mutex = Mutex.new - @path = start_server(path) + @path = start ? start_server(path) : path end attr_reader :path + attr_reader :tcp_sockets, :udp_sockets # for tests def new_client Client.new(@path) @@ -159,9 +167,9 @@ def process_peer(peer) res = SocketManager.recv_peer(peer) return if res.nil? - pid, method, bind, port = *res + pid, method, *opts = res begin - send_socket(peer, pid, method, bind, port) + send_socket(peer, pid, method, *opts) rescue => e SocketManager.send_peer(peer, e) end diff --git a/lib/serverengine/socket_manager_unix.rb b/lib/serverengine/socket_manager_unix.rb index 625a831..addfb70 100644 --- a/lib/serverengine/socket_manager_unix.rb +++ b/lib/serverengine/socket_manager_unix.rb @@ -47,6 +47,67 @@ def recv_udp(family, peer, sent) end module ServerModule + def start_server(path) + unless @server + # return absolute path so that client can connect to this path + # when client changed working directory + path = File.expand_path(path) + + begin + old_umask = File.umask(0077) # Protect unix socket from other users + @server = UNIXServer.new(path) + ensure + File.umask(old_umask) + end + end + + @thread = Thread.new do + begin + while peer = @server.accept + Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket + end + rescue => e + unless @server.closed? + ServerEngine.dump_uncaught_error(e) + end + end + end + + return path + end + + def take_over_another_server + another_server = UNIXSocket.new(@path) + begin + idx = 0 + while true + SocketManager.send_peer(another_server, [Process.pid, :get_listening_tcp, idx]) + key = SocketManager.recv_peer(another_server) + break if key.nil? + @tcp_sockets[key] = another_server.recv_io TCPServer + idx += 1 + end + + idx = 0 + while true + SocketManager.send_peer(another_server, [Process.pid, :get_listening_udp, idx]) + key = SocketManager.recv_peer(another_server) + break if key.nil? + @udp_sockets[key] = another_server.recv_io UDPSocket + idx += 1 + end + + SocketManager.send_peer(another_server, [Process.pid, :get_unix]) + res = SocketManager.recv_peer(another_server) + raise res if res.is_a?(Exception) + @server = another_server.recv_io UNIXServer + + start_server(@path) + ensure + another_server.close + end + end + private def listen_tcp_new(bind_ip, port) @@ -76,33 +137,6 @@ def listen_udp_new(bind_ip, port) UDPSocket.for_fd(usock.fileno) end - def start_server(path) - # return absolute path so that client can connect to this path - # when client changed working directory - path = File.expand_path(path) - - begin - old_umask = File.umask(0077) # Protect unix socket from other users - @server = UNIXServer.new(path) - ensure - File.umask(old_umask) - end - - @thread = Thread.new do - begin - while peer = @server.accept - Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket - end - rescue => e - unless @server.closed? - ServerEngine.dump_uncaught_error(e) - end - end - end - - return path - end - def stop_server @tcp_sockets.reject! {|key,lsock| lsock.close; true } @udp_sockets.reject! {|key,usock| usock.close; true } @@ -111,19 +145,34 @@ def stop_server @thread.join if RUBY_VERSION >= "2.2" end - def send_socket(peer, pid, method, bind, port) - sock = case method - when :listen_tcp - listen_tcp(bind, port) - when :listen_udp - listen_udp(bind, port) - else - raise ArgumentError, "Unknown method: #{method.inspect}" - end - - SocketManager.send_peer(peer, nil) - - peer.send_io sock + def send_socket(peer, pid, method, *opts) + case method + when :listen_tcp + bind, port = opts + sock = listen_tcp(bind, port) + SocketManager.send_peer(peer, nil) + peer.send_io sock + when :listen_udp + bind, port = opts + sock = listen_udp(bind, port) + SocketManager.send_peer(peer, nil) + peer.send_io sock + when :get_listening_tcp + idx, = opts + key = @tcp_sockets.keys[idx] + SocketManager.send_peer(peer, key) + peer.send_io(@tcp_sockets.values[idx]) if key + when :get_listening_udp + idx, = opts + key = @udp_sockets.keys[idx] + SocketManager.send_peer(peer, key) + peer.send_io(@udp_sockets.values[idx]) if key + when :get_unix + SocketManager.send_peer(peer, nil) + peer.send_io @server + else + raise ArgumentError, "Unknown method: #{method.inspect}" + end end end diff --git a/lib/serverengine/socket_manager_win.rb b/lib/serverengine/socket_manager_win.rb index f7a7e26..42acaa6 100644 --- a/lib/serverengine/socket_manager_win.rb +++ b/lib/serverengine/socket_manager_win.rb @@ -58,6 +58,26 @@ def recv_udp(family, peer, sent) end module ServerModule + def start_server(addr) + # We need to take care about selecting an available port. + # By passing `nil` or `0` as `addr`, an available port is automatically selected. + # However, we should consider using NamedPipe instead of TCPServer. + @server = TCPServer.new("127.0.0.1", addr) + @thread = Thread.new do + begin + while peer = @server.accept + Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket + end + rescue => e + unless @server.closed? + ServerEngine.dump_uncaught_error(e) + end + end + end + + return @server.addr[1] + end + private TCP_OPTIONS = [Socket::SOCK_STREAM, Socket::IPPROTO_TCP, TCPServer, true] @@ -107,26 +127,6 @@ def htons(h) [h].pack("S").unpack("n")[0] end - def start_server(addr) - # We need to take care about selecting an available port. - # By passing `nil` or `0` as `addr`, an available port is automatically selected. - # However, we should consider using NamedPipe instead of TCPServer. - @server = TCPServer.new("127.0.0.1", addr) - @thread = Thread.new do - begin - while peer = @server.accept - Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket - end - rescue => e - unless @server.closed? - ServerEngine.dump_uncaught_error(e) - end - end - end - - return @server.addr[1] - end - def stop_server @tcp_sockets.reject! {|key,lsock| lsock.close; true } @udp_sockets.reject! {|key,usock| usock.close; true } diff --git a/spec/socket_manager_spec.rb b/spec/socket_manager_spec.rb index c74e877..4ff554c 100644 --- a/spec/socket_manager_spec.rb +++ b/spec/socket_manager_spec.rb @@ -55,6 +55,15 @@ expect(server.path).to be_between(49152, 65535) end end + + context 'Server.take_over_another_server' do + it 'not supported' do + server = SocketManager::Server.open(server_path) + expect { SocketManager::Server.take_over_another_server(server_path) }.to raise_error(NotImplementedError) + ensure + server.close + end + end else context 'Server.generate_path' do it 'returns socket path under /tmp' do @@ -76,6 +85,146 @@ expect(server.path).to include('/tmp/SERVERENGINE_SOCKETMANAGER_') end end + + context 'Server.take_over_another_server' do + it 'takes over listen sockets to another server' do + server = SocketManager::Server.open(server_path) + + client = SocketManager::Client.new(server_path) + tcp1 = client.listen_tcp('127.0.0.1', 55551) + udp1 = client.listen_udp('127.0.0.1', 55561) + udp2 = client.listen_udp('127.0.0.1', 55562) + + another_server = SocketManager::Server.take_over_another_server(server_path) + + expect(another_server.tcp_sockets.size).to eq(1) + expect(another_server.tcp_sockets['localhost:55551'].addr).to eq(['AF_INET', 55551, '127.0.0.1', '127.0.0.1']) + + expect(another_server.udp_sockets.size).to eq(2) + expect(another_server.udp_sockets['localhost:55561'].addr).to eq(['AF_INET', 55561, '127.0.0.1', '127.0.0.1']) + expect(another_server.udp_sockets['localhost:55562'].addr).to eq(['AF_INET', 55562, '127.0.0.1', '127.0.0.1']) + ensure + tcp1&.close + udp1&.close + udp2&.close + server&.close + another_server&.close + end + + it 'takes over TCP sockets without downtime' do + manager_server = SocketManager::Server.open(server_path) + manager_client = SocketManager::Client.new(server_path) + + has_server_started = false + thread_server = Thread.new do + server = manager_client.listen_tcp('127.0.0.1', test_port) + has_server_started = true + while socket = server.accept + incr_test_state(:count) + socket.close + end + ensure + server&.close + end + + sleep 0.1 until has_server_started + + thread_client = Thread.new do + 100.times do |i| + socket = TCPSocket.new('127.0.0.1', test_port) + begin + socket.write("Hello #{i}\n") + ensure + socket.close + end + sleep 0.01 + end + end + + sleep 0.5 + + child_pid = fork do + new_manager_server = SocketManager::Server.take_over_another_server(server_path) + server = manager_client.listen_tcp('127.0.0.1', test_port) + while socket = server.accept + incr_test_state(:count) + socket.close + end + ensure + new_manager_server&.close + server&.close + end + + thread_client.join + wait_for_stop + + expect(test_state(:count)).to eq(100) + ensure + if child_pid + Process.kill :TERM, child_pid + Process.waitpid(child_pid) + end + manager_server&.close + thread_server&.kill + thread_server&.join + end + + it 'takes over UDP sockets without downtime' do + manager_server = SocketManager::Server.open(server_path) + manager_client = SocketManager::Client.new(server_path) + + has_server_started = false + thread_server = Thread.new do + server = manager_client.listen_udp('127.0.0.1', test_port) + has_server_started = true + while server.recv(10) + incr_test_state(:count) + end + ensure + server&.close + end + + sleep 0.1 until has_server_started + + thread_client = Thread.new do + 100.times do |i| + socket = UDPSocket.new + begin + socket.send("Hello #{i}\n", 0, "127.0.0.1", test_port) + ensure + socket.close + end + sleep 0.01 + end + end + + sleep 0.5 + + child_pid = fork do + new_manager_server = SocketManager::Server.take_over_another_server(server_path) + server = manager_client.listen_udp('127.0.0.1', test_port) + while server.recv(10) + incr_test_state(:count) + end + ensure + new_manager_server&.close + server&.close + end + + thread_client.join + wait_for_stop + + expect(test_state(:count)).to eq(100) + ensure + if child_pid + Process.kill :TERM, child_pid + Process.waitpid(child_pid) + end + manager_server&.close + thread_server&.kill + thread_server&.join + end + end end context 'with thread' do