From 05d5f4f46debc88eb03ade26a0b28c31592d1114 Mon Sep 17 00:00:00 2001 From: Vardan2009 <70532109+Vardan2009@users.noreply.github.com> Date: Sun, 27 Oct 2024 14:40:50 +0400 Subject: [PATCH] Added basic security prompts Working on #177 --- core/builtin_classes/file_object.py | 15 ++++++++++++ core/builtin_classes/requests_object.py | 11 +++++++++ core/builtin_funcs.py | 3 +++ core/prompt.py | 31 +++++++++++++++++++++++++ 4 files changed, 60 insertions(+) create mode 100644 core/prompt.py diff --git a/core/builtin_classes/file_object.py b/core/builtin_classes/file_object.py index 42c2a26..41caf42 100644 --- a/core/builtin_classes/file_object.py +++ b/core/builtin_classes/file_object.py @@ -6,6 +6,7 @@ from core.errors import RTError from core.parser import Context, RTResult from core.tokens import Position +from core import prompt class FileObject(BuiltInObject): @@ -14,6 +15,8 @@ class FileObject(BuiltInObject): @operator("__constructor__") @check([String, String], [None, String("r")]) def constructor(self, path: String, mode: String) -> RTResult[Value]: + prompt.security_prompt("disk_read") + allowed_modes = [None, "r", "w", "a", "r+", "w+", "a+"] # Allowed modes for opening files res = RTResult[Value]() if mode.value not in allowed_modes: @@ -29,6 +32,8 @@ def constructor(self, path: String, mode: String) -> RTResult[Value]: @args(["count"], [Number(-1)]) @method def read(self, ctx: Context) -> RTResult[Value]: + prompt.security_prompt("disk_read") + res = RTResult[Value]() count = ctx.symbol_table.get("count") assert count is not None @@ -49,6 +54,8 @@ def read(self, ctx: Context) -> RTResult[Value]: @args([]) @method def readline(self, ctx: Context) -> RTResult[Value]: + prompt.security_prompt("disk_read") + res = RTResult[Value]() try: value = self.file.readline() @@ -60,6 +67,8 @@ def readline(self, ctx: Context) -> RTResult[Value]: @args([]) @method def readlines(self, ctx: Context) -> RTResult[Value]: + prompt.security_prompt("disk_read") + res = RTResult[Value]() try: value = self.file.readlines() @@ -71,6 +80,8 @@ def readlines(self, ctx: Context) -> RTResult[Value]: @args(["data"]) @method def write(self, ctx: Context) -> RTResult[Value]: + prompt.security_prompt("disk_read") + res = RTResult[Value]() data = ctx.symbol_table.get("data") assert data is not None @@ -88,6 +99,8 @@ def write(self, ctx: Context) -> RTResult[Value]: @args([]) @method def close(self, _ctx: Context) -> RTResult[Value]: + prompt.security_prompt("disk_read") + res = RTResult[Value]() self.file.close() return res.success(Null.null()) @@ -95,5 +108,7 @@ def close(self, _ctx: Context) -> RTResult[Value]: @args([]) @method def is_closed(self, _ctx: Context) -> RTResult[Value]: + prompt.security_prompt("disk_read") + res = RTResult[Value]() return res.success(Boolean(self.file.closed)) diff --git a/core/builtin_classes/requests_object.py b/core/builtin_classes/requests_object.py index e3693b9..403f086 100644 --- a/core/builtin_classes/requests_object.py +++ b/core/builtin_classes/requests_object.py @@ -7,6 +7,7 @@ from core.datatypes import HashMap, Null, String, Value, deradonify, radonify from core.errors import RTError from core.parser import Context, RTResult +from core import prompt class RequestsObject(BuiltInObject): @@ -18,6 +19,8 @@ def constructor(self) -> RTResult[Value]: @args(["url", "headers"], [None, HashMap({})]) @method def get(self, ctx: Context) -> RTResult[Value]: + prompt.security_prompt("web_requests") + res = RTResult[Value]() url = ctx.symbol_table.get("url") assert url is not None @@ -38,6 +41,8 @@ def get(self, ctx: Context) -> RTResult[Value]: @args(["url", "data", "headers"], [None, HashMap({}), HashMap({})]) @method def post(self, ctx: Context) -> RTResult[Value]: + prompt.security_prompt("web_requests") + res = RTResult[Value]() url = ctx.symbol_table.get("url") assert url is not None @@ -66,6 +71,8 @@ def post(self, ctx: Context) -> RTResult[Value]: @args(["url", "data", "headers"], [None, HashMap({}), HashMap({})]) @method def put(self, ctx: Context) -> RTResult[Value]: + prompt.security_prompt("web_requests") + res = RTResult[Value]() url = ctx.symbol_table.get("url") assert url is not None @@ -93,6 +100,8 @@ def put(self, ctx: Context) -> RTResult[Value]: @args(["url", "headers"], [None, HashMap({})]) @method def delete(self, ctx: Context) -> RTResult[Value]: + prompt.security_prompt("web_requests") + res = RTResult[Value]() url = ctx.symbol_table.get("url") assert url is not None @@ -113,6 +122,8 @@ def delete(self, ctx: Context) -> RTResult[Value]: @args(["url", "data", "headers"], [None, HashMap({}), HashMap({})]) @method def patch(self, ctx: Context) -> RTResult[Value]: + prompt.security_prompt("web_requests") + res = RTResult[Value]() url = ctx.symbol_table.get("url") assert url is not None diff --git a/core/builtin_funcs.py b/core/builtin_funcs.py index 3f2a27c..122207d 100755 --- a/core/builtin_funcs.py +++ b/core/builtin_funcs.py @@ -3,6 +3,7 @@ import os from sys import stdout from typing import Callable, Generic, NoReturn, Optional, ParamSpec, Protocol, Sequence, Union, cast +from core import prompt from core.datatypes import ( Array, @@ -427,6 +428,8 @@ def execute_type(self, exec_ctx: Context) -> RTResult[Value]: @args(["code", "ns"]) def execute_pyapi(self, exec_ctx: Context) -> RTResult[Value]: + prompt.security_prompt("unsafe_code") + res = RTResult[Value]() code = exec_ctx.symbol_table.get("code") diff --git a/core/prompt.py b/core/prompt.py new file mode 100644 index 0000000..48487c9 --- /dev/null +++ b/core/prompt.py @@ -0,0 +1,31 @@ +from typing import Literal +from core.colortools import Log + +# Define all types of security prompts +SecurityPromptType = Literal["unsafe_code", "disk_read", "web_requests"] +type_messages: dict[str, str] = { + "unsafe_code": "This program is attempting to execute potentially unsafe python code", + "disk_read": "This program is attempting to access the filesystem", + "web_requests": "This program is attempting to invoke web requests", +} + +# List of allowed actions (used during code execution) +allowed: dict[str, bool] = {} + + +def security_prompt(type: SecurityPromptType) -> None: + # If action already allowed, continue + if type in allowed: + return + # Log the message and get a y/n prompt by user + print(f"{Log.deep_warning(f"[{type.upper()}]")} {Log.deep_info(type_messages[type], True)}. Continue execution?") + print(f"{Log.deep_purple("[Y/n] -> ")}", end="") + # If user agreed + if input().lower() == "y": + # Add action to allowed list + allowed[type] = True + return + # Exit program + print("Permission denied by user.") + exit(1) + return