Skip to content

Commit

Permalink
Distribution: add support for rpc from other nodes
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Guyot <[email protected]>
  • Loading branch information
pguyot committed Jan 12, 2025
1 parent eb06dc0 commit 115800c
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 30 deletions.
1 change: 1 addition & 0 deletions libs/estdlib/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ set(ERLANG_MODULES
crypto
dist_util
erl_epmd
erpc
erts_debug
ets
gen_event
Expand Down
59 changes: 59 additions & 0 deletions libs/estdlib/src/erpc.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
%
% This file is part of AtomVM.
%
% Copyright 2025 Paul Guyot <[email protected]>
%
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
% You may obtain a copy of the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS,
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
% See the License for the specific language governing permissions and
% limitations under the License.
%
% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
%

%%-----------------------------------------------------------------------------
%% @doc An implementation of the Erlang/OTP erpc interface.
%%
%% This module implements a strict subset of the Erlang/OTP erpc
%% interface.
%% @end
%%-----------------------------------------------------------------------------
-module(erpc).

% api
-export([
execute_call/4
]).

%%-----------------------------------------------------------------------------
%% @param Reference reference of the request, passed in exit tuple
%% @param Module module to call
%% @param Func function to call
%% @param Args argument of the call
%% @doc Execute a call locally, exiting with the result.
%% This function is called from rpc on other nodes using spawn_request BIF.
%% @end
%%-----------------------------------------------------------------------------
-spec execute_call(Reference :: reference(), Module :: module(), Func :: atom(), Args :: [any()]) ->
no_return().
execute_call(Reference, Module, Func, Args) ->
Reply =
try
Result = apply(Module, Func, Args),
{Reference, return, Result}
catch
throw:Reason ->
{Reference, throw, Reason};
exit:Reason ->
{Reference, exit, Reason};
error:Reason:Stack ->
{Reference, error, Reason, Stack}
end,
exit(Reply).
1 change: 1 addition & 0 deletions src/libAtomVM/defaultatoms.def
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,4 @@ X(INET_ATOM, "\x4", "inet")
X(TIMEOUT_ATOM, "\x7", "timeout")

X(DIST_DATA_ATOM, "\x9", "dist_data")
X(REQUEST_ATOM, "\x7", "request")
128 changes: 102 additions & 26 deletions src/libAtomVM/dist_nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ enum
OPERATION_ALIAS_SEND_TT = 34,
};

enum
{
SPAWN_REPLY_FLAGS_LINK_CREATED = 1,
SPAWN_REPLY_FLAGS_MONITOR_CREATED = 2,
};

struct DistributionPacket
{
struct ListHead head;
Expand Down Expand Up @@ -339,6 +345,37 @@ static term nif_erlang_dist_ctrl_get_data(Context *ctx, int argc, term argv[])
return result;
}

term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx)
{
int target_process_id = 0;
if (term_is_local_pid(target_proc)) {
target_process_id = term_to_local_process_id(target_proc);
} else if (term_is_atom(target_proc)) {
target_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
} else {
RAISE_ERROR(BADARG_ATOM);
}
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
monitor->target_proc = target_proc;
monitor->pid_number = term_get_external_pid_process_id(from_pid);
monitor->pid_serial = term_get_external_pid_serial(from_pid);
monitor->ref_len = term_get_external_reference_len(monitor_ref);
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
if (target_process_id) {
synclist_append(&conn_obj->remote_monitors, &monitor->head);
ErlNifPid target_process_pid = target_process_id;
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
free(monitor);
}
} else {
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
free(monitor);
}
return OK_ATOM;
}

static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
{
UNUSED(argc);
Expand Down Expand Up @@ -390,32 +427,7 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
term from_pid = term_get_tuple_element(control, 1);
term target_proc = term_get_tuple_element(control, 2);
term monitor_ref = term_get_tuple_element(control, 3);
int target_process_id = 0;
if (term_is_local_pid(target_proc)) {
target_process_id = term_to_local_process_id(target_proc);
} else if (term_is_atom(target_proc)) {
target_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(target_proc));
} else {
RAISE_ERROR(BADARG_ATOM);
}
struct RemoteMonitor *monitor = malloc(sizeof(struct RemoteMonitor));
monitor->target_proc = target_proc;
monitor->pid_number = term_get_external_pid_process_id(from_pid);
monitor->pid_serial = term_get_external_pid_serial(from_pid);
monitor->ref_len = term_get_external_reference_len(monitor_ref);
memcpy(monitor->ref_words, term_get_external_reference_words(monitor_ref), sizeof(uint32_t) * monitor->ref_len);
if (target_process_id) {
synclist_append(&conn_obj->remote_monitors, &monitor->head);
ErlNifPid target_process_pid = target_process_id;
if (UNLIKELY(enif_monitor_process(erl_nif_env_from_context(ctx), conn_obj, &target_process_pid, &monitor->process_monitor) != 0)) {
synclist_remove(&conn_obj->remote_monitors, &monitor->head);
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
free(monitor);
}
} else {
dist_enqueue_monitor_exit_message(monitor, NOPROC_ATOM, conn_obj, ctx->global);
free(monitor);
}
dist_monitor(conn_obj, from_pid, target_proc, monitor_ref, ctx);

break;
}
Expand All @@ -442,6 +454,53 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
synclist_unlock(&conn_obj->remote_monitors);
break;
}
case OPERATION_SPAWN_REQUEST: {
if (UNLIKELY(arity != 6)) {
RAISE_ERROR(BADARG_ATOM);
}
term roots[4];
roots[0] = argv[0];
roots[1] = argv[1];
roots[2] = control;
roots[3] = externalterm_to_term_with_roots(data + 1 + bytes_read, binary_len - 1 - bytes_read, ctx, ExternalTermCopy, &bytes_read, 3, roots);
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(4)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
control = roots[2];
term arglist = roots[3];
term mfa = term_get_tuple_element(control, 4);
if (UNLIKELY(!term_is_tuple(mfa) || term_get_tuple_arity(mfa) != 3)) {
RAISE_ERROR(BADARG_ATOM);
}
if (UNLIKELY(!term_is_list(arglist))) {
RAISE_ERROR(BADARG_ATOM);
}
term reqid = term_get_tuple_element(control, 1);
term from = term_get_tuple_element(control, 2);
if (UNLIKELY(!term_is_pid(from))) {
RAISE_ERROR(BADARG_ATOM);
}
// term groupleader = term_get_tuple_element(control, 3);
term options = term_get_tuple_element(control, 5);

term request_tuple = term_alloc_tuple(4, &ctx->heap);
term_put_tuple_element(request_tuple, 0, roots[0]);
term_put_tuple_element(request_tuple, 1, reqid);
term_put_tuple_element(request_tuple, 2, from);
term_put_tuple_element(request_tuple, 3, options);
term request_opt = term_alloc_tuple(2, &ctx->heap);
term_put_tuple_element(request_opt, 0, REQUEST_ATOM);
term_put_tuple_element(request_opt, 1, request_tuple);
term spawn_opts = term_list_prepend(request_opt, term_nil(), &ctx->heap);

// reuse roots for args
roots[0] = term_get_tuple_element(mfa, 0);
roots[1] = term_get_tuple_element(mfa, 1);
roots[2] = arglist;
roots[3] = spawn_opts;
nif_erlang_spawn_opt(ctx, 4, roots);
break;
}
default:
printf("Unknown distribution protocol operation id %d\n", (int) term_to_int(operation));
RAISE_ERROR(BADARG_ATOM);
Expand All @@ -467,6 +526,23 @@ void dist_send_message(term external_pid, term payload, Context *ctx)
synclist_unlock(&ctx->global->dist_connections);
}

void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global)
{
int flags = (link ? SPAWN_REPLY_FLAGS_LINK_CREATED : 0)
| (monitor ? SPAWN_REPLY_FLAGS_MONITOR_CREATED : 0);
// allocate tuple
BEGIN_WITH_STACK_HEAP(TUPLE_SIZE(5), heap)
term control_message = term_alloc_tuple(5, &heap);
term_put_tuple_element(control_message, 0, term_from_int(OPERATION_SPAWN_REPLY));
term_put_tuple_element(control_message, 1, req_id);
term_put_tuple_element(control_message, 2, to_pid);
term_put_tuple_element(control_message, 3, term_from_int(flags));
term_put_tuple_element(control_message, 4, result);

dist_enqueue_message(control_message, term_invalid_term(), connection, global);
END_WITH_STACK_HEAP(heap, global)
}

const struct Nif setnode_3_nif = {
.base.type = NIFFunctionType,
.nif_ptr = nif_erlang_setnode_3
Expand Down
27 changes: 27 additions & 0 deletions src/libAtomVM/dist_nifs.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,35 @@ extern const struct Nif dist_ctrl_get_data_notification_nif;
extern const struct Nif dist_ctrl_get_data_nif;
extern const struct Nif dist_ctrl_put_data_nif;

struct DistConnection;

void dist_send_message(term external_pid, term payload, Context *ctx);

/**
* @doc Setup a monitor on a local process for a distributed process.
* @end
* @param conn_obj object of the connection
* @param from_pid remote pid setting up the monitor
* @param target_proc atom (for registered process) or pid of the local
* process to monitor
* @param monitor_ref reference used for monitor
* @param ctx context for memory allocation
*/
term dist_monitor(struct DistConnection *conn_obj, term from_pid, term target_proc, term monitor_ref, Context *ctx);

/**
* @doc Send a spawn reply signal to a node
* @end
* @param conn_obj object of the connection
* @param req_id reference identifying the request
* @param to_pid (remote) process id identifying the caller
* @param link if a link was created
* @param monitor if a monitor was created
* @param result pid of the spawned process or atom for an error
* @param ctx context for memory allocation
*/
void dist_spawn_reply(term req_id, term to_pid, bool link, bool monitor, term result, struct DistConnection *connection, GlobalContext *global);

#ifdef __cplusplus
}
#endif
Expand Down
33 changes: 31 additions & 2 deletions src/libAtomVM/nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "defaultatoms.h"
#include "dictionary.h"
#include "dist_nifs.h"
#include "erl_nif_priv.h"
#include "ets.h"
#include "externalterm.h"
#include "globalcontext.h"
Expand Down Expand Up @@ -134,7 +135,7 @@ static term nif_erlang_register_2(Context *ctx, int argc, term argv[]);
static term nif_erlang_unregister_1(Context *ctx, int argc, term argv[]);
static term nif_erlang_send_2(Context *ctx, int argc, term argv[]);
static term nif_erlang_setelement_3(Context *ctx, int argc, term argv[]);
static term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]);
// static term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]);
static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[]);
static term nif_erlang_whereis_1(Context *ctx, int argc, term argv[]);
static term nif_erlang_system_time_1(Context *ctx, int argc, term argv[]);
Expand Down Expand Up @@ -1222,6 +1223,7 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
term link_term = interop_proplist_get_value(opts_term, LINK_ATOM);
term monitor_term = interop_proplist_get_value(opts_term, MONITOR_ATOM);
term heap_growth_strategy = interop_proplist_get_value_default(opts_term, ATOMVM_HEAP_GROWTH_ATOM, BOUNDED_FREE_ATOM);
term request_term = interop_proplist_get_value(opts_term, REQUEST_ATOM);

if (min_heap_size_term != term_nil()) {
if (UNLIKELY(!term_is_integer(min_heap_size_term))) {
Expand Down Expand Up @@ -1303,6 +1305,33 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
term_put_tuple_element(pid_ref_tuple, 1, ref);

return pid_ref_tuple;
} else if (UNLIKELY(request_term != term_nil())) {
// Handling of spawn_request
// spawn_request requires that the reply is enqueued before
// any message from the spawned process

term dhandle = term_get_tuple_element(request_term, 0);
term request_ref = term_get_tuple_element(request_term, 1);
term request_from = term_get_tuple_element(request_term, 2);
term request_opts = term_get_tuple_element(request_term, 3);
monitor_term = interop_proplist_get_value(request_opts, MONITOR_ATOM);
// link_term = interop_proplist_get_value(request_opts, LINK_ATOM);

void *rsrc_obj_ptr;
if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), dhandle, ctx->global->dist_connection_resource_type, &rsrc_obj_ptr))) {
RAISE_ERROR(BADARG_ATOM);
}
struct DistConnection *conn_obj = (struct DistConnection *) rsrc_obj_ptr;

dist_spawn_reply(request_ref, request_from, false, monitor_term != term_nil(), new_pid, conn_obj, ctx->global);

// Also setup monitor, if any.
if (monitor_term != term_nil()) {
dist_monitor(conn_obj, request_from, new_pid, request_ref, ctx);
}

scheduler_init_ready(new_ctx);
return new_pid;
} else {
scheduler_init_ready(new_ctx);
return new_pid;
Expand Down Expand Up @@ -1360,7 +1389,7 @@ static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[])
return do_spawn(ctx, new_ctx, opts_term);
}

static term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
{
UNUSED(argc);

Expand Down
2 changes: 2 additions & 0 deletions src/libAtomVM/nifs.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ extern "C" {

const struct Nif *nifs_get(AtomString module, AtomString function, int arity);

// spawn opt is used by distribution nifs
term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[]);
#ifdef __cplusplus
}
#endif
Expand Down
14 changes: 12 additions & 2 deletions src/libAtomVM/otp_net.c
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,18 @@ static term nif_net_gethostname(Context *ctx, int argc, term argv[])
}
return make_error_tuple(posix_errno_to_term(errno, ctx->global), ctx);
}

size_t len = strlen(buf);
// Truncate name to first dot
char *end_str = buf;
while (1) {
char c = *end_str++;
if (c == 0) {
break;
}
if (c == '.') {
break;
}
}
size_t len = end_str - buf;
if (UNLIKELY(memory_ensure_free_opt(ctx, TUPLE_SIZE(2) + LIST_SIZE(len, 1), MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
Expand Down
Loading

0 comments on commit 115800c

Please sign in to comment.