淄博市网站建设_网站建设公司_网站制作_seo优化
2026/3/2 0:32:04 网站建设 项目流程

异步数据库

1.创建数据库引擎

async_DataBase_Engine_url = "mysql+aiomysql://xxx:xxx@ip:3306/heima?charset=utf8mb4"
async_engine=create_async_engine(async_DataBase_Engine_url,
echo=True, #日志打印
pool_size=10, #连接池大小
max_overflow=20, #最大连接数
)
  1. 定义模型类
from sqlalchemy.orm import DeclarativeBase,Mapped,mapped_column
from sqlalchemy import Integer,String,DateTime
from datetime import datetimeclass Base(DeclarativeBase):
create_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now)
update_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, onupdate=datetime.now)
passclass Dict(Base):
__tablename__ = "dict"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
key: Mapped[str] = mapped_column(String(255), nullable=False)
label: Mapped[str] = mapped_column(String(255), nullable=False)
value: Mapped[str] = mapped_column(String(255), nullable=False)
  1. 建表: 定义函数建表= >fastapi去启动的时候建表
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时执行 - 建表操作
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# 关闭时执行(可选)await async_engine.dispose()
app = FastAPI(lifespan=lifespan)

在路由中使用ORM

创建依赖项,使用Depends 注入
创建会话工厂,创建会话函数,注入数据库会话

  1. 创建引擎
from sqlalchemy.ext.asyncio import create_async_engineasync_engine = create_async_engine("mysql+aiomysql://user:pass@host/db",pool_size=10,      # 连接池大小max_overflow=20,   # 最大溢出连接
)
  1. 创建会话工厂
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSessionAsyncSessionLocal = async_sessionmaker(bind=async_engine,       # 绑定引擎class_=AsyncSession,     # 指定会话类型expire_on_commit=False,  # 提交后对象仍可访问
)
  1. 依赖注入函数
async def get_db():async with AsyncSessionLocal() as session:try:yield session        # ← 返回 session 给路由except Exception:await session.rollback()  # 异常时回滚raise# async with 自动关闭 session
  1. 路由中使用
from fastapi import Depends
from database import get_db@router.get("/items")
async def get_items(db: AsyncSession = Depends(get_db)):result = await db.execute(select(Item))return result.scalars().all()

查询

select

相当于select 语句

  • select(DictORM)
  • select(DictORM.id)
  • select(DictORM.id, DictORM.label)
  • select(func.avg(DictORM.id))
  • select(func.count())

# 返回 所有
result = await db.execute(select(DictORM))
return result.scalars().all()

查询单条

result = await db.get(DictORM, id)

条件查询 where

result = await db.execute(select(DictORM).where(DictORM.label==label))

  • == 等于
  • '>=' 大于等于
  • '>' 大于

模糊查询 like

result = await db.execute(select(DictORM).where(DictORM.label.like(f'%{label}%')))
book=result.scalars().all()

  • % 模糊匹配任意符号 f'%{label}%' 匹配前/后/任意字符
  • _ 匹配单个字符
多个条件 & |

result = await db.execute(select(DictORM).where((DictORM.label.like(f'%{label}%'))&(DictORM.key.like(f'%{key}%'))))

in_ 包含

id=[1,2,3]
result = await db.execute(select(DictORM).where((DictORM.id.in_(id))))

聚合查询 func

from sqlalchemy import func

  • count 统计行数
  • avg 平均值
  • max 求最大值
  • min 求最小值
  • sum 求和
    result = await db.execute(select(func.avg(DictORM.id)))

分页查询

data=await db.execute(select(DictORM).limit(10).offset(0))

新增

async def get_dict(dict:DictModel, db: AsyncSession = Depends(get_db)):
# 返回orm对象
data=DictORM(**dict.__dict__)
db.add(data)
await db.commit()
return {
"msg": f'{dict.id}添加成功'}

修改

查询- > 重新赋值,

result=await db.get(DictORM, dict.id)
if not result:
raise HTTPException(status_code=404, detail="Dict not found")
db.label=dict.label
db.value=dict.value
db.commit()

commit() 提交事务 把暂存区的操作真正写入数据库

删除

查询> 删除 提交事务

# 返回orm对象
result=await db.get(DictORM, dict.id)
if not result:
raise HTTPException(status_code=404, detail="Dict not found")
db.delete(result)
await db.commit()

获取对象

scalars()

拆包 - 只要第一列

mappings()

变字典 - 列名做 key

all

获取所有

总结:

  • 创建异步引擎
  • 定义基类,和模型类
  • 在创建app之前建表

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询