-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfilling_db.py
99 lines (73 loc) · 3.45 KB
/
filling_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from pydantic import EmailStr
from sqlalchemy import Result, select
from sqlalchemy.ext.asyncio import AsyncSession
import asyncio
from src.database import db
from src.auth import utils as auth_utils
from src.videoprocessor.schemas import TypeAnnotation
from src.datasets.schemas import TypeDatasetCreate, TypeDatasetRead
from src.users.schemas import UserCreateSuperuser, UserLogin, UserRead
from src.users.models import User
from src.datasets.models import TypeDataset
async def get_user_by_username(username: str, session: AsyncSession) -> UserLogin | None:
stmt = select(User).filter(User.username == username)
result: Result = await session.execute(stmt)
user = result.scalar()
return user
async def get_user_by_email(email: EmailStr, session: AsyncSession) -> UserLogin | None:
stmt = select(User).filter(User.email == email)
result = await session.execute(stmt)
user = result.scalar()
return user
async def create_user(user: UserCreateSuperuser, session: AsyncSession) -> UserRead:
email_exist = await get_user_by_email(user.email, session)
username_exist = await get_user_by_username(user.username, session)
if email_exist is not None or username_exist is not None:
print(f'такой пользователь есть: {user.username}')
return
user.hashed_password = auth_utils.hash_password(user.hashed_password).decode()
add_user = User(**user.model_dump())
session.add(add_user)
await session.commit()
return add_user
async def get_types(session: AsyncSession) -> list[TypeDatasetRead]:
stmt = select(TypeDataset)
result: Result = await session.execute(stmt)
types = result.scalars().all()
return types
async def get_type_by_name(name: str, session: AsyncSession) -> TypeDatasetRead | None:
stmt = select(TypeDataset).filter(TypeDataset.name == name)
result: Result = await session.execute(stmt)
type_data = result.scalar()
return type_data
async def create_types(names: list[TypeDatasetCreate], session: AsyncSession):
model_names_types = [TypeDataset(**name.model_dump()) for name in names]
session.add_all(model_names_types)
await session.commit()
print(model_names_types)
async def main(user):
async with db.async_session() as session:
await create_user(user, session)
enum_names_types = [typ.name for typ in TypeAnnotation]
database_names_types = await get_types(session)
enum_names_types_copy =enum_names_types.copy()
is_in_type = False
for name in database_names_types:
if name.name not in enum_names_types:
is_in_type = True
print(f'нету этого типа в Enum: {name.name}')
if name.name in enum_names_types_copy:
enum_names_types_copy.remove(name.name)
if len(database_names_types) == len(enum_names_types) or is_in_type:
print('обновление не требуется')
return
types = [TypeDatasetCreate(name=name) for name in enum_names_types]
await create_types(types, session)
if __name__ == '__main__':
user = UserCreateSuperuser(
username='admin',
email='[email protected]',
hashed_password='admin',
is_superuser=True
)
asyncio.run(main(user))