Skip to content

Commit

Permalink
Add support for asynchronous socket:accept/2 API
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Guyot <[email protected]>
  • Loading branch information
pguyot committed Jan 12, 2025
1 parent 469b424 commit 07a0595
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 18 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `atomvm:subprocess/4` to perform pipe/fork/execve on POSIX platforms
- Added `externalterm_to_term_with_roots` to efficiently preserve roots when allocating memory for external terms.
- Added `erl_epmd` client implementation to epmd using `socket` module
- Added support for socket asynchronous API for `recv` and `recvfrom`.
- Added support for socket asynchronous API for `recv`, `recvfrom` and `accept`.

### Changed

Expand Down
53 changes: 42 additions & 11 deletions libs/estdlib/src/socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -229,46 +229,77 @@ accept(Socket) ->
%% be set to listen for connections.
%%
%% Note that this function will block until a connection is made
%% from a client. Typically, users will spawn a call to `accept'
%% in a separate process.
%% from a client, unless `nowait' or a reference is passed as `Timeout'.
%% Typically, users will spawn a call to `accept' in a separate process.
%%
%% Example:
%%
%% `{ok, ConnectedSocket} = socket:accept(ListeningSocket)'
%% @end
%%-----------------------------------------------------------------------------
-spec accept(Socket :: socket(), Timeout :: timeout()) ->
{ok, Connection :: socket()} | {error, Reason :: term()}.
-spec accept(Socket :: socket(), Timeout :: timeout() | nowait | reference()) ->
{ok, Connection :: socket()}
| {select, {select_info, accept, reference()}}
| {error, Reason :: term()}.
accept(Socket, 0) ->
accept0_noselect(Socket);
accept(Socket, nowait) ->
accept0_nowait(Socket, erlang:make_ref());
accept(Socket, Ref) when is_reference(Ref) ->
accept0_nowait(Socket, Ref);
accept(Socket, Timeout) ->
accept0(Socket, Timeout).

accept0_noselect(Socket) ->
case ?MODULE:nif_accept(Socket) of
{error, _} = E ->
E;
{ok, _Socket} = Reply ->
Reply
end.

accept0(Socket, Timeout) ->
Ref = erlang:make_ref(),
?TRACE("select read for accept. self=~p ref=~p~n", [self(), Ref]),
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
receive
{'$socket', Socket, select, Ref} ->
case ?MODULE:nif_accept(Socket) of
{error, closed} = E ->
{error, _} = E ->
?MODULE:nif_select_stop(Socket),
E;
R ->
R
{ok, _Socket} = Reply ->
Reply
end;
{'$socket', Socket, abort, {Ref, closed}} ->
% socket was closed by another process
% TODO: we need to handle:
% (a) SELECT_STOP being scheduled
% (b) flush of messages as we can have both in the
% queue
{error, closed};
Other ->
{error, {accept, unexpected, Other, {'$socket', Socket, select, Ref}}}
{error, closed}
after Timeout ->
{error, timeout}
end;
{error, _Reason} = Error ->
Error
end.

accept0_nowait(Socket, Ref) ->
case ?MODULE:nif_accept(Socket) of
{error, eagain} ->
case ?MODULE:nif_select_read(Socket, Ref) of
ok ->
{select, {select_info, accept, Ref}};
{error, _} = SelectError ->
SelectError
end;
{error, _} = RecvError ->
RecvError;
{ok, _Socket} = Reply ->
Reply
end.

%%-----------------------------------------------------------------------------
%% @equiv socket:recv(Socket, 0)
%% @end
Expand Down
4 changes: 3 additions & 1 deletion src/libAtomVM/otp_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -1743,8 +1743,10 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[])
int fd = accept(rsrc_obj->fd, (struct sockaddr *) &clientaddr, &clientlen);
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
if (UNLIKELY(fd == -1 || fd == CLOSED_FD)) {
AVM_LOGE(TAG, "Unable to accept on socket %i.", rsrc_obj->fd);
int err = errno;
if (err != EAGAIN) {
AVM_LOGI(TAG, "Unable to accept on socket %i. errno=%i", rsrc_obj->fd, (int) err);
}
term reason = (err == ECONNABORTED) ? CLOSED_ATOM : posix_errno_to_term(err, global);
return make_error_tuple(reason, ctx);
} else {
Expand Down
72 changes: 67 additions & 5 deletions tests/libs/estdlib/test_tcp_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ test() ->
ok = test_close_by_another_process(),
ok = test_buf_size(),
ok = test_timeout(),
ok = test_nowait(),
ok = test_recv_nowait(),
ok = test_accept_nowait(),
ok = test_setopt_getopt(),
case get_otp_version() of
atomvm ->
Expand Down Expand Up @@ -430,12 +431,12 @@ test_timeout() ->
ok = close_client_socket(Socket),
ok = close_listen_socket(ListenSocket).

test_nowait() ->
ok = test_nowait(fun receive_loop_nowait/2),
ok = test_nowait(fun receive_loop_nowait_ref/2),
test_recv_nowait() ->
ok = test_recv_nowait(fun receive_loop_nowait/2),
ok = test_recv_nowait(fun receive_loop_nowait_ref/2),
ok.

test_nowait(ReceiveFun) ->
test_recv_nowait(ReceiveFun) ->
etest:flush_msg_queue(),

Port = 44404,
Expand All @@ -460,6 +461,67 @@ test_nowait(ReceiveFun) ->

ok = close_listen_socket(ListenSocket).

test_accept_nowait() ->
OTPVersion = get_otp_version(),
ok = test_accept_nowait(nowait, OTPVersion),
ok = test_accept_nowait(make_ref(), OTPVersion),
ok.

% actually since 22.1, but let's simplify here.
test_accept_nowait(_NoWaitRef, Version) when Version =/= atomvm andalso Version < 23 -> ok;
test_accept_nowait(Ref, Version) when
is_reference(Ref) andalso Version =/= atomvm andalso Version < 24
->
ok;
test_accept_nowait(NoWaitRef, _Version) ->
etest:flush_msg_queue(),

Port = 44404,
{ok, Socket} = socket:open(inet, stream, tcp),
ok = socket:setopt(Socket, {socket, reuseaddr}, true),
ok = socket:setopt(Socket, {socket, linger}, #{onoff => true, linger => 0}),

ok = socket:bind(Socket, #{
family => inet, addr => loopback, port => Port
}),

ok = socket:listen(Socket),

Parent = self(),
{Child, MonitorRef} = spawn_opt(
fun() ->
{select, {select_info, accept, Ref}} = socket:accept(Socket, NoWaitRef),
Parent ! {self(), got_nowait},
receive
{'$socket', Socket, select, Ref} ->
{ok, ConnSocket} = socket:accept(Socket, 0),
socket:send(ConnSocket, <<"hello">>),
socket:close(ConnSocket)
after 5000 ->
exit(timeout)
end
end,
[link, monitor]
),
ok =
receive
{Child, got_nowait} -> ok
after 5000 -> timeout
end,
{ok, ClientSocket} = socket:open(inet, stream, tcp),
ok = socket:connect(ClientSocket, #{family => inet, addr => loopback, port => Port}),
{ok, <<"hello">>} = socket:recv(ClientSocket, 5),

socket:close(ClientSocket),
ok =
receive
{'DOWN', MonitorRef, process, Child, normal} -> ok
after 5000 ->
timeout
end,
socket:close(Socket),
ok.

test_setopt_getopt() ->
{ok, Socket} = socket:open(inet, stream, tcp),
{ok, stream} = socket:getopt(Socket, {socket, type}),
Expand Down

0 comments on commit 07a0595

Please sign in to comment.