百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

FastAPI 和数据库:使用 FastAPI 设置 SQLAlchemy

liebian365 2024-11-13 13:25 5 浏览 0 评论

FastAPI 是一个现代、快速(高性能)的 Web 框架,用于基于标准 Python 类型提示使用 Python 3.7+ 构建 API。构建 API 时最常见的任务之一是将其连接到数据库。在这篇博文中,我们将探讨如何使用 FastAPI 设置 SQLAlchemy 以与关系数据库交互。我们还将提供一些演示示例来帮助您入门。

为什么选择 SQLAlchemy?

SQLAlchemy 是一个功能强大的 SQL 工具包和 Python 对象关系映射 (ORM) 库。它提供了一整套众所周知的企业级持久性模式,旨在实现高效、高性能的数据库访问。

先决条件

在开始之前,请确保您已安装以下内容:

Python 3.7+

FastAPI

SQLAlchemy

数据库(例如,为简单起见,使用 SQLite)

您可以使用 pip 安装 FastAPI 和 SQLAlchemy:

pip install fastapi
pip install sqlalchemy
pip install uvicorn
pip install psycopg2-binary  # For PostgreSQL, replace with your database's driver

使用 FastAPI 设置 SQLAlchemy

1. 创建 FastAPI 项目结构

首先,让我们创建一个基本的项目结构:

fastapi_sqlalchemy_example/
    ├── app/
    │   ├── __init__.py
    │   ├── main.py
    │   ├── models.py
    │   ├── database.py
    │   ├── schemas.py
    │   └── crud.py
    └── requirements.txt

2. 定义数据库 URL

在app/database.py,我们将定义数据库连接:

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"  # Replace with your database URL


engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


Base = declarative_base()

3. 创建模型

接下来,让我们在 app/models.py 中定义我们的数据库模型:

from sqlalchemy import Column, Integer, String
from .database import Base


class User(Base):
    __tablename__ = "users"


    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)

4. 创建 CRUD 操作

在 app/crud.py 中,我们将定义与数据库交互的函数:

from sqlalchemy.orm import Session
from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 10):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(name=user.name, email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

5. 定义模式

在 app/schemas.py 中,为数据定义 Pydantic 模型:

from pydantic import BaseModel


class UserBase(BaseModel):
    name: str
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int


    class Config:
        orm_mode = True

6. 创建依赖项

在 app/main.py 中,我们创建数据库会话依赖项:

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine


models.Base.metadata.create_all(bind=engine)


app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

7. 创建 API 端点

最后,让我们在 app/main.py 中创建 API 端点:

@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users

8. 运行应用程序

要运行 FastAPI 应用程序,请使用 Uvicorn:

uvicorn app.main:app --reload

以下是一些基于上述基本设置的更高级的演示,展示了附加功能和最佳实践。

演示 1:添加模型之间的关系

让我们添加一个名为 Item 的新模型,该模型与 User 模型有关系。

步骤 1:更新 models.py

from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base


class User(Base):
    __tablename__ = "users"


    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)


    items = relationship("Item", back_populates="owner")




class Item(Base):
    __tablename__ = "items"


    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))


    owner = relationship("User", back_populates="items")

步骤 2:更新 schemas.py

from typing import List, Optional
from pydantic import BaseModel


class ItemBase(BaseModel):
    title: str
    description: Optional[str] = None


class ItemCreate(ItemBase):
    pass


class Item(ItemBase):
    id: int
    owner_id: int


    class Config:
        orm_mode = True




class UserBase(BaseModel):
    name: str
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    items: List[Item] = []


    class Config:
        orm_mode = True

步骤 3:更新 crud.py

from sqlalchemy.orm import Session
from . import models, schemas


# Existing CRUD functions...


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item


def get_items(db: Session, skip: int = 0, limit: int = 10):
    return db.query(models.Item).offset(skip).limit(limit).all()

步骤 4:使用新端点更新 main.py

from typing import List


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

演示 2:使用 OAuth2 添加身份验证

步骤 1:安装依赖项

pip install python-jose[cryptography]
pip install passlib[bcrypt]

步骤 2:为安全实用程序创建 auth.py

from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session


from . import crud, models, schemas
from .database import SessionLocal


# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def authenticate_user(db: Session, email: str, password: str):
    user = crud.get_user_by_email(db, email=email)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        email: str = payload.get("sub")
        if email is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    user = crud.get_user_by_email(db, email=email)
    if user is None:
        raise credentials_exception
    return user

步骤 3:更新 crud.py 以使用散列密码

from .auth import get_password_hash


def create_user(db: Session, user: schemas.UserCreate):
    hashed_password = get_password_hash(user.password)
    db_user = models.User(name=user.name, email=user.email, hashed_password=hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

步骤 4:在 main.py 中添加令牌端点和安全路由

from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import timedelta
from .auth import authenticate_user, create_access_token, get_current_user


@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.email}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me/", response_model=schemas.User)
async def read_users_me(current_user: schemas.User = Depends(get_current_user)):
    return current_user

步骤 5:更新 schemas.py 以支持 OAuth2

from pydantic import BaseModel


class Token(BaseModel):
    access_token: str
    token_type: str

演示 3:分页和过滤

步骤 1:在 crud.py 中为 CRUD 操作添加分页

def get_users(db: Session, skip: int = 0, limit: int = 10, search: str = None):
    query = db.query(models.User)
    if search:
        query = query.filter(models.User.name.contains(search))
    return query.offset(skip).limit(limit).all()

步骤 2:在 main.py 中为端点添加分页和过滤

@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 10, search: str = None, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit, search=search)
    return users

这些额外的演示展示了更高级的用例,例如在模型之间添加关系、使用 OAuth2 实现身份验证以及向端点添加分页和过滤。这些功能中的每一个都增强了 FastAPI 应用程序的功能和可用性。您可以随意进一步定制和扩展这些示例以满足您的特定需求。

在这篇博文中,我们用 SQLAlchemy 设置了一个 FastAPI 应用程序来与关系数据库交互。我们介绍了如何定义模型、创建 CRUD 操作以及设置 API 端点。这只是一个起点,您也可以模仿上述添加更复杂的查询、处理模型之间的关系以及实现更高级的功能来扩展此设置。

相关推荐

快递查询教程,批量查询物流,一键管理快递

作为商家,每天需要查询许许多多的快递单号,面对不同的快递公司,有没有简单一点的物流查询方法呢?小编的回答当然是有的,下面随小编一起来试试这个新技巧。需要哪些工具?安装一个快递批量查询高手快递单号怎么快...

一键自动查询所有快递的物流信息 支持圆通、韵达等多家快递

对于各位商家来说拥有一个好的快递软件,能够有效的提高自己的工作效率,在管理快递单号的时候都需要对单号进行表格整理,那怎么样能够快速的查询所有单号信息,并自动生成表格呢?1、其实方法很简单,我们不需要一...

快递查询单号查询,怎么查物流到哪了

输入单号怎么查快递到哪里去了呢?今天小编给大家分享一个新的技巧,它支持多家快递,一次能查询多个单号物流,还可对查询到的物流进行分析、筛选以及导出,下面一起来试试。需要哪些工具?安装一个快递批量查询高手...

3分钟查询物流,教你一键批量查询全部物流信息

很多朋友在问,如何在短时间内把单号的物流信息查询出来,查询完成后筛选已签收件、筛选未签收件,今天小编就分享一款物流查询神器,感兴趣的朋友接着往下看。第一步,运行【快递批量查询高手】在主界面中点击【添...

快递单号查询,一次性查询全部物流信息

现在各种快递的查询方式,各有各的好,各有各的劣,总的来说,还是有比较方便的。今天小编就给大家分享一个新的技巧,支持多家快递,一次能查询多个单号的物流,还能对查询到的物流进行分析、筛选以及导出,下面一起...

快递查询工具,批量查询多个快递快递单号的物流状态、签收时间

最近有朋友在问,怎么快速查询单号的物流信息呢?除了官网,还有没有更简单的方法呢?小编的回答当然是有的,下面一起来看看。需要哪些工具?安装一个快递批量查询高手多个京东的快递单号怎么快速查询?进入快递批量...

快递查询软件,自动识别查询快递单号查询方法

当你拥有多个快递单号的时候,该如何快速查询物流信息?比如单号没有快递公司时,又该如何自动识别再去查询呢?不知道如何操作的宝贝们,下面随小编一起来试试。需要哪些工具?安装一个快递批量查询高手快递单号若干...

教你怎样查询快递查询单号并保存物流信息

商家发货,快递揽收后,一般会直接手动复制到官网上一个个查询物流,那么久而久之,就会觉得查询变得特别繁琐,今天小编给大家分享一个新的技巧,下面一起来试试。教程之前,我们来预览一下用快递批量查询高手...

简单几步骤查询所有快递物流信息

在高峰期订单量大的时候,可能需要一双手当十双手去查询快递物流,但是由于逐一去查询,效率极低,追踪困难。那么今天小编给大家分享一个新的技巧,一次能查询多个快递单号的物流,下面一起来学习一下,希望能给大家...

物流单号查询,如何查询快递信息,按最后更新时间搜索需要的单号

最近有很多朋友在问,如何通过快递单号查询物流信息,并按最后更新时间搜索出需要的单号呢?下面随小编一起来试试吧。需要哪些工具?安装一个快递批量查询高手快递单号若干怎么快速查询?运行【快递批量查询高手】...

连续保存新单号功能解析,导入单号查询并自动识别批量查快递信息

快递查询已经成为我们日常生活中不可或缺的一部分。然而,面对海量的快递单号,如何高效、准确地查询每一个快递的物流信息,成为了许多人头疼的问题。幸运的是,随着科技的进步,一款名为“快递批量查询高手”的软件...

快递查询教程,快递单号查询,筛选更新量为1的单号

最近有很多朋友在问,怎么快速查询快递单号的物流,并筛选出更新量为1的单号呢?今天小编给大家分享一个新方法,一起来试试吧。需要哪些工具?安装一个快递批量查询高手多个快递单号怎么快速查询?运行【快递批量查...

掌握批量查询快递动态的技巧,一键查找无信息记录的两种方法解析

在快节奏的商业环境中,高效的物流查询是确保业务顺畅运行的关键。作为快递查询达人,我深知时间的宝贵,因此,今天我将向大家介绍一款强大的工具——快递批量查询高手软件。这款软件能够帮助你批量查询快递动态,一...

从复杂到简单的单号查询,一键清除单号中的符号并批量查快递信息

在繁忙的商务与日常生活中,快递查询已成为不可或缺的一环。然而,面对海量的单号,逐一查询不仅耗时费力,还容易出错。现在,有了快递批量查询高手软件,一切变得简单明了。只需一键,即可搞定单号查询,一键处理单...

物流单号查询,在哪里查询快递

如果在快递单号多的情况,你还在一个个复制粘贴到官网上手动查询,是一件非常麻烦的事情。于是乎今天小编给大家分享一个新的技巧,下面一起来试试。需要哪些工具?安装一个快递批量查询高手快递单号怎么快速查询?...

取消回复欢迎 发表评论: