diff --git a/quantumion/backend/client.py b/quantumion/backend/client.py deleted file mode 100644 index 41cfdda..0000000 --- a/quantumion/backend/client.py +++ /dev/null @@ -1,137 +0,0 @@ -from typing import Literal - -import requests - -######################################################################################## - -from quantumion.backend.provider import Provider -from quantumion.backend.task import Task -from quantumion.server.model import Job - -######################################################################################## - - -class Client: - def __init__(self): - self._jobs = {} - - @property - def jobs(self): - return self._jobs - - def __len__(self): - return len(self.jobs) - - @property - def pending(self): - return self.status_report["queued"]["count"] > 0 - - @property - def status_report(self): - _status_report = dict( - queued=dict(count=0, jobs=[]), - finished=dict(count=0, jobs=[]), - failed=dict(count=0, jobs=[]), - stopped=dict(count=0, jobs=[]), - canceled=dict(count=0, jobs=[]), - ) - for job in self.jobs.values(): - _status_report[job.status]["count"] += 1 - _status_report[job.status]["jobs"].append(job) - return _status_report - - @property - def provider(self): - if hasattr(self, "_provider"): - return self._provider - raise ConnectionError("Missing provider") - - @property - def token(self): - if hasattr(self, "_token"): - return self._token - raise ConnectionError("Missing token") - - @property - def authorization_header(self): - return dict( - Authorization="{} {}".format( - self.token["token_type"], self.token["access_token"] - ) - ) - - def connect(self, provider: Provider): - self._provider = provider - - username = input("Enter username: ") - password = input("Enter password: ") - login = dict(username=username, password=password) - - response = requests.post( - provider.login_url, - data=login, - headers={"Content-Type": "application/x-www-form-urlencoded"}, - ) - - if response.status_code == 200: - self._token = response.json() - return - - raise response.raise_for_status() - - def reconnect(self): - self.connect(self, self.provider) - pass - - def submit_job(self, task: Task, backend: Literal["qutip", "tensorcircuit"]): - - response = requests.post( - self.provider.job_submission_url(backend=backend), - json=task.model_dump(), - headers=self.authorization_header, - ) - job = Job.model_validate(response.json()) - - if response.status_code == 200: - self._jobs[job.job_id] = job - return self.jobs[job.job_id] - - raise response.raise_for_status() - - def retrieve_job(self, job_id): - response = requests.get( - self.provider.job_retrieval_url(job_id=job_id), - headers=self.authorization_header, - ) - job = Job.model_validate(response.json()) - - if response.status_code == 200: - self._jobs[job_id] = job - return self.jobs[job_id] - - raise response.raise_for_status() - - def status_update(self): - for job_id in self.jobs.keys(): - self.retrieve_job(job_id) - pass - - def resubmit_job(self, job_id): - return self.submit_job( - task=Task.model_validate_json(self.jobs[job_id].task), - backend=self.jobs[job_id].backend, - ) - - def cancel_job(self, job_id): - response = requests.delete( - self.provider.job_cancellation_url(job_id=job_id), - headers=self.authorization_header, - ) - job = Job.model_validate(response.json()) - print(job.status) - - if response.status_code == 200: - self._jobs[job_id] = job - return self.jobs[job_id] - - raise response.raise_for_status() diff --git a/quantumion/backend/metric.py b/quantumion/backend/metric.py index b734232..5c851af 100644 --- a/quantumion/backend/metric.py +++ b/quantumion/backend/metric.py @@ -1,7 +1,6 @@ from typing import List, Union from quantumion.interface.base import VisitableBaseModel from pydantic.types import NonNegativeInt -import qutip as qt ######################################################################################## diff --git a/quantumion/backend/provider.py b/quantumion/backend/provider.py deleted file mode 100644 index 58e9291..0000000 --- a/quantumion/backend/provider.py +++ /dev/null @@ -1,33 +0,0 @@ -import requests - -######################################################################################## - - -class Provider: - def __init__(self, url: str = "http://localhost:8000"): - self.url = url - - @property - def available_backends(self): - if hasattr(self, "_available_backends"): - return self._available_backends - else: - return ["qutip", "tensorcircuit"] - - @property - def registration_url(self): - return self.url + "/auth/register" - - @property - def login_url(self): - return self.url + "/auth/token" - - def job_submission_url(self, backend): - assert backend in self.available_backends, "Unavailable backend" - return self.url + "/submit/{}".format(backend) - - def job_retrieval_url(self, job_id): - return self.url + "/retrieve/{}".format(job_id) - - def job_cancellation_url(self, job_id): - return self.url + "/cancel/{}".format(job_id) diff --git a/quantumion/server/__init__.py b/quantumion/server/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/quantumion/server/app.py b/quantumion/server/app.py deleted file mode 100644 index 10b692f..0000000 --- a/quantumion/server/app.py +++ /dev/null @@ -1,28 +0,0 @@ -from fastapi import FastAPI - -from contextlib import asynccontextmanager - -######################################################################################## - -from quantumion.server.database import engine, Base -from quantumion.server.route import user_router, auth_router, job_router - -######################################################################################## - - -@asynccontextmanager -async def create_db(app: FastAPI): - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.create_all) - yield - - -app = FastAPI(lifespan=create_db) -app.include_router(user_router) -app.include_router(auth_router) -app.include_router(job_router) - - -async def create_db(): - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.create_all) diff --git a/quantumion/server/database.py b/quantumion/server/database.py deleted file mode 100644 index 84ae6e8..0000000 --- a/quantumion/server/database.py +++ /dev/null @@ -1,70 +0,0 @@ -import os - -from typing import Annotated, Optional, List - -from datetime import datetime - -from uuid import uuid4 - -from fastapi import Depends - -from sqlalchemy import ForeignKey -from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship -from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession - -######################################################################################## - -POSTGRES_HOST = os.environ["POSTGRES_HOST"] -POSTGRES_DB = os.environ["POSTGRES_DB"] -POSTGRES_USER = os.environ["POSTGRES_USER"] -POSTGRES_PASSWORD = os.environ["POSTGRES_PASSWORD"] -POSTGRES_URL = f"postgresql+asyncpg://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}/{POSTGRES_DB}" - -engine = create_async_engine(POSTGRES_URL) -SessionLocal = async_sessionmaker(autocommit=False, autoflush=False, bind=engine) - -######################################################################################## - - -class Base(DeclarativeBase): - pass - - -class UserInDB(Base): - __tablename__ = "users" - - user_id: Mapped[str] = mapped_column( - primary_key=True, index=True, default=lambda: str(uuid4()) - ) - username: Mapped[str] = mapped_column(unique=True, nullable=False) - email: Mapped[str] - hashed_password: Mapped[str] - created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow) - disabled: Mapped[bool] = mapped_column(default=False) - jobs: Mapped[List["JobInDB"]] = relationship(back_populates="user") - - -class JobInDB(Base): - __tablename__ = "jobs" - - job_id: Mapped[str] = mapped_column(primary_key=True, index=True) - task: Mapped[str] - backend: Mapped[str] - status: Mapped[str] - result: Mapped[Optional[str]] - user_id: Mapped[int] = mapped_column(ForeignKey("users.user_id")) - user: Mapped["UserInDB"] = relationship(back_populates="jobs") - - -######################################################################################## - - -async def get_db(): - db = SessionLocal() - try: - yield db - finally: - await db.close() - - -db_dependency = Annotated[AsyncSession, Depends(get_db)] diff --git a/quantumion/server/jobqueue.py b/quantumion/server/jobqueue.py deleted file mode 100644 index 8e2624f..0000000 --- a/quantumion/server/jobqueue.py +++ /dev/null @@ -1,74 +0,0 @@ -import os - -from redis import Redis -from rq import Queue - -from sqlalchemy import select - -import asyncio - -from contextlib import asynccontextmanager - -######################################################################################## - -from quantumion.server.database import get_db, JobInDB - -######################################################################################## - -REDIS_HOST = os.environ["REDIS_HOST"] -REDIS_PASSWORD = os.environ["REDIS_PASSWORD"] - -redis_client = Redis( - host=REDIS_HOST, password=REDIS_PASSWORD, port=6379, decode_responses=False -) -queue = Queue(connection=redis_client) - -######################################################################################## - - -async def _report_success(job, connection, result, *args, **kwargs): - async with asynccontextmanager(get_db)() as db: - status_update = dict(status="finished", result=result.model_dump_json()) - query = await db.execute(select(JobInDB).filter(JobInDB.job_id == job.id)) - job_in_db = query.scalars().first() - for k, v in status_update.items(): - setattr(job_in_db, k, v) - await db.commit() - - -def report_success(job, connection, result, *args, **kwargs): - return asyncio.get_event_loop().run_until_complete( - _report_success(job, connection, result, *args, **kwargs) - ) - - -async def _report_failure(job, connection, result, *args, **kwargs): - async with asynccontextmanager(get_db)() as db: - status_update = dict(status="failed") - query = await db.execute(select(JobInDB).filter(JobInDB.job_id == job.id)) - job_in_db = query.scalars().first() - for k, v in status_update.items(): - setattr(job_in_db, k, v) - await db.commit() - - -def report_failure(job, connection, result, *args, **kwargs): - return asyncio.get_event_loop().run_until_complete( - _report_failure(job, connection, result, *args, **kwargs) - ) - - -async def _report_stopped(job, connection, result, *args, **kwargs): - async with asynccontextmanager(get_db)() as db: - status_update = dict(status="stopped") - query = await db.execute(select(JobInDB).filter(JobInDB.job_id == job.id)) - job_in_db = query.scalars().first() - for k, v in status_update.items(): - setattr(job_in_db, k, v) - await db.commit() - - -def report_stopped(job, connection, result, *args, **kwargs): - return asyncio.get_event_loop().run_until_complete( - _report_stopped(job, connection, result, *args, **kwargs) - ) diff --git a/quantumion/server/main.py b/quantumion/server/main.py deleted file mode 100644 index 4fa0e9b..0000000 --- a/quantumion/server/main.py +++ /dev/null @@ -1,4 +0,0 @@ -if __name__ == "__main__": - import uvicorn - - uvicorn.run("app:app", host="0.0.0.0", port=8000) diff --git a/quantumion/server/model.py b/quantumion/server/model.py deleted file mode 100644 index 9309c0f..0000000 --- a/quantumion/server/model.py +++ /dev/null @@ -1,32 +0,0 @@ -from typing import Optional - -from pydantic import BaseModel, ConfigDict - -######################################################################################## - - -class UserRegistrationForm(BaseModel): - username: str - email: str - password: str - - -class Token(BaseModel): - access_token: str - token_type: str - - -class User(BaseModel): - user_id: str - username: str - - -class Job(BaseModel): - model_config = ConfigDict(from_attributes=True) - - job_id: str - task: str - backend: str - status: str - result: Optional[str] = None - user_id: str diff --git a/quantumion/server/route/__init__.py b/quantumion/server/route/__init__.py deleted file mode 100644 index 080b5ca..0000000 --- a/quantumion/server/route/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .user import user_router -from .auth import auth_router -from .job import job_router diff --git a/quantumion/server/route/auth.py b/quantumion/server/route/auth.py deleted file mode 100644 index 2d393a7..0000000 --- a/quantumion/server/route/auth.py +++ /dev/null @@ -1,78 +0,0 @@ -import os - -from datetime import datetime, timedelta - -from typing import Annotated - -from fastapi import APIRouter, Depends, HTTPException, status -from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm - -from jose import JWTError, jwt - -from passlib.context import CryptContext - -from sqlalchemy import select - -######################################################################################## - -from quantumion.server.model import Token, User - -from quantumion.server.database import UserInDB, db_dependency - -######################################################################################## - -auth_router = APIRouter(prefix="/auth", tags=["Auth"]) - -pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") - -JWT_SECRET_KEY = os.environ["JWT_SECRET_KEY"] -JWT_ALGORITHM = os.environ["JWT_ALGORITHM"] -JWT_ACCESS_TOKEN_EXPIRE_MINUTES = os.environ["JWT_ACCESS_TOKEN_EXPIRE_MINUTES"] -oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token") - -######################################################################################## - - -def generate_token(username, user_id): - expires = datetime.utcnow() + timedelta( - minutes=int(JWT_ACCESS_TOKEN_EXPIRE_MINUTES) - ) - encode = {"id": user_id, "sub": username, "exp": expires} - return jwt.encode(encode, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM) - - -async def current_user(token: Annotated[str, Depends(oauth2_scheme)]): - try: - payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM]) - username = payload.get("sub") - user_id = payload.get("id") - if not username is None and not user_id is None: - return User(username=username, user_id=user_id) - raise JWTError - - except JWTError: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) - - -user_dependency = Annotated[User, Depends(current_user)] - -# ######################################################################################## - - -@auth_router.post("/token") -async def request_token( - form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: db_dependency -): - query = await db.execute( - select(UserInDB).filter(UserInDB.username == form_data.username) - ) - user_in_db = query.scalars().first() - if ( - user_in_db - and pwd_context.verify(form_data.password, user_in_db.hashed_password) - and not user_in_db.disabled - ): - token = generate_token(user_in_db.username, user_in_db.user_id) - return Token(access_token=token, token_type="bearer") - - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) diff --git a/quantumion/server/route/job.py b/quantumion/server/route/job.py deleted file mode 100644 index f01335c..0000000 --- a/quantumion/server/route/job.py +++ /dev/null @@ -1,98 +0,0 @@ -from typing import Literal - -from fastapi import APIRouter, HTTPException -from fastapi import status as http_status - -from rq.job import Callback -from rq.job import Job as RQJob - -from sqlalchemy import select - -######################################################################################## - -from quantumion.backend.analog.python.qutip import QutipBackend -from quantumion.backend.digital.python.tc import TensorCircuitBackend -from quantumion.backend.task import Task - -from quantumion.server.route.auth import user_dependency -from quantumion.server.database import db_dependency, JobInDB -from quantumion.server.jobqueue import ( - redis_client, - queue, - report_success, - report_failure, - report_stopped, -) -from quantumion.server.model import Job - -######################################################################################## - -job_router = APIRouter(tags=["Job"]) - - -@job_router.post("/submit/{backend}", tags=["Job"]) -async def submit_job( - task: Task, - backend: Literal["qutip", "tensorcircuit"], - user: user_dependency, - db: db_dependency, -): - print(f"Queueing {task} on server {backend} backend. {len(queue)} jobs in queue.") - - backends = {"qutip": QutipBackend(), "tensorcircuit": TensorCircuitBackend()} - job = queue.enqueue( - backends[backend].run, - task, - on_success=Callback(report_success), - on_failure=Callback(report_failure), - on_stopped=Callback(report_stopped), - ) - - job_in_db = JobInDB( - job_id=job.id, - task=task.model_dump_json(), - backend=backend, - status=job.get_status(), - result=None, - user_id=user.user_id, - ) - db.add(job_in_db) - await db.commit() - await db.refresh(job_in_db) - - return Job.model_validate(job_in_db) - - -@job_router.get("/retrieve/{job_id}", tags=["Job"]) -async def retrieve_job(job_id: str, user: user_dependency, db: db_dependency): - query = await db.execute( - select(JobInDB).filter( - JobInDB.job_id == job_id, - JobInDB.user_id == user.user_id, - ) - ) - job_in_db = query.scalars().first() - if job_in_db: - return Job.model_validate(job_in_db) - - raise HTTPException(status_code=http_status.HTTP_401_UNAUTHORIZED) - - -@job_router.delete("/cancel/{job_id}", tags=["Job"]) -async def cancel_job(job_id: str, user: user_dependency, db: db_dependency): - query = await db.execute( - select(JobInDB).filter( - JobInDB.job_id == job_id, - JobInDB.user_id == user.user_id, - ) - ) - job_in_db = query.scalars().first() - if job_in_db: - job = RQJob.fetch(id=job_id, connection=redis_client) - job.cancel() - setattr(job_in_db, "status", "canceled") - await db.commit() - await db.refresh(job_in_db) - return Job.model_validate(job_in_db) - - raise HTTPException(status_code=http_status.HTTP_401_UNAUTHORIZED) diff --git a/quantumion/server/route/user.py b/quantumion/server/route/user.py deleted file mode 100644 index 596f7e0..0000000 --- a/quantumion/server/route/user.py +++ /dev/null @@ -1,68 +0,0 @@ -from fastapi import APIRouter, HTTPException -from fastapi import status as http_status - -from rq.job import Job - -from sqlalchemy import select - -######################################################################################## - -from quantumion.server.route.auth import user_dependency, pwd_context - -from quantumion.server.model import UserRegistrationForm, Job - -from quantumion.server.database import UserInDB, JobInDB, db_dependency - -######################################################################################## - -user_router = APIRouter(prefix="/user", tags=["User"]) - -######################################################################################## - - -async def available_user(user, db): - query = await db.execute( - select(UserInDB).filter(UserInDB.username == user.username) - ) - user_in_db = query.scalars().first() - if not user_in_db: - return user - - raise HTTPException(status_code=http_status.HTTP_409_CONFLICT) - - -######################################################################################## - - -@user_router.post( - "/register", - status_code=http_status.HTTP_201_CREATED, -) -async def register_user(create_user_form: UserRegistrationForm, db: db_dependency): - user = await available_user(create_user_form, db) - if user: - user_in_db = UserInDB( - username=user.username, - email=user.email, - hashed_password=pwd_context.hash(user.password), - ) - - db.add(user_in_db) - await db.commit() - return {"status": "success"} - - raise HTTPException(status_code=http_status.HTTP_401_UNAUTHORIZED) - - -@user_router.get("/jobs", tags=["Job"]) -async def user_jobs(user: user_dependency, db: db_dependency): - query = await db.execute( - select(JobInDB).filter( - JobInDB.user_id == user.user_id, - ) - ) - jobs_in_db = query.scalars().all() - if jobs_in_db: - return [Job.model_validate(job) for job in jobs_in_db] - - raise HTTPException(status_code=http_status.HTTP_401_UNAUTHORIZED) diff --git a/quantumion/utils/io.py b/quantumion/utils/io.py deleted file mode 100644 index e8e9c7c..0000000 --- a/quantumion/utils/io.py +++ /dev/null @@ -1,312 +0,0 @@ -import pathlib -import warnings -import os -import copy -import datetime -import json -import string -import random -import yaml - -from numpy import ndarray - -from matplotlib.figure import Figure - -import pandas as pd -from pandas import DataFrame - -from typing import Optional, Any - - -def current_time(): - """ - Returns current date and time in a consistent format, used for monitoring long-running measurements - - Returns: - (str): current date and time - """ - return datetime.datetime.now().strftime("%d/%m/%Y, %H:%M:%S") - - -class IO: - r""" - The IO class encapsulates all saving/loading features of data, figures, etc. - """ - - default_path = ( - pathlib.Path(os.path.expanduser("~")).joinpath("data").joinpath("oqd") - ) - - def __init__( - self, - path=None, - folder="", - include_date=False, - include_time=False, - include_id=False, - verbose=True, - ): - """ - Args: - path (Optional[str]): The parent folder. - folder (str): The main, descriptive folder name. - include_date (bool): If True, add the date to the front of the path. Otherwise, do not add the date - include_time (bool): If True, add the time to the front of the path. Otherwise, do not add the time - include_id (bool): If True, add a random string of characters to the end of the path. Otherwise, do not - verbose (bool): If True, will print out the path of each saved/loaded file. - - Returns: - (IO): A new IO class instance - - """ - if path is None: - path = self.default_path - - if type(path) is str: - path = pathlib.Path(path) - - date = datetime.date.today().isoformat() - time = datetime.datetime.now().strftime("%H-%M-%S") - if not folder: # if empty string - warnings.warn( - "No folder entered. Saving to a folder with a unique identifier" - ) - include_data, include_id, verbose = True, True, True - - # build the full folder name with date, time, and uuid, if selected - _str = "" - if include_date: - _str = _str + date + "_" - if include_time: - _str = _str + time + "_" - - _str = _str + folder - - if include_id: - _str = ( - _str + "_" + "".join(random.choice(string.hexdigits) for _ in range(4)) - ) - - self.path = path.joinpath(_str) - self.verbose = verbose - return - - def subpath(self, subfolder: str): - cls = copy.deepcopy(self) - cls.path = cls.path.joinpath(subfolder) - return cls - - def save_json(self, variable, filename): - """ - Save serialized python object into a json file at filename - - Args: - variable (object): the object to save - filename (str): name of the file to which variable should be saved - """ - full_path = self.path.joinpath(filename) - os.makedirs(full_path.parent, exist_ok=True) - self._save_json(variable, full_path) - if self.verbose: - print(f"{current_time()} | Saved to {full_path} successfully.") - - def load_json(self, filename): - """ - Load serialized python object from json - - Args: - filename (str): name of the file from which we are loading the object - - Returns: - (Any): the loaded object data - """ - full_path = self.path.joinpath(filename) - file = self._load_json(full_path) - if self.verbose: - print(f"{current_time()} | Loaded from {full_path} successfully.") - return file - - def save_txt(self, variable, filename): - """ - Save serialized python object into a text file at filename - - Args: - variable (object): the object to save - filename (str): name of the file to which variable should be saved - """ - full_path = self.path.joinpath(filename) - os.makedirs(full_path.parent, exist_ok=True) - self._save_txt(variable, full_path) - if self.verbose: - print(f"{current_time()} | Saved to {full_path} successfully.") - - def load_txt(self, filename): - """ - Load serialized python object from text file - - Args: - filename (str): name of the file from which we are loading the object - - Returns: - (Any): the loaded object data - """ - full_path = self.path.joinpath(filename) - file = self._load_txt(full_path) - if self.verbose: - print(f"{current_time()} | Loaded from {full_path} successfully.") - return file - - def save_dataframe(self, df, filename): - """ - Save a panda dataframe object to pkl - - Args: - df (DataFrame): data contained in a dataframe - filename (str): file to which data should be saved - """ - ext = ".pkl" - full_path = self.path.joinpath(filename + ext) - os.makedirs(full_path.parent, exist_ok=True) - # df.to_csv(str(full_path), sep=",", index=False, header=True) - df.to_pickle(str(full_path)) - if self.verbose: - print(f"{current_time()} | Saved to {full_path} successfully.") - - def load_dataframe(self, filename): - """ - Load panda dataframe object from pkl - - Args: - filename (str): name of the file from which data should be loaded - - Returns: - (DataFrame): dataframe data - """ - import pandas as pd - - ext = ".pkl" - full_path = self.path.joinpath(filename + ext) - # df = pd.read_csv(str(full_path), sep=",", header=0) - df = pd.read_pickle(str(full_path)) - if self.verbose: - print(f"{current_time()} | Loaded from {full_path} successfully.") - return df - - def save_figure(self, fig, filename): - """ - Save a figure (image datatype can be specified as part of filename) - - Args: - fig (Figure): the figure containing the figure to save - filename (str): the filename to which we save a figure - """ - full_path = self.path.joinpath(filename) - os.makedirs(full_path.parent, exist_ok=True) - fig.savefig(full_path, dpi=300, bbox_inches="tight") - if self.verbose: - print(f"{current_time()} | Saved figure to {full_path} successfully.") - - def save_np_array(self, np_arr, filename): - """ - Save numpy array to a text document - - Args: - np_arr (ndarray): the array which we are saving - filename (str): name of the text file to which we want to save the numpy array - """ - import numpy as np - - full_path = self.path.joinpath(filename) - os.makedirs(full_path.parent, exist_ok=True) - np.savetxt(str(full_path), np_arr) - if self.verbose: - print(f"{current_time()} | Saved to {full_path} successfully.") - - def load_np_array(self, filename, complex_vals=False): - """ - Loads numpy array from a text document - - Args: - filename (str): name of the text file from which we want to load the numpy array - complex_vals (bool): True if we expect the numpy array to be complex, False otherwise - - Returns: - (ndarray): numpy array of data - """ - import numpy as np - - full_path = self.path.joinpath(filename) - file = np.loadtxt( - str(full_path), dtype=np.complex if complex_vals else np.float - ) - if self.verbose: - print(f"{current_time()} | Loaded from {full_path} successfully.") - return file - - def save_csv(self, df, filename): - """ - Save a panda dataframe object to csv - - Args: - df (DataFrame): data contained in a dataframe - filename (str): file to which data should be saved - """ - ext = ".csv" - full_path = self.path.joinpath(filename + ext) - os.makedirs(full_path.parent, exist_ok=True) - df.to_csv(str(full_path), sep=",", index=False, header=True) - if self.verbose: - print(f"{current_time()} | Saved to {full_path} successfully.") - - def load_csv(self, filename): - """ - Load panda dataframe object from csv - - Args: - filename (str): name of the file from which data should be loaded - - Returns: - (DataFrame): dataframe data - """ - full_path = self.path.joinpath(filename) - df = pd.read_csv(str(full_path), sep=",", header=0) - if self.verbose: - print(f"{current_time()} | Loaded from {full_path} successfully.") - return df - - @staticmethod - def _save_json(variable, path): - """ - Helper method for saving to json files - """ - with open(path, "w+") as json_file: - json.dump(variable, json_file, indent=4) - - @staticmethod - def _load_json(path): - """ - Helper method for loading from json files - """ - with open(path) as json_file: - data = json.load(json_file) - return data - - @staticmethod - def _save_txt(variable, path): - """ - Helper method for saving to text files - """ - with open(path, "w") as txt_file: - txt_file.write(variable) - - @staticmethod - def _load_txt(path): - """ - Helper method for loading from text files - """ - # with open(path) as json_file: - # data = json.load(json_file) - # return data - with open(path) as txt_file: - txt_str = txt_file.read() - return txt_str