-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
207 lines (176 loc) · 6.96 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# main.py
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from passlib.context import CryptContext
from datetime import datetime, timedelta
from jose import JWTError, jwt
import os
from dotenv import load_dotenv
# JWT Configuration
load_dotenv() # This loads variables from a .env file
SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
app = FastAPI()
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# SQLite Database setup
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class UserDB(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
hashed_password = Column(String)
class ItemDB(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
description = Column(String)
owner_id = Column(Integer)
Base.metadata.create_all(bind=engine)
# Pydantic models
class UserCreate(BaseModel):
username: str
password: str
class User(BaseModel):
id: int
username: str
class Config:
orm_mode = True
class ItemCreate(BaseModel):
name: str
description: str
class Item(ItemCreate):
id: int
owner_id: int
class Config:
orm_mode = True
class Token(BaseModel):
access_token: str
token_type: str
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Dependency to get database session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Helper functions
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def authenticate_user(db: Session, username: str, password: str):
user = db.query(UserDB).filter(UserDB.username == username).first()
if not user or not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(UserDB).filter(UserDB.username == username).first()
if user is None:
raise credentials_exception
return user
# Authentication endpoints
@app.post("/register", response_model=User)
def register(user: UserCreate, db: Session = Depends(get_db)):
db_user = db.query(UserDB).filter(UserDB.username == user.username).first()
if db_user:
raise HTTPException(status_code=400, detail="Username already registered")
hashed_password = get_password_hash(user.password)
db_user = UserDB(username=user.username, hashed_password=hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# Protected item endpoints
@app.post("/items/", response_model=Item)
def create_item(item: ItemCreate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
db_item = ItemDB(name=item.name, description=item.description, owner_id=current_user.id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
@app.get("/items/", response_model=List[Item])
def read_items(skip: int = 0, limit: int = 100, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
items = db.query(ItemDB).filter(ItemDB.owner_id == current_user.id).offset(skip).limit(limit).all()
return items
@app.get("/items/{item_id}", response_model=Item)
def read_item(item_id: int, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
item = db.query(ItemDB).filter(ItemDB.id == item_id, ItemDB.owner_id == current_user.id).first()
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
return item
@app.put("/items/{item_id}", response_model=Item)
def update_item(item_id: int, item: ItemCreate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
db_item = db.query(ItemDB).filter(ItemDB.id == item_id, ItemDB.owner_id == current_user.id).first()
if db_item is None:
raise HTTPException(status_code=404, detail="Item not found")
db_item.name = item.name
db_item.description = item.description
db.commit()
db.refresh(db_item)
return db_item
@app.delete("/items/{item_id}")
def delete_item(item_id: int, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
db_item = db.query(ItemDB).filter(ItemDB.id == item_id, ItemDB.owner_id == current_user.id).first()
if db_item is None:
raise HTTPException(status_code=404, detail="Item not found")
db.delete(db_item)
db.commit()
return {"message": "Item deleted successfully"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)