From 5a688eb3b39a3d9b6d29328eb8b0393e2c2d9290 Mon Sep 17 00:00:00 2001 From: S Anand Date: Wed, 26 Apr 2023 16:59:53 +0530 Subject: [PATCH 1/2] ENH: scaffolding --- gramex/gramex.yaml | 14 ++++++ gramex/task.py | 107 +++++++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 1 + 3 files changed, 122 insertions(+) create mode 100644 gramex/task.py diff --git a/gramex/gramex.yaml b/gramex/gramex.yaml index 46d7dae1..298d5202 100644 --- a/gramex/gramex.yaml +++ b/gramex/gramex.yaml @@ -371,6 +371,20 @@ storelocations: start: TEXT # ISO8601 encoded (YYYY-MM-DD HH:MM:SS.SSS UTC) end: TEXT # ISO8601 encoded (YYYY-MM-DD HH:MM:SS.SSS UTC) error: TEXT # Error stage + traceback + task: + url: sqlite:///$GRAMEXDATA/task.db + table: task + columns: + # ID as Time of submission + timestamp: + type: TEXT # ISO8601 encoded (YYYY-MM-DD HH:MM:SS.SSS UTC) + primary_key: true + queue: TEXT # name of task queue + type: TEXT # shell|... (default: shell) + task: TEXT # JSON configuration for the task + status: TEXT # pending|running|done|error (default: pending) + info: TEXT # JSON that holds server & process info + output: TEXT # output of the process # The `schedule:` section defines when specific code is to run. schedule: diff --git a/gramex/task.py b/gramex/task.py new file mode 100644 index 00000000..e6418705 --- /dev/null +++ b/gramex/task.py @@ -0,0 +1,107 @@ +'''Task management utilities''' +from typing import Union + + +def add(queue: str, task: Union[str, list, dict], type: str = 'shell', **kwargs): + import datetime + import gramex.data + import json + + return gramex.data.insert( + **gramex.service.storelocations.task, + args={ + "timestamp": [datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%fZ')], + "queue": [queue], + "type": [type], + "task": [json.dumps(task)], + "status": ["pending"], + "info": [""], + "output": [""], + }, + id=["timestamp"], + ) + + +def schedule(queue: str, max_tasks: int = 8, **kwargs) -> None: + '''To execute tasks in a queue, add this to `gramex.yaml`: + + ```yaml + schedule: + gramex-task-$*: + function: gramex.task.schedule(queue='my-queue-name', max_tasks=10) + every: 2 minutes + ``` + + Now any task added via `gramex.task.add(...)` will be auto-executed. + + Parameters: + + queue: task queue name to execute. fnmatch-style wildcards are allowed. + E.g. `"x.*"` matches x.a, x.b, etc. + max_tasks: number of tasks allowed concurrently. Tasks queue up until a slot is available. + ''' + import gramex + import gramex.data + + # TODO: If processes that ought to be running in this system are not, make them pending + # running = gramex.data.filter( + # **gramex.service.storelocations.task, args={"status": ["running"], "_sort": "timestamp"} + # ) + # TODO: Count running processes. Exit if >= max_tasks + # TODO: While running processes < max_tasks, get next pending process and run it + + +def run(timestamp: str, queue, type: str, task: str): + import sys + + if type == 'shell': + import subprocess + + process = subprocess.Popen( + [sys.executable, '-timestamp', timestamp, '-queue', queue, '-task', task, '-table': ..., '-url': ...], + # TODO: Detach subprocess on Linux/Mac: https://stackoverflow.com/a/64145368/100904 + # TODO: Detach subprocess on Windows: https://stackoverflow.com/a/52450172/100904 + ) + pid = process.pid + # TODO: update the "info" and "status" in the DB + # __update(db, params={"pid": pid, "status": "running"}, condition=f"id={job2run['id']}") + else: + raise ValueError(f'Unknown task type: {queue}.{type}') + + +def commandline(args=None): + import json + import gramex + import subprocess + import sys + + args = gramex.parse_command_line(sys.argv[1:] if args is None else args) + task = json.parse(args.task) + if isinstance(task, str): + args, kwargs = [task], {'shell': True} + elif isinstance(task, list): + args, kwargs = task, {} + elif isinstance(task, dict): + args, kwargs = task['args'], task['kwargs'] + else: + raise ValueError(f'Task must be str, list, dict. Not {type!r}') + + kwargs['capture_output'] = True + # TODO: Re-implement using subprocess.run. + process = subprocess.run(*args, **kwargs) + stdout, stderr = process.communicate() + return_code = process.returncode + status = "success" if return_code == 0 else "error" + output = json.dumps( + { + "stdout": stdout.decode("utf-8"), + "stderr": stderr.decode("utf-8"), + "return_code": returncode, + } + ) + gramex.data.update( + url=args.url, + table=args.table, + params={"pid": 0, "status": status, "output": output}, + condition=f"timestamp={args.timestamp}", + ) diff --git a/pyproject.toml b/pyproject.toml index 7ed6c697..90b164dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,6 +83,7 @@ Stackoverflow = "https://stackoverflow.com/questions/tagged/gramex" [project.scripts] gramex = "gramex:commandline" +gramextask = "gramex.task:commandline" secrets = "gramex.secrets:commandline" slidesense = "gramex.pptgen2:commandline" From 80a6b0ab4a47c95789fe11063b84af3f04932f5a Mon Sep 17 00:00:00 2001 From: shraddheya Date: Tue, 2 May 2023 09:53:37 +0530 Subject: [PATCH 2/2] ENH: Use cli 'gramex task' to run the tasks --- gramex/__init__.py | 1 + gramex/install.py | 7 ++ gramex/task.py | 200 +++++++++++++++++++++++++++++++++++---------- 3 files changed, 164 insertions(+), 44 deletions(-) diff --git a/gramex/__init__.py b/gramex/__init__.py index 27c976cf..02ddbf7b 100644 --- a/gramex/__init__.py +++ b/gramex/__init__.py @@ -154,6 +154,7 @@ def commandline(args: List[str] = None): 'license', 'features', 'complexity', + 'task', }: import gramex.install diff --git a/gramex/install.py b/gramex/install.py index 09220ef8..50349d4d 100644 --- a/gramex/install.py +++ b/gramex/install.py @@ -603,6 +603,13 @@ def walk(node: dict, parents: tuple = ()): ) +def task(args, kwargs) -> dict: + import gramex.task + # kkk = " ".join(f"""-{key}='{value.strftime("%Y-%m-%d %H:%M:%S.%f")}'""" if isinstance(value, datetime.datetime) else f'-{key}="{value}"' for key, value in kwargs.items()) + # gramex.task.commandline(kkk) + gramex.task.commandline(kwargs) + return {"output": None, "args": args, "kwargs": kwargs} + class TryAgainError(Exception): '''If shutil.rmtree fails, and we've fixed the problem, raise this to try again''' diff --git a/gramex/task.py b/gramex/task.py index e6418705..346c4d73 100644 --- a/gramex/task.py +++ b/gramex/task.py @@ -1,19 +1,60 @@ -'''Task management utilities''' +"""Task management utilities""" from typing import Union +import gramex -def add(queue: str, task: Union[str, list, dict], type: str = 'shell', **kwargs): +def __update(db, table, params, condition): + query = ( + f"UPDATE {table} SET " + f"""{', '.join([f"{k}='{v}'" for k, v in params.items()])} """ + f"WHERE {condition} " + ) + print("QUERY: ", query) + import sqlalchemy as sa + with sa.create_engine(db).connect() as conn: + conn.execute(query) + + +def __set_status(db, status: str, timestamp: str): + __update( + db["url"], + db["table"], + params={"status": status}, + condition=f"timestamp='{timestamp}'", + ) + # gramex.data.update( + # **gramex.service.storelocations.task, + # args={"status": [status], timestamp: [timestamp]}, + # timestamp="timestamp", + # ) + + +def __get_next_enqueued(db) -> dict or None: + failed = gramex.data.filter(**db, args={"status": ["error"], "_sort": "id", "_limit": [1]}) + if len(failed) > 0: + return failed.iloc[0] + + pending = gramex.data.filter(**db, args={"status": ["pending"], "_sort": "id", "_limit": [1]}) + if len(pending) > 0: + return pending.iloc[0] + + return None + + +def add(queue: str, task: Union[str, list, dict], type: str = "shell", **kwargs): import datetime import gramex.data + import gramex.services import json - + gramex.app_log.warning("runnign something, i dont know what") + print(gramex.service.storelocations.task) return gramex.data.insert( **gramex.service.storelocations.task, args={ - "timestamp": [datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%fZ')], + "timestamp": [datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%fZ")], "queue": [queue], "type": [type], - "task": [json.dumps(task)], + "task": [task], "status": ["pending"], "info": [""], "output": [""], @@ -23,7 +64,7 @@ def add(queue: str, task: Union[str, list, dict], type: str = 'shell', **kwargs) def schedule(queue: str, max_tasks: int = 8, **kwargs) -> None: - '''To execute tasks in a queue, add this to `gramex.yaml`: + """To execute tasks in a queue, add this to `gramex.yaml`: ```yaml schedule: @@ -39,69 +80,140 @@ def schedule(queue: str, max_tasks: int = 8, **kwargs) -> None: queue: task queue name to execute. fnmatch-style wildcards are allowed. E.g. `"x.*"` matches x.a, x.b, etc. max_tasks: number of tasks allowed concurrently. Tasks queue up until a slot is available. - ''' + """ import gramex import gramex.data + import json + import os - # TODO: If processes that ought to be running in this system are not, make them pending - # running = gramex.data.filter( - # **gramex.service.storelocations.task, args={"status": ["running"], "_sort": "timestamp"} - # ) - # TODO: Count running processes. Exit if >= max_tasks - # TODO: While running processes < max_tasks, get next pending process and run it - - -def run(timestamp: str, queue, type: str, task: str): + running = gramex.data.filter( + **gramex.service.storelocations.task, args={"status": ["running"], "_sort": "timestamp"} + ) + running_count = 0 + for _, job in running.iterrows(): + print(job) + info = json.loads(job["info"]) + pid = info.get("pid", 0) + if not os.path.exists(os.path.join(f"/proc/{pid}/status")): + __set_status(gramex.service.storelocations.task, "pending", job.timestamp) + continue + with open(f"/proc/{pid}/status", "r") as f: + if job["task"] not in f.read(): + __set_status(gramex.service.storelocations.task, "pending", job.timestamp) + continue + + running_count += 1 + running.drop(job.timestamp, inplace=True) + + if running_count >= max_tasks: + return + + next_job = __get_next_enqueued(gramex.service.storelocations.task) + if next_job is not None: + __run(next_job.timestamp, next_job.queue, next_job.type, next_job.task) + + +def __run(timestamp: str, queue, type: str, task: str): import sys + import os + import subprocess + import json - if type == 'shell': - import subprocess - - process = subprocess.Popen( - [sys.executable, '-timestamp', timestamp, '-queue', queue, '-task', task, '-table': ..., '-url': ...], - # TODO: Detach subprocess on Linux/Mac: https://stackoverflow.com/a/64145368/100904 - # TODO: Detach subprocess on Windows: https://stackoverflow.com/a/52450172/100904 - ) - pid = process.pid - # TODO: update the "info" and "status" in the DB - # __update(db, params={"pid": pid, "status": "running"}, condition=f"id={job2run['id']}") + if type not in ["shell"]: + raise ValueError(f"Unknown task type: {queue}.{type}") + + cmd = [ + 'gramex', + 'task', + '-timestamp', + f'"{timestamp}"', + '-queue', + f'"{queue}"', + '-task', + task, + '-url', + f'''"{gramex.service.storelocations.task['url']}"''', + '-table', + f'''"{gramex.service.storelocations.task['table']}"''', + ] + print("cmd : "," ".join(cmd)) + # TODO: Detach subprocess on Linux/Mac: https://stackoverflow.com/a/64145368/100904 + # TODO: Detach subprocess on Windows: https://stackoverflow.com/a/52450172/100904 + osname = os.name + if osname == "nt": + flags = 0 + flags |= 0x00000008 # DETACHED_PROCESS + flags |= 0x00000200 # CREATE_NEW_PROCESS_GROUP + flags |= 0x08000000 # CREATE_NO_WINDOW + pkwargs = { + "close_fds": True, # close stdin/stdout/stderr on child + "creationflags": flags, + } + elif osname == "posix": + pkwargs = {"start_new_session": True} else: - raise ValueError(f'Unknown task type: {queue}.{type}') + raise ValueError(f"Unsupported OS: {osname}") + + process = subprocess.Popen(cmd, **pkwargs) + pid = process.pid + __update( + gramex.service.storelocations.task["url"], + gramex.service.storelocations.task["table"], + params={"info": json.dumps({"pid": pid}), "status": "running"}, + condition=f"timestamp='{timestamp}'", + ) + # gramex.data.update( + # **gramex.service.storelocations.task, + # args={"timestamp": [timestamp], "status": ["running"]}, + # id=timestamp, + # ) def commandline(args=None): + with open("somesomesome.txt", "a") as f: + f.write("running task") import json import gramex import subprocess import sys - args = gramex.parse_command_line(sys.argv[1:] if args is None else args) - task = json.parse(args.task) + # args = gramex.parse_command_line(sys.argv[1:] if args is None else args) + task = args['task'] + url = args['url'] + table = args['table'] + timestamp = args['timestamp'] + # timestamp = args['timestamp'].strftime("%Y-%m-%d %H:%M:%S.%fZ") if isinstance(task, str): - args, kwargs = [task], {'shell': True} + args, kwargs = [task], {"shell": True} elif isinstance(task, list): args, kwargs = task, {} elif isinstance(task, dict): - args, kwargs = task['args'], task['kwargs'] + args, kwargs = task["args"], task["kwargs"] else: - raise ValueError(f'Task must be str, list, dict. Not {type!r}') + with open("somsomsomsom.txt", "w") as f: + f.write(f"Task must be str, list, dict. Not {type!r}") + raise ValueError(f"Task must be str, list, dict. Not {type!r}") - kwargs['capture_output'] = True - # TODO: Re-implement using subprocess.run. + kwargs["capture_output"] = True process = subprocess.run(*args, **kwargs) - stdout, stderr = process.communicate() return_code = process.returncode status = "success" if return_code == 0 else "error" output = json.dumps( { - "stdout": stdout.decode("utf-8"), - "stderr": stderr.decode("utf-8"), - "return_code": returncode, + "stdout": process.stdout.decode("utf-8"), + "stderr": process.stderr.decode("utf-8"), + "return_code": return_code, } ) - gramex.data.update( - url=args.url, - table=args.table, - params={"pid": 0, "status": status, "output": output}, - condition=f"timestamp={args.timestamp}", + __update( + url, + table, + {"status": status, "output": output, "info": json.dumps({"pid": 0})}, + f"timestamp='{timestamp}'", ) + # gramex.data.update( + # url=args.url, + # table=args.table, + # params={"pid": 0, "status": status, "output": output}, + # condition=f"timestamp={args.timestamp}", + # )