Skip to content

Commit

Permalink
Implement support for script execution kill and timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Ricardo Dias <[email protected]>
  • Loading branch information
rjd15372 committed Jan 17, 2025
1 parent 42e3fa2 commit 7ebf8d9
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 64 deletions.
53 changes: 25 additions & 28 deletions src/eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,29 +156,27 @@ void freeEvalScriptsSync(dict *scripts, list *scripts_lru_list, list *engine_cal
}
}

static int resetEngineEvalEnvCallback(scriptingEngine *engine, void *context) {
static void resetEngineEvalEnvCallback(scriptingEngine *engine, void *context) {
int async = context != NULL;
callableLazyEvalReset *callback = scriptingEngineCallResetEvalEnvFunc(engine, async);

if (async) {
list *callbacks = context;
listAddNodeTail(callbacks, callback);
}

return 1;
}

/* Release resources related to Lua scripting.
* This function is used in order to reset the scripting environment. */
void evalRelease(int async) {
if (async) {
list *engine_callbacks = listCreate();
engineManagerForEachEngine(resetEngineEvalEnvCallback, engine_callbacks);
scriptingEngineManagerForEachEngine(resetEngineEvalEnvCallback, engine_callbacks);
freeEvalScriptsAsync(evalCtx.scripts, evalCtx.scripts_lru_list, engine_callbacks);

} else {
freeEvalScriptsSync(evalCtx.scripts, evalCtx.scripts_lru_list, NULL);
engineManagerForEachEngine(resetEngineEvalEnvCallback, NULL);
scriptingEngineManagerForEachEngine(resetEngineEvalEnvCallback, NULL);
}
}

Expand Down Expand Up @@ -319,7 +317,7 @@ static void evalDeleteScript(client *c, sds sha) {
scriptHolder *sh = dictGetVal(de);

/* Delete the script from the engine. */
engineCallFreeFunction(sh->engine, VMSE_EVAL, sh->script);
scriptingEngineCallFreeFunction(sh->engine, VMSE_EVAL, sh->script);

evalCtx.scripts_mem -= sdsAllocSize(sha) + getStringObjectSdsUsedMemory(sh->body);
dictFreeUnlinkedEntry(evalCtx.scripts, de);
Expand Down Expand Up @@ -397,7 +395,7 @@ static int evalRegisterNewScript(client *c, robj *body, char **sha) {
}

serverAssert(engine_name != NULL);
scriptingEngine *engine = engineManagerFind(engine_name);
scriptingEngine *engine = scriptingEngineManagerFind(engine_name);
if (!engine) {
if (c != NULL) {
addReplyErrorFormat(c, "Could not find scripting engine '%s'", engine_name);
Expand All @@ -414,12 +412,12 @@ static int evalRegisterNewScript(client *c, robj *body, char **sha) {
robj *_err = NULL;
size_t num_compiled_functions = 0;
compiledFunction **functions =
engineCallCompileCode(engine,
VMSE_EVAL,
(sds)body->ptr + shebang_len,
0,
&num_compiled_functions,
&_err);
scriptingEngineCallCompileCode(engine,
VMSE_EVAL,
(sds)body->ptr + shebang_len,
0,
&num_compiled_functions,
&_err);
if (functions == NULL) {
serverAssert(_err != NULL);
if (c != NULL) {
Expand Down Expand Up @@ -449,7 +447,7 @@ static int evalRegisterNewScript(client *c, robj *body, char **sha) {
}
sh->body = body;
int retval = dictAdd(evalCtx.scripts, _sha, sh);
serverAssertWithInfo(c ? c : engineGetClient(engine), NULL, retval == DICT_OK);
serverAssertWithInfo(c ? c : scriptingEngineGetClient(engine), NULL, retval == DICT_OK);
evalCtx.scripts_mem += sdsAllocSize(_sha) + getStringObjectSdsUsedMemory(body);
incrRefCount(body);
zfree(functions);
Expand Down Expand Up @@ -501,21 +499,21 @@ static void evalGenericCommand(client *c, int evalsha) {
int ro = c->cmd->proc == evalRoCommand || c->cmd->proc == evalShaRoCommand;

scriptRunCtx rctx;
if (scriptPrepareForRun(&rctx, engineGetClient(sh->engine), c, sha, sh->flags, ro) != C_OK) {
if (scriptPrepareForRun(&rctx, scriptingEngineGetClient(sh->engine), c, sha, sh->flags, ro) != C_OK) {
return;
}
rctx.flags |= SCRIPT_EVAL_MODE; /* mark the current run as EVAL (as opposed to FCALL) so we'll
get appropriate error messages and logs */

engineCallFunction(sh->engine,
&rctx,
c,
sh->script,
VMSE_EVAL,
c->argv + 3,
numkeys,
c->argv + 3 + numkeys,
c->argc - 3 - numkeys);
scriptingEngineCallFunction(sh->engine,
&rctx,
c,
sh->script,
VMSE_EVAL,
c->argv + 3,
numkeys,
c->argv + 3 + numkeys,
c->argc - 3 - numkeys);
scriptResetRun(&rctx);

if (sh->node) {
Expand Down Expand Up @@ -654,16 +652,15 @@ void scriptCommand(client *c) {
}
}

static int getEngineUsedMemory(scriptingEngine *engine, void *context) {
static void getEngineUsedMemory(scriptingEngine *engine, void *context) {
size_t *sum = (size_t *)context;
engineMemoryInfo mem_info = engineCallGetMemoryInfo(engine, VMSE_EVAL);
engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(engine, VMSE_EVAL);
*sum += mem_info.used_memory;
return 1;
}

unsigned long evalMemory(void) {
size_t memory = 0;
engineManagerForEachEngine(getEngineUsedMemory, &memory);
scriptingEngineManagerForEachEngine(getEngineUsedMemory, &memory);
return memory;
}

Expand Down
8 changes: 4 additions & 4 deletions src/lua/engine_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ int luaEngineInitEngine(void) {
.get_memory_info = luaEngineGetMemoryInfo,
};

return scriptingEngineManagerRegisterEngine(LUA_ENGINE_NAME,
NULL,
createEngineContext(),
&methods);
return scriptingEngineManagerRegister(LUA_ENGINE_NAME,
NULL,
createEngineContext(),
&methods);
}
16 changes: 16 additions & 0 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -13191,6 +13191,21 @@ int VM_UnregisterScriptingEngine(ValkeyModuleCtx *ctx, const char *engine_name)
return VALKEYMODULE_OK;
}

/* Returns the state of the current function being executed by the scripting
* engine.
*
* `server_ctx` is the server runtime context.
*
* It will return VMSE_STATE_KILLED if the function was already killed either by
* a `SCRIPT KILL`, or `FUNCTION KILL`.
*/
ValkeyModuleScriptingEngineExecutionState VM_GetFunctionExecutionState(
ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx) {
int ret = scriptInterrupt(server_ctx);
serverAssert(ret == SCRIPT_CONTINUE || ret == SCRIPT_KILL);
return ret == SCRIPT_CONTINUE ? VMSE_STATE_EXECUTING : VMSE_STATE_KILLED;
}

/* MODULE command.
*
* MODULE LIST
Expand Down Expand Up @@ -14063,4 +14078,5 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(RdbSave);
REGISTER_API(RegisterScriptingEngine);
REGISTER_API(UnregisterScriptingEngine);
REGISTER_API(GetFunctionExecutionState);
}
20 changes: 10 additions & 10 deletions src/scripting_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ int scriptingEngineManagerUnregister(const char *engine_name) {

functionsRemoveLibFromEngine(e);

engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(e);
engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(e, VMSE_ALL);
engineMgr.total_memory_overhead -= zmalloc_size(e) +
sdsAllocSize(e->name) +
mem_info.engine_memory_overhead;
Expand Down Expand Up @@ -215,17 +215,17 @@ static void engineTeardownModuleCtx(scriptingEngine *e) {
}
}

compiledFunction **scriptinEngineCallCompileCode(scriptingEngine *engine,
subsystemType type,
const char *code,
size_t timeout,
size_t *out_num_compiled_functions,
robj **err) {
compiledFunction **scriptingEngineCallCompileCode(scriptingEngine *engine,
subsystemType type,
const char *code,
size_t timeout,
size_t *out_num_compiled_functions,
robj **err) {
serverAssert(type == VMSE_EVAL || type == VMSE_FUNCTION);

engineSetupModuleCtx(engine, NULL);

compiledFunction **functions = engine->impl->methods.compile_code(
compiledFunction **functions = engine->impl.methods.compile_code(
engine->module_ctx,
engine->impl.ctx,
type,
Expand Down Expand Up @@ -290,11 +290,11 @@ size_t scriptingEngineCallGetFunctionMemoryOverhead(scriptingEngine *engine,
}

callableLazyEvalReset *scriptingEngineCallResetEvalEnvFunc(scriptingEngine *engine,
int async) {
int async) {
engineSetupModuleCtx(engine, NULL);
callableLazyEvalReset *callback = engine->impl.methods.reset_eval_env(
engine->module_ctx,
engine->impl->ctx,
engine->impl.ctx,
async);
engineTeardownModuleCtx(engine);
return callback;
Expand Down
12 changes: 6 additions & 6 deletions src/scripting_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ typedef void (*engineIterCallback)(scriptingEngine *engine, void *context);
* Engine manager API functions.
*/
int scriptingEngineManagerInit(void);
size_t scriptingEngineManagerGetCacheMemory(void);
size_t scriptingEngineManagerGetTotalMemoryOverhead(void);
size_t scriptingEngineManagerGetNumEngines(void);
size_t scriptingEngineManagerGetMemoryUsage(void);
int scriptingEngineManagerRegisterEngine(const char *engine_name,
ValkeyModule *engine_module,
engineCtx *engine_ctx,
engineMethods *engine_methods);
int scriptingEngineManagerUnregisterEngine(const char *engine_name);
int scriptingEngineManagerRegister(const char *engine_name,
ValkeyModule *engine_module,
engineCtx *engine_ctx,
engineMethods *engine_methods);
int scriptingEngineManagerUnregister(const char *engine_name);
scriptingEngine *scriptingEngineManagerFind(const char *engine_name);
void scriptingEngineManagerForEachEngine(engineIterCallback callback, void *context);

Expand Down
7 changes: 3 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1369,11 +1369,10 @@ void checkChildrenDone(void) {
}
}

static int sumEngineUsedMemory(scriptingEngine *engine, void *context) {
static void sumEngineUsedMemory(scriptingEngine *engine, void *context) {
size_t *total_memory = (size_t *)context;
engineMemoryInfo mem_info = engineCallGetMemoryInfo(engine, VMSE_ALL);
engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(engine, VMSE_ALL);
*total_memory += mem_info.used_memory;
return 1;
}

/* Called from serverCron and cronUpdateMemoryStats to update cached memory metrics. */
Expand Down Expand Up @@ -1402,7 +1401,7 @@ void cronUpdateMemoryStats(void) {
* so we must deduct it in order to be able to calculate correct
* "allocator fragmentation" ratio */
size_t engines_memory = 0;
engineManagerForEachEngine(sumEngineUsedMemory, &engines_memory);
scriptingEngineManagerForEachEngine(sumEngineUsedMemory, &engines_memory);
server.cron_malloc_stats.allocator_resident = server.cron_malloc_stats.process_rss - engines_memory;
}
if (!server.cron_malloc_stats.allocator_active)
Expand Down
8 changes: 8 additions & 0 deletions src/valkeymodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,11 @@ typedef enum ValkeyModuleScriptingEngineSubsystemType {
VMSE_ALL
} ValkeyModuleScriptingEngineSubsystemType;

typedef enum ValkeyModuleScriptingEngineExecutionState {
VMSE_STATE_EXECUTING,
VMSE_STATE_KILLED,
} ValkeyModuleScriptingEngineExecutionState;

typedef struct ValkeyModuleScriptingEngineCallableLazyEvalReset {
void *context;

Expand Down Expand Up @@ -1868,6 +1873,8 @@ VALKEYMODULE_API int (*ValkeyModule_RegisterScriptingEngine)(ValkeyModuleCtx *mo
VALKEYMODULE_API int (*ValkeyModule_UnregisterScriptingEngine)(ValkeyModuleCtx *module_ctx,
const char *engine_name) VALKEYMODULE_ATTR;

VALKEYMODULE_API ValkeyModuleScriptingEngineExecutionState (*ValkeyModule_GetFunctionExecutionState)(ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx) VALKEYMODULE_ATTR;

#define ValkeyModule_IsAOFClient(id) ((id) == UINT64_MAX)

/* This is included inline inside each Valkey module. */
Expand Down Expand Up @@ -2237,6 +2244,7 @@ static int ValkeyModule_Init(ValkeyModuleCtx *ctx, const char *name, int ver, in
VALKEYMODULE_GET_API(RdbSave);
VALKEYMODULE_GET_API(RegisterScriptingEngine);
VALKEYMODULE_GET_API(UnregisterScriptingEngine);
VALKEYMODULE_GET_API(GetFunctionExecutionState);

if (ValkeyModule_IsModuleNameBusy && ValkeyModule_IsModuleNameBusy(name)) return VALKEYMODULE_ERR;
ValkeyModule_SetModuleAttribs(ctx, name, ver, apiver);
Expand Down
Loading

0 comments on commit 7ebf8d9

Please sign in to comment.