大家好,今天我们来聊聊Python操作MySQL数据库这个话题。别看这玩意儿简单,但新手朋友经常在这上面摔跟头。
今天我就手把手教你,从零开始学习使用PyMySQL操作MySQL数据库,让你一次掌握,从此告别"Python连接数据库总出错"的尴尬!
为什么选择PyMySQL?
在开始学习之前,先聊聊为什么我们要选择PyMySQL:
- 纯Python实现:完全用Python编写,不需要额外的C库依赖
- 兼容性好:支持Python 3.6+和MySQL 5.6+
安装PyMySQL
环境要求
在安装之前,确保你已经:
安装步骤
# 使用pip安装PyMySQLpip install PyMySQL# 验证安装是否成功python -c "import pymysql; print(pymysql.__version__)"
如果看到版本号输出,说明安装成功。
建立数据库连接
基本连接方式
import pymysql# 建立数据库连接connection = pymysql.connect( host='localhost', # 数据库主机地址 port=3306, # 数据库端口 user='root', # 用户名 password='your_password', # 密码 database='test_db', # 数据库名 charset='utf8mb4'# 字符集)# 关闭连接connection.close()
连接参数详解
import pymysql# 完整的连接参数配置connection = pymysql.connect( host='localhost', # 主机地址 port=3306, # 端口号 user='your_username', # 用户名 password='your_password', # 密码 database='your_database', # 数据库名 charset='utf8mb4', # 字符集 autocommit=False, # 是否自动提交 connect_timeout=10, # 连接超时时间 read_timeout=10, # 读取超时时间 write_timeout=10, # 写入超时时间 cursorclass=pymysql.cursors.DictCursor # 游标类型)
使用上下文管理器
推荐使用with语句管理连接,确保连接正确关闭:
import pymysql# 使用with语句自动管理连接with pymysql.connect( host='localhost', user='root', password='your_password', database='test_db', charset='utf8mb4') as connection:# 在这里执行数据库操作pass# 连接会自动关闭
创建游标对象
游标对象用于执行SQL语句和获取结果:
import pymysql# 建立连接connection = pymysql.connect( host='localhost', user='root', password='your_password', database='test_db', charset='utf8mb4')# 创建游标对象cursor = connection.cursor()# 执行SQL语句cursor.execute("SELECT VERSION()")# 获取结果result = cursor.fetchone()print(f"MySQL版本: {result[0]}")# 关闭游标和连接cursor.close()connection.close()
不同类型的游标
import pymysql.cursors# 默认游标 - 返回元组with pymysql.connect(...) as connection: cursor = connection.cursor() cursor.execute("SELECT * FROM users") result = cursor.fetchall() # ((1, 'Alice', 25), (2, 'Bob', 30))# 字典游标 - 返回字典with pymysql.connect( cursorclass=pymysql.cursors.DictCursor) as connection: cursor = connection.cursor() cursor.execute("SELECT * FROM users") result = cursor.fetchall() # [{'id': 1, 'name': 'Alice', 'age': 25}, ...]# 字典游标的优势是可以通过键名访问字段print(result[0]['name']) # 输出: Alice
数据库基本操作
创建数据库和表
import pymysql# 连接MySQL服务器(不指定数据库)connection = pymysql.connect( host='localhost', user='root', password='your_password', charset='utf8mb4')try:with connection.cursor() as cursor:# 创建数据库 cursor.execute("CREATE DATABASE IF NOT EXISTS myapp")# 选择数据库 cursor.execute("USE myapp")# 创建用户表 create_table_sql = """ CREATE TABLE IF NOT EXISTS users ( id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(50) NOT NULL UNIQUE, email VARCHAR(100) NOT NULL UNIQUE, age INT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """ cursor.execute(create_table_sql)# 提交事务 connection.commit()finally: connection.close()
插入数据
import pymysqlfrom datetime import datetime# 插入单条数据definsert_user(username, email, age): connection = pymysql.connect( host='localhost', user='root', password='your_password', database='myapp', charset='utf8mb4' )try:with connection.cursor() as cursor:# 使用参数化查询防止SQL注入 sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)" cursor.execute(sql, (username, email, age))# 提交事务 connection.commit() print("用户插入成功")except Exception as e:# 回滚事务 connection.rollback() print(f"插入失败: {e}")finally: connection.close()# 调用函数insert_user("张三", "zhangsan@example.com", 25)
批量插入数据
import pymysql# 批量插入数据defbatch_insert_users(users_data): connection = pymysql.connect( host='localhost', user='root', password='your_password', database='myapp', charset='utf8mb4' )try:with connection.cursor() as cursor: sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"# executemany用于批量插入 cursor.executemany(sql, users_data) connection.commit() print(f"成功插入{len(users_data)}条用户数据")except Exception as e: connection.rollback() print(f"批量插入失败: {e}")finally: connection.close()# 准备批量数据users = [ ("李四", "lisi@example.com", 28), ("王五", "wangwu@example.com", 32), ("赵六", "zhaoliu@example.com", 29)]# 执行批量插入batch_insert_users(users)
查询数据
import pymysql# 查询单条数据defget_user_by_id(user_id): connection = pymysql.connect( host='localhost', user='root', password='your_password', database='myapp', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor )try:with connection.cursor() as cursor: sql = "SELECT * FROM users WHERE id = %s" cursor.execute(sql, (user_id,)) result = cursor.fetchone()return resultfinally: connection.close()# 查询多条数据defget_users_by_age(min_age): connection = pymysql.connect( host='localhost', user='root', password='your_password', database='myapp', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor )try:with connection.cursor() as cursor: sql = "SELECT * FROM users WHERE age >= %s ORDER BY age" cursor.execute(sql, (min_age,)) results = cursor.fetchall()return resultsfinally: connection.close()# 使用示例user = get_user_by_id(1)if user: print(f"用户信息: {user}")users = get_users_by_age(25)print("年龄大于等于25的用户:")for user in users: print(f" {user['username']}, {user['age']}岁")
更新数据
import pymysql# 更新用户信息defupdate_user(user_id, username=None, email=None, age=None): connection = pymysql.connect( host='localhost', user='root', password='your_password', database='myapp', charset='utf8mb4' )try:with connection.cursor() as cursor:# 动态构建更新语句 updates = [] params = []if username: updates.append("username = %s") params.append(username)if email: updates.append("email = %s") params.append(email)if age isnotNone: updates.append("age = %s") params.append(age)ifnot updates: print("没有需要更新的字段")return# 添加WHERE条件参数 params.append(user_id) sql = f"UPDATE users SET {', '.join(updates)} WHERE id = %s" cursor.execute(sql, params) connection.commit() print("用户信息更新成功")except Exception as e: connection.rollback() print(f"更新失败: {e}")finally: connection.close()# 使用示例update_user(1, age=26, email="newemail@example.com")
删除数据
import pymysql# 删除用户defdelete_user(user_id): connection = pymysql.connect( host='localhost', user='root', password='your_password', database='myapp', charset='utf8mb4' )try:with connection.cursor() as cursor: sql = "DELETE FROM users WHERE id = %s" cursor.execute(sql, (user_id,)) connection.commit() print("用户删除成功")except Exception as e: connection.rollback() print(f"删除失败: {e}")finally: connection.close()# 使用示例delete_user(1)
事务处理
基本事务操作
import pymysql# 事务示例:转账操作deftransfer_money(from_user_id, to_user_id, amount): connection = pymysql.connect( host='localhost', user='root', password='your_password', database='myapp', charset='utf8mb4' )try:with connection.cursor() as cursor:# 开始事务 connection.begin()# 检查转出用户余额 cursor.execute("SELECT balance FROM accounts WHERE user_id = %s", (from_user_id,)) from_balance = cursor.fetchone()ifnot from_balance or from_balance[0] < amount:raise Exception("余额不足")# 扣除转出用户余额 cursor.execute("UPDATE accounts SET balance = balance - %s WHERE user_id = %s", (amount, from_user_id))# 增加转入用户余额 cursor.execute("UPDATE accounts SET balance = balance + %s WHERE user_id = %s", (amount, to_user_id))# 记录转账日志 cursor.execute(""" INSERT INTO transfer_logs (from_user_id, to_user_id, amount, transfer_time) VALUES (%s, %s, %s, NOW()) """, (from_user_id, to_user_id, amount))# 提交事务 connection.commit() print("转账成功")except Exception as e:# 回滚事务 connection.rollback() print(f"转账失败: {e}")finally: connection.close()# 使用示例transfer_money(1, 2, 100.00)
高级查询技巧
分页查询
import pymysql# 分页查询用户列表defget_users_paginated(page, page_size): connection = pymysql.connect( host='localhost', user='root', password='your_password', database='myapp', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor )try:with connection.cursor() as cursor: offset = (page - 1) * page_size sql = "SELECT * FROM users LIMIT %s OFFSET %s" cursor.execute(sql, (page_size, offset)) users = cursor.fetchall()# 获取总记录数 cursor.execute("SELECT COUNT(*) as total FROM users") total = cursor.fetchone()['total']return {'users': users,'total': total,'page': page,'page_size': page_size,'total_pages': (total + page_size - 1) // page_size }finally: connection.close()# 使用示例result = get_users_paginated(1, 10)print(f"第{result['page']}页,共{result['total_pages']}页")for user in result['users']: print(f" {user['username']}")
条件查询
import pymysql# 复杂条件查询defsearch_users(username=None, min_age=None, max_age=None, email_domain=None): connection = pymysql.connect( host='localhost', user='root', password='your_password', database='myapp', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor )try:with connection.cursor() as cursor:# 动态构建查询条件 conditions = [] params = []if username: conditions.append("username LIKE %s") params.append(f"%{username}%")if min_age isnotNone: conditions.append("age >= %s") params.append(min_age)if max_age isnotNone: conditions.append("age <= %s") params.append(max_age)if email_domain: conditions.append("email LIKE %s") params.append(f"%@{email_domain}")# 构建SQL语句if conditions: sql = f"SELECT * FROM users WHERE {' AND '.join(conditions)} ORDER BY created_at DESC"else: sql = "SELECT * FROM users ORDER BY created_at DESC" cursor.execute(sql, params) results = cursor.fetchall()return resultsfinally: connection.close()# 使用示例users = search_users(username="张", min_age=20, max_age=30)print("搜索结果:")for user in users: print(f" {user['username']}, {user['age']}岁, {user['email']}")
错误处理和最佳实践
异常处理
import pymysqlfrom pymysql import IntegrityError, OperationalError# 完整的错误处理示例defsafe_database_operation(): connection = Nonetry:# 建立连接 connection = pymysql.connect( host='localhost', user='root', password='your_password', database='myapp', charset='utf8mb4', autocommit=False, connect_timeout=5 )with connection.cursor() as cursor:# 执行数据库操作 sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)" cursor.execute(sql, ("test_user", "test@example.com", 25))# 提交事务 connection.commit() print("操作成功")except IntegrityError as e:# 处理数据完整性错误(如唯一约束冲突)if connection: connection.rollback() print(f"数据完整性错误: {e}")except OperationalError as e:# 处理操作错误(如连接失败) print(f"数据库操作错误: {e}")except Exception as e:# 处理其他异常if connection: connection.rollback() print(f"未知错误: {e}")finally:# 确保连接关闭if connection: connection.close()# 调用函数safe_database_operation()
连接池使用
import pymysqlfrom dbutils.pooled_db import PooledDB# 创建连接池pool = PooledDB( creator=pymysql, host='localhost', port=3306, user='root', password='your_password', database='myapp', charset='utf8mb4', maxconnections=20, # 最大连接数 mincached=2, # 最小缓存连接数 maxcached=5, # 最大缓存连接数 maxshared=3, # 最大共享连接数 blocking=True, # 连接池满时是否阻塞等待 maxusage=None, # 单个连接最大复用次数 setsession=[], # 开始会话前执行的命令列表 ping=0# ping MySQL服务端)# 使用连接池defget_user_with_pool(user_id):# 从连接池获取连接 connection = pool.connection()try:with connection.cursor() as cursor: cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) result = cursor.fetchone()return resultfinally:# 关闭连接(归还到连接池) connection.close()# 使用示例user = get_user_with_pool(1)if user: print(f"用户信息: {user}")
安全最佳实践
防止SQL注入
import pymysql# 正确的做法:使用参数化查询defsafe_query(username): connection = pymysql.connect( host='localhost', user='root', password='your_password', database='myapp', charset='utf8mb4' )try:with connection.cursor() as cursor:# 使用参数化查询,防止SQL注入 sql = "SELECT * FROM users WHERE username = %s" cursor.execute(sql, (username,)) result = cursor.fetchall()return resultfinally: connection.close()# 错误的做法:字符串拼接(容易被SQL注入)defunsafe_query(username): connection = pymysql.connect( host='localhost', user='root', password='your_password', database='myapp', charset='utf8mb4' )try:with connection.cursor() as cursor:# 危险!容易被SQL注入攻击 sql = f"SELECT * FROM users WHERE username = '{username}'" cursor.execute(sql) # 不要这样做! result = cursor.fetchall()return resultfinally: connection.close()# 安全查询示例users = safe_query("admin'; DROP TABLE users; --")print(users) # 只会查找用户名为 "admin'; DROP TABLE users; --" 的用户
密码安全管理
import pymysqlimport osfrom dotenv import load_dotenv# 加载环境变量load_dotenv()# 从环境变量获取数据库配置defget_db_config():return {'host': os.getenv('DB_HOST', 'localhost'),'port': int(os.getenv('DB_PORT', 3306)),'user': os.getenv('DB_USER', 'root'),'password': os.getenv('DB_PASSWORD'),'database': os.getenv('DB_NAME', 'myapp'),'charset': 'utf8mb4' }# 使用环境变量连接数据库defconnect_with_env(): config = get_db_config() connection = pymysql.connect(**config)return connection# 创建.env文件示例:"""DB_HOST=localhostDB_PORT=3306DB_USER=myuserDB_PASSWORD=mypasswordDB_NAME=myapp"""
性能优化技巧
批量操作优化
import pymysql# 高效的批量插入defefficient_batch_insert(data_list): connection = pymysql.connect( host='localhost', user='root', password='your_password', database='myapp', charset='utf8mb4' )try:with connection.cursor() as cursor:# 使用事务和批量插入提高性能 connection.begin() sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)" cursor.executemany(sql, data_list) connection.commit() print(f"批量插入{len(data_list)}条记录成功")except Exception as e: connection.rollback() print(f"批量插入失败: {e}")finally: connection.close()# 准备大量数据进行测试large_data = [ (f"user_{i}", f"user_{i}@example.com", 20 + (i % 50))for i in range(1000)]# 执行批量插入efficient_batch_insert(large_data)
查询优化
import pymysql# 使用索引优化查询defoptimized_query(): connection = pymysql.connect( host='localhost', user='root', password='your_password', database='myapp', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor )try:with connection.cursor() as cursor:# 为经常查询的字段创建索引 cursor.execute("CREATE INDEX idx_username ON users(username)") cursor.execute("CREATE INDEX idx_email ON users(email)") cursor.execute("CREATE INDEX idx_age ON users(age)")# 使用EXPLAIN分析查询性能 cursor.execute("EXPLAIN SELECT * FROM users WHERE username = 'admin'") explain_result = cursor.fetchall() print("查询执行计划:")for row in explain_result: print(row)finally: connection.close()
结语
到这里,PyMySQL操作数据库的全面学习就完成了!从安装配置、基本操作到高级技巧,每一步都详细讲解了。
记住几个关键点:
PyMySQL是Python操作MySQL数据库的强大工具,掌握它对于后端开发人员来说至关重要。后续我们还会分享更多数据库操作的实战技巧,记得关注我们的公众号"服务端技术精选"!
觉得这篇文章对你有帮助吗?欢迎点赞、在看、转发三连,你的支持是我们持续创作的最大动力!
服务端技术精选 | 专注分享实用的后端技术干货