别上来就 cursor.execute(),先把这几个动作做对
很多人第一次用 Python 连 MySQL,代码大概都是这么起手的:
import pymysql
conn = pymysql.connect(
host="127.0.0.1",
port=3306,
user="root",
password="123456",
database="test"
)
cursor = conn.cursor()
cursor.execute("select * from user")
print(cursor.fetchall())
能跑,确实能跑。
但这类代码一放到真实项目里,后面很快就会遇到几个问题:事务忘了提交、异常没回滚、连接没关、SQL 参数拼接不安全、查询结果不好用。写脚本时不明显,一接业务接口,坑就出来了。
我下面不打算讲太虚的概念,直接按平时开发里最常见的几个动作来过一遍:连接、查、增删改、事务、批量、连接池。写完你基本就能把一个小服务接起来了。行文风格我参考了你给的技术文那种偏实战、带代码推进的写法。
先装库
现在 Python 操作 MySQL,常见的有两个方向:
pymysql:纯 Python,轻便,脚本里很常见sqlalchemy:更像工程化方案,ORM 和原生 SQL 都能玩
先从最直接的 pymysql 讲,因为很多排查现场、一次性脚本、管理后台,最后落的还是它。
安装:
pip install pymysql
很多人上来只关心能不能连通,其实连接参数里有两个地方很容易漏:
建议先这样写:
import pymysql
conn = pymysql.connect(
host="127.0.0.1",
port=3306,
user="app_user",
password="app_pwd",
database="demo",
charset="utf8mb4",
autocommit=False
)
这里我一般会显式写 utf8mb4,不然后面你库里存个 emoji,或者某些特殊字符,排查起来挺烦。
autocommit=False 也建议明确写出来。因为你一旦有更新操作,事务边界最好自己控制,不要靠默认行为猜。
先建一张简单表,假设有这么个用户表:
CREATETABLE user_account (
idBIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(64) NOTNULL,
statusTINYINTNOTNULLDEFAULT1,
balance DECIMAL(10,2) NOTNULLDEFAULT0.00,
created_at DATETIME NOTNULLDEFAULTCURRENT_TIMESTAMP,
KEY idx_status_created (status, created_at)
);
查一条
import pymysql
conn = pymysql.connect(
host="127.0.0.1",
user="app_user",
password="app_pwd",
database="demo",
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor
)
with conn.cursor() as cursor:
sql = """
select id, username, status, balance
from user_account
where id = %s
"""
cursor.execute(sql, (1001,))
row = cursor.fetchone()
print(row)
这里有两个小点:
第一,cursorclass=DictCursor 很实用。 默认返回的是元组,查出来像 (1001, 'dongge', 1, Decimal('88.00')),临时脚本还能忍,业务代码里可读性很一般。字典结果顺手很多。
第二,参数不要手拼。
别写成这样:
user_id = "1001 or 1=1"
sql = f"select * from user_account where id = {user_id}"
你哪怕不是给外部接口用,自己后台脚本里这么拼,后面也容易出事。参数化是最省心的。
查多条
with conn.cursor() as cursor:
sql = """
select id, username, balance
from user_account
where status = %s
order by id desc
limit %s
"""
cursor.execute(sql, (1, 20))
rows = cursor.fetchall()
for row in rows:
print(row)
conn = pymysql.connect(
host="127.0.0.1",
user="app_user",
password="app_pwd",
database="demo",
charset="utf8mb4",
autocommit=False
)
try:
with conn.cursor() as cursor:
sql = """
insert into user_account(username, status, balance)
values(%s, %s, %s)
"""
cursor.execute(sql, ("zhangsan", 1, 200.50))
new_id = cursor.lastrowid
conn.commit()
print("新用户ID:", new_id)
except Exception:
conn.rollback()
raise
finally:
conn.close()
这里其实就是最基础的一套动作:
很多线上数据不一致,不一定是多复杂的问题,真有一部分就是少了这几行。
比如余额加钱:
try:
with conn.cursor() as cursor:
sql = """
update user_account
set balance = balance + %s
where id = %s and status = 1
"""
affected = cursor.execute(sql, (50.00, 1001))
if affected != 1:
raise ValueError(f"更新失败,影响行数={affected}")
conn.commit()
except Exception:
conn.rollback()
raise
这里我平时会盯一下 affected。
因为业务里你经常会带条件更新,比如 status = 1、version = ?、deleted = 0 这种。如果影响行数是 0,有时不是 SQL 错,是你条件不满足了。
排查现场最怕的不是报错,是 SQL 没报错但根本没改到数据。
物理删除:
with conn.cursor() as cursor:
sql = "delete from user_account where id = %s"
affected = cursor.execute(sql, (1001,))
conn.commit()
很多业务里我更偏向逻辑删,比如加个 deleted 字段:
ALTERTABLE user_account ADDCOLUMN deleted TINYINTNOTNULLDEFAULT0;
CREATEINDEX idx_status_deleted ON user_account(status, deleted);
然后代码里改成:
with conn.cursor() as cursor:
sql = """
update user_account
set deleted = 1
where id = %s and deleted = 0
"""
cursor.execute(sql, (1001,))
conn.commit()
这样后面查问题、补数据、恢复数据都方便一点。
- 事务别只会
commit,两步更新要么一起成,要么一起回滚
举个很典型的转账:
deftransfer(conn, from_user_id: int, to_user_id: int, amount: float):
try:
with conn.cursor() as cursor:
sql1 = """
update user_account
set balance = balance - %s
where id = %s and balance >= %s and status = 1
"""
n1 = cursor.execute(sql1, (amount, from_user_id, amount))
if n1 != 1:
raise ValueError("扣款失败,余额不足或账户状态异常")
sql2 = """
update user_account
set balance = balance + %s
where id = %s and status = 1
"""
n2 = cursor.execute(sql2, (amount, to_user_id))
if n2 != 1:
raise ValueError("收款失败,目标账户不存在或状态异常")
conn.commit()
except Exception:
conn.rollback()
raise
这种地方就不能图省事了。
第一条扣成功,第二条加失败,如果你不回滚,账就歪了。
先看一个很多人会顺手写出来的版本:
for name in ["u1", "u2", "u3"]:
cursor.execute(
"insert into user_account(username, status, balance) values(%s, %s, %s)",
(name, 1, 0)
)
能用,但量一大就慢。
改成 executemany:
data = [
("u1", 1, 0),
("u2", 1, 10),
("u3", 1, 20),
]
try:
with conn.cursor() as cursor:
sql = """
insert into user_account(username, status, balance)
values(%s, %s, %s)
"""
cursor.executemany(sql, data)
conn.commit()
except Exception:
conn.rollback()
raise
如果是几千、几万条,再往上走时,我一般会分批,不会一次全塞进去。
像这样:
defbatch_insert_users(conn, rows, batch_size=500):
sql = """
insert into user_account(username, status, balance)
values(%s, %s, %s)
"""
try:
with conn.cursor() as cursor:
for i in range(0, len(rows), batch_size):
batch = rows[i:i + batch_size]
cursor.executemany(sql, batch)
conn.commit()
except Exception:
conn.rollback()
raise
这类分批处理的思路,在数据库写入、消息投递、批量更新里都挺常见,写法不复杂,但能少踩不少坑。
有时候你不想每次都写一堆重复逻辑,可以简单包一层:
import pymysql
from contextlib import contextmanager
@contextmanager
defget_conn():
conn = pymysql.connect(
host="127.0.0.1",
port=3306,
user="app_user",
password="app_pwd",
database="demo",
charset="utf8mb4",
autocommit=False,
cursorclass=pymysql.cursors.DictCursor
)
try:
yield conn
finally:
conn.close()
defquery_one(sql, params=None):
with get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute(sql, params or ())
return cursor.fetchone()
defexecute(sql, params=None):
with get_conn() as conn:
try:
with conn.cursor() as cursor:
affected = cursor.execute(sql, params or ())
conn.commit()
return affected
except Exception:
conn.rollback()
raise
调用起来会干净一点:
user = query_one(
"select id, username, balance from user_account where id = %s",
(1001,)
)
print(user)
affected = execute(
"update user_account set status = %s where id = %s",
(0, 1001)
)
print("影响行数:", affected)
脚本里临时 connect() 问题不大。 Web 服务、定时任务、消费程序里,如果每次请求都新建连接,开销就比较明显了。
这时候更建议上 SQLAlchemy 的连接池能力。哪怕你不用 ORM,只拿它管连接池也值。
安装:
pip install sqlalchemy pymysql
用 SQLAlchemy 执行原生 SQL
from sqlalchemy import create_engine, text
engine = create_engine(
"mysql+pymysql://app_user:app_pwd@127.0.0.1:3306/demo?charset=utf8mb4",
pool_size=10,
max_overflow=20,
pool_recycle=1800,
pool_pre_ping=True
)
with engine.connect() as conn:
result = conn.execute(
text("""
select id, username, balance
from user_account
where status = :status
limit 10
"""),
{"status": 1}
)
for row in result.mappings():
print(dict(row))
这里 pool_pre_ping=True 挺实用,能减少“连接断了但池子里还以为能用”的问题。
带事务的写法
from sqlalchemy import text
with engine.begin() as conn:
conn.execute(
text("""
update user_account
set balance = balance - :amount
where id = :uid and balance >= :amount
"""),
{"amount": 20, "uid": 1001}
)
conn.execute(
text("""
update user_account
set balance = balance + :amount
where id = :uid
"""),
{"amount": 20, "uid": 1002}
)
begin() 这个写法挺省事,代码块里成功就提交,异常就自动回滚。
查慢 SQL 时,Python 代码也要一起看
有时候大家会把锅全甩给 MySQL,其实 Python 侧也能把数据库拖慢。
比如分页接口:
deflist_users(conn, page_no: int, page_size: int):
offset = (page_no - 1) * page_size
with conn.cursor() as cursor:
sql = """
select id, username, balance, created_at
from user_account
where status = %s
order by created_at desc
limit %s, %s
"""
cursor.execute(sql, (1, offset, page_size))
return cursor.fetchall()
这段代码看着没毛病,但如果 page_no 很大,limit offset, size 本身就可能慢。
这时不要只盯 Python 代码,要去库里把 SQL 单独拉出来看执行计划:
EXPLAIN
selectid, username, balance, created_at
from user_account
wherestatus = 1
orderby created_at desc
limit100000, 20;
如果发现扫了很多行,那问题就不是 cursor.execute() 慢,而是分页方案本身不对。后面可能要改成基于游标翻页,或者补更合适的索引。
也就是说,Python 操作 MySQL,不只是“会连会查”就够了。真到了接口超时、任务堆积、CPU 飙高的时候,最后还是得回到 SQL 本身。 11. 这几个细节,平时最容易漏
1)永远别手拼参数
# 不要这样
sql = f"select * from user_account where username = '{username}'"
老老实实参数化:
sql = "select * from user_account where username = %s"
cursor.execute(sql, (username,))
2)更新后看影响行数
很多“更新成功”的日志其实是自我安慰,数据库里一行都没动。
3)异常里记得回滚
尤其是一组 SQL 连着跑的时候。
4)连接记得关
脚本偶尔跑一次问题不大,服务里不关连接,后面连接数满了,业务线程就在那排队。
5)批量操作分批做
别一把塞十几万条进去,然后开始怀疑数据库。
import pymysql
from pymysql.cursors import DictCursor
classUserRepo:
def__init__(self):
self.conn = pymysql.connect(
host="127.0.0.1",
port=3306,
user="app_user",
password="app_pwd",
database="demo",
charset="utf8mb4",
autocommit=False,
cursorclass=DictCursor
)
defclose(self):
self.conn.close()
defget_by_id(self, user_id: int):
with self.conn.cursor() as cursor:
cursor.execute(
"""
select id, username, status, balance, created_at
from user_account
where id = %s and deleted = 0
""",
(user_id,)
)
return cursor.fetchone()
defcreate(self, username: str, balance: float = 0):
try:
with self.conn.cursor() as cursor:
cursor.execute(
"""
insert into user_account(username, status, balance, deleted)
values(%s, 1, %s, 0)
""",
(username, balance)
)
user_id = cursor.lastrowid
self.conn.commit()
return user_id
except Exception:
self.conn.rollback()
raise
defupdate_balance(self, user_id: int, delta: float):
try:
with self.conn.cursor() as cursor:
affected = cursor.execute(
"""
update user_account
set balance = balance + %s
where id = %s and deleted = 0
""",
(delta, user_id)
)
if affected != 1:
raise ValueError("用户不存在或已删除")
self.conn.commit()
except Exception:
self.conn.rollback()
raise
defsoft_delete(self, user_id: int):
try:
with self.conn.cursor() as cursor:
cursor.execute(
"""
update user_account
set deleted = 1
where id = %s and deleted = 0
""",
(user_id,)
)
self.conn.commit()
except Exception:
self.conn.rollback()
raise
这个类不算多高级,但日常小项目、小后台、小工具里已经够用了。
Python 操作 MySQL,表面看就是几行 execute()。 真正拉开差距的,不是“会不会连库”,而是你写出来的代码,出异常时会不会回滚,查询多了会不会拖垮连接,批量写入时会不会把事务撑太大,出慢请求时会不会顺手把 SQL 拿出来单查一下。
刚开始先把 pymysql 用熟,够了。 等你项目里连接多了、事务多了、查询逻辑复杂了,再把连接池和 SQLAlchemy 补上,会顺很多。