Skip to content

Commit

Permalink
Added basic security prompts
Browse files Browse the repository at this point in the history
Working on #177
  • Loading branch information
Vardan2009 committed Oct 27, 2024
1 parent 34da0c0 commit 05d5f4f
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 0 deletions.
15 changes: 15 additions & 0 deletions core/builtin_classes/file_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from core.errors import RTError
from core.parser import Context, RTResult
from core.tokens import Position
from core import prompt


class FileObject(BuiltInObject):
Expand All @@ -14,6 +15,8 @@ class FileObject(BuiltInObject):
@operator("__constructor__")
@check([String, String], [None, String("r")])
def constructor(self, path: String, mode: String) -> RTResult[Value]:
prompt.security_prompt("disk_read")

allowed_modes = [None, "r", "w", "a", "r+", "w+", "a+"] # Allowed modes for opening files
res = RTResult[Value]()
if mode.value not in allowed_modes:
Expand All @@ -29,6 +32,8 @@ def constructor(self, path: String, mode: String) -> RTResult[Value]:
@args(["count"], [Number(-1)])
@method
def read(self, ctx: Context) -> RTResult[Value]:
prompt.security_prompt("disk_read")

res = RTResult[Value]()
count = ctx.symbol_table.get("count")
assert count is not None
Expand All @@ -49,6 +54,8 @@ def read(self, ctx: Context) -> RTResult[Value]:
@args([])
@method
def readline(self, ctx: Context) -> RTResult[Value]:
prompt.security_prompt("disk_read")

res = RTResult[Value]()
try:
value = self.file.readline()
Expand All @@ -60,6 +67,8 @@ def readline(self, ctx: Context) -> RTResult[Value]:
@args([])
@method
def readlines(self, ctx: Context) -> RTResult[Value]:
prompt.security_prompt("disk_read")

res = RTResult[Value]()
try:
value = self.file.readlines()
Expand All @@ -71,6 +80,8 @@ def readlines(self, ctx: Context) -> RTResult[Value]:
@args(["data"])
@method
def write(self, ctx: Context) -> RTResult[Value]:
prompt.security_prompt("disk_read")

res = RTResult[Value]()
data = ctx.symbol_table.get("data")
assert data is not None
Expand All @@ -88,12 +99,16 @@ def write(self, ctx: Context) -> RTResult[Value]:
@args([])
@method
def close(self, _ctx: Context) -> RTResult[Value]:
prompt.security_prompt("disk_read")

res = RTResult[Value]()
self.file.close()
return res.success(Null.null())

@args([])
@method
def is_closed(self, _ctx: Context) -> RTResult[Value]:
prompt.security_prompt("disk_read")

res = RTResult[Value]()
return res.success(Boolean(self.file.closed))
11 changes: 11 additions & 0 deletions core/builtin_classes/requests_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from core.datatypes import HashMap, Null, String, Value, deradonify, radonify
from core.errors import RTError
from core.parser import Context, RTResult
from core import prompt


class RequestsObject(BuiltInObject):
Expand All @@ -18,6 +19,8 @@ def constructor(self) -> RTResult[Value]:
@args(["url", "headers"], [None, HashMap({})])
@method
def get(self, ctx: Context) -> RTResult[Value]:
prompt.security_prompt("web_requests")

res = RTResult[Value]()
url = ctx.symbol_table.get("url")
assert url is not None
Expand All @@ -38,6 +41,8 @@ def get(self, ctx: Context) -> RTResult[Value]:
@args(["url", "data", "headers"], [None, HashMap({}), HashMap({})])
@method
def post(self, ctx: Context) -> RTResult[Value]:
prompt.security_prompt("web_requests")

res = RTResult[Value]()
url = ctx.symbol_table.get("url")
assert url is not None
Expand Down Expand Up @@ -66,6 +71,8 @@ def post(self, ctx: Context) -> RTResult[Value]:
@args(["url", "data", "headers"], [None, HashMap({}), HashMap({})])
@method
def put(self, ctx: Context) -> RTResult[Value]:
prompt.security_prompt("web_requests")

res = RTResult[Value]()
url = ctx.symbol_table.get("url")
assert url is not None
Expand Down Expand Up @@ -93,6 +100,8 @@ def put(self, ctx: Context) -> RTResult[Value]:
@args(["url", "headers"], [None, HashMap({})])
@method
def delete(self, ctx: Context) -> RTResult[Value]:
prompt.security_prompt("web_requests")

res = RTResult[Value]()
url = ctx.symbol_table.get("url")
assert url is not None
Expand All @@ -113,6 +122,8 @@ def delete(self, ctx: Context) -> RTResult[Value]:
@args(["url", "data", "headers"], [None, HashMap({}), HashMap({})])
@method
def patch(self, ctx: Context) -> RTResult[Value]:
prompt.security_prompt("web_requests")

res = RTResult[Value]()
url = ctx.symbol_table.get("url")
assert url is not None
Expand Down
3 changes: 3 additions & 0 deletions core/builtin_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
from sys import stdout
from typing import Callable, Generic, NoReturn, Optional, ParamSpec, Protocol, Sequence, Union, cast
from core import prompt

from core.datatypes import (
Array,
Expand Down Expand Up @@ -427,6 +428,8 @@ def execute_type(self, exec_ctx: Context) -> RTResult[Value]:

@args(["code", "ns"])
def execute_pyapi(self, exec_ctx: Context) -> RTResult[Value]:
prompt.security_prompt("unsafe_code")

res = RTResult[Value]()

code = exec_ctx.symbol_table.get("code")
Expand Down
31 changes: 31 additions & 0 deletions core/prompt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Literal
from core.colortools import Log

# Define all types of security prompts
SecurityPromptType = Literal["unsafe_code", "disk_read", "web_requests"]
type_messages: dict[str, str] = {
"unsafe_code": "This program is attempting to execute potentially unsafe python code",
"disk_read": "This program is attempting to access the filesystem",
"web_requests": "This program is attempting to invoke web requests",
}

# List of allowed actions (used during code execution)
allowed: dict[str, bool] = {}


def security_prompt(type: SecurityPromptType) -> None:
# If action already allowed, continue
if type in allowed:
return
# Log the message and get a y/n prompt by user
print(f"{Log.deep_warning(f"[{type.upper()}]")} {Log.deep_info(type_messages[type], True)}. Continue execution?")
print(f"{Log.deep_purple("[Y/n] -> ")}", end="")
# If user agreed
if input().lower() == "y":
# Add action to allowed list
allowed[type] = True
return
# Exit program
print("Permission denied by user.")
exit(1)
return

0 comments on commit 05d5f4f

Please sign in to comment.