Skip to content

Commit

Permalink
chore: simplefied stream reading
Browse files Browse the repository at this point in the history
  • Loading branch information
MSchmoecker committed Feb 2, 2024
1 parent a4a6f4a commit c85a012
Showing 1 changed file with 41 additions and 84 deletions.
125 changes: 41 additions & 84 deletions controller/app/models/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ class State(BaseModel):
tags: List[models.Tag]
devices: List[models.Device]

__q: List
__stdout: bytearray
__stderr: bytearray

def write_nix(self, path: os.PathLike):
path = pathlib.Path(path)
# write a flake.nix
Expand Down Expand Up @@ -110,9 +114,28 @@ def save(self, path: os.PathLike = "./"):
def repo_dir(self):
return REPO_PATH

def update_status(self, status: str):
self.__q[0] = {
"status": status,
"stdout": self.__stdout.decode("utf-8"),
"stderr": self.__stderr.decode("utf-8"),
}

async def stream_reader(self, stream: asyncio.StreamReader | None, out: bytearray):
while True:
line = await stream.readline()
if not line:
break
out.extend(line)
self.update_status("building")

async def build_nix(self, q: List):
await terminate_other_procs()
q[0] = {"status": "started building"}
self.__q = q
self.__stdout = bytearray()
self.__stderr = bytearray()
self.update_status("started building")

# runs a nix command to build the flake
# async run commands using asyncio.subprocess
# we will run
Expand All @@ -125,54 +148,23 @@ async def build_nix(self, q: List):
stderr=asyncio.subprocess.PIPE,
)

stdout = bytearray()
stderr = bytearray()

async def stdout_reader():
while True:
line = await proc.stdout.readline()
if not line:
break
stdout.extend(line)
q[0] = {
"status": "building",
"stdout": stdout.decode("utf-8"),
"stderr": stderr.decode("utf-8"),
}

async def stderr_reader():
while True:
line = await proc.stderr.readline()
if not line:
break
stderr.extend(line)
q[0] = {
"status": "building",
"stdout": stdout.decode("utf-8"),
"stderr": stderr.decode("utf-8"),
}

other_procs.append(proc)
asyncio.create_task(stdout_reader())
asyncio.create_task(stderr_reader())
asyncio.create_task(self.stream_reader(proc.stdout, self.__stdout))
asyncio.create_task(self.stream_reader(proc.stderr, self.__stderr))

r = await proc.wait()
if r != 0:
q[0] = {
"status": "failed",
"stdout": stdout.decode("utf-8"),
"stderr": stderr.decode("utf-8"),
}
return

q[0] = {
"status": "success",
"stdout": stdout.decode("utf-8"),
"stderr": stderr.decode("utf-8"),
}
self.update_status("failed")
else:
self.update_status("success")

async def deploy(self, q: List):
await terminate_other_procs()
q[0] = {"status": "started deploying"}
self.__q = q
self.__stdout = bytearray()
self.__stderr = bytearray()
self.update_status("started deploying")

# for each device in the state
# runs a command to deploy the flake

Expand All @@ -186,47 +178,12 @@ async def deploy(self, q: List):
stderr=asyncio.subprocess.PIPE,
)

stdout = bytearray()
stderr = bytearray()

async def stdout_reader():
while True:
line = await proc.stdout.readline()
if not line:
break
stdout.extend(line)
q[0] = {
"status": "deploying",
"stdout": stdout.decode("utf-8"),
"stderr": stderr.decode("utf-8"),
}

async def stderr_reader():
while True:
line = await proc.stderr.readline()
if not line:
break
stderr.extend(line)
q[0] = {
"status": "deploying",
"stdout": stdout.decode("utf-8"),
"stderr": stderr.decode("utf-8"),
}

other_procs.append(proc)
asyncio.create_task(stdout_reader())
asyncio.create_task(stderr_reader())
asyncio.create_task(self.stream_reader(proc.stdout, self.__stdout))
asyncio.create_task(self.stream_reader(proc.stderr, self.__stderr))

r = await proc.wait()
if r != 0:
q[0] = {
"status": "failed",
"stdout": stdout.decode("utf-8"),
"stderr": stderr.decode("utf-8"),
}
return

q[0] = {
"status": "success",
"stdout": stdout.decode("utf-8"),
"stderr": stderr.decode("utf-8"),
}
self.update_status("failed")
else:
self.update_status("success")

0 comments on commit c85a012

Please sign in to comment.