Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

socket_manager: add feature to share sockets with another server #150

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,15 @@ se = ServerEngine.create(MyServer, MyWorker, {
se.run
```

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).
Other features:

- `socket_manager_server = SocketManager::Server.share_sockets_with_another_server(path)`
- It starts a new manager server that shares all UDP/TCP sockets with the existing manager.
- We can use this for live restart for network servers.
- The old process should stop without removing the file for the socket after the new process starts.
- Limitation: This feature would not work well if the process opens new TCP ports frequently.

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).

## Module API

Expand Down
21 changes: 17 additions & 4 deletions lib/serverengine/socket_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,32 @@ def self.open(path = nil)
end
end

def initialize(path)
def self.share_sockets_with_another_server(path)
raise NotImplementedError, "Not supported on Windows." if ServerEngine.windows?
server = new(path, start: false)
server.share_sockets_with_another_server
server
end

def initialize(path, start: true)
@tcp_sockets = {}
@udp_sockets = {}
@mutex = Mutex.new
@path = start_server(path)
@path = start ? start_server(path) : path
end

attr_reader :path
attr_reader :tcp_sockets, :udp_sockets # for tests

def new_client
Client.new(@path)
end

def start
start_server(path)
nil
end

def close
stop_server
nil
Expand Down Expand Up @@ -159,9 +172,9 @@ def process_peer(peer)
res = SocketManager.recv_peer(peer)
return if res.nil?

pid, method, bind, port = *res
pid, method, *opts = res
begin
send_socket(peer, pid, method, bind, port)
send_socket(peer, pid, method, *opts)
rescue => e
SocketManager.send_peer(peer, e)
end
Expand Down
91 changes: 70 additions & 21 deletions lib/serverengine/socket_manager_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,38 @@ def recv_udp(family, peer, sent)
end

module ServerModule
def share_sockets_with_another_server
another_server = UNIXSocket.new(@path)
begin
idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :share_udp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@udp_sockets[key] = another_server.recv_io UDPSocket
idx += 1
end

idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :share_tcp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@tcp_sockets[key] = another_server.recv_io TCPServer
idx += 1
end

SocketManager.send_peer(another_server, [Process.pid, :share_unix])
res = SocketManager.recv_peer(another_server)
raise res if res.is_a?(Exception)
@server = another_server.recv_io UNIXServer

start_server(@path)
ensure
another_server.close
end
end

private

def listen_tcp_new(bind_ip, port)
Expand Down Expand Up @@ -77,15 +109,17 @@ def listen_udp_new(bind_ip, port)
end

def start_server(path)
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)
unless @server
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)

begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
end
end

@thread = Thread.new do
Expand All @@ -111,19 +145,34 @@ def stop_server
@thread.join if RUBY_VERSION >= "2.2"
end

def send_socket(peer, pid, method, bind, port)
sock = case method
when :listen_tcp
listen_tcp(bind, port)
when :listen_udp
listen_udp(bind, port)
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end

SocketManager.send_peer(peer, nil)

peer.send_io sock
def send_socket(peer, pid, method, *opts)
case method
when :listen_tcp
bind, port = opts
sock = listen_tcp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :listen_udp
bind, port = opts
sock = listen_udp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :share_tcp
idx, = opts
key = @tcp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@tcp_sockets.values[idx]) if key
when :share_udp
idx, = opts
key = @udp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@udp_sockets.values[idx]) if key
when :share_unix
SocketManager.send_peer(peer, nil)
peer.send_io @server
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end
end
end

Expand Down
168 changes: 168 additions & 0 deletions spec/socket_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@
expect(server.path).to be_between(49152, 65535)
end
end

context 'Server.share_sockets_with_another_server' do
it 'not supported' do
server = SocketManager::Server.open(server_path)
expect { SocketManager::Server.share_sockets_with_another_server(server_path) }.to raise_error(NotImplementedError)
ensure
server.close
end
end
else
context 'Server.generate_path' do
it 'returns socket path under /tmp' do
Expand All @@ -76,6 +85,165 @@
expect(server.path).to include('/tmp/SERVERENGINE_SOCKETMANAGER_')
end
end

context 'Server.share_sockets_with_another_server' do
it 'shares listen sockets to another server' do
server = SocketManager::Server.open(server_path)

client = SocketManager::Client.new(server_path)
tcp1 = client.listen_tcp('127.0.0.1', 55551)
udp1 = client.listen_udp('127.0.0.1', 55561)
udp2 = client.listen_udp('127.0.0.1', 55562)

another_server = SocketManager::Server.share_sockets_with_another_server(server_path)

expect([
another_server.tcp_sockets.keys,
another_server.tcp_sockets.values.map(&:addr),
another_server.udp_sockets.keys,
another_server.udp_sockets.values.map(&:addr),
]).to eq([
server.tcp_sockets.keys,
server.tcp_sockets.values.map(&:addr),
server.udp_sockets.keys,
server.udp_sockets.values.map(&:addr),
])
ensure
tcp1&.close
udp1&.close
udp2&.close
server&.close
another_server&.close
end

it 'takes over TCP sockets without downtime' do
manager_server = SocketManager::Server.open(server_path)
manager_client = SocketManager::Client.new(server_path)

has_server_started = false
# The old server starts listening
thread_server = Thread.new do
server = manager_client.listen_tcp('127.0.0.1', test_port)
has_server_started = true
while socket = server.accept
incr_test_state(:count)
socket.close
end
ensure
server&.close
end

sleep 0.1 until has_server_started

# The client starts sending data
thread_client = Thread.new do
100.times do |i|
socket = TCPSocket.new('127.0.0.1', test_port)
begin
socket.write("Hello #{i}\n")
ensure
socket.close
end
sleep 0.01
end
end

sleep 0.5

# The new server shares the sockets and starts listening in parallel with the old one
thread_new_server = Thread.new do
new_manager_server = SocketManager::Server.share_sockets_with_another_server(server_path)
server = manager_client.listen_tcp('127.0.0.1', test_port)
while socket = server.accept
incr_test_state(:count)
socket.close
end
ensure
new_manager_server&.close
server&.close
end

# Stop the old server
sleep 0.1
thread_server.kill
thread_server.join

thread_client.join
wait_for_stop

# Confirm that server switching was completed without data loss
expect(test_state(:count)).to eq(100)
ensure
manager_server&.close
thread_server&.kill
thread_new_server&.kill
thread_server&.join
thread_new_server&.join
end

it 'takes over UDP sockets without downtime' do
manager_server = SocketManager::Server.open(server_path)
manager_client = SocketManager::Client.new(server_path)

has_server_started = false
# The old server starts listening
thread_server = Thread.new do
server = manager_client.listen_udp('127.0.0.1', test_port)
has_server_started = true
while server.recv(10)
incr_test_state(:count)
end
ensure
server&.close
end

sleep 0.1 until has_server_started

# The client starts sending data
thread_client = Thread.new do
100.times do |i|
socket = UDPSocket.new
begin
socket.send("Hello #{i}\n", 0, "127.0.0.1", test_port)
ensure
socket.close
end
sleep 0.01
end
end

sleep 0.5

# The new server shares the sockets and starts listening in parallel with the old one
thread_new_server = Thread.new do
new_manager_server = SocketManager::Server.share_sockets_with_another_server(server_path)
server = manager_client.listen_udp('127.0.0.1', test_port)
while server.recv(10)
incr_test_state(:count)
end
ensure
new_manager_server&.close
server&.close
end

# Stop the old server
sleep 0.1
thread_server.kill
thread_server.join

thread_client.join
wait_for_stop

# Confirm that server switching was completed without data loss
expect(test_state(:count)).to eq(100)
ensure
manager_server&.close
thread_server&.kill
thread_new_server&.kill
thread_server&.join
thread_new_server&.join
end
end
end

context 'with thread' do
Expand Down
Loading