diff --git a/core/builtin_classes/file_object.py b/core/builtin_classes/file_object.py index 42c2a26..b3b2c69 100644 --- a/core/builtin_classes/file_object.py +++ b/core/builtin_classes/file_object.py @@ -6,6 +6,7 @@ from core.errors import RTError from core.parser import Context, RTResult from core.tokens import Position +from core import security class FileObject(BuiltInObject): @@ -14,6 +15,8 @@ class FileObject(BuiltInObject): @operator("__constructor__") @check([String, String], [None, String("r")]) def constructor(self, path: String, mode: String) -> RTResult[Value]: + security.security_prompt("disk_access") + allowed_modes = [None, "r", "w", "a", "r+", "w+", "a+"] # Allowed modes for opening files res = RTResult[Value]() if mode.value not in allowed_modes: @@ -29,6 +32,8 @@ def constructor(self, path: String, mode: String) -> RTResult[Value]: @args(["count"], [Number(-1)]) @method def read(self, ctx: Context) -> RTResult[Value]: + security.security_prompt("disk_access") + res = RTResult[Value]() count = ctx.symbol_table.get("count") assert count is not None @@ -49,6 +54,8 @@ def read(self, ctx: Context) -> RTResult[Value]: @args([]) @method def readline(self, ctx: Context) -> RTResult[Value]: + security.security_prompt("disk_access") + res = RTResult[Value]() try: value = self.file.readline() @@ -60,6 +67,8 @@ def readline(self, ctx: Context) -> RTResult[Value]: @args([]) @method def readlines(self, ctx: Context) -> RTResult[Value]: + security.security_prompt("disk_access") + res = RTResult[Value]() try: value = self.file.readlines() @@ -71,6 +80,8 @@ def readlines(self, ctx: Context) -> RTResult[Value]: @args(["data"]) @method def write(self, ctx: Context) -> RTResult[Value]: + security.security_prompt("disk_access") + res = RTResult[Value]() data = ctx.symbol_table.get("data") assert data is not None @@ -88,6 +99,8 @@ def write(self, ctx: Context) -> RTResult[Value]: @args([]) @method def close(self, _ctx: Context) -> RTResult[Value]: + security.security_prompt("disk_access") + res = RTResult[Value]() self.file.close() return res.success(Null.null()) @@ -95,5 +108,7 @@ def close(self, _ctx: Context) -> RTResult[Value]: @args([]) @method def is_closed(self, _ctx: Context) -> RTResult[Value]: + security.security_prompt("disk_access") + res = RTResult[Value]() return res.success(Boolean(self.file.closed)) diff --git a/core/builtin_classes/requests_object.py b/core/builtin_classes/requests_object.py index e3693b9..35a9c4c 100644 --- a/core/builtin_classes/requests_object.py +++ b/core/builtin_classes/requests_object.py @@ -7,6 +7,7 @@ from core.datatypes import HashMap, Null, String, Value, deradonify, radonify from core.errors import RTError from core.parser import Context, RTResult +from core import security class RequestsObject(BuiltInObject): @@ -18,6 +19,8 @@ def constructor(self) -> RTResult[Value]: @args(["url", "headers"], [None, HashMap({})]) @method def get(self, ctx: Context) -> RTResult[Value]: + security.security_prompt("network_access") + res = RTResult[Value]() url = ctx.symbol_table.get("url") assert url is not None @@ -38,6 +41,8 @@ def get(self, ctx: Context) -> RTResult[Value]: @args(["url", "data", "headers"], [None, HashMap({}), HashMap({})]) @method def post(self, ctx: Context) -> RTResult[Value]: + security.security_prompt("network_access") + res = RTResult[Value]() url = ctx.symbol_table.get("url") assert url is not None @@ -66,6 +71,8 @@ def post(self, ctx: Context) -> RTResult[Value]: @args(["url", "data", "headers"], [None, HashMap({}), HashMap({})]) @method def put(self, ctx: Context) -> RTResult[Value]: + security.security_prompt("network_access") + res = RTResult[Value]() url = ctx.symbol_table.get("url") assert url is not None @@ -93,6 +100,8 @@ def put(self, ctx: Context) -> RTResult[Value]: @args(["url", "headers"], [None, HashMap({})]) @method def delete(self, ctx: Context) -> RTResult[Value]: + security.security_prompt("network_access") + res = RTResult[Value]() url = ctx.symbol_table.get("url") assert url is not None @@ -113,6 +122,8 @@ def delete(self, ctx: Context) -> RTResult[Value]: @args(["url", "data", "headers"], [None, HashMap({}), HashMap({})]) @method def patch(self, ctx: Context) -> RTResult[Value]: + security.security_prompt("network_access") + res = RTResult[Value]() url = ctx.symbol_table.get("url") assert url is not None diff --git a/core/builtin_funcs.py b/core/builtin_funcs.py index 3f2a27c..1a7f569 100755 --- a/core/builtin_funcs.py +++ b/core/builtin_funcs.py @@ -3,6 +3,7 @@ import os from sys import stdout from typing import Callable, Generic, NoReturn, Optional, ParamSpec, Protocol, Sequence, Union, cast +from core import security from core.datatypes import ( Array, @@ -427,6 +428,8 @@ def execute_type(self, exec_ctx: Context) -> RTResult[Value]: @args(["code", "ns"]) def execute_pyapi(self, exec_ctx: Context) -> RTResult[Value]: + security.security_prompt("pyapi_access") + res = RTResult[Value]() code = exec_ctx.symbol_table.get("code") diff --git a/core/security.py b/core/security.py new file mode 100644 index 0000000..5eaf7fa --- /dev/null +++ b/core/security.py @@ -0,0 +1,38 @@ +from typing import Literal +from core.colortools import Log + +# Define all types of security prompts +SecurityPromptType = Literal["pyapi_access", "disk_access", "network_access"] +type_messages: dict[str, str] = { + "pyapi_access": "This program is attempting to use the Python API", + "disk_access": "This program is attempting to access the disk", + "network_access": "This program is attempting to access the network", +} + +# List of allowed actions (used during code execution) +allowed: dict[str, bool] = {} + + +# !!! Only used for tests !!! +def allow_all_permissions() -> None: + allowed["pyapi_access"] = True + allowed["disk_access"] = True + allowed["network_access"] = True + + +def security_prompt(type: SecurityPromptType) -> None: + # If action already allowed, continue + if type in allowed: + return + # Log the message and get a y/n prompt by user + print(f"{Log.deep_warning(f"[{type.upper()}]")} {Log.deep_info(type_messages[type], True)}. Continue execution?") + print(f"{Log.deep_purple("[Y/n] -> ")}", end="") + # If user agreed + if input().lower() == "y": + # Add action to allowed list + allowed[type] = True + return + # Exit program + print("Permission denied by user.") + exit(1) + return diff --git a/radon.py b/radon.py index 9c04f0e..529cb97 100755 --- a/radon.py +++ b/radon.py @@ -121,8 +121,7 @@ def main(argv: list[str]) -> None: usage(program_name, sys.stderr) print(f"ERROR: {arg} requires an argument", file=sys.stderr) exit(1) - source_file = argv[0] - break # allow program to use remaining args + source_file = argv.pop(0) case "--version" | "-v": print(base_core.__version__) exit(0) @@ -131,8 +130,16 @@ def main(argv: list[str]) -> None: usage(program_name, sys.stderr) print(f"ERROR: {arg} requires an argument", file=sys.stderr) exit(1) - command = argv[0] - break # allow program to use remaining args + command = argv.pop(0) + # These flags starting with --allow should only be used for testing, and not be allowed to be set by a user + case "--allow-all" | "-A": + base_core.security.allow_all_permissions() + case "--allow-disk" | "-D": + base_core.security.allowed["disk_access"] = True + case "--allow-py" | "-P": + base_core.security.allowed["pyapi_access"] = True + case "--allow-network" | "-W": + base_core.security.allowed["network_access"] = True case _: usage(program_name, sys.stderr) print(f"ERROR: Unknown argument '{arg}'", file=sys.stderr) diff --git a/test.py b/test.py index 43670aa..571fe0d 100755 --- a/test.py +++ b/test.py @@ -26,7 +26,9 @@ def dump(self, path: str) -> None: def run_test(test: str) -> Output: - proc = subprocess.run([sys.executable, "radon.py", "-s", test], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + proc = subprocess.run( + [sys.executable, "radon.py", "-s", test, "-A"], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) return Output( proc.returncode, proc.stdout.decode("utf-8").replace("\r\n", "\n"),