关注+星标,每天学习Python新技能
因公众号更改推送规则,请点“在看”并加“星标”第一时间获取精彩技术分享
来源于网络,侵删
我们将从以下几个方面进行详细演示:
环境准备:安装必要的库和PostgreSQL数据库
连接PostgreSQL数据库
创建表
插入数据
查询数据
更新数据
删除数据
使用事务
使用连接池
使用ORM(SQLAlchemy)示例
注意:本教程假设你已经安装了PostgreSQL数据库,并且已经创建了一个数据库和用户。
步骤1:环境准备安装必要的Python库:psycopg2(用于连接PostgreSQL)和SQLAlchemy(ORM示例)如果你还没有安装,可以使用以下命令安装:pip install psycopg2-binary sqlalchemy
步骤2:连接数据库我们将演示如何使用psycopg2连接数据库。
目录
环境准备
基础连接与操作
CRUD操作详解
事务管理
连接池管理
使用SQLAlchemy ORM
异步操作
最佳实践
完整示例项目
环境准备
1. 安装PostgreSQL
Windows/Mac:
# 下载PostgreSQL安装包https://www.postgresql.org/download/# 或使用Dockerdocker run --name postgresql -e POSTGRES_PASSWORD=password -p 5432:5432 -d postgres
sudo apt updatesudo apt install postgresql postgresql-contribsudo systemctl start postgresql
2. 创建数据库和用户
-- 登录PostgreSQLsudo -u postgres psql-- 创建数据库CREATE DATABASE tutorial_db;-- 创建用户CREATE USER tutorial_user WITH PASSWORD 'secure_password';-- 授权GRANT ALL PRIVILEGES ON DATABASE tutorial_db TO tutorial_user;-- 查看数据库\l-- 连接到数据库\c tutorial_db
3. 安装Python库
pip install psycopg2-binary # PostgreSQL适配器pip install sqlalchemy # ORM框架pip install asyncpg # 异步PostgreSQL驱动pip install psycopg2-pool # 连接池
基础连接与操作
1. 基础连接示例
python
import psycopg2from psycopg2 import sqlfrom psycopg2.extras import RealDictCursorimport logging# 配置日志logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)class PostgreSQLConnector: def __init__(self, host='localhost', port=5432, database='tutorial_db', user='tutorial_user', password='secure_password'): self.connection_params = { 'host': host, 'port': port, 'database': database, 'user': user, 'password': password, 'client_encoding': 'utf8' } self.connection = None def connect(self): """建立数据库连接""" try: self.connection = psycopg2.connect(**self.connection_params) logger.info("成功连接到PostgreSQL数据库") return True except Exception as e: logger.error(f"连接失败: {e}") return False def close(self): """关闭连接""" if self.connection: self.connection.close() logger.info("数据库连接已关闭") def execute_query(self, query, params=None, fetch=False): """执行SQL查询""" cursor = None try: cursor = self.connection.cursor(cursor_factory=RealDictCursor) cursor.execute(query, params or ()) if fetch: if query.strip().upper().startswith('SELECT'): result = cursor.fetchall() return result else: self.connection.commit() return cursor.rowcount else: self.connection.commit() return cursor.rowcount except Exception as e: self.connection.rollback() logger.error(f"查询执行失败: {e}") raise finally: if cursor: cursor.close() def test_connection(self): """测试连接""" try: result = self.execute_query("SELECT version();", fetch=True) logger.info(f"PostgreSQL版本: {result[0]['version']}") return True except Exception as e: logger.error(f"测试连接失败: {e}") return False# 使用示例if __name__ == "__main__": # 创建连接器实例 db = PostgreSQLConnector() # 连接数据库 if db.connect(): # 测试连接 db.test_connection() # 关闭连接 db.close()
2. 使用上下文管理器
python
import psycopg2from contextlib import contextmanagerfrom typing import Generator, Any, Tuple@contextmanagerdef get_db_connection() -> Generator[psycopg2.extensions.connection, None, None]: """数据库连接上下文管理器""" conn = None try: conn = psycopg2.connect( host="localhost", database="tutorial_db", user="tutorial_user", password="secure_password" ) yield conn conn.commit() except Exception as e: if conn: conn.rollback() raise e finally: if conn: conn.close()@contextmanagerdef get_db_cursor() -> Generator[Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor], None, None]: """游标上下文管理器""" with get_db_connection() as conn: cursor = conn.cursor() try: yield conn, cursor finally: cursor.close()# 使用示例def get_all_users(): with get_db_cursor() as (conn, cursor): cursor.execute("SELECT * FROM users;") return cursor.fetchall()
CRUD操作详解
1. 创建表结构
class TableManager: def __init__(self, connector): self.db = connector def create_tables(self): """创建示例表""" # 用户表 users_table = """ CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, username VARCHAR(50) UNIQUE NOT NULL, email VARCHAR(100) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_active BOOLEAN DEFAULT TRUE, CONSTRAINT email_check CHECK (email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$') ); """ # 产品表 products_table = """ CREATE TABLE IF NOT EXISTS products ( id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL, description TEXT, price DECIMAL(10, 2) CHECK (price >= 0), stock_quantity INTEGER DEFAULT 0 CHECK (stock_quantity >= 0), category VARCHAR(50), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_category (category), INDEX idx_price (price) ); """ # 订单表 orders_table = """ CREATE TABLE IF NOT EXISTS orders ( id SERIAL PRIMARY KEY, user_id INTEGER REFERENCES users(id) ON DELETE CASCADE, total_amount DECIMAL(10, 2) CHECK (total_amount >= 0), status VARCHAR(20) DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, CONSTRAINT fk_user FOREIGN KEY (user_id) REFERENCES users(id), INDEX idx_user_id (user_id), INDEX idx_status (status) ); """ # 订单详情表 order_items_table = """ CREATE TABLE IF NOT EXISTS order_items ( id SERIAL PRIMARY KEY, order_id INTEGER REFERENCES orders(id) ON DELETE CASCADE, product_id INTEGER REFERENCES products(id) ON DELETE SET NULL, quantity INTEGER CHECK (quantity > 0), unit_price DECIMAL(10, 2) CHECK (unit_price >= 0), subtotal DECIMAL(10, 2) GENERATED ALWAYS AS (quantity * unit_price) STORED, CONSTRAINT fk_order FOREIGN KEY (order_id) REFERENCES orders(id), CONSTRAINT fk_product FOREIGN KEY (product_id) REFERENCES products(id), UNIQUE (order_id, product_id) ); """ tables = [users_table, products_table, orders_table, order_items_table] for i, table_sql in enumerate(tables, 1): try: self.db.execute_query(table_sql) logger.info(f"表 {i} 创建成功") except Exception as e: logger.error(f"创建表失败: {e}") raise def create_indexes(self): """创建额外索引""" indexes = [ "CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);", "CREATE INDEX IF NOT EXISTS idx_products_name ON products(name);", "CREATE INDEX IF NOT EXISTS idx_orders_created ON orders(created_at);" ] for index_sql in indexes: try: self.db.execute_query(index_sql) except Exception as e: logger.warning(f"创建索引失败: {e}") def create_triggers(self): """创建触发器""" trigger_sql = """ CREATE OR REPLACE FUNCTION update_updated_at_column() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = CURRENT_TIMESTAMP; RETURN NEW; END; $$ language 'plpgsql'; DO $$ DECLARE tbl text; BEGIN FOR tbl IN SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_name IN ('users', 'products', 'orders') LOOP EXECUTE format(' DROP TRIGGER IF EXISTS update_%s_updated_at ON %s; CREATE TRIGGER update_%s_updated_at BEFORE UPDATE ON %s FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); ', tbl, tbl, tbl, tbl); END LOOP; END $$; """ try: self.db.execute_query(trigger_sql) logger.info("触发器创建成功") except Exception as e: logger.error(f"创建触发器失败: {e}")
2. 插入数据
class DataOperations: def __init__(self, connector): self.db = connector def insert_user(self, username, email, password_hash): """插入用户数据""" query = """ INSERT INTO users (username, email, password_hash) VALUES (%s, %s, %s) RETURNING id, username, email, created_at; """ try: result = self.db.execute_query( query, (username, email, password_hash), fetch=True ) logger.info(f"用户创建成功: {result[0]['username']}") return result[0] except psycopg2.IntegrityError as e: logger.error(f"用户已存在或数据不合法: {e}") raise except Exception as e: logger.error(f"插入用户失败: {e}") raise def batch_insert_products(self, products): """批量插入产品数据""" query = """ INSERT INTO products (name, description, price, stock_quantity, category) VALUES %s RETURNING id; """ try: # 使用psycopg2的批量插入 with self.db.connection.cursor() as cursor: # 使用execute_values进行高效批量插入 from psycopg2.extras import execute_values execute_values( cursor, query, products, template="(%s, %s, %s, %s, %s)", page_size=100 ) self.db.connection.commit() inserted_ids = cursor.fetchall() logger.info(f"批量插入 {len(inserted_ids)} 个产品成功") return inserted_ids except Exception as e: self.db.connection.rollback() logger.error(f"批量插入失败: {e}") raise def upsert_product(self, name, description, price, stock_quantity, category): """插入或更新产品""" query = """ INSERT INTO products (name, description, price, stock_quantity, category) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (name) DO UPDATE SET description = EXCLUDED.description, price = EXCLUDED.price, stock_quantity = products.stock_quantity + EXCLUDED.stock_quantity, updated_at = CURRENT_TIMESTAMP RETURNING id, name, stock_quantity; """ try: result = self.db.execute_query( query, (name, description, price, stock_quantity, category), fetch=True ) return result[0] except Exception as e: logger.error(f"Upsert失败: {e}") raise
3. 查询数据
class QueryOperations: def __init__(self, connector): self.db = connector def get_user_by_id(self, user_id): """根据ID查询用户""" query = """ SELECT id, username, email, created_at, is_active FROM users WHERE id = %s; """ result = self.db.execute_query(query, (user_id,), fetch=True) return result[0] if result else None def search_products(self, keyword=None, min_price=0, max_price=None, category=None, limit=10, offset=0): """搜索产品(带参数)""" base_query = """ SELECT id, name, description, price, stock_quantity, category FROM products WHERE price >= %s """ params = [min_price] # 动态构建查询条件 conditions = [] if max_price: conditions.append("price <= %s") params.append(max_price) if keyword: conditions.append("(name ILIKE %s OR description ILIKE %s)") params.extend([f"%{keyword}%", f"%{keyword}%"]) if category: conditions.append("category = %s") params.append(category) if conditions: base_query += " AND " + " AND ".join(conditions) # 添加排序和分页 base_query += """ ORDER BY price ASC, name ASC LIMIT %s OFFSET %s; """ params.extend([limit, offset]) # 执行查询 return self.db.execute_query(base_query, tuple(params), fetch=True) def get_user_orders(self, user_id): """获取用户订单(连接查询)""" query = """ SELECT o.id as order_id, o.total_amount, o.status, o.created_at, COUNT(oi.id) as item_count, SUM(oi.subtotal) as items_total FROM orders o LEFT JOIN order_items oi ON o.id = oi.order_id WHERE o.user_id = %s GROUP BY o.id, o.total_amount, o.status, o.created_at ORDER BY o.created_at DESC; """ return self.db.execute_query(query, (user_id,), fetch=True) def get_order_details(self, order_id): """获取订单详情(复杂连接查询)""" query = """ SELECT o.id as order_id, o.status, o.total_amount, o.created_at, u.username, u.email, json_agg( json_build_object( 'product_id', p.id, 'product_name', p.name, 'quantity', oi.quantity, 'unit_price', oi.unit_price, 'subtotal', oi.subtotal ) ) as items FROM orders o JOIN users u ON o.user_id = u.id JOIN order_items oi ON o.id = oi.order_id JOIN products p ON oi.product_id = p.id WHERE o.id = %s GROUP BY o.id, u.username, u.email; """ result = self.db.execute_query(query, (order_id,), fetch=True) return result[0] if result else None def get_paginated_data(self, table_name, page=1, per_page=10, filters=None): """通用分页查询""" # 构建WHERE条件 where_conditions = [] params = [] if filters: for key, value in filters.items(): if value is not None: where_conditions.append(f"{key} = %s") params.append(value) where_clause = "" if where_conditions: where_clause = "WHERE " + " AND ".join(where_conditions) # 计算偏移量 offset = (page - 1) * per_page # 查询数据 data_query = f""" SELECT * FROM {table_name} {where_clause} LIMIT %s OFFSET %s; """ params.extend([per_page, offset]) # 查询总数 count_query = f""" SELECT COUNT(*) as total FROM {table_name} {where_clause}; """ data = self.db.execute_query(data_query, tuple(params), fetch=True) total_result = self.db.execute_query(count_query, tuple(params[:len(params)-2]), fetch=True) return { 'data': data, 'total': total_result[0]['total'] if total_result else 0, 'page': page, 'per_page': per_page, 'total_pages': (total_result[0]['total'] + per_page - 1) // per_page if total_result else 0 }
4. 更新数据
class UpdateOperations: def __init__(self, connector): self.db = connector def update_user(self, user_id, **kwargs): """更新用户信息""" if not kwargs: return False set_clauses = [] params = [] allowed_fields = ['username', 'email', 'password_hash', 'is_active'] for field, value in kwargs.items(): if field in allowed_fields and value is not None: set_clauses.append(f"{field} = %s") params.append(value) if not set_clauses: return False params.append(user_id) query = f""" UPDATE users SET {', '.join(set_clauses)} WHERE id = %s RETURNING id, username, email, updated_at; """ try: result = self.db.execute_query(query, tuple(params), fetch=True) if result: logger.info(f"用户 {user_id} 更新成功") return result[0] return None except psycopg2.IntegrityError as e: logger.error(f"更新失败(数据冲突): {e}") raise def update_product_stock(self, product_id, quantity_change): """更新产品库存(原子操作)""" query = """ UPDATE products SET stock_quantity = GREATEST(0, stock_quantity + %s) WHERE id = %s RETURNING id, name, stock_quantity; """ try: result = self.db.execute_query( query, (quantity_change, product_id), fetch=True ) if result: logger.info(f"产品 {product_id} 库存更新: {quantity_change}") return result[0] return None except Exception as e: logger.error(f"更新库存失败: {e}") raise def bulk_update_status(self, table_name, ids, status): """批量更新状态""" query = f""" UPDATE {table_name} SET status = %s WHERE id = ANY(%s) RETURNING id; """ try: result = self.db.execute_query( query, (status, ids), fetch=True ) logger.info(f"批量更新 {len(result)} 条记录状态为 {status}") return result except Exception as e: logger.error(f"批量更新失败: {e}") raise
5. 删除数据
class DeleteOperations: def __init__(self, connector): self.db = connector def soft_delete_user(self, user_id): """软删除用户(标记为不活跃)""" query = """ UPDATE users SET is_active = FALSE WHERE id = %s RETURNING id, username; """ try: result = self.db.execute_query(query, (user_id,), fetch=True) if result: logger.info(f"用户 {user_id} 已软删除") return result[0] return None except Exception as e: logger.error(f"软删除失败: {e}") raise def delete_product(self, product_id): """删除产品""" query = """ DELETE FROM products WHERE id = %s RETURNING id, name; """ try: result = self.db.execute_query(query, (product_id,), fetch=True) if result: logger.info(f"产品 {product_id} 已删除") return result[0] return None except Exception as e: logger.error(f"删除失败: {e}") raise def cleanup_old_records(self, table_name, days=30): """清理旧记录""" query = f""" DELETE FROM {table_name} WHERE created_at < CURRENT_TIMESTAMP - INTERVAL '%s days' RETURNING COUNT(*) as deleted_count; """ try: result = self.db.execute_query(query, (days,), fetch=True) deleted_count = result[0]['deleted_count'] if result else 0 logger.info(f"清理了 {deleted_count} 条旧记录") return deleted_count except Exception as e: logger.error(f"清理失败: {e}") raise
事务管理
class TransactionManager: def __init__(self, connector): self.db = connector def create_order_with_items(self, user_id, items): """创建订单(事务示例)""" try: # 开始事务 self.db.connection.autocommit = False # 计算订单总额 total_amount = 0 for item in items: # 检查库存 check_query = """ SELECT price, stock_quantity FROM products WHERE id = %s FOR UPDATE; """ result = self.db.execute_query( check_query, (item['product_id'],), fetch=True ) if not result or result[0]['stock_quantity'] < item['quantity']: raise Exception(f"产品 {item['product_id']} 库存不足") total_amount += result[0]['price'] * item['quantity'] # 创建订单 order_query = """ INSERT INTO orders (user_id, total_amount) VALUES (%s, %s) RETURNING id; """ order_result = self.db.execute_query( order_query, (user_id, total_amount), fetch=True ) order_id = order_result[0]['id'] # 添加订单项并更新库存 for item in items: # 插入订单项 item_query = """ INSERT INTO order_items (order_id, product_id, quantity, unit_price) VALUES (%s, %s, %s, (SELECT price FROM products WHERE id = %s) ); """ self.db.execute_query( item_query, (order_id, item['product_id'], item['quantity'], item['product_id']) ) # 更新库存 self.db.execute_query( "UPDATE products SET stock_quantity = stock_quantity - %s WHERE id = %s;", (item['quantity'], item['product_id']) ) # 提交事务 self.db.connection.commit() logger.info(f"订单 {order_id} 创建成功") return order_id except Exception as e: # 回滚事务 self.db.connection.rollback() logger.error(f"创建订单失败: {e}") raise finally: # 恢复自动提交 self.db.connection.autocommit = True def transfer_stock(self, from_product_id, to_product_id, quantity): """库存转移(事务示例)""" try: self.db.connection.autocommit = False # 检查源产品库存 check_query = """ SELECT stock_quantity FROM products WHERE id = %s FOR UPDATE; """ from_stock = self.db.execute_query( check_query, (from_product_id,), fetch=True )[0]['stock_quantity'] if from_stock < quantity: raise Exception("库存不足") # 减少源产品库存 self.db.execute_query( "UPDATE products SET stock_quantity = stock_quantity - %s WHERE id = %s;", (quantity, from_product_id) ) # 增加目标产品库存 self.db.execute_query( "UPDATE products SET stock_quantity = stock_quantity + %s WHERE id = %s;", (quantity, to_product_id) ) # 记录转移日志 log_query = """ INSERT INTO stock_transfers (from_product_id, to_product_id, quantity) VALUES (%s, %s, %s); """ self.db.execute_query(log_query, (from_product_id, to_product_id, quantity)) self.db.connection.commit() logger.info(f"库存转移成功: {quantity} 个产品") except Exception as e: self.db.connection.rollback() logger.error(f"库存转移失败: {e}") raise finally: self.db.connection.autocommit = True
连接池管理
import psycopg2.poolfrom threading import Lockimport timeclass ConnectionPoolManager: def __init__(self, min_conn=1, max_conn=10, **kwargs): self.pool_params = kwargs self.pool = None self.lock = Lock() self.min_conn = min_conn self.max_conn = max_conn def create_pool(self): """创建连接池""" with self.lock: if not self.pool: try: self.pool = psycopg2.pool.ThreadedConnectionPool( minconn=self.min_conn, maxconn=self.max_conn, **self.pool_params ) logger.info("连接池创建成功") except Exception as e: logger.error(f"创建连接池失败: {e}") raise def get_connection(self): """从连接池获取连接""" if not self.pool: self.create_pool() try: conn = self.pool.getconn() return conn except Exception as e: logger.error(f"获取连接失败: {e}") raise def return_connection(self, conn): """归还连接到连接池""" try: self.pool.putconn(conn) except Exception as e: logger.error(f"归还连接失败: {e}") def close_all_connections(self): """关闭所有连接""" with self.lock: if self.pool: self.pool.closeall() logger.info("所有连接已关闭") def get_pool_stats(self): """获取连接池统计信息""" if not self.pool: return {} return { 'min_connections': self.min_conn, 'max_connections': self.max_conn, 'available': len(self.pool._used), 'in_use': len(self.pool._rused) }class PooledDatabase: def __init__(self, pool_manager): self.pool = pool_manager @contextmanager def get_cursor_from_pool(self): """从连接池获取游标的上下文管理器""" conn = None cursor = None try: conn = self.pool.get_connection() cursor = conn.cursor(cursor_factory=RealDictCursor) yield cursor conn.commit() except Exception as e: if conn: conn.rollback() raise e finally: if cursor: cursor.close() if conn: self.pool.return_connection(conn) def execute_with_pool(self, query, params=None): """使用连接池执行查询""" with self.get_cursor_from_pool() as cursor: cursor.execute(query, params or ()) if query.strip().upper().startswith('SELECT'): return cursor.fetchall() else: return cursor.rowcount# 使用连接池的示例def setup_database_pool(): """设置数据库连接池""" pool_manager = ConnectionPoolManager( min_conn=2, max_conn=10, host="localhost", database="tutorial_db", user="tutorial_user", password="secure_password", connect_timeout=10 ) return PooledDatabase(pool_manager)
使用SQLAlchemy ORM
1. 定义模型
python
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Boolean, Text, ForeignKey, CheckConstraint, Index, UniqueConstraintfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker, relationshipfrom sqlalchemy.sql import funcfrom datetime import datetimeimport jsonBase = declarative_base()class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) username = Column(String(50), unique=True, nullable=False) email = Column(String(100), unique=True, nullable=False) password_hash = Column(String(255), nullable=False) created_at = Column(DateTime, default=func.now()) updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) is_active = Column(Boolean, default=True) # 关系 orders = relationship("Order", back_populates="user", cascade="all, delete-orphan") # 约束 __table_args__ = ( CheckConstraint("email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'", name="email_check"), Index('idx_users_email', email), ) def to_dict(self): """转换为字典""" return { 'id': self.id, 'username': self.username, 'email': self.email, 'created_at': self.created_at.isoformat() if self.created_at else None, 'is_active': self.is_active }class Product(Base): __tablename__ = 'products' id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) description = Column(Text) price = Column(Float, CheckConstraint("price >= 0"), nullable=False) stock_quantity = Column(Integer, default=0, CheckConstraint("stock_quantity >= 0")) category = Column(String(50)) created_at = Column(DateTime, default=func.now()) updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) # 关系 order_items = relationship("OrderItem", back_populates="product") # 索引 __table_args__ = ( Index('idx_products_category', category), Index('idx_products_price', price), Index('idx_products_name', name), ) def to_dict(self): return { 'id': self.id, 'name': self.name, 'description': self.description, 'price': self.price, 'stock_quantity': self.stock_quantity, 'category': self.category }class Order(Base): __tablename__ = 'orders' id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey('users.id', ondelete='CASCADE'), nullable=False) total_amount = Column(Float, CheckConstraint("total_amount >= 0")) status = Column(String(20), default='pending') created_at = Column(DateTime, default=func.now()) updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) # 关系 user = relationship("User", back_populates="orders") items = relationship("OrderItem", back_populates="order", cascade="all, delete-orphan") # 索引 __table_args__ = ( Index('idx_orders_user_id', user_id), Index('idx_orders_status', status), Index('idx_orders_created', created_at), ) def to_dict(self): return { 'id': self.id, 'user_id': self.user_id, 'total_amount': self.total_amount, 'status': self.status, 'created_at': self.created_at.isoformat() if self.created_at else None, 'items': [item.to_dict() for item in self.items] }class OrderItem(Base): __tablename__ = 'order_items' id = Column(Integer, primary_key=True) order_id = Column(Integer, ForeignKey('orders.id', ondelete='CASCADE'), nullable=False) product_id = Column(Integer, ForeignKey('products.id', ondelete='SET NULL')) quantity = Column(Integer, CheckConstraint("quantity > 0"), nullable=False) unit_price = Column(Float, CheckConstraint("unit_price >= 0"), nullable=False) # 关系 order = relationship("Order", back_populates="items") product = relationship("Product", back_populates="order_items") # 复合唯一约束 __table_args__ = ( UniqueConstraint('order_id', 'product_id', name='uq_order_product'), ) @property def subtotal(self): return self.quantity * self.unit_price def to_dict(self): return { 'id': self.id, 'product_id': self.product_id, 'product_name': self.product.name if self.product else None, 'quantity': self.quantity, 'unit_price': self.unit_price, 'subtotal': self.subtotal }
2. SQLAlchemy数据库操作类
python
class SQLAlchemyDatabase: def __init__(self, connection_string): self.engine = create_engine( connection_string, pool_size=10, max_overflow=20, pool_pre_ping=True, echo=False, # 设为True可以查看SQL日志 json_serializer=lambda obj: json.dumps(obj, ensure_ascii=False) ) self.SessionLocal = sessionmaker( autocommit=False, autoflush=False, bind=self.engine ) def init_db(self): """初始化数据库""" Base.metadata.create_all(bind=self.engine) logger.info("数据库表创建成功") def drop_db(self): """删除所有表""" Base.metadata.drop_all(bind=self.engine) logger.info("数据库表已删除") def get_session(self): """获取数据库会话""" return self.SessionLocal() @contextmanager def session_scope(self): """会话上下文管理器""" session = self.get_session() try: yield session session.commit() except Exception as e: session.rollback() logger.error(f"数据库操作失败: {e}") raise finally: session.close() # CRUD操作示例 def create_user(self, username, email, password_hash): """创建用户""" with self.session_scope() as session: user = User( username=username, email=email, password_hash=password_hash ) session.add(user) session.flush() # 获取生成的ID return user.to_dict() def get_user(self, user_id=None, username=None, email=None): """获取用户""" with self.session_scope() as session: query = session.query(User) if user_id: query = query.filter(User.id == user_id) if username: query = query.filter(User.username == username) if email: query = query.filter(User.email == email) user = query.first() return user.to_dict() if user else None def update_user(self, user_id, **kwargs): """更新用户""" with self.session_scope() as session: user = session.query(User).filter(User.id == user_id).first() if not user: return None for key, value in kwargs.items(): if hasattr(user, key): setattr(user, key, value) return user.to_dict() def delete_user(self, user_id): """删除用户""" with self.session_scope() as session: user = session.query(User).filter(User.id == user_id).first() if user: session.delete(user) return True return False # 复杂查询示例 def search_products(self, **filters): """搜索产品""" with self.session_scope() as session: query = session.query(Product) if filters.get('keyword'): keyword = f"%{filters['keyword']}%" query = query.filter( Product.name.ilike(keyword) | Product.description.ilike(keyword) ) if filters.get('min_price'): query = query.filter(Product.price >= filters['min_price']) if filters.get('max_price'): query = query.filter(Product.price <= filters['max_price']) if filters.get('category'): query = query.filter(Product.category == filters['category']) # 排序 sort_by = filters.get('sort_by', 'price') sort_order = filters.get('sort_order', 'asc') if sort_order == 'desc': query = query.order_by(getattr(Product, sort_by).desc()) else: query = query.order_by(getattr(Product, sort_by)) # 分页 page = filters.get('page', 1) per_page = filters.get('per_page', 10) offset = (page - 1) * per_page total = query.count() products = query.offset(offset).limit(per_page).all() return { 'data': [p.to_dict() for p in products], 'total': total, 'page': page, 'per_page': per_page } def create_order(self, user_id, items): """创建订单(使用事务)""" with self.session_scope() as session: try: # 计算总额并检查库存 total_amount = 0 for item_data in items: product = session.query(Product).filter( Product.id == item_data['product_id'] ).with_for_update().first() # 行级锁 if not product or product.stock_quantity < item_data['quantity']: raise Exception(f"产品 {item_data['product_id']} 库存不足") total_amount += product.price * item_data['quantity'] # 创建订单 order = Order( user_id=user_id, total_amount=total_amount ) session.add(order) session.flush() # 获取订单ID # 添加订单项并更新库存 for item_data in items: product = session.query(Product).filter( Product.id == item_data['product_id'] ).first() order_item = OrderItem( order_id=order.id, product_id=item_data['product_id'], quantity=item_data['quantity'], unit_price=product.price ) session.add(order_item) # 更新库存 product.stock_quantity -= item_data['quantity'] return order.to_dict() except Exception as e: session.rollback() logger.error(f"创建订单失败: {e}") raise # 批量操作 def bulk_insert_products(self, products_data): """批量插入产品""" with self.session_scope() as session: products = [ Product(**product_data) for product_data in products_data ] session.bulk_save_objects(products) # 原生SQL查询 def execute_raw_sql(self, sql, params=None): """执行原生SQL""" with self.session_scope() as session: result = session.execute(sql, params or {}) if sql.strip().upper().startswith('SELECT'): columns = result.keys() return [dict(zip(columns, row)) for row in result.fetchall()] else: return result.rowcount# 使用示例def setup_sqlalchemy_db(): """设置SQLAlchemy数据库""" connection_string = "postgresql://tutorial_user:secure_password@localhost/tutorial_db" db = SQLAlchemyDatabase(connection_string) # 初始化数据库表 db.init_db() return db
异步操作
import asyncioimport asyncpgfrom datetime import datetimefrom typing import List, Dict, Any, Optionalclass AsyncPostgreSQL: def __init__(self, dsn: str, min_size: int = 5, max_size: int = 20): self.dsn = dsn self.min_size = min_size self.max_size = max_size self.pool = None async def connect(self): """创建连接池""" self.pool = await asyncpg.create_pool( dsn=self.dsn, min_size=self.min_size, max_size=self.max_size, command_timeout=60, max_inactive_connection_lifetime=300 ) logger.info("异步连接池创建成功") async def close(self): """关闭连接池""" if self.pool: await self.pool.close() logger.info("异步连接池已关闭") @contextmanager async def get_connection(self): """获取连接""" if not self.pool: await self.connect() conn = await self.pool.acquire() try: yield conn finally: await self.pool.release(conn) async def execute(self, query: str, *args) -> str: """执行SQL语句""" async with self.get_connection() as conn: return await conn.execute(query, *args) async def fetch(self, query: str, *args) -> List[Dict]: """查询多行数据""" async with self.get_connection() as conn: rows = await conn.fetch(query, *args) return [dict(row) for row in rows] async def fetchrow(self, query: str, *args) -> Optional[Dict]: """查询单行数据""" async with self.get_connection() as conn: row = await conn.fetchrow(query, *args) return dict(row) if row else None async def fetchval(self, query: str, *args) -> Any: """查询单个值""" async with self.get_connection() as conn: return await conn.fetchval(query, *args) async def transaction(self, callback): """事务处理""" async with self.get_connection() as conn: async with conn.transaction(): return await callback(conn) # 业务方法示例 async def get_user_with_orders(self, user_id: int) -> Dict: """获取用户及其订单""" query = """ SELECT u.*, COALESCE( json_agg( json_build_object( 'id', o.id, 'total_amount', o.total_amount, 'status', o.status, 'created_at', o.created_at ) ) FILTER (WHERE o.id IS NOT NULL), '[]' ) as orders FROM users u LEFT JOIN orders o ON u.id = o.user_id WHERE u.id = $1 GROUP BY u.id; """ return await self.fetchrow(query, user_id) async def bulk_insert_users(self, users: List[Dict]) -> List[int]: """批量插入用户""" query = """ INSERT INTO users (username, email, password_hash) VALUES ($1, $2, $3) RETURNING id; """ async with self.get_connection() as conn: stmt = await conn.prepare(query) ids = [] for user in users: row = await stmt.fetchrow( user['username'], user['email'], user['password_hash'] ) ids.append(row['id']) return ids# 异步操作示例async def async_demo(): """异步操作演示""" dsn = "postgresql://tutorial_user:secure_password@localhost/tutorial_db" db = AsyncPostgreSQL(dsn) await db.connect() try: # 测试连接 version = await db.fetchval("SELECT version();") logger.info(f"PostgreSQL版本: {version}") # 查询示例 users = await db.fetch("SELECT * FROM users LIMIT 5;") logger.info(f"前5个用户: {len(users)}") # 事务示例 async def transfer_money(from_user, to_user, amount): """转账事务""" await db.execute( "UPDATE accounts SET balance = balance - $1 WHERE user_id = $2;", amount, from_user ) await db.execute( "UPDATE accounts SET balance = balance + $1 WHERE user_id = $2;", amount, to_user ) return True # 执行事务 success = await db.transaction( lambda conn: transfer_money(1, 2, 100.00) ) finally: await db.close()# 运行异步示例# asyncio.run(async_demo())
最佳实践
1. 配置管理
python
import osfrom dataclasses import dataclassfrom typing import Optionalfrom dotenv import load_dotenvload_dotenv() # 加载环境变量@dataclassclass DatabaseConfig: """数据库配置类""" host: str = os.getenv('DB_HOST', 'localhost') port: int = int(os.getenv('DB_PORT', '5432')) database: str = os.getenv('DB_NAME', 'tutorial_db') user: str = os.getenv('DB_USER', 'tutorial_user') password: str = os.getenv('DB_PASSWORD', 'secure_password') pool_min: int = int(os.getenv('DB_POOL_MIN', '2')) pool_max: int = int(os.getenv('DB_POOL_MAX', '10')) timeout: int = int(os.getenv('DB_TIMEOUT', '30')) @property def connection_string(self): """生成连接字符串""" return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}" @property def async_connection_string(self): """生成异步连接字符串""" return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}" def validate(self): """验证配置""" required = ['host', 'database', 'user', 'password'] for field in required: if not getattr(self, field): raise ValueError(f"数据库配置缺失: {field}") return True
2. 健康检查
class DatabaseHealthChecker: """数据库健康检查""" def __init__(self, connector): self.db = connector def check_connection(self) -> dict: """检查连接""" try: start_time = time.time() self.db.execute_query("SELECT 1;") latency = (time.time() - start_time) * 1000 # 毫秒 return { 'status': 'healthy', 'latency_ms': round(latency, 2), 'timestamp': datetime.now().isoformat() } except Exception as e: return { 'status': 'unhealthy', 'error': str(e), 'timestamp': datetime.now().isoformat() } def check_disk_space(self) -> dict: """检查磁盘空间""" query = """ SELECT datname as database, pg_database_size(datname) as size_bytes, pg_size_pretty(pg_database_size(datname)) as size_pretty FROM pg_database WHERE datname = current_database(); """ try: result = self.db.execute_query(query, fetch=True) return { 'database': result[0]['database'], 'size_bytes': result[0]['size_bytes'], 'size_human': result[0]['size_pretty'], 'status': 'healthy' } except Exception as e: return { 'status': 'unhealthy', 'error': str(e) } def check_active_connections(self) -> dict: """检查活动连接""" query = """ SELECT count(*) as total, count(*) filter (where state = 'active') as active, count(*) filter (where state = 'idle') as idle, count(*) filter (where state = 'idle in transaction') as idle_in_transaction FROM pg_stat_activity WHERE datname = current_database(); """ try: result = self.db.execute_query(query, fetch=True) return { 'total': result[0]['total'], 'active': result[0]['active'], 'idle': result[0]['idle'], 'idle_in_transaction': result[0]['idle_in_transaction'], 'status': 'healthy' } except Exception as e: return { 'status': 'unhealthy', 'error': str(e) } def full_health_check(self) -> dict: """完整健康检查""" checks = { 'connection': self.check_connection(), 'disk_space': self.check_disk_space(), 'connections': self.check_active_connections() } overall_status = 'healthy' for check_name, result in checks.items(): if result.get('status') != 'healthy': overall_status = 'unhealthy' break return { 'status': overall_status, 'checks': checks, 'timestamp': datetime.now().isoformat() }
3. 错误处理装饰器
from functools import wrapsimport psycopg2from psycopg2 import OperationalError, IntegrityErrordef handle_db_errors(max_retries=3, retry_delay=1): """数据库错误处理装饰器""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): retries = 0 while retries < max_retries: try: return func(*args, **kwargs) except OperationalError as e: retries += 1 if retries == max_retries: logger.error(f"数据库操作失败(连接错误): {e}") raise logger.warning(f"连接错误,重试 {retries}/{max_retries}: {e}") time.sleep(retry_delay) except IntegrityError as e: logger.error(f"数据完整性错误: {e}") raise except psycopg2.Error as e: logger.error(f"数据库错误: {e}") raise except Exception as e: logger.error(f"未知错误: {e}") raise return None return wrapper return decorator# 使用示例class SafeDatabaseOperations: def __init__(self, connector): self.db = connector @handle_db_errors(max_retries=3) def safe_create_user(self, username, email, password_hash): """安全的用户创建方法""" return self.db.execute_query( "INSERT INTO users (username, email, password_hash) VALUES (%s, %s, %s) RETURNING id;", (username, email, password_hash), fetch=True )
4. 性能优化
class DatabaseOptimizer: """数据库性能优化工具""" def __init__(self, connector): self.db = connector def analyze_query_plan(self, query, params=None): """分析查询计划""" explain_query = f"EXPLAIN ANALYZE {query}" try: result = self.db.execute_query(explain_query, params, fetch=True) return "\n".join([row['QUERY PLAN'] for row in result]) except Exception as e: logger.error(f"分析查询计划失败: {e}") return None def vacuum_table(self, table_name): """清理表空间""" try: self.db.execute_query(f"VACUUM ANALYZE {table_name};") logger.info(f"表 {table_name} 清理完成") return True except Exception as e: logger.error(f"清理表失败: {e}") return False def reindex_table(self, table_name): """重建索引""" try: self.db.execute_query(f"REINDEX TABLE {table_name};") logger.info(f"表 {table_name} 索引重建完成") return True except Exception as e: logger.error(f"重建索引失败: {e}") return False def get_table_statistics(self, table_name): """获取表统计信息""" query = """ SELECT schemaname, tablename, n_live_tup as live_rows, n_dead_tup as dead_rows, pg_size_pretty(pg_total_relation_size(schemaname || '.' || tablename)) as total_size, pg_size_pretty(pg_relation_size(schemaname || '.' || tablename)) as table_size, pg_size_pretty(pg_total_relation_size(schemaname || '.' || tablename) - pg_relation_size(schemaname || '.' || tablename)) as index_size, last_vacuum, last_autovacuum, last_analyze, last_autoanalyze FROM pg_stat_user_tables WHERE tablename = %s; """ try: result = self.db.execute_query(query, (table_name,), fetch=True) return result[0] if result else None except Exception as e: logger.error(f"获取统计信息失败: {e}") return None def optimize_queries(self): """执行优化操作""" # 获取所有用户表 tables_query = """ SELECT tablename FROM pg_tables WHERE schemaname = 'public'; """ tables = self.db.execute_query(tables_query, fetch=True) for table in tables: table_name = table['tablename'] # 清理表 self.vacuum_table(table_name) # 更新统计信息 self.db.execute_query(f"ANALYZE {table_name};") logger.info(f"表 {table_name} 优化完成")
完整示例项目
项目结构
text
postgresql-tutorial/│├── config/│ ├── __init__.py│ └── settings.py│├── database/│ ├── __init__.py│ ├── connection.py│ ├── models.py│ ├── operations.py│ ├── repositories.py│ └── migrations/│├── services/│ ├── __init__.py│ ├── user_service.py│ ├── product_service.py│ └── order_service.py│├── api/│ ├── __init__.py│ ├── routes.py│ └── schemas.py│├── utils/│ ├── __init__.py│ ├── logger.py│ ├── validators.py│ └── helpers.py│├── tests/│ ├── __init__.py│ ├── test_database.py│ ├── test_services.py│ └── test_api.py│├── requirements.txt├── docker-compose.yml├── .env.example└── README.md
主应用程序
python
# main.pyimport uvicornfrom fastapi import FastAPI, Depends, HTTPExceptionfrom contextlib import asynccontextmanagerfrom database.connection import get_db, DatabaseHealthCheckerfrom database.models import Base, enginefrom api.routes import router as api_routerimport logging# 配置日志logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger = logging.getLogger(__name__)@asynccontextmanagerasync def lifespan(app: FastAPI): """应用生命周期管理""" # 启动时 logger.info("应用启动中...") # 创建数据库表 Base.metadata.create_all(bind=engine) logger.info("数据库表初始化完成") # 运行健康检查 db = get_db() health_checker = DatabaseHealthChecker(db) health_status = health_checker.full_health_check() if health_status['status'] == 'healthy': logger.info("数据库健康检查通过") else: logger.error("数据库健康检查失败") yield # 关闭时 logger.info("应用关闭中...") db.close()# 创建FastAPI应用app = FastAPI( title="PostgreSQL教程API", description="PostgreSQL与Python集成示例", version="1.0.0", lifespan=lifespan)# 注册路由app.include_router(api_router, prefix="/api/v1")@app.get("/")async def root(): """根路由""" return { "message": "PostgreSQL教程API", "version": "1.0.0", "docs": "/docs", "health": "/health" }@app.get("/health")async def health_check(): """健康检查端点""" db = get_db() health_checker = DatabaseHealthChecker(db) return health_checker.full_health_check()if __name__ == "__main__": uvicorn.run( "main:app", host="0.0.0.0", port=8000, reload=True, log_level="info" )
Docker配置
yaml
# docker-compose.ymlversion: '3.8'services: postgres: image: postgres:15-alpine container_name: postgres_tutorial environment: POSTGRES_DB: tutorial_db POSTGRES_USER: tutorial_user POSTGRES_PASSWORD: secure_password ports: - "5432:5432" volumes: - postgres_data:/var/lib/postgresql/data - ./init.sql:/docker-entrypoint-initdb.d/init.sql networks: - app_network healthcheck: test: ["CMD-SHELL", "pg_isready -U tutorial_user -d tutorial_db"] interval: 10s timeout: 5s retries: 5 pgadmin: image: dpage/pgadmin4:latest container_name: pgadmin_tutorial environment: PGADMIN_DEFAULT_EMAIL: admin@example.com PGADMIN_DEFAULT_PASSWORD: admin_password ports: - "5050:80" depends_on: - postgres networks: - app_network app: build: . container_name: python_app environment: DB_HOST: postgres DB_PORT: 5432 DB_NAME: tutorial_db DB_USER: tutorial_user DB_PASSWORD: secure_password ports: - "8000:8000" depends_on: postgres: condition: service_healthy networks: - app_network volumes: - .:/app command: uvicorn main:app --host 0.0.0.0 --port 8000 --reloadnetworks: app_network: driver: bridgevolumes: postgres_data:
初始化SQL
sql
-- init.sql-- 创建扩展CREATE EXTENSION IF NOT EXISTS "uuid-ossp";CREATE EXTENSION IF NOT EXISTS "pg_trgm";-- 创建函数CREATE OR REPLACE FUNCTION update_updated_at_column()RETURNS TRIGGER AS $$BEGIN NEW.updated_at = CURRENT_TIMESTAMP; RETURN NEW;END;$$ language 'plpgsql';
总结
本教程涵盖了Python与PostgreSQL集成的各个方面:
基础连接:使用psycopg2建立连接,使用上下文管理器管理资源
CRUD操作:完整的增删改查操作,包括批量操作和复杂查询
事务管理:确保数据一致性的重要机制
连接池:提高应用性能的连接管理
ORM框架:使用SQLAlchemy简化数据库操作
异步操作:使用asyncpg进行高性能异步操作
最佳实践:配置管理、错误处理、性能优化等
完整项目:包含Docker配置的完整应用示例
关键要点:
始终使用参数化查询防止SQL注入
合理使用事务确保数据一致性
使用连接池管理数据库连接
实现适当的错误处理和重试机制
定期进行数据库维护和优化
在生产环境中使用环境变量管理配置
后续学习建议:
学习PostgreSQL高级特性:窗口函数、CTE、JSONB等
深入了解数据库索引优化
学习数据库分片和复制
探索更多ORM框架(如Django ORM、Peewee等)
学习数据库迁移工具(Alembic)
这个教程提供了全面的Python与PostgreSQL集成指南,适合从初学者到中级开发者的学习。建议按照步骤实践每个部分,以加深理解。