pip install mysql-connector-python pymysql

数据库链接

创建src目录,里面创建db.py 代码如下:

# 导入mysql.connector模块,该模块提供了与MySQL数据库进行连接和交互的功能。  
import mysql.connector  
  
# 定义一个函数get_db_connection,该函数用于获取与MySQL数据库的连接。  
def get_db_connection():  
    # 使用mysql.connector.connect()方法创建一个数据库连接对象。  
    # 这个方法需要几个关键参数来指定如何连接到数据库。  
    connection = mysql.connector.connect(  
        # host参数指定数据库服务器的地址,这里使用'localhost'表示数据库服务器运行在本机上。  
        host='localhost',  
          
        # port参数指定数据库服务器监听的端口号,MySQL默认端口是3306。  
        port=3306,  
          
        # user参数指定用于连接到数据库的用户名,这里使用的是"root"用户。  
        user="root",  
          
        # password参数指定连接数据库的密码,出于安全考虑,实际使用时密码不应明文写在代码中。  
        password="XXXXXXX",  
          
        # database参数指定要连接到的数据库名称,这里使用的是名为"database"的数据库。  
        database="database"  
    )  
      
    # 返回创建的数据库连接对象,以供后续操作使用。  
    return connection


def get_db():
    connection = get_db_connection()
    db = connection.cursor()

    try:
        yield db
    finally:
        db.close()
        connection.close()

main.py代码

# 导入FastAPI框架及相关依赖  
from fastapi import FastAPI, Depends, Header, HTTPException  
# 导入uvicorn,用于运行FastAPI应用  
import uvicorn  
# 从mysql.connector导入cursor,但这里可能存在问题,通常我们会导入connector然后从中获取cursor  
from mysql.connector import cursor  
# 导入自定义的数据库连接函数  
from src.db import get_db  
  
# 初始化FastAPI应用实例  
app = FastAPI()  
  
# 定义根路由,返回"Hello World"  
@app.get("/")  
async def root():  
    return {"message": "Hello World"}  
  
# 定义带参数的路由,返回带有输入名字的问候语  
@app.get("/hello/{name}")  
async def say_hello(name: str):  
    return {"message": f"Hello {name}"}  
  
# 定义关于信息的路由,返回应用名称和版本  
@app.get("/about")  
async def about():  
    """  
    项目信息  
    :return: 返回包含应用名称和版本信息的字典  
    """  
    return {  
        "app_name": "人工智能识别",  
        "app_version": "v0.0.1"  
    }  
  
# 定义获取所有用户的路由,从数据库中查询并返回用户列表  
@app.get("/users/")  
async def get_users(db: cursor.MySQLCursor = Depends(get_db)):  
    query = "SELECT * FROM view_set"  
    db.execute(query)  
    result = db.fetchall()  
    if result:  
        return {"users": result}  
    else:  
        return {"error": "User not found"}  
  
# 定义根据用户ID获取单个用户的路由  
@app.get("/users/{user_id}")  
async def get_user(user_id: int,  
                   db: cursor.MySQLCursor = Depends(get_db)):  
    query = "SELECT * FROM view_set WHERE id = %s"  
    db.execute(query, (user_id,))  
    result = db.fetchall()  
    if result:  
        return {"user_id": result[0][0], "username": result[0][1]}  
    else:  
        return {"error": "User not found"}  
  
# 定义插入用户的路由,但路由名称可能应该为/add_user/或其他更有意义的名称  
@app.get("/user_name/{user_name}")  
async def insert_user(user_name: str,  
                      db: cursor.MySQLCursor = Depends(get_db)):  
    query = "INSERT INTO view_set (name) VALUES (%s)"  
    db.execute(query, (user_name,))  
    # 注意:通常在执行INSERT操作后,我们不需要调用fetchone(),因为INSERT不返回结果集  
    # result = db.fetchone()  # 这行代码是多余的  
    db.execute("COMMIT")  # 注意:这里可能存在问题,通常我们不会在每个查询后都执行COMMIT,而是在适当的时候  
    return {"user_name": user_name}  
  
# 主程序入口  
if __name__ == "__main__":  
    # 使用uvicorn运行FastAPI应用  
    uvicorn.run(app, host="0.0.0.0", port=8000)  
    # 另一种运行方式,使用命令行参数--reload实现热重载  
    # uvicorn main:app --reload


02-06 16:07