当前位置:首页>python>Python+PostgreSQL数据库交互详解:从基础连接到高级ORM

Python+PostgreSQL数据库交互详解:从基础连接到高级ORM

  • 2026-04-13 18:54:47
Python+PostgreSQL数据库交互详解:从基础连接到高级ORM

关注+星标,每天学习Python新技能

因公众号更改推送规则,请点“在看”并加“星标”第一时间获取精彩技术分享

来源于网络,侵删

我们将从以下几个方面进行详细演示:

  1. 环境准备:安装必要的库和PostgreSQL数据库

  2. 连接PostgreSQL数据库

  3. 创建表

  4. 插入数据

  5. 查询数据

  6. 更新数据

  7. 删除数据

  8. 使用事务

  9. 使用连接池

  10. 使用ORM(SQLAlchemy)示例

注意:本教程假设你已经安装了PostgreSQL数据库,并且已经创建了一个数据库和用户。

步骤1:环境准备安装必要的Python库:psycopg2(用于连接PostgreSQL)和SQLAlchemy(ORM示例)如果你还没有安装,可以使用以下命令安装:pip install psycopg2-binary sqlalchemy

步骤2:连接数据库我们将演示如何使用psycopg2连接数据库。

目录

  1. 环境准备

  2. 基础连接与操作

  3. CRUD操作详解

  4. 事务管理

  5. 连接池管理

  6. 使用SQLAlchemy ORM

  7. 异步操作

  8. 最佳实践

  9. 完整示例项目

环境准备

1. 安装PostgreSQL

Windows/Mac:

# 下载PostgreSQL安装包https://www.postgresql.org/download/# 或使用Dockerdocker run --name postgresql -e POSTGRES_PASSWORD=password -p 5432:5432 -d postgres
Ubuntu/Debian:
sudo apt updatesudo apt install postgresql postgresql-contribsudo systemctl start postgresql

2. 创建数据库和用户

-- 登录PostgreSQLsudo -u postgres psql-- 创建数据库CREATE DATABASE tutorial_db;-- 创建用户CREATE USER tutorial_user WITH PASSWORD 'secure_password';-- 授权GRANT ALL PRIVILEGES ON DATABASE tutorial_db TO tutorial_user;-- 查看数据库\l-- 连接到数据库\c tutorial_db

3. 安装Python库

pip install psycopg2-binary  # PostgreSQL适配器pip install sqlalchemy       # ORM框架pip install asyncpg          # 异步PostgreSQL驱动pip install psycopg2-pool    # 连接池

基础连接与操作

1. 基础连接示例

python

import psycopg2from psycopg2 import sqlfrom psycopg2.extras import RealDictCursorimport logging# 配置日志logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)class PostgreSQLConnector:    def __init__(self, host='localhost', port=5432                 database='tutorial_db', user='tutorial_user'                 password='secure_password'):        self.connection_params = {            'host': host,            'port': port,            'database': database,            'user': user,            'password': password,            'client_encoding''utf8'        }        self.connection = None    def connect(self):        """建立数据库连接"""        try:            self.connection = psycopg2.connect(**self.connection_params)            logger.info("成功连接到PostgreSQL数据库")            return True        except Exception as e:            logger.error(f"连接失败: {e}")            return False    def close(self):        """关闭连接"""        if self.connection:            self.connection.close()            logger.info("数据库连接已关闭")    def execute_query(self, query, params=None, fetch=False):        """执行SQL查询"""        cursor = None        try:            cursor = self.connection.cursor(cursor_factory=RealDictCursor)            cursor.execute(query, params or ())            if fetch:                if query.strip().upper().startswith('SELECT'):                    result = cursor.fetchall()                    return result                else:                    self.connection.commit()                    return cursor.rowcount            else:                self.connection.commit()                return cursor.rowcount        except Exception as e:            self.connection.rollback()            logger.error(f"查询执行失败: {e}")            raise        finally:            if cursor:                cursor.close()    def test_connection(self):        """测试连接"""        try:            result = self.execute_query("SELECT version();", fetch=True)            logger.info(f"PostgreSQL版本: {result[0]['version']}")            return True        except Exception as e:            logger.error(f"测试连接失败: {e}")            return False# 使用示例if __name__ == "__main__":    # 创建连接器实例    db = PostgreSQLConnector()    # 连接数据库    if db.connect():        # 测试连接        db.test_connection()        # 关闭连接        db.close()

2. 使用上下文管理器

python

import psycopg2from contextlib import contextmanagerfrom typing import Generator, AnyTuple@contextmanagerdef get_db_connection() -> Generator[psycopg2.extensions.connection, NoneNone]:    """数据库连接上下文管理器"""    conn = None    try:        conn = psycopg2.connect(            host="localhost",            database="tutorial_db",            user="tutorial_user",            password="secure_password"        )        yield conn        conn.commit()    except Exception as e:        if conn:            conn.rollback()        raise e    finally:        if conn:            conn.close()@contextmanagerdef get_db_cursor() -> Generator[Tuple[psycopg2.extensions.connection,                                        psycopg2.extensions.cursor], NoneNone]:    """游标上下文管理器"""    with get_db_connection() as conn:        cursor = conn.cursor()        try:            yield conn, cursor        finally:            cursor.close()# 使用示例def get_all_users():    with get_db_cursor() as (conn, cursor):        cursor.execute("SELECT * FROM users;")        return cursor.fetchall()

CRUD操作详解

1. 创建表结构

class TableManager:    def __init__(self, connector):        self.db = connector    def create_tables(self):        """创建示例表"""        # 用户表        users_table = """        CREATE TABLE IF NOT EXISTS users (            id SERIAL PRIMARY KEY,            username VARCHAR(50) UNIQUE NOT NULL,            email VARCHAR(100) UNIQUE NOT NULL,            password_hash VARCHAR(255) NOT NULL,            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,            is_active BOOLEAN DEFAULT TRUE,            CONSTRAINT email_check CHECK (email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$')        );        """        # 产品表        products_table = """        CREATE TABLE IF NOT EXISTS products (            id SERIAL PRIMARY KEY,            name VARCHAR(100) NOT NULL,            description TEXT,            price DECIMAL(10, 2) CHECK (price >= 0),            stock_quantity INTEGER DEFAULT 0 CHECK (stock_quantity >= 0),            category VARCHAR(50),            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,            INDEX idx_category (category),            INDEX idx_price (price)        );        """        # 订单表        orders_table = """        CREATE TABLE IF NOT EXISTS orders (            id SERIAL PRIMARY KEY,            user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,            total_amount DECIMAL(10, 2) CHECK (total_amount >= 0),            status VARCHAR(20) DEFAULT 'pending',            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,            CONSTRAINT fk_user FOREIGN KEY (user_id) REFERENCES users(id),            INDEX idx_user_id (user_id),            INDEX idx_status (status)        );        """        # 订单详情表        order_items_table = """        CREATE TABLE IF NOT EXISTS order_items (            id SERIAL PRIMARY KEY,            order_id INTEGER REFERENCES orders(id) ON DELETE CASCADE,            product_id INTEGER REFERENCES products(id) ON DELETE SET NULL,            quantity INTEGER CHECK (quantity > 0),            unit_price DECIMAL(10, 2) CHECK (unit_price >= 0),            subtotal DECIMAL(10, 2) GENERATED ALWAYS AS (quantity * unit_price) STORED,            CONSTRAINT fk_order FOREIGN KEY (order_id) REFERENCES orders(id),            CONSTRAINT fk_product FOREIGN KEY (product_id) REFERENCES products(id),            UNIQUE (order_id, product_id)        );        """        tables = [users_table, products_table, orders_table, order_items_table]        for i, table_sql in enumerate(tables, 1):            try:                self.db.execute_query(table_sql)                logger.info(f"表 {i} 创建成功")            except Exception as e:                logger.error(f"创建表失败: {e}")                raise    def create_indexes(self):        """创建额外索引"""        indexes = [            "CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);",            "CREATE INDEX IF NOT EXISTS idx_products_name ON products(name);",            "CREATE INDEX IF NOT EXISTS idx_orders_created ON orders(created_at);"        ]        for index_sql in indexes:            try:                self.db.execute_query(index_sql)            except Exception as e:                logger.warning(f"创建索引失败: {e}")    def create_triggers(self):        """创建触发器"""        trigger_sql = """        CREATE OR REPLACE FUNCTION update_updated_at_column()        RETURNS TRIGGER AS $$        BEGIN            NEW.updated_at = CURRENT_TIMESTAMP;            RETURN NEW;        END;        $$ language 'plpgsql';        DO $$         DECLARE            tbl text;        BEGIN            FOR tbl IN                 SELECT table_name                 FROM information_schema.tables                 WHERE table_schema = 'public'                 AND table_name IN ('users', 'products', 'orders')            LOOP                EXECUTE format('                    DROP TRIGGER IF EXISTS update_%s_updated_at ON %s;                    CREATE TRIGGER update_%s_updated_at                    BEFORE UPDATE ON %s                    FOR EACH ROW                    EXECUTE FUNCTION update_updated_at_column();                ', tbl, tbl, tbl, tbl);            END LOOP;        END $$;        """        try:            self.db.execute_query(trigger_sql)            logger.info("触发器创建成功")        except Exception as e:            logger.error(f"创建触发器失败: {e}")

2. 插入数据

class DataOperations:    def __init__(self, connector):        self.db = connector    def insert_user(self, username, email, password_hash):        """插入用户数据"""        query = """        INSERT INTO users (username, email, password_hash)        VALUES (%s, %s, %s)        RETURNING id, username, email, created_at;        """        try:            result = self.db.execute_query(                query,                 (username, email, password_hash),                 fetch=True            )            logger.info(f"用户创建成功: {result[0]['username']}")            return result[0]        except psycopg2.IntegrityError as e:            logger.error(f"用户已存在或数据不合法: {e}")            raise        except Exception as e:            logger.error(f"插入用户失败: {e}")            raise    def batch_insert_products(self, products):        """批量插入产品数据"""        query = """        INSERT INTO products (name, description, price, stock_quantity, category)        VALUES %s        RETURNING id;        """        try:            # 使用psycopg2的批量插入            with self.db.connection.cursor() as cursor:                # 使用execute_values进行高效批量插入                from psycopg2.extras import execute_values                execute_values(                    cursor,                    query,                    products,                    template="(%s, %s, %s, %s, %s)",                    page_size=100                )                self.db.connection.commit()                inserted_ids = cursor.fetchall()                logger.info(f"批量插入 {len(inserted_ids)} 个产品成功")                return inserted_ids        except Exception as e:            self.db.connection.rollback()            logger.error(f"批量插入失败: {e}")            raise    def upsert_product(self, name, description, price, stock_quantity, category):        """插入或更新产品"""        query = """        INSERT INTO products (name, description, price, stock_quantity, category)        VALUES (%s, %s, %s, %s, %s)        ON CONFLICT (name)         DO UPDATE SET            description = EXCLUDED.description,            price = EXCLUDED.price,            stock_quantity = products.stock_quantity + EXCLUDED.stock_quantity,            updated_at = CURRENT_TIMESTAMP        RETURNING id, name, stock_quantity;        """        try:            result = self.db.execute_query(                query,                (name, description, price, stock_quantity, category),                fetch=True            )            return result[0]        except Exception as e:            logger.error(f"Upsert失败: {e}")            raise

3. 查询数据

class QueryOperations:    def __init__(self, connector):        self.db = connector    def get_user_by_id(self, user_id):        """根据ID查询用户"""        query = """        SELECT id, username, email, created_at, is_active        FROM users        WHERE id = %s;        """        result = self.db.execute_query(query, (user_id,), fetch=True)        return result[0if result else None    def search_products(self, keyword=None, min_price=0, max_price=None                       category=None, limit=10, offset=0):        """搜索产品(带参数)"""        base_query = """        SELECT id, name, description, price, stock_quantity, category        FROM products        WHERE price >= %s        """        params = [min_price]        # 动态构建查询条件        conditions = []        if max_price:            conditions.append("price <= %s")            params.append(max_price)        if keyword:            conditions.append("(name ILIKE %s OR description ILIKE %s)")            params.extend([f"%{keyword}%"f"%{keyword}%"])        if category:            conditions.append("category = %s")            params.append(category)        if conditions:            base_query += " AND " + " AND ".join(conditions)        # 添加排序和分页        base_query += """        ORDER BY price ASC, name ASC        LIMIT %s OFFSET %s;        """        params.extend([limit, offset])        # 执行查询        return self.db.execute_query(base_query, tuple(params), fetch=True)    def get_user_orders(self, user_id):        """获取用户订单(连接查询)"""        query = """        SELECT             o.id as order_id,            o.total_amount,            o.status,            o.created_at,            COUNT(oi.id) as item_count,            SUM(oi.subtotal) as items_total        FROM orders o        LEFT JOIN order_items oi ON o.id = oi.order_id        WHERE o.user_id = %s        GROUP BY o.id, o.total_amount, o.status, o.created_at        ORDER BY o.created_at DESC;        """        return self.db.execute_query(query, (user_id,), fetch=True)    def get_order_details(self, order_id):        """获取订单详情(复杂连接查询)"""        query = """        SELECT             o.id as order_id,            o.status,            o.total_amount,            o.created_at,            u.username,            u.email,            json_agg(                json_build_object(                    'product_id', p.id,                    'product_name', p.name,                    'quantity', oi.quantity,                    'unit_price', oi.unit_price,                    'subtotal', oi.subtotal                )            ) as items        FROM orders o        JOIN users u ON o.user_id = u.id        JOIN order_items oi ON o.id = oi.order_id        JOIN products p ON oi.product_id = p.id        WHERE o.id = %s        GROUP BY o.id, u.username, u.email;        """        result = self.db.execute_query(query, (order_id,), fetch=True)        return result[0if result else None    def get_paginated_data(self, table_name, page=1, per_page=10, filters=None):        """通用分页查询"""        # 构建WHERE条件        where_conditions = []        params = []        if filters:            for key, value in filters.items():                if value is not None:                    where_conditions.append(f"{key} = %s")                    params.append(value)        where_clause = ""        if where_conditions:            where_clause = "WHERE " + " AND ".join(where_conditions)        # 计算偏移量        offset = (page - 1) * per_page        # 查询数据        data_query = f"""        SELECT * FROM {table_name}        {where_clause}        LIMIT %s OFFSET %s;        """        params.extend([per_page, offset])        # 查询总数        count_query = f"""        SELECT COUNT(*) as total FROM {table_name}        {where_clause};        """        data = self.db.execute_query(data_query, tuple(params), fetch=True)        total_result = self.db.execute_query(count_query, tuple(params[:len(params)-2]), fetch=True)        return {            'data': data,            'total': total_result[0]['total'if total_result else 0,            'page': page,            'per_page': per_page,            'total_pages': (total_result[0]['total'] + per_page - 1) // per_page if total_result else 0        }

4. 更新数据

class UpdateOperations:    def __init__(self, connector):        self.db = connector    def update_user(self, user_id, **kwargs):        """更新用户信息"""        if not kwargs:            return False        set_clauses = []        params = []        allowed_fields = ['username''email''password_hash''is_active']        for field, value in kwargs.items():            if field in allowed_fields and value is not None:                set_clauses.append(f"{field} = %s")                params.append(value)        if not set_clauses:            return False        params.append(user_id)        query = f"""        UPDATE users        SET {', '.join(set_clauses)}        WHERE id = %s        RETURNING id, username, email, updated_at;        """        try:            result = self.db.execute_query(query, tuple(params), fetch=True)            if result:                logger.info(f"用户 {user_id} 更新成功")                return result[0]            return None        except psycopg2.IntegrityError as e:            logger.error(f"更新失败(数据冲突): {e}")            raise    def update_product_stock(self, product_id, quantity_change):        """更新产品库存(原子操作)"""        query = """        UPDATE products        SET stock_quantity = GREATEST(0, stock_quantity + %s)        WHERE id = %s        RETURNING id, name, stock_quantity;        """        try:            result = self.db.execute_query(                query,                 (quantity_change, product_id),                 fetch=True            )            if result:                logger.info(f"产品 {product_id} 库存更新: {quantity_change}")                return result[0]            return None        except Exception as e:            logger.error(f"更新库存失败: {e}")            raise    def bulk_update_status(self, table_name, ids, status):        """批量更新状态"""        query = f"""        UPDATE {table_name}        SET status = %s        WHERE id = ANY(%s)        RETURNING id;        """        try:            result = self.db.execute_query(                query,                 (status, ids),                 fetch=True            )            logger.info(f"批量更新 {len(result)} 条记录状态为 {status}")            return result        except Exception as e:            logger.error(f"批量更新失败: {e}")            raise

5. 删除数据

class DeleteOperations:    def __init__(self, connector):        self.db = connector    def soft_delete_user(self, user_id):        """软删除用户(标记为不活跃)"""        query = """        UPDATE users        SET is_active = FALSE        WHERE id = %s        RETURNING id, username;        """        try:            result = self.db.execute_query(query, (user_id,), fetch=True)            if result:                logger.info(f"用户 {user_id} 已软删除")                return result[0]            return None        except Exception as e:            logger.error(f"软删除失败: {e}")            raise    def delete_product(self, product_id):        """删除产品"""        query = """        DELETE FROM products        WHERE id = %s        RETURNING id, name;        """        try:            result = self.db.execute_query(query, (product_id,), fetch=True)            if result:                logger.info(f"产品 {product_id} 已删除")                return result[0]            return None        except Exception as e:            logger.error(f"删除失败: {e}")            raise    def cleanup_old_records(self, table_name, days=30):        """清理旧记录"""        query = f"""        DELETE FROM {table_name}        WHERE created_at < CURRENT_TIMESTAMP - INTERVAL '%s days'        RETURNING COUNT(*) as deleted_count;        """        try:            result = self.db.execute_query(query, (days,), fetch=True)            deleted_count = result[0]['deleted_count'if result else 0            logger.info(f"清理了 {deleted_count} 条旧记录")            return deleted_count        except Exception as e:            logger.error(f"清理失败: {e}")            raise

事务管理

class TransactionManager:    def __init__(self, connector):        self.db = connector    def create_order_with_items(self, user_id, items):        """创建订单(事务示例)"""        try:            # 开始事务            self.db.connection.autocommit = False            # 计算订单总额            total_amount = 0            for item in items:                # 检查库存                check_query = """                SELECT price, stock_quantity                 FROM products                 WHERE id = %s FOR UPDATE;                """                result = self.db.execute_query(                    check_query,                     (item['product_id'],),                     fetch=True                )                if not result or result[0]['stock_quantity'] < item['quantity']:                    raise Exception(f"产品 {item['product_id']} 库存不足")                total_amount += result[0]['price'] * item['quantity']            # 创建订单            order_query = """            INSERT INTO orders (user_id, total_amount)            VALUES (%s, %s)            RETURNING id;            """            order_result = self.db.execute_query(                order_query,                 (user_id, total_amount),                 fetch=True            )            order_id = order_result[0]['id']            # 添加订单项并更新库存            for item in items:                # 插入订单项                item_query = """                INSERT INTO order_items (order_id, product_id, quantity, unit_price)                VALUES (%s, %s, %s,                     (SELECT price FROM products WHERE id = %s)                );                """                self.db.execute_query(                    item_query,                    (order_id, item['product_id'], item['quantity'], item['product_id'])                )                # 更新库存                self.db.execute_query(                    "UPDATE products SET stock_quantity = stock_quantity - %s WHERE id = %s;",                    (item['quantity'], item['product_id'])                )            # 提交事务            self.db.connection.commit()            logger.info(f"订单 {order_id} 创建成功")            return order_id        except Exception as e:            # 回滚事务            self.db.connection.rollback()            logger.error(f"创建订单失败: {e}")            raise        finally:            # 恢复自动提交            self.db.connection.autocommit = True    def transfer_stock(self, from_product_id, to_product_id, quantity):        """库存转移(事务示例)"""        try:            self.db.connection.autocommit = False            # 检查源产品库存            check_query = """            SELECT stock_quantity FROM products WHERE id = %s FOR UPDATE;            """            from_stock = self.db.execute_query(                check_query,                 (from_product_id,),                 fetch=True            )[0]['stock_quantity']            if from_stock < quantity:                raise Exception("库存不足")            # 减少源产品库存            self.db.execute_query(                "UPDATE products SET stock_quantity = stock_quantity - %s WHERE id = %s;",                (quantity, from_product_id)            )            # 增加目标产品库存            self.db.execute_query(                "UPDATE products SET stock_quantity = stock_quantity + %s WHERE id = %s;",                (quantity, to_product_id)            )            # 记录转移日志            log_query = """            INSERT INTO stock_transfers (from_product_id, to_product_id, quantity)            VALUES (%s, %s, %s);            """            self.db.execute_query(log_query, (from_product_id, to_product_id, quantity))            self.db.connection.commit()            logger.info(f"库存转移成功: {quantity} 个产品")        except Exception as e:            self.db.connection.rollback()            logger.error(f"库存转移失败: {e}")            raise        finally:            self.db.connection.autocommit = True

连接池管理

import psycopg2.poolfrom threading import Lockimport timeclass ConnectionPoolManager:    def __init__(self, min_conn=1, max_conn=10, **kwargs):        self.pool_params = kwargs        self.pool = None        self.lock = Lock()        self.min_conn = min_conn        self.max_conn = max_conn    def create_pool(self):        """创建连接池"""        with self.lock:            if not self.pool:                try:                    self.pool = psycopg2.pool.ThreadedConnectionPool(                        minconn=self.min_conn,                        maxconn=self.max_conn,                        **self.pool_params                    )                    logger.info("连接池创建成功")                except Exception as e:                    logger.error(f"创建连接池失败: {e}")                    raise    def get_connection(self):        """从连接池获取连接"""        if not self.pool:            self.create_pool()        try:            conn = self.pool.getconn()            return conn        except Exception as e:            logger.error(f"获取连接失败: {e}")            raise    def return_connection(self, conn):        """归还连接到连接池"""        try:            self.pool.putconn(conn)        except Exception as e:            logger.error(f"归还连接失败: {e}")    def close_all_connections(self):        """关闭所有连接"""        with self.lock:            if self.pool:                self.pool.closeall()                logger.info("所有连接已关闭")    def get_pool_stats(self):        """获取连接池统计信息"""        if not self.pool:            return {}        return {            'min_connections'self.min_conn,            'max_connections'self.max_conn,            'available'len(self.pool._used),            'in_use'len(self.pool._rused)        }class PooledDatabase:    def __init__(self, pool_manager):        self.pool = pool_manager    @contextmanager    def get_cursor_from_pool(self):        """从连接池获取游标的上下文管理器"""        conn = None        cursor = None        try:            conn = self.pool.get_connection()            cursor = conn.cursor(cursor_factory=RealDictCursor)            yield cursor            conn.commit()        except Exception as e:            if conn:                conn.rollback()            raise e        finally:            if cursor:                cursor.close()            if conn:                self.pool.return_connection(conn)    def execute_with_pool(self, query, params=None):        """使用连接池执行查询"""        with self.get_cursor_from_pool() as cursor:            cursor.execute(query, params or ())            if query.strip().upper().startswith('SELECT'):                return cursor.fetchall()            else:                return cursor.rowcount# 使用连接池的示例def setup_database_pool():    """设置数据库连接池"""    pool_manager = ConnectionPoolManager(        min_conn=2,        max_conn=10,        host="localhost",        database="tutorial_db",        user="tutorial_user",        password="secure_password",        connect_timeout=10    )    return PooledDatabase(pool_manager)

使用SQLAlchemy ORM

1. 定义模型

python

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Boolean, Text, ForeignKey, CheckConstraint, Index, UniqueConstraintfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy.sql import funcfrom datetime import datetimeimport jsonBase = declarative_base()class User(Base):    __tablename__ = 'users'    id = Column(Integer, primary_key=True)    username = Column(String(50), unique=True, nullable=False)    email = Column(String(100), unique=True, nullable=False)    password_hash = Column(String(255), nullable=False)    created_at = Column(DateTime, default=func.now())    updated_at = Column(DateTime, default=func.now(), onupdate=func.now())    is_active = Column(Boolean, default=True)    # 关系    orders = relationship("Order", back_populates="user", cascade="all, delete-orphan")    # 约束    __table_args__ = (        CheckConstraint("email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'", name="email_check"),        Index('idx_users_email', email),    )    def to_dict(self):        """转换为字典"""        return {            'id'self.id,            'username'self.username,            'email'self.email,            'created_at'self.created_at.isoformat() if self.created_at else None,            'is_active'self.is_active        }class Product(Base):    __tablename__ = 'products'    id = Column(Integer, primary_key=True)    name = Column(String(100), nullable=False)    description = Column(Text)    price = Column(Float, CheckConstraint("price >= 0"), nullable=False)    stock_quantity = Column(Integer, default=0, CheckConstraint("stock_quantity >= 0"))    category = Column(String(50))    created_at = Column(DateTime, default=func.now())    updated_at = Column(DateTime, default=func.now(), onupdate=func.now())    # 关系    order_items = relationship("OrderItem", back_populates="product")    # 索引    __table_args__ = (        Index('idx_products_category', category),        Index('idx_products_price', price),        Index('idx_products_name', name),    )    def to_dict(self):        return {            'id'self.id,            'name'self.name,            'description'self.description,            'price'self.price,            'stock_quantity'self.stock_quantity,            'category'self.category        }class Order(Base):    __tablename__ = 'orders'    id = Column(Integer, primary_key=True)    user_id = Column(Integer, ForeignKey('users.id', ondelete='CASCADE'), nullable=False)    total_amount = Column(Float, CheckConstraint("total_amount >= 0"))    status = Column(String(20), default='pending')    created_at = Column(DateTime, default=func.now())    updated_at = Column(DateTime, default=func.now(), onupdate=func.now())    # 关系    user = relationship("User", back_populates="orders")    items = relationship("OrderItem", back_populates="order", cascade="all, delete-orphan")    # 索引    __table_args__ = (        Index('idx_orders_user_id', user_id),        Index('idx_orders_status', status),        Index('idx_orders_created', created_at),    )    def to_dict(self):        return {            'id'self.id,            'user_id'self.user_id,            'total_amount'self.total_amount,            'status'self.status,            'created_at'self.created_at.isoformat() if self.created_at else None,            'items': [item.to_dict() for item in self.items]        }class OrderItem(Base):    __tablename__ = 'order_items'    id = Column(Integer, primary_key=True)    order_id = Column(Integer, ForeignKey('orders.id', ondelete='CASCADE'), nullable=False)    product_id = Column(Integer, ForeignKey('products.id', ondelete='SET NULL'))    quantity = Column(Integer, CheckConstraint("quantity > 0"), nullable=False)    unit_price = Column(Float, CheckConstraint("unit_price >= 0"), nullable=False)    # 关系    order = relationship("Order", back_populates="items")    product = relationship("Product", back_populates="order_items")    # 复合唯一约束    __table_args__ = (        UniqueConstraint('order_id''product_id', name='uq_order_product'),    )    @property    def subtotal(self):        return self.quantity * self.unit_price    def to_dict(self):        return {            'id'self.id,            'product_id'self.product_id,            'product_name'self.product.name if self.product else None,            'quantity'self.quantity,            'unit_price'self.unit_price,            'subtotal'self.subtotal        }

2. SQLAlchemy数据库操作类

python

class SQLAlchemyDatabase:    def __init__(self, connection_string):        self.engine = create_engine(            connection_string,            pool_size=10,            max_overflow=20,            pool_pre_ping=True,            echo=False,  # 设为True可以查看SQL日志            json_serializer=lambda obj: json.dumps(obj, ensure_ascii=False)        )        self.SessionLocal = sessionmaker(            autocommit=False,            autoflush=False,            bind=self.engine        )    def init_db(self):        """初始化数据库"""        Base.metadata.create_all(bind=self.engine)        logger.info("数据库表创建成功")    def drop_db(self):        """删除所有表"""        Base.metadata.drop_all(bind=self.engine)        logger.info("数据库表已删除")    def get_session(self):        """获取数据库会话"""        return self.SessionLocal()    @contextmanager    def session_scope(self):        """会话上下文管理器"""        session = self.get_session()        try:            yield session            session.commit()        except Exception as e:            session.rollback()            logger.error(f"数据库操作失败: {e}")            raise        finally:            session.close()    # CRUD操作示例    def create_user(self, username, email, password_hash):        """创建用户"""        with self.session_scope() as session:            user = User(                username=username,                email=email,                password_hash=password_hash            )            session.add(user)            session.flush()  # 获取生成的ID            return user.to_dict()    def get_user(self, user_id=None, username=None, email=None):        """获取用户"""        with self.session_scope() as session:            query = session.query(User)            if user_id:                query = query.filter(User.id == user_id)            if username:                query = query.filter(User.username == username)            if email:                query = query.filter(User.email == email)            user = query.first()            return user.to_dict() if user else None    def update_user(self, user_id, **kwargs):        """更新用户"""        with self.session_scope() as session:            user = session.query(User).filter(User.id == user_id).first()            if not user:                return None            for key, value in kwargs.items():                if hasattr(user, key):                    setattr(user, key, value)            return user.to_dict()    def delete_user(self, user_id):        """删除用户"""        with self.session_scope() as session:            user = session.query(User).filter(User.id == user_id).first()            if user:                session.delete(user)                return True            return False    # 复杂查询示例    def search_products(self, **filters):        """搜索产品"""        with self.session_scope() as session:            query = session.query(Product)            if filters.get('keyword'):                keyword = f"%{filters['keyword']}%"                query = query.filter(                    Product.name.ilike(keyword) |                     Product.description.ilike(keyword)                )            if filters.get('min_price'):                query = query.filter(Product.price >= filters['min_price'])            if filters.get('max_price'):                query = query.filter(Product.price <= filters['max_price'])            if filters.get('category'):                query = query.filter(Product.category == filters['category'])            # 排序            sort_by = filters.get('sort_by''price')            sort_order = filters.get('sort_order''asc')            if sort_order == 'desc':                query = query.order_by(getattr(Product, sort_by).desc())            else:                query = query.order_by(getattr(Product, sort_by))            # 分页            page = filters.get('page'1)            per_page = filters.get('per_page'10)            offset = (page - 1) * per_page            total = query.count()            products = query.offset(offset).limit(per_page).all()            return {                'data': [p.to_dict() for p in products],                'total': total,                'page': page,                'per_page': per_page            }    def create_order(self, user_id, items):        """创建订单(使用事务)"""        with self.session_scope() as session:            try:                # 计算总额并检查库存                total_amount = 0                for item_data in items:                    product = session.query(Product).filter(                        Product.id == item_data['product_id']                    ).with_for_update().first()  # 行级锁                    if not product or product.stock_quantity < item_data['quantity']:                        raise Exception(f"产品 {item_data['product_id']} 库存不足")                    total_amount += product.price * item_data['quantity']                # 创建订单                order = Order(                    user_id=user_id,                    total_amount=total_amount                )                session.add(order)                session.flush()  # 获取订单ID                # 添加订单项并更新库存                for item_data in items:                    product = session.query(Product).filter(                        Product.id == item_data['product_id']                    ).first()                    order_item = OrderItem(                        order_id=order.id,                        product_id=item_data['product_id'],                        quantity=item_data['quantity'],                        unit_price=product.price                    )                    session.add(order_item)                    # 更新库存                    product.stock_quantity -= item_data['quantity']                return order.to_dict()            except Exception as e:                session.rollback()                logger.error(f"创建订单失败: {e}")                raise    # 批量操作    def bulk_insert_products(self, products_data):        """批量插入产品"""        with self.session_scope() as session:            products = [                Product(**product_data)                 for product_data in products_data            ]            session.bulk_save_objects(products)    # 原生SQL查询    def execute_raw_sql(self, sql, params=None):        """执行原生SQL"""        with self.session_scope() as session:            result = session.execute(sql, params or {})            if sql.strip().upper().startswith('SELECT'):                columns = result.keys()                return [dict(zip(columns, row)) for row in result.fetchall()]            else:                return result.rowcount# 使用示例def setup_sqlalchemy_db():    """设置SQLAlchemy数据库"""    connection_string = "postgresql://tutorial_user:secure_password@localhost/tutorial_db"    db = SQLAlchemyDatabase(connection_string)    # 初始化数据库表    db.init_db()    return db

异步操作

import asyncioimport asyncpgfrom datetime import datetimefrom typing import ListDictAnyOptionalclass AsyncPostgreSQL:    def __init__(self, dsn: str, min_size: int = 5, max_size: int = 20):        self.dsn = dsn        self.min_size = min_size        self.max_size = max_size        self.pool = None    async def connect(self):        """创建连接池"""        self.pool = await asyncpg.create_pool(            dsn=self.dsn,            min_size=self.min_size,            max_size=self.max_size,            command_timeout=60,            max_inactive_connection_lifetime=300        )        logger.info("异步连接池创建成功")    async def close(self):        """关闭连接池"""        if self.pool:            await self.pool.close()            logger.info("异步连接池已关闭")    @contextmanager    async def get_connection(self):        """获取连接"""        if not self.pool:            await self.connect()        conn = await self.pool.acquire()        try:            yield conn        finally:            await self.pool.release(conn)    async def execute(self, query: str, *args) -> str:        """执行SQL语句"""        async with self.get_connection() as conn:            return await conn.execute(query, *args)    async def fetch(self, query: str, *args) -> List[Dict]:        """查询多行数据"""        async with self.get_connection() as conn:            rows = await conn.fetch(query, *args)            return [dict(row) for row in rows]    async def fetchrow(self, query: str, *args) -> Optional[Dict]:        """查询单行数据"""        async with self.get_connection() as conn:            row = await conn.fetchrow(query, *args)            return dict(row) if row else None    async def fetchval(self, query: str, *args) -> Any:        """查询单个值"""        async with self.get_connection() as conn:            return await conn.fetchval(query, *args)    async def transaction(self, callback):        """事务处理"""        async with self.get_connection() as conn:            async with conn.transaction():                return await callback(conn)    # 业务方法示例    async def get_user_with_orders(self, user_id: int) -> Dict:        """获取用户及其订单"""        query = """        SELECT             u.*,            COALESCE(                json_agg(                    json_build_object(                        'id', o.id,                        'total_amount', o.total_amount,                        'status', o.status,                        'created_at', o.created_at                    )                ) FILTER (WHERE o.id IS NOT NULL),                '[]'            ) as orders        FROM users u        LEFT JOIN orders o ON u.id = o.user_id        WHERE u.id = $1        GROUP BY u.id;        """        return await self.fetchrow(query, user_id)    async def bulk_insert_users(self, users: List[Dict]) -> List[int]:        """批量插入用户"""        query = """        INSERT INTO users (username, email, password_hash)        VALUES ($1, $2, $3)        RETURNING id;        """        async with self.get_connection() as conn:            stmt = await conn.prepare(query)            ids = []            for user in users:                row = await stmt.fetchrow(                    user['username'],                    user['email'],                    user['password_hash']                )                ids.append(row['id'])            return ids# 异步操作示例async def async_demo():    """异步操作演示"""    dsn = "postgresql://tutorial_user:secure_password@localhost/tutorial_db"    db = AsyncPostgreSQL(dsn)    await db.connect()    try:        # 测试连接        version = await db.fetchval("SELECT version();")        logger.info(f"PostgreSQL版本: {version}")        # 查询示例        users = await db.fetch("SELECT * FROM users LIMIT 5;")        logger.info(f"前5个用户: {len(users)}")        # 事务示例        async def transfer_money(from_user, to_user, amount):            """转账事务"""            await db.execute(                "UPDATE accounts SET balance = balance - $1 WHERE user_id = $2;",                amount, from_user            )            await db.execute(                "UPDATE accounts SET balance = balance + $1 WHERE user_id = $2;",                amount, to_user            )            return True        # 执行事务        success = await db.transaction(            lambda conn: transfer_money(12100.00)        )    finally:        await db.close()# 运行异步示例# asyncio.run(async_demo())

最佳实践

1. 配置管理

python

import osfrom dataclasses import dataclassfrom typing import Optionalfrom dotenv import load_dotenvload_dotenv()  # 加载环境变量@dataclassclass DatabaseConfig:    """数据库配置类"""    host: str = os.getenv('DB_HOST''localhost')    port: int = int(os.getenv('DB_PORT''5432'))    database: str = os.getenv('DB_NAME''tutorial_db')    user: str = os.getenv('DB_USER''tutorial_user')    password: str = os.getenv('DB_PASSWORD''secure_password')    pool_min: int = int(os.getenv('DB_POOL_MIN''2'))    pool_max: int = int(os.getenv('DB_POOL_MAX''10'))    timeout: int = int(os.getenv('DB_TIMEOUT''30'))    @property    def connection_string(self):        """生成连接字符串"""        return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"    @property    def async_connection_string(self):        """生成异步连接字符串"""        return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"    def validate(self):        """验证配置"""        required = ['host''database''user''password']        for field in required:            if not getattr(self, field):                raise ValueError(f"数据库配置缺失: {field}")        return True

2. 健康检查

class DatabaseHealthChecker:    """数据库健康检查"""    def __init__(self, connector):        self.db = connector    def check_connection(self) -> dict:        """检查连接"""        try:            start_time = time.time()            self.db.execute_query("SELECT 1;")            latency = (time.time() - start_time) * 1000  # 毫秒            return {                'status''healthy',                'latency_ms'round(latency, 2),                'timestamp': datetime.now().isoformat()            }        except Exception as e:            return {                'status''unhealthy',                'error'str(e),                'timestamp': datetime.now().isoformat()            }    def check_disk_space(self) -> dict:        """检查磁盘空间"""        query = """        SELECT             datname as database,            pg_database_size(datname) as size_bytes,            pg_size_pretty(pg_database_size(datname)) as size_pretty        FROM pg_database         WHERE datname = current_database();        """        try:            result = self.db.execute_query(query, fetch=True)            return {                'database': result[0]['database'],                'size_bytes': result[0]['size_bytes'],                'size_human': result[0]['size_pretty'],                'status''healthy'            }        except Exception as e:            return {                'status''unhealthy',                'error'str(e)            }    def check_active_connections(self) -> dict:        """检查活动连接"""        query = """        SELECT             count(*) as total,            count(*) filter (where state = 'active') as active,            count(*) filter (where state = 'idle') as idle,            count(*) filter (where state = 'idle in transaction') as idle_in_transaction        FROM pg_stat_activity         WHERE datname = current_database();        """        try:            result = self.db.execute_query(query, fetch=True)            return {                'total': result[0]['total'],                'active': result[0]['active'],                'idle': result[0]['idle'],                'idle_in_transaction': result[0]['idle_in_transaction'],                'status''healthy'            }        except Exception as e:            return {                'status''unhealthy',                'error'str(e)            }    def full_health_check(self) -> dict:        """完整健康检查"""        checks = {            'connection'self.check_connection(),            'disk_space'self.check_disk_space(),            'connections'self.check_active_connections()        }        overall_status = 'healthy'        for check_name, result in checks.items():            if result.get('status') != 'healthy':                overall_status = 'unhealthy'                break        return {            'status': overall_status,            'checks': checks,            'timestamp': datetime.now().isoformat()        }

3. 错误处理装饰器

from functools import wrapsimport psycopg2from psycopg2 import OperationalError, IntegrityErrordef handle_db_errors(max_retries=3, retry_delay=1):    """数据库错误处理装饰器"""    def decorator(func):        @wraps(func)        def wrapper(*args, **kwargs):            retries = 0            while retries < max_retries:                try:                    return func(*args, **kwargs)                except OperationalError as e:                    retries += 1                    if retries == max_retries:                        logger.error(f"数据库操作失败(连接错误): {e}")                        raise                    logger.warning(f"连接错误,重试 {retries}/{max_retries}{e}")                    time.sleep(retry_delay)                except IntegrityError as e:                    logger.error(f"数据完整性错误: {e}")                    raise                except psycopg2.Error as e:                    logger.error(f"数据库错误: {e}")                    raise                except Exception as e:                    logger.error(f"未知错误: {e}")                    raise            return None        return wrapper    return decorator# 使用示例class SafeDatabaseOperations:    def __init__(self, connector):        self.db = connector    @handle_db_errors(max_retries=3)    def safe_create_user(self, username, email, password_hash):        """安全的用户创建方法"""        return self.db.execute_query(            "INSERT INTO users (username, email, password_hash) VALUES (%s, %s, %s) RETURNING id;",            (username, email, password_hash),            fetch=True        )

4. 性能优化

class DatabaseOptimizer:    """数据库性能优化工具"""    def __init__(self, connector):        self.db = connector    def analyze_query_plan(self, query, params=None):        """分析查询计划"""        explain_query = f"EXPLAIN ANALYZE {query}"        try:            result = self.db.execute_query(explain_query, params, fetch=True)            return "\n".join([row['QUERY PLAN'for row in result])        except Exception as e:            logger.error(f"分析查询计划失败: {e}")            return None    def vacuum_table(self, table_name):        """清理表空间"""        try:            self.db.execute_query(f"VACUUM ANALYZE {table_name};")            logger.info(f"表 {table_name} 清理完成")            return True        except Exception as e:            logger.error(f"清理表失败: {e}")            return False    def reindex_table(self, table_name):        """重建索引"""        try:            self.db.execute_query(f"REINDEX TABLE {table_name};")            logger.info(f"表 {table_name} 索引重建完成")            return True        except Exception as e:            logger.error(f"重建索引失败: {e}")            return False    def get_table_statistics(self, table_name):        """获取表统计信息"""        query = """        SELECT             schemaname,            tablename,            n_live_tup as live_rows,            n_dead_tup as dead_rows,            pg_size_pretty(pg_total_relation_size(schemaname || '.' || tablename)) as total_size,            pg_size_pretty(pg_relation_size(schemaname || '.' || tablename)) as table_size,            pg_size_pretty(pg_total_relation_size(schemaname || '.' || tablename) -                           pg_relation_size(schemaname || '.' || tablename)) as index_size,            last_vacuum,            last_autovacuum,            last_analyze,            last_autoanalyze        FROM pg_stat_user_tables         WHERE tablename = %s;        """        try:            result = self.db.execute_query(query, (table_name,), fetch=True)            return result[0if result else None        except Exception as e:            logger.error(f"获取统计信息失败: {e}")            return None    def optimize_queries(self):        """执行优化操作"""        # 获取所有用户表        tables_query = """        SELECT tablename         FROM pg_tables         WHERE schemaname = 'public';        """        tables = self.db.execute_query(tables_query, fetch=True)        for table in tables:            table_name = table['tablename']            # 清理表            self.vacuum_table(table_name)            # 更新统计信息            self.db.execute_query(f"ANALYZE {table_name};")            logger.info(f"表 {table_name} 优化完成")

完整示例项目

项目结构

text

postgresql-tutorial/├── config/│   ├── __init__.py│   └── settings.py├── database/│   ├── __init__.py│   ├── connection.py│   ├── models.py│   ├── operations.py│   ├── repositories.py│   └── migrations/├── services/│   ├── __init__.py│   ├── user_service.py│   ├── product_service.py│   └── order_service.py├── api/│   ├── __init__.py│   ├── routes.py│   └── schemas.py├── utils/│   ├── __init__.py│   ├── logger.py│   ├── validators.py│   └── helpers.py├── tests/│   ├── __init__.py│   ├── test_database.py│   ├── test_services.py│   └── test_api.py├── requirements.txt├── docker-compose.yml├── .env.example└── README.md

主应用程序

python

# main.pyimport uvicornfrom fastapi import FastAPI, Depends, HTTPExceptionfrom contextlib import asynccontextmanagerfrom database.connection import get_db, DatabaseHealthCheckerfrom database.models import Base, enginefrom api.routes import router as api_routerimport logging# 配置日志logging.basicConfig(    level=logging.INFO,    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger = logging.getLogger(__name__)@asynccontextmanagerasync def lifespan(app: FastAPI):    """应用生命周期管理"""    # 启动时    logger.info("应用启动中...")    # 创建数据库表    Base.metadata.create_all(bind=engine)    logger.info("数据库表初始化完成")    # 运行健康检查    db = get_db()    health_checker = DatabaseHealthChecker(db)    health_status = health_checker.full_health_check()    if health_status['status'] == 'healthy':        logger.info("数据库健康检查通过")    else:        logger.error("数据库健康检查失败")    yield    # 关闭时    logger.info("应用关闭中...")    db.close()# 创建FastAPI应用app = FastAPI(    title="PostgreSQL教程API",    description="PostgreSQL与Python集成示例",    version="1.0.0",    lifespan=lifespan)# 注册路由app.include_router(api_router, prefix="/api/v1")@app.get("/")async def root():    """根路由"""    return {        "message""PostgreSQL教程API",        "version""1.0.0",        "docs""/docs",        "health""/health"    }@app.get("/health")async def health_check():    """健康检查端点"""    db = get_db()    health_checker = DatabaseHealthChecker(db)    return health_checker.full_health_check()if __name__ == "__main__":    uvicorn.run(        "main:app",        host="0.0.0.0",        port=8000,        reload=True,        log_level="info"    )

Docker配置

yaml

# docker-compose.ymlversion: '3.8'services:  postgres:    image: postgres:15-alpine    container_name: postgres_tutorial    environment:      POSTGRES_DB: tutorial_db      POSTGRES_USER: tutorial_user      POSTGRES_PASSWORD: secure_password    ports:      - "5432:5432"    volumes:      - postgres_data:/var/lib/postgresql/data      - ./init.sql:/docker-entrypoint-initdb.d/init.sql    networks:      - app_network    healthcheck:      test: ["CMD-SHELL""pg_isready -U tutorial_user -d tutorial_db"]      interval: 10s      timeout: 5s      retries: 5  pgadmin:    image: dpage/pgadmin4:latest    container_name: pgadmin_tutorial    environment:      PGADMIN_DEFAULT_EMAIL: admin@example.com      PGADMIN_DEFAULT_PASSWORD: admin_password    ports:      - "5050:80"    depends_on:      - postgres    networks:      - app_network  app:    build: .    container_name: python_app    environment:      DB_HOST: postgres      DB_PORT: 5432      DB_NAME: tutorial_db      DB_USER: tutorial_user      DB_PASSWORD: secure_password    ports:      - "8000:8000"    depends_on:      postgres:        condition: service_healthy    networks:      - app_network    volumes:      - .:/app    command: uvicorn main:app --host 0.0.0.0 --port 8000 --reloadnetworks:  app_network:    driver: bridgevolumes:  postgres_data:

初始化SQL

sql

-- init.sql-- 创建扩展CREATE EXTENSION IF NOT EXISTS "uuid-ossp";CREATE EXTENSION IF NOT EXISTS "pg_trgm";-- 创建函数CREATE OR REPLACE FUNCTION update_updated_at_column()RETURNS TRIGGER AS $$BEGIN    NEW.updated_at = CURRENT_TIMESTAMP;    RETURN NEW;END;$$ language 'plpgsql';

总结

本教程涵盖了Python与PostgreSQL集成的各个方面:

  1. 基础连接:使用psycopg2建立连接,使用上下文管理器管理资源

  2. CRUD操作:完整的增删改查操作,包括批量操作和复杂查询

  3. 事务管理:确保数据一致性的重要机制

  4. 连接池:提高应用性能的连接管理

  5. ORM框架:使用SQLAlchemy简化数据库操作

  6. 异步操作:使用asyncpg进行高性能异步操作

  7. 最佳实践:配置管理、错误处理、性能优化等

  8. 完整项目:包含Docker配置的完整应用示例

关键要点:

  • 始终使用参数化查询防止SQL注入

  • 合理使用事务确保数据一致性

  • 使用连接池管理数据库连接

  • 实现适当的错误处理和重试机制

  • 定期进行数据库维护和优化

  • 在生产环境中使用环境变量管理配置

后续学习建议:

  1. 学习PostgreSQL高级特性:窗口函数、CTE、JSONB等

  2. 深入了解数据库索引优化

  3. 学习数据库分片和复制

  4. 探索更多ORM框架(如Django ORM、Peewee等)

  5. 学习数据库迁移工具(Alembic)

这个教程提供了全面的Python与PostgreSQL集成指南,适合从初学者到中级开发者的学习。建议按照步骤实践每个部分,以加深理解。

点击关注下方公众号,免费获取 Python公开课和大佬打包整理的几百G的学习资料,内容包含但不限于Python电子书、教程、项目接单、源码等等

点击关注-免费领取

推荐阅读

Python 4.0 要来了!这 5 个变化你必须知道

拯救我崩溃代码的 5 个 Python 库

12 分钟介绍所有主流 (Web) 框架

全球爆火的OpenClaw(小龙虾)到底是什么?

点击 阅读原文

最新文章

随机文章

基本 文件 流程 错误 SQL 调试
  1. 请求信息 : 2026-04-14 22:08:49 HTTP/2.0 GET : https://f.mffb.com.cn/a/486308.html
  2. 运行时间 : 0.091815s [ 吞吐率:10.89req/s ] 内存消耗:4,845.16kb 文件加载:140
  3. 缓存信息 : 0 reads,0 writes
  4. 会话信息 : SESSION_ID=1bb658a435e86ca0f4b00eddc23fc0e5
  1. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/public/index.php ( 0.79 KB )
  2. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/autoload.php ( 0.17 KB )
  3. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_real.php ( 2.49 KB )
  4. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/platform_check.php ( 0.90 KB )
  5. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/ClassLoader.php ( 14.03 KB )
  6. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_static.php ( 4.90 KB )
  7. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper.php ( 8.34 KB )
  8. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/helper.php ( 2.19 KB )
  9. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/helper.php ( 1.47 KB )
  10. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/stubs/load_stubs.php ( 0.16 KB )
  11. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Exception.php ( 1.69 KB )
  12. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Facade.php ( 2.71 KB )
  13. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/deprecation-contracts/function.php ( 0.99 KB )
  14. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap.php ( 8.26 KB )
  15. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap80.php ( 9.78 KB )
  16. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/Resources/functions/dump.php ( 1.49 KB )
  17. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-dumper/src/helper.php ( 0.18 KB )
  18. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/VarDumper.php ( 4.30 KB )
  19. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/App.php ( 15.30 KB )
  20. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Container.php ( 15.76 KB )
  21. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/container/src/ContainerInterface.php ( 1.02 KB )
  22. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/provider.php ( 0.19 KB )
  23. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Http.php ( 6.04 KB )
  24. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Str.php ( 7.29 KB )
  25. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Env.php ( 4.68 KB )
  26. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/common.php ( 0.03 KB )
  27. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/helper.php ( 18.78 KB )
  28. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Config.php ( 5.54 KB )
  29. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/app.php ( 0.95 KB )
  30. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cache.php ( 0.78 KB )
  31. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/console.php ( 0.23 KB )
  32. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cookie.php ( 0.56 KB )
  33. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/database.php ( 2.48 KB )
  34. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Env.php ( 1.67 KB )
  35. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/filesystem.php ( 0.61 KB )
  36. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/lang.php ( 0.91 KB )
  37. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/log.php ( 1.35 KB )
  38. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/middleware.php ( 0.19 KB )
  39. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/route.php ( 1.89 KB )
  40. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/session.php ( 0.57 KB )
  41. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/trace.php ( 0.34 KB )
  42. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/view.php ( 0.82 KB )
  43. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/event.php ( 0.25 KB )
  44. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Event.php ( 7.67 KB )
  45. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/service.php ( 0.13 KB )
  46. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/AppService.php ( 0.26 KB )
  47. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Service.php ( 1.64 KB )
  48. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Lang.php ( 7.35 KB )
  49. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/lang/zh-cn.php ( 13.70 KB )
  50. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/Error.php ( 3.31 KB )
  51. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/RegisterService.php ( 1.33 KB )
  52. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/services.php ( 0.14 KB )
  53. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/PaginatorService.php ( 1.52 KB )
  54. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ValidateService.php ( 0.99 KB )
  55. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ModelService.php ( 2.04 KB )
  56. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Service.php ( 0.77 KB )
  57. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Middleware.php ( 6.72 KB )
  58. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/BootService.php ( 0.77 KB )
  59. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Paginator.php ( 11.86 KB )
  60. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/Validate.php ( 63.20 KB )
  61. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Model.php ( 23.55 KB )
  62. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Attribute.php ( 21.05 KB )
  63. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/AutoWriteData.php ( 4.21 KB )
  64. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Conversion.php ( 6.44 KB )
  65. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/DbConnect.php ( 5.16 KB )
  66. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/ModelEvent.php ( 2.33 KB )
  67. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/RelationShip.php ( 28.29 KB )
  68. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Arrayable.php ( 0.09 KB )
  69. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Jsonable.php ( 0.13 KB )
  70. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/contract/Modelable.php ( 0.09 KB )
  71. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Db.php ( 2.88 KB )
  72. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/DbManager.php ( 8.52 KB )
  73. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Log.php ( 6.28 KB )
  74. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Manager.php ( 3.92 KB )
  75. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerTrait.php ( 2.69 KB )
  76. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerInterface.php ( 2.71 KB )
  77. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cache.php ( 4.92 KB )
  78. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/simple-cache/src/CacheInterface.php ( 4.71 KB )
  79. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Arr.php ( 16.63 KB )
  80. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/driver/File.php ( 7.84 KB )
  81. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/Driver.php ( 9.03 KB )
  82. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/CacheHandlerInterface.php ( 1.99 KB )
  83. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/Request.php ( 0.09 KB )
  84. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Request.php ( 55.78 KB )
  85. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/middleware.php ( 0.25 KB )
  86. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Pipeline.php ( 2.61 KB )
  87. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/TraceDebug.php ( 3.40 KB )
  88. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/middleware/SessionInit.php ( 1.94 KB )
  89. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Session.php ( 1.80 KB )
  90. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/driver/File.php ( 6.27 KB )
  91. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/SessionHandlerInterface.php ( 0.87 KB )
  92. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/Store.php ( 7.12 KB )
  93. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Route.php ( 23.73 KB )
  94. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleName.php ( 5.75 KB )
  95. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Domain.php ( 2.53 KB )
  96. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleGroup.php ( 22.43 KB )
  97. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Rule.php ( 26.95 KB )
  98. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleItem.php ( 9.78 KB )
  99. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/route/app.php ( 1.72 KB )
  100. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Route.php ( 4.70 KB )
  101. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/dispatch/Controller.php ( 4.74 KB )
  102. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Dispatch.php ( 10.44 KB )
  103. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/controller/Index.php ( 4.81 KB )
  104. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/BaseController.php ( 2.05 KB )
  105. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/facade/Db.php ( 0.93 KB )
  106. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/connector/Mysql.php ( 5.44 KB )
  107. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/PDOConnection.php ( 52.47 KB )
  108. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Connection.php ( 8.39 KB )
  109. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/ConnectionInterface.php ( 4.57 KB )
  110. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/builder/Mysql.php ( 16.58 KB )
  111. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Builder.php ( 24.06 KB )
  112. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseBuilder.php ( 27.50 KB )
  113. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Query.php ( 15.71 KB )
  114. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseQuery.php ( 45.13 KB )
  115. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TimeFieldQuery.php ( 7.43 KB )
  116. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/AggregateQuery.php ( 3.26 KB )
  117. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ModelRelationQuery.php ( 20.07 KB )
  118. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ParamsBind.php ( 3.66 KB )
  119. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ResultOperation.php ( 7.01 KB )
  120. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/WhereQuery.php ( 19.37 KB )
  121. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/JoinAndViewQuery.php ( 7.11 KB )
  122. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php ( 2.63 KB )
  123. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/Transaction.php ( 2.77 KB )
  124. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/driver/File.php ( 5.96 KB )
  125. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/LogHandlerInterface.php ( 0.86 KB )
  126. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/Channel.php ( 3.89 KB )
  127. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/event/LogRecord.php ( 1.02 KB )
  128. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/Collection.php ( 16.47 KB )
  129. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/View.php ( 1.70 KB )
  130. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/View.php ( 4.39 KB )
  131. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Response.php ( 8.81 KB )
  132. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/response/View.php ( 3.29 KB )
  133. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cookie.php ( 6.06 KB )
  134. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-view/src/Think.php ( 8.38 KB )
  135. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/TemplateHandlerInterface.php ( 1.60 KB )
  136. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/Template.php ( 46.61 KB )
  137. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/driver/File.php ( 2.41 KB )
  138. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/contract/DriverInterface.php ( 0.86 KB )
  139. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/runtime/temp/067d451b9a0c665040f3f1bdd3293d68.php ( 11.98 KB )
  140. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Html.php ( 4.42 KB )
  1. CONNECT:[ UseTime:0.000643s ] mysql:host=127.0.0.1;port=3306;dbname=f_mffb;charset=utf8mb4
  2. SHOW FULL COLUMNS FROM `fenlei` [ RunTime:0.000708s ]
  3. SELECT * FROM `fenlei` WHERE `fid` = 0 [ RunTime:0.000254s ]
  4. SELECT * FROM `fenlei` WHERE `fid` = 63 [ RunTime:0.000275s ]
  5. SHOW FULL COLUMNS FROM `set` [ RunTime:0.000468s ]
  6. SELECT * FROM `set` [ RunTime:0.000203s ]
  7. SHOW FULL COLUMNS FROM `article` [ RunTime:0.000565s ]
  8. SELECT * FROM `article` WHERE `id` = 486308 LIMIT 1 [ RunTime:0.000604s ]
  9. UPDATE `article` SET `lasttime` = 1776175729 WHERE `id` = 486308 [ RunTime:0.000612s ]
  10. SELECT * FROM `fenlei` WHERE `id` = 66 LIMIT 1 [ RunTime:0.000208s ]
  11. SELECT * FROM `article` WHERE `id` < 486308 ORDER BY `id` DESC LIMIT 1 [ RunTime:0.001233s ]
  12. SELECT * FROM `article` WHERE `id` > 486308 ORDER BY `id` ASC LIMIT 1 [ RunTime:0.000393s ]
  13. SELECT * FROM `article` WHERE `id` < 486308 ORDER BY `id` DESC LIMIT 10 [ RunTime:0.011974s ]
  14. SELECT * FROM `article` WHERE `id` < 486308 ORDER BY `id` DESC LIMIT 10,10 [ RunTime:0.000726s ]
  15. SELECT * FROM `article` WHERE `id` < 486308 ORDER BY `id` DESC LIMIT 20,10 [ RunTime:0.002875s ]
0.093428s