diff --git a/embed-proplet/src/mqtt_client.c b/embed-proplet/src/mqtt_client.c index 8f27263..4e3cba2 100644 --- a/embed-proplet/src/mqtt_client.c +++ b/embed-proplet/src/mqtt_client.c @@ -7,7 +7,6 @@ #include #include #include -#include #include #include "wasm_handler.h" @@ -17,75 +16,98 @@ LOG_MODULE_REGISTER(mqtt_client); #define TX_BUFFER_SIZE 256 #define MQTT_BROKER_HOSTNAME "192.168.88.179" /* Replace with your broker's IP */ -#define MQTT_BROKER_PORT 1883 - -#define REGISTRY_ACK_TOPIC_TEMPLATE "channels/%s/messages/control/manager/registry" -#define ALIVE_TOPIC_TEMPLATE "channels/%s/messages/control/proplet/alive" -#define DISCOVERY_TOPIC_TEMPLATE "channels/%s/messages/control/proplet/create" -#define START_TOPIC_TEMPLATE "channels/%s/messages/control/manager/start" -#define STOP_TOPIC_TEMPLATE "channels/%s/messages/control/manager/stop" -#define REGISTRY_RESPONSE_TOPIC "channels/%s/messages/registry/server" +#define MQTT_BROKER_PORT 1883 + +#define REGISTRY_ACK_TOPIC_TEMPLATE "channels/%s/messages/control/manager/registry" +#define ALIVE_TOPIC_TEMPLATE "channels/%s/messages/control/proplet/alive" +#define DISCOVERY_TOPIC_TEMPLATE "channels/%s/messages/control/proplet/create" +#define START_TOPIC_TEMPLATE "channels/%s/messages/control/manager/start" +#define STOP_TOPIC_TEMPLATE "channels/%s/messages/control/manager/stop" +#define REGISTRY_RESPONSE_TOPIC "channels/%s/messages/registry/server" #define FETCH_REQUEST_TOPIC_TEMPLATE "channels/%s/messages/registry/proplet" -#define RESULTS_TOPIC_TEMPLATE "channels/%s/messages/control/proplet/results" +#define RESULTS_TOPIC_TEMPLATE "channels/%s/messages/control/proplet/results" #define WILL_MESSAGE_TEMPLATE "{\"status\":\"offline\",\"proplet_id\":\"%s\",\"mg_channel_id\":\"%s\"}" -#define WILL_QOS MQTT_QOS_1_AT_LEAST_ONCE -#define WILL_RETAIN 1 +#define WILL_QOS MQTT_QOS_1_AT_LEAST_ONCE +#define WILL_RETAIN 1 #define CLIENT_ID "proplet-esp32s3" -/* Buffers for MQTT client */ +#define MAX_ID_LEN 64 +#define MAX_NAME_LEN 64 +#define MAX_STATE_LEN 16 +#define MAX_URL_LEN 256 +#define MAX_TIMESTAMP_LEN 32 +#define MAX_BASE64_LEN 256 +#define MAX_INPUTS 16 +#define MAX_RESULTS 16 + +/* + * Keep the most recent "start" Task here, so if + * we fetch the WASM from the registry, we can call + * the WASM with the same inputs. + * + * If you support multiple tasks in parallel, you'll need + * a more robust approach than a single global. + */ +static struct task g_current_task; + static uint8_t rx_buffer[RX_BUFFER_SIZE]; static uint8_t tx_buffer[TX_BUFFER_SIZE]; -/* MQTT client context */ static struct mqtt_client client_ctx; static struct sockaddr_storage broker_addr; -/* Socket descriptor */ static struct zsock_pollfd fds[1]; static int nfds; bool mqtt_connected = false; -struct start_command { - char task_id[50]; - char app_name[50]; - char wasm_file[100]; -}; -struct stop_command { - char task_id[50]; +struct task { + char id[MAX_ID_LEN]; + char name[MAX_NAME_LEN]; + char state[MAX_STATE_LEN]; + char image_url[MAX_URL_LEN]; + + char file[MAX_BASE64_LEN]; + size_t file_len; + + uint64_t inputs[MAX_INPUTS]; + size_t inputs_count; + uint64_t results[MAX_RESULTS]; + size_t results_count; + + char start_time[MAX_TIMESTAMP_LEN]; + char finish_time[MAX_TIMESTAMP_LEN]; + char created_at[MAX_TIMESTAMP_LEN]; + char updated_at[MAX_TIMESTAMP_LEN]; }; struct registry_response { - char app_name[50]; - int chunk_idx; - int total_chunks; - char data[256]; + char app_name[64]; + char data[MAX_BASE64_LEN]; }; -struct chunk_tracker { - int total_chunks; - int received_chunks; - bool *chunk_received; - uint8_t *file_data; -}; +static const struct json_obj_descr task_descr[] = { + JSON_OBJ_DESCR_PRIM_NAMED(struct task, "id", id, JSON_TOK_STRING), + JSON_OBJ_DESCR_PRIM_NAMED(struct task, "name", name, JSON_TOK_STRING), + JSON_OBJ_DESCR_PRIM_NAMED(struct task, "state", state, JSON_TOK_STRING), + JSON_OBJ_DESCR_PRIM_NAMED(struct task, "image_url", image_url, JSON_TOK_STRING), -static const struct json_obj_descr start_command_descr[] = { - JSON_OBJ_DESCR_PRIM(struct start_command, task_id, JSON_TOK_STRING), - JSON_OBJ_DESCR_PRIM(struct start_command, app_name, JSON_TOK_STRING), - JSON_OBJ_DESCR_PRIM(struct start_command, wasm_file, JSON_TOK_STRING), -}; + JSON_OBJ_DESCR_PRIM_NAMED(struct task, "file", file, JSON_TOK_STRING), + + JSON_OBJ_DESCR_ARRAY_NAMED(struct task, "inputs", inputs, MAX_INPUTS, inputs_count, JSON_TOK_NUMBER), + JSON_OBJ_DESCR_ARRAY_NAMED(struct task, "results", results, MAX_RESULTS, results_count, JSON_TOK_NUMBER), -static const struct json_obj_descr stop_command_descr[] = { - JSON_OBJ_DESCR_PRIM(struct stop_command, task_id, JSON_TOK_STRING), + JSON_OBJ_DESCR_PRIM_NAMED(struct task, "start_time", start_time, JSON_TOK_STRING), + JSON_OBJ_DESCR_PRIM_NAMED(struct task, "finish_time", finish_time, JSON_TOK_STRING), + JSON_OBJ_DESCR_PRIM_NAMED(struct task, "created_at", created_at, JSON_TOK_STRING), + JSON_OBJ_DESCR_PRIM_NAMED(struct task, "updated_at", updated_at, JSON_TOK_STRING), }; static const struct json_obj_descr registry_response_descr[] = { - JSON_OBJ_DESCR_PRIM(struct registry_response, app_name, JSON_TOK_STRING), - JSON_OBJ_DESCR_PRIM(struct registry_response, chunk_idx, JSON_TOK_NUMBER), - JSON_OBJ_DESCR_PRIM(struct registry_response, total_chunks, JSON_TOK_NUMBER), - JSON_OBJ_DESCR_PRIM(struct registry_response, data, JSON_TOK_STRING), + JSON_OBJ_DESCR_PRIM_NAMED(struct registry_response, "app_name", app_name, JSON_TOK_STRING), + JSON_OBJ_DESCR_PRIM_NAMED(struct registry_response, "data", data, JSON_TOK_STRING), }; static void prepare_fds(struct mqtt_client *client) @@ -110,7 +132,6 @@ static void clear_fds(void) static int poll_mqtt_socket(struct mqtt_client *client, int timeout) { prepare_fds(client); - if (nfds <= 0) { return -EINVAL; } @@ -119,7 +140,6 @@ static int poll_mqtt_socket(struct mqtt_client *client, int timeout) if (rc < 0) { LOG_ERR("Socket poll error [%d]", rc); } - return rc; } @@ -141,45 +161,49 @@ static void mqtt_event_handler(struct mqtt_client *client, const struct mqtt_evt LOG_INF("Disconnected from MQTT broker"); break; - case MQTT_EVT_PUBLISH: - { - const struct mqtt_publish_param *pub = &evt->param.publish; - char payload[PAYLOAD_BUFFER_SIZE]; - char start_topic[128]; - char stop_topic[128]; - char registry_response_topic[128]; - int ret; - - extern const char *channel_id; - - snprintf(start_topic, sizeof(start_topic), START_TOPIC_TEMPLATE, channel_id); - snprintf(stop_topic, sizeof(stop_topic), STOP_TOPIC_TEMPLATE, channel_id); - snprintf(registry_response_topic, sizeof(registry_response_topic), REGISTRY_RESPONSE_TOPIC, channel_id); - - LOG_INF("Message received on topic: %s", pub->message.topic.topic.utf8); - - ret = mqtt_read_publish_payload(&client_ctx, payload, MIN(pub->message.payload.len, PAYLOAD_BUFFER_SIZE - 1)); - if (ret < 0) { - LOG_ERR("Failed to read payload [%d]", ret); - return; - } - - // Null-terminate the payload - payload[ret] = '\0'; - - LOG_INF("Payload: %s", payload); - - if (strncmp(pub->message.topic.topic.utf8, start_topic, pub->message.topic.topic.size) == 0) { - handle_start_command(payload); - } else if (strncmp(pub->message.topic.topic.utf8, stop_topic, pub->message.topic.topic.size) == 0) { - handle_stop_command(payload); - } else if (strncmp(pub->message.topic.topic.utf8, registry_response_topic, pub->message.topic.topic.size) == 0) { - handle_registry_response(payload); - } else { - LOG_WRN("Unknown topic"); - } - break; + case MQTT_EVT_PUBLISH: { + const struct mqtt_publish_param *pub = &evt->param.publish; + char payload[PAYLOAD_BUFFER_SIZE]; + int ret; + + extern const char *channel_id; + + char start_topic[128]; + char stop_topic[128]; + char registry_response_topic[128]; + + snprintf(start_topic, sizeof(start_topic), START_TOPIC_TEMPLATE, channel_id); + snprintf(stop_topic, sizeof(stop_topic), STOP_TOPIC_TEMPLATE, channel_id); + snprintf(registry_response_topic, sizeof(registry_response_topic), REGISTRY_RESPONSE_TOPIC, channel_id); + + LOG_INF("Message received on topic: %s", pub->message.topic.topic.utf8); + + ret = mqtt_read_publish_payload( + &client_ctx, + payload, + MIN(pub->message.payload.len, PAYLOAD_BUFFER_SIZE - 1) + ); + if (ret < 0) { + LOG_ERR("Failed to read payload [%d]", ret); + return; } + payload[ret] = '\0'; /* Null-terminate */ + LOG_INF("Payload: %s", payload); + + if (strncmp(pub->message.topic.topic.utf8, start_topic, pub->message.topic.topic.size) == 0) { + handle_start_command(payload); + } + else if (strncmp(pub->message.topic.topic.utf8, stop_topic, pub->message.topic.topic.size) == 0) { + handle_stop_command(payload); + } + else if (strncmp(pub->message.topic.topic.utf8, registry_response_topic, pub->message.topic.topic.size) == 0) { + handle_registry_response(payload); + } + else { + LOG_WRN("Unknown topic"); + } + break; + } case MQTT_EVT_SUBACK: LOG_INF("Subscribed successfully"); @@ -192,19 +216,15 @@ static void mqtt_event_handler(struct mqtt_client *client, const struct mqtt_evt case MQTT_EVT_PUBREC: LOG_INF("QoS 2 publish received"); break; - case MQTT_EVT_PUBREL: LOG_INF("QoS 2 publish released"); break; - case MQTT_EVT_PUBCOMP: LOG_INF("QoS 2 publish complete"); break; - case MQTT_EVT_UNSUBACK: LOG_INF("Unsubscribed successfully"); break; - case MQTT_EVT_PINGRESP: LOG_INF("Ping response received from broker"); break; @@ -215,35 +235,36 @@ static void mqtt_event_handler(struct mqtt_client *client, const struct mqtt_evt } } -int publish(const char *channel_id, const char *topic_template, const char *payload) +static void prepare_publish_param(struct mqtt_publish_param *param, + const char *topic_str, + const char *payload) { - char topic[128]; + memset(param, 0, sizeof(*param)); - snprintf(topic, sizeof(topic), topic_template, channel_id); + param->message.topic.topic.utf8 = topic_str; + param->message.topic.topic.size = strlen(topic_str); + param->message.topic.qos = MQTT_QOS_1_AT_LEAST_ONCE; + + param->message.payload.data = (uint8_t *)payload; + param->message.payload.len = strlen(payload); + param->message_id = sys_rand32_get() & 0xFFFF; + param->dup_flag = 0; + param->retain_flag= 0; +} + +int publish(const char *channel_id, const char *topic_template, const char *payload) +{ if (!mqtt_connected) { - LOG_ERR("MQTT client is not connected. Cannot publish to topic: %s", topic); + LOG_ERR("MQTT client is not connected. Cannot publish."); return -ENOTCONN; } - struct mqtt_publish_param param = { - .message = { - .topic = { - .topic = { - .utf8 = topic, - .size = strlen(topic), - }, - .qos = MQTT_QOS_1_AT_LEAST_ONCE, - }, - .payload = { - .data = (uint8_t *)payload, - .len = strlen(payload), - }, - }, - .message_id = sys_rand32_get() & 0xFFFF, - .dup_flag = 0, - .retain_flag = 0 - }; + char topic[128]; + snprintf(topic, sizeof(topic), topic_template, channel_id); + + struct mqtt_publish_param param; + prepare_publish_param(¶m, topic, payload); int ret = mqtt_publish(&client_ctx, ¶m); if (ret != 0) { @@ -255,29 +276,14 @@ int publish(const char *channel_id, const char *topic_template, const char *payl return 0; } -void publish_alive_message(const char *channel_id) -{ - char payload[128]; - snprintf(payload, sizeof(payload), - "{\"status\":\"alive\",\"proplet_id\":\"%s\",\"mg_channel_id\":\"%s\"}", - CLIENT_ID, channel_id); - publish(channel_id, ALIVE_TOPIC_TEMPLATE, payload); -} - -void publish_registry_request(const char *channel_id, const char *app_name) -{ - char payload[128]; - snprintf(payload, sizeof(payload), "{\"app_name\":\"%s\"}", app_name); - publish(channel_id, FETCH_REQUEST_TOPIC_TEMPLATE, payload); -} - int mqtt_client_connect(const char *proplet_id, const char *channel_id) { int ret; - struct sockaddr_in *broker = (struct sockaddr_in *)&broker_addr; + broker->sin_family = AF_INET; - broker->sin_port = htons(MQTT_BROKER_PORT); + broker->sin_port = htons(MQTT_BROKER_PORT); + ret = net_addr_pton(AF_INET, MQTT_BROKER_HOSTNAME, &broker->sin_addr); if (ret != 0) { LOG_ERR("Failed to resolve broker address, ret=%d", ret); @@ -290,7 +296,8 @@ int mqtt_client_connect(const char *proplet_id, const char *channel_id) snprintf(will_topic_str, sizeof(will_topic_str), ALIVE_TOPIC_TEMPLATE, channel_id); char will_message_str[256]; - snprintf(will_message_str, sizeof(will_message_str), WILL_MESSAGE_TEMPLATE, proplet_id, channel_id); + snprintf(will_message_str, sizeof(will_message_str), + WILL_MESSAGE_TEMPLATE, proplet_id, channel_id); struct mqtt_utf8 will_message = { .utf8 = (const uint8_t *)will_message_str, @@ -305,21 +312,21 @@ int mqtt_client_connect(const char *proplet_id, const char *channel_id) .qos = WILL_QOS, }; - client_ctx.broker = &broker_addr; - client_ctx.evt_cb = mqtt_event_handler; - client_ctx.client_id.utf8 = CLIENT_ID; - client_ctx.client_id.size = strlen(CLIENT_ID); - client_ctx.protocol_version = MQTT_VERSION_3_1_1; - client_ctx.transport.type = MQTT_TRANSPORT_NON_SECURE; + client_ctx.broker = &broker_addr; + client_ctx.evt_cb = mqtt_event_handler; + client_ctx.client_id.utf8 = CLIENT_ID; + client_ctx.client_id.size = strlen(CLIENT_ID); + client_ctx.protocol_version= MQTT_VERSION_3_1_1; + client_ctx.transport.type = MQTT_TRANSPORT_NON_SECURE; - client_ctx.rx_buf = rx_buffer; - client_ctx.rx_buf_size = RX_BUFFER_SIZE; - client_ctx.tx_buf = tx_buffer; - client_ctx.tx_buf_size = TX_BUFFER_SIZE; + client_ctx.rx_buf = rx_buffer; + client_ctx.rx_buf_size = RX_BUFFER_SIZE; + client_ctx.tx_buf = tx_buffer; + client_ctx.tx_buf_size = TX_BUFFER_SIZE; - client_ctx.will_topic = &will_topic; - client_ctx.will_message = &will_message; - client_ctx.will_retain = WILL_RETAIN; + client_ctx.will_topic = &will_topic; + client_ctx.will_message = &will_message; + client_ctx.will_retain = WILL_RETAIN; while (!mqtt_connected) { LOG_INF("Attempting to connect to the MQTT broker..."); @@ -338,7 +345,8 @@ int mqtt_client_connect(const char *proplet_id, const char *channel_id) mqtt_abort(&client_ctx); k_sleep(K_SECONDS(5)); continue; - } else if (ret == 0) { + } + else if (ret == 0) { LOG_ERR("Poll timed out waiting for CONNACK. Retrying in 5 seconds..."); mqtt_abort(&client_ctx); k_sleep(K_SECONDS(5)); @@ -357,66 +365,15 @@ int mqtt_client_connect(const char *proplet_id, const char *channel_id) return 0; } -int publish_discovery(const char *proplet_id, const char *channel_id) -{ - char topic[128]; - char payload[128]; - - snprintf(topic, sizeof(topic), DISCOVERY_TOPIC_TEMPLATE, channel_id); - snprintf(payload, sizeof(payload), - "{\"proplet_id\":\"%s\",\"mg_channel_id\":\"%s\"}", - proplet_id, channel_id); - - if (strlen(topic) >= sizeof(topic) || strlen(payload) >= sizeof(payload)) { - LOG_ERR("Discovery topic or payload size exceeds maximum allowable size."); - return -EINVAL; - } - - if (!mqtt_connected) { - LOG_ERR("MQTT client is not connected. Discovery announcement aborted."); - return -ENOTCONN; - } - - LOG_INF("Publishing discovery announcement for Proplet ID: %s, Channel ID: %s", proplet_id, channel_id); - - struct mqtt_publish_param param = { - .message = { - .topic = { - .topic = { - .utf8 = (uint8_t *)topic, - .size = strlen(topic), - }, - .qos = MQTT_QOS_1_AT_LEAST_ONCE, - }, - .payload = { - .data = (uint8_t *)payload, - .len = strlen(payload), - }, - }, - .message_id = sys_rand32_get() & 0xFFFF, - .dup_flag = 0, - .retain_flag = 0 - }; - - int ret = mqtt_publish(&client_ctx, ¶m); - if (ret != 0) { - LOG_ERR("Failed to publish discovery announcement. Error code: %d", ret); - return ret; - } - - LOG_INF("Discovery announcement published successfully to topic: %s", topic); - return 0; -} - int subscribe(const char *channel_id) { char start_topic[128]; char stop_topic[128]; char registry_response_topic[128]; - snprintf(start_topic, sizeof(start_topic), START_TOPIC_TEMPLATE, channel_id); - snprintf(stop_topic, sizeof(stop_topic), STOP_TOPIC_TEMPLATE, channel_id); - snprintf(registry_response_topic, sizeof(registry_response_topic), REGISTRY_RESPONSE_TOPIC, channel_id); + snprintf(start_topic, sizeof(start_topic), START_TOPIC_TEMPLATE, channel_id); + snprintf(stop_topic, sizeof(stop_topic), STOP_TOPIC_TEMPLATE, channel_id); + snprintf(registry_response_topic, sizeof(registry_response_topic), REGISTRY_RESPONSE_TOPIC, channel_id); struct mqtt_topic topics[] = { { @@ -443,7 +400,7 @@ int subscribe(const char *channel_id) }; struct mqtt_subscription_list sub_list = { - .list = topics, + .list = topics, .list_count = ARRAY_SIZE(topics), .message_id = 1, }; @@ -452,7 +409,7 @@ int subscribe(const char *channel_id) int ret = mqtt_subscribe(&client_ctx, &sub_list); if (ret != 0) { - LOG_ERR("Failed to subscribe to topics for channel ID: %s. Error code: %d", channel_id, ret); + LOG_ERR("Failed to subscribe to topics for channel ID: %s. Error: %d", channel_id, ret); } else { LOG_INF("Successfully subscribed to topics for channel ID: %s", channel_id); } @@ -460,113 +417,193 @@ int subscribe(const char *channel_id) return ret; } -void handle_start_command(const char *payload) { - struct start_command cmd; - int ret; - - ret = json_obj_parse((char *)payload, strlen(payload), start_command_descr, ARRAY_SIZE(start_command_descr), &cmd); +void handle_start_command(const char *payload) +{ + struct task t; + memset(&t, 0, sizeof(t)); + int ret = json_obj_parse(payload, strlen(payload), + task_descr, + ARRAY_SIZE(task_descr), + &t); if (ret < 0) { - LOG_ERR("Failed to parse start command payload, error: %d", ret); + LOG_ERR("Failed to parse START task payload, error: %d", ret); return; } - LOG_INF("Starting task:"); - LOG_INF("Task ID: %s", cmd.task_id); - LOG_INF("Function: %s", cmd.app_name); - LOG_INF("Wasm File: %s", cmd.wasm_file); + LOG_INF("Starting task: ID=%s, Name=%s, State=%s", t.id, t.name, t.state); + LOG_INF("image_url=%s, file-len(b64)=%zu", t.image_url, strlen(t.file)); + LOG_INF("inputs_count=%zu", t.inputs_count); - // TODO: Use WAMR runtime to start the task - // Example: - // wamr_start_app(cmd.wasm_file, cmd.app_name); -} + memcpy(&g_current_task, &t, sizeof(struct task)); -void handle_stop_command(const char *payload) { - struct stop_command cmd; - int ret; + if (strlen(t.file) > 0) { + static uint8_t wasm_binary[MAX_BASE64_LEN]; + size_t wasm_decoded_len = 0; - ret = json_obj_parse((char *)payload, strlen(payload), stop_command_descr, ARRAY_SIZE(stop_command_descr), &cmd); - if (ret < 0) { - LOG_ERR("Failed to parse stop command payload, error: %d", ret); - return; + ret = base64_decode(wasm_binary, sizeof(wasm_binary), + &wasm_decoded_len, + (const uint8_t *)t.file, + strlen(t.file)); + if (ret < 0) { + LOG_ERR("Failed to decode base64 WASM (task.file). Err=%d", ret); + return; + } + g_current_task.file_len = wasm_decoded_len; + LOG_INF("Decoded WASM size: %zu", g_current_task.file_len); + + execute_wasm_module(g_current_task.id, + wasm_binary, + g_current_task.file_len, + g_current_task.inputs, + g_current_task.inputs_count); } - LOG_INF("Stopping task:"); - LOG_INF("Task ID: %s", cmd.task_id); - - // TODO: Use WAMR runtime to stop the task - // Example: - // wamr_stop_app(cmd.task_id); -} - -void request_registry_file(const char *channel_id, const char *app_name) -{ - char registry_payload[128]; - - snprintf(registry_payload, sizeof(registry_payload), - "{\"app_name\":\"%s\"}", - app_name); - - if (publish(channel_id, FETCH_REQUEST_TOPIC_TEMPLATE, registry_payload) != 0) { - LOG_ERR("Failed to request registry file"); - } else { - LOG_INF("Requested registry file for app: %s", app_name); + else if (strlen(t.image_url) > 0) { + LOG_INF("Requesting WASM from registry: %s", t.image_url); + extern const char *channel_id; + publish_registry_request(channel_id, t.image_url); + } + else { + LOG_WRN("No file or image_url specified; cannot start WASM task!"); } } -void publish_results(const char *channel_id, const char *task_id, const char *results) +void handle_stop_command(const char *payload) { - char results_payload[256]; - - snprintf(results_payload, sizeof(results_payload), - "{\"task_id\":\"%s\",\"results\":\"%s\"}", - task_id, results); + struct task t; + memset(&t, 0, sizeof(t)); - if (publish(channel_id, RESULTS_TOPIC_TEMPLATE, results_payload) != 0) { - LOG_ERR("Failed to publish results"); - } else { - LOG_INF("Published results for task: %s", task_id); + int ret = json_obj_parse(payload, strlen(payload), + task_descr, + ARRAY_SIZE(task_descr), + &t); + if (ret < 0) { + LOG_ERR("Failed to parse STOP task payload, error: %d", ret); + return; } + + LOG_INF("Stopping task: ID=%s, Name=%s, State=%s", t.id, t.name, t.state); + stop_wasm_app(t.id); } -int handle_registry_response(const char *payload) { +/** + * We receive a single chunk "data" field with the full base64 WASM. + */ +int handle_registry_response(const char *payload) +{ struct registry_response resp; - int ret; + memset(&resp, 0, sizeof(resp)); - ret = json_obj_parse((char *)payload, strlen(payload), registry_response_descr, ARRAY_SIZE(registry_response_descr), &resp); + int ret = json_obj_parse(payload, strlen(payload), + registry_response_descr, + ARRAY_SIZE(registry_response_descr), + &resp); if (ret < 0) { LOG_ERR("Failed to parse registry response, error: %d", ret); return -1; } - LOG_INF("Registry response received:"); - LOG_INF("App Name: %s", resp.app_name); + LOG_INF("Single-chunk registry response for app: %s", resp.app_name); - size_t decoded_len = (strlen(resp.data) * 3) / 4; + size_t encoded_len = strlen(resp.data); + size_t decoded_len = (encoded_len * 3) / 4; uint8_t *binary_data = malloc(decoded_len); if (!binary_data) { LOG_ERR("Failed to allocate memory for decoded binary"); return -1; } - size_t binary_data_len = decoded_len; - ret = base64_decode(binary_data, decoded_len, &binary_data_len, (const uint8_t *)resp.data, strlen(resp.data)); + size_t actual_decoded_len = decoded_len; + ret = base64_decode(binary_data, + decoded_len, + &actual_decoded_len, + (const uint8_t *)resp.data, + encoded_len); if (ret < 0) { - LOG_ERR("Failed to decode Base64 data, error: %d", ret); + LOG_ERR("Failed to decode base64 single-chunk, err=%d", ret); free(binary_data); return -1; } - LOG_INF("Successfully decoded Wasm binary. Executing now..."); + LOG_INF("Decoded single-chunk WASM size: %zu. Executing now...", actual_decoded_len); + + execute_wasm_module(g_current_task.id, + binary_data, + actual_decoded_len, + g_current_task.inputs, + g_current_task.inputs_count); - execute_wasm_module(binary_data, binary_data_len); free(binary_data); - LOG_INF("Wasm binary executed"); + LOG_INF("WASM binary executed from single-chunk registry response."); + return 0; +} +void publish_alive_message(const char *channel_id) +{ + char payload[128]; + snprintf(payload, sizeof(payload), + "{\"status\":\"alive\",\"proplet_id\":\"%s\",\"mg_channel_id\":\"%s\"}", + CLIENT_ID, channel_id); + publish(channel_id, ALIVE_TOPIC_TEMPLATE, payload); +} + +int publish_discovery(const char *proplet_id, const char *channel_id) +{ + char topic[128]; + char payload[128]; + + snprintf(topic, sizeof(topic), DISCOVERY_TOPIC_TEMPLATE, channel_id); + snprintf(payload, sizeof(payload), + "{\"proplet_id\":\"%s\",\"mg_channel_id\":\"%s\"}", + proplet_id, channel_id); + + if (!mqtt_connected) { + LOG_ERR("MQTT client is not connected. Discovery aborted."); + return -ENOTCONN; + } + + struct mqtt_publish_param param; + prepare_publish_param(¶m, topic, payload); + + int ret = mqtt_publish(&client_ctx, ¶m); + if (ret != 0) { + LOG_ERR("Failed to publish discovery. Error: %d", ret); + return ret; + } + + LOG_INF("Discovery published successfully to topic: %s", topic); return 0; } +void publish_registry_request(const char *channel_id, const char *app_name) +{ + char payload[128]; + snprintf(payload, sizeof(payload), "{\"app_name\":\"%s\"}", app_name); + + if (publish(channel_id, FETCH_REQUEST_TOPIC_TEMPLATE, payload) != 0) { + LOG_ERR("Failed to request registry file"); + } else { + LOG_INF("Requested registry file for: %s", app_name); + } +} + +void publish_results(const char *channel_id, const char *task_id, const char *results) +{ + char results_payload[256]; + + snprintf(results_payload, sizeof(results_payload), + "{\"task_id\":\"%s\",\"results\":\"%s\"}", + task_id, results); + + if (publish(channel_id, RESULTS_TOPIC_TEMPLATE, results_payload) != 0) { + LOG_ERR("Failed to publish results"); + } else { + LOG_INF("Published results for task: %s", task_id); + } +} + void mqtt_client_process(void) { if (mqtt_connected) { diff --git a/embed-proplet/src/mqtt_client.h b/embed-proplet/src/mqtt_client.h index 3a52d82..375836e 100644 --- a/embed-proplet/src/mqtt_client.h +++ b/embed-proplet/src/mqtt_client.h @@ -24,7 +24,6 @@ int mqtt_client_connect(const char *proplet_id, const char *channel_id); */ int mqtt_client_connect(const char *proplet_id, const char *channel_id); - /** * @brief Subscribe to topics for a specific channel. * diff --git a/embed-proplet/src/wasm_handler.c b/embed-proplet/src/wasm_handler.c index 5854ca6..8a35599 100644 --- a/embed-proplet/src/wasm_handler.c +++ b/embed-proplet/src/wasm_handler.c @@ -1,46 +1,194 @@ #include "wasm_handler.h" #include #include +#include +#include LOG_MODULE_REGISTER(wasm_handler); -void execute_wasm_module(const uint8_t *wasm_data, size_t wasm_size) +#define MAX_WASM_APPS 10 /* How many different Wasm tasks we can track simultaneously */ +#define MAX_ID_LEN 64 +#define MAX_INPUTS 16 /* Max 32-bit arguments we’ll pass to WASM main */ + +/* A record of a "running" (loaded & instantiated) Wasm module. */ +typedef struct { + bool in_use; + char id[MAX_ID_LEN]; + wasm_module_t module; + wasm_module_inst_t module_inst; +} wasm_app_t; + +/* We'll maintain a small global array of possible Wasm apps. */ +static wasm_app_t g_wasm_apps[MAX_WASM_APPS]; + +/* Keep track of whether we've called wasm_runtime_full_init() yet. */ +static bool g_wamr_initialized = false; + +/* Forward declarations for some helper functions. */ +static void maybe_init_wamr_runtime(void); +static int find_free_slot(void); +static int find_app_by_id(const char *task_id); + +/*-------------------------------------------------------------------------*/ +/* Public API: execute_wasm_module(...) */ +/*-------------------------------------------------------------------------*/ +void execute_wasm_module(const char *task_id, + const uint8_t *wasm_data, + size_t wasm_size, + const uint64_t *inputs, + size_t inputs_count) { - RuntimeInitArgs init_args = { .mem_alloc_type = Alloc_With_System_Allocator }; - if (!wasm_runtime_full_init(&init_args)) { - LOG_ERR("Failed to initialize WAMR runtime."); + /* Make sure the WAMR runtime is initialized once. */ + maybe_init_wamr_runtime(); + if (!g_wamr_initialized) { + LOG_ERR("WAMR runtime not available, cannot execute WASM"); return; } + /* If a Wasm app with this ID is already running, stop it first. */ + int existing_idx = find_app_by_id(task_id); + if (existing_idx >= 0) { + LOG_WRN("WASM app with ID %s is already running. Stopping it first...", task_id); + stop_wasm_app(task_id); + } + + /* Find a free slot in the global array. */ + int slot = find_free_slot(); + if (slot < 0) { + LOG_ERR("No free slot to store new WASM app instance (increase MAX_WASM_APPS)."); + return; + } + + /* Load the module from memory. */ char error_buf[128]; - wasm_module_t module = wasm_runtime_load(wasm_data, wasm_size, error_buf, sizeof(error_buf)); + wasm_module_t module = wasm_runtime_load(wasm_data, wasm_size, + error_buf, sizeof(error_buf)); if (!module) { - LOG_ERR("Failed to load Wasm module: %s", error_buf); - wasm_runtime_destroy(); + LOG_ERR("Failed to load WASM module: %s", error_buf); return; } - wasm_module_inst_t module_inst = wasm_runtime_instantiate(module, 1024, 1024, error_buf, sizeof(error_buf)); + /* Instantiate the module. Increase stack/heap if needed. */ + wasm_module_inst_t module_inst = wasm_runtime_instantiate(module, + 4 * 1024, /* stack size */ + 4 * 1024, /* heap size */ + error_buf, + sizeof(error_buf)); if (!module_inst) { - LOG_ERR("Failed to instantiate Wasm module: %s", error_buf); + LOG_ERR("Failed to instantiate WASM module: %s", error_buf); wasm_runtime_unload(module); - wasm_runtime_destroy(); return; } + /* Store references in the global array, so we can stop it later. */ + g_wasm_apps[slot].in_use = true; + strncpy(g_wasm_apps[slot].id, task_id, MAX_ID_LEN - 1); + g_wasm_apps[slot].id[MAX_ID_LEN - 1] = '\0'; + g_wasm_apps[slot].module = module; + g_wasm_apps[slot].module_inst = module_inst; + + /* + * Optionally call "main" right away. If your Wasm module is meant to run + * a single function and then exit, this is enough. If it's a long-running + * module (e.g., with timers or state), you'd skip the immediate call. + */ wasm_function_inst_t func = wasm_runtime_lookup_function(module_inst, "main"); - if (func) { - LOG_INF("Executing Wasm application..."); - if (!wasm_runtime_call_wasm(module_inst, func, 0, NULL)) { - LOG_ERR("Error invoking Wasm function."); - } else { - LOG_INF("Wasm application executed successfully."); - } + if (!func) { + LOG_WRN("Function 'main' not found in WASM module. No entry point to call."); + return; + } + + LOG_INF("Executing 'main' in WASM module with ID=%s", task_id); + + /* Convert 64-bit inputs to 32-bit if your "main" expects 32-bit args. */ + uint32_t arg_buf[MAX_INPUTS]; + memset(arg_buf, 0, sizeof(arg_buf)); + size_t n_args = (inputs_count > MAX_INPUTS) ? MAX_INPUTS : inputs_count; + for (size_t i = 0; i < n_args; i++) { + arg_buf[i] = (uint32_t)(inputs[i] & 0xFFFFFFFFu); + } + + if (!wasm_runtime_call_wasm(module_inst, func, n_args, arg_buf)) { + LOG_ERR("Error invoking WASM function 'main'"); } else { - LOG_ERR("Function 'main' not found in Wasm module."); + LOG_INF("WASM 'main' executed successfully."); + } + + /* + * NOTE: We do NOT call wasm_runtime_deinstantiate() or wasm_runtime_unload() + * here, so the module remains loaded. That’s what allows stop_wasm_app() + * to do meaningful cleanup later. + */ +} + +/*-------------------------------------------------------------------------*/ +/* Public API: stop_wasm_app(...) */ +/*-------------------------------------------------------------------------*/ +void stop_wasm_app(const char *task_id) +{ + int idx = find_app_by_id(task_id); + if (idx < 0) { + LOG_WRN("No running WASM app found with ID=%s", task_id); + return; } - wasm_runtime_deinstantiate(module_inst); - wasm_runtime_unload(module); - wasm_runtime_destroy(); + wasm_app_t *app = &g_wasm_apps[idx]; + LOG_INF("Stopping WASM app with ID=%s", app->id); + + /* Properly deinstantiate and unload the module. */ + wasm_runtime_deinstantiate(app->module_inst); + wasm_runtime_unload(app->module); + + /* Clear our record for re-use. */ + app->in_use = false; + memset(app->id, 0, sizeof(app->id)); + + LOG_INF("WASM app [%s] has been stopped and unloaded.", task_id); +} + +/*-------------------------------------------------------------------------*/ +/* Internal Helpers */ +/*-------------------------------------------------------------------------*/ + +/** One-time WAMR runtime initialization. */ +static void maybe_init_wamr_runtime(void) +{ + if (g_wamr_initialized) { + return; /* Already inited */ + } + + RuntimeInitArgs init_args; + memset(&init_args, 0, sizeof(init_args)); + init_args.mem_alloc_type = Alloc_With_System_Allocator; + + if (!wasm_runtime_full_init(&init_args)) { + LOG_ERR("Failed to initialize WAMR runtime."); + return; + } + + g_wamr_initialized = true; + LOG_INF("WAMR runtime initialized successfully."); +} + +/** Finds the first free slot in the global g_wasm_apps array, or -1 if full. */ +static int find_free_slot(void) +{ + for (int i = 0; i < MAX_WASM_APPS; i++) { + if (!g_wasm_apps[i].in_use) { + return i; + } + } + return -1; +} + +/** Looks up a loaded module by task ID. Returns array index or -1 if not found. */ +static int find_app_by_id(const char *task_id) +{ + for (int i = 0; i < MAX_WASM_APPS; i++) { + if (g_wasm_apps[i].in_use && + (strcmp(g_wasm_apps[i].id, task_id) == 0)) { + return i; + } + } + return -1; } diff --git a/embed-proplet/src/wasm_handler.h b/embed-proplet/src/wasm_handler.h index 298de1e..b453dee 100644 --- a/embed-proplet/src/wasm_handler.h +++ b/embed-proplet/src/wasm_handler.h @@ -1,14 +1,38 @@ #ifndef WASM_HANDLER_H #define WASM_HANDLER_H -#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Loads and instantiates a Wasm module, then invokes its "main" function (if present). + * The module remains loaded in memory so that it can be stopped later by ID. + * + * @param task_id Unique identifier (string) for this Wasm "task." + * @param wasm_data Pointer to the Wasm file data in memory. + * @param wasm_size Size of the Wasm file data in bytes. + * @param inputs Array of 64-bit inputs that the Wasm main function might consume. + * @param inputs_count Number of elements in the 'inputs' array. + */ +void execute_wasm_module(const char *task_id, + const uint8_t *wasm_data, + size_t wasm_size, + const uint64_t *inputs, + size_t inputs_count); /** - * Execute a Wasm application from the provided memory buffer. + * Stops the Wasm module with the given task_id by deinstantiating and unloading it from memory. * - * @param wasm_data Pointer to the Wasm file data in memory. - * @param wasm_size Size of the Wasm file data in bytes. + * @param task_id The unique string ID assigned to the Wasm module at start time. */ -void execute_wasm_module(const uint8_t *wasm_data, size_t wasm_size); +void stop_wasm_app(const char *task_id); + +#ifdef __cplusplus +} +#endif #endif /* WASM_HANDLER_H */