普通查询
在 MySQLdb 库中,默认的客户端游标是 MySQLdb.cursors.Cursor(也常被称为 DictCursor 之外的 “基础游标” 或 “元组游标”),它属于客户端游标(Client-side Cursor) 范畴。
客户端游标的特性:
- 数据加载方式:执行
cursor.execute() 后,MySQL 服务器会将所有查询结果一次性返回并加载到客户端内存中,之后 fetchone()/fetchall() 都是从本地内存读取数据。 - 返回格式:默认返回元组(tuple),例如
(1, '张三', 25),也可以通过 DictCursor 改为返回字典({'id':1, 'name':'张三', 'age':25})。 - 适用场景:小数据量查询(结果集在内存可承受范围内),优点是本地读取速度快。
代码示例:使用默认游标
import MySQLdbfrom MySQLdb.cursors import Cursor # 默认游标,可省略(导入即默认)# 建立连接conn = MySQLdb.connect( host='localhost', user='gmroot', password='password', db='db_name', charset='utf8')# 创建默认游标(不指定 cursorclass 即为 Cursor)cursor = conn.cursor() # 等价于 conn.cursor(Cursor)# 执行查询(此时所有结果已加载到本地内存)cursor.execute("select * from mock_users limit 100")# 逐行读取(从本地内存读取)row = cursor.fetchone()while row:print(row) row = cursor.fetchone()# 关闭资源cursor.close()conn.close()
除了基础的 Cursor, MySQLdb还提供了常用的游标子类:
DictCursor:默认游标的变体,返回字典格式(键为列名),仍属于客户端游标;SSDictCursor:SSCursor 的变体,返回字典格式的服务器端游标。
流式查询
流式查询(Streaming Query)的核心是避免一次性将所有查询结果加载到客户端内存,而是通过「服务器端游标」逐行从 MySQL 服务器读取数据,仅在本地缓存当前行数据,从而大幅降低内存占用。
Python 实现MySQL 流式查询的两种方式
| | |
|---|
| PyMySQL | | SSCursor |
| mysqlclient(MySQLdb) | | SSCursor |
** 以MySQLdb使用SSCursor
import MySQLdbimport loggingfrom contextlib import suppress# 初始化日志logger = logging.getLogger(__name__)logging.basicConfig(level=logging.INFO)classMySQLStreamQuery:def__init__(self, host, user, password, db, charset="utf8mb4"): self.host = host self.user = user self.password = password self.db = db self.charset = charset self.conn = None# 保存连接对象,方便关闭defget_connection(self):"""创建/复用数据库连接"""if not self.conn or not self.conn.open: self.conn = MySQLdb.connect( host=self.host, user=self.user, password=self.password, database=self.db, charset=self.charset ) self.conn.autocommit(True) # 开启自动提交,避免事务阻塞return self.conndefclose(self):"""统一关闭数据库连接"""with suppress(Exception):if self.conn and self.conn.open: self.conn.close() self.conn = None# 重置连接对象defstream_query( self, sql, parameters=None, limit_num=0, close_conn=True, **kwargs ):""" 流式查询核心方法(封装生成器版本) :param sql: 查询SQL :param parameters: SQL参数(防止注入) :param limit_num: 返回行数限制(0=无限制) :param close_conn: 迭代结束后是否关闭连接 :return: 生成器(逐行返回数据) """ result = {"rows": None,"error": None,"column_list": [],"column_type": [] } max_execution_time = kwargs.get("max_execution_time", 0) cursor = Nonetry:# 1. 获取连接并创建服务器端游标(SSCursor) conn = self.get_connection() cursor = conn.cursor(MySQLdb.cursors.SSCursor)# 2. 设置会话超时参数(兼容低版本MySQL)with suppress(MySQLdb.OperationalError): cursor.execute(f"set session max_execution_time={max_execution_time};") cursor.execute("set session net_write_timeout=3600;") cursor.execute("set session net_read_timeout=3600;")# 3. 执行SQL查询 cursor.execute(sql, parameters or ())# 4. 获取列元数据(可选,按需保留)if cursor.description: result["column_list"] = [field[0] for field in cursor.description]# 若有列类型映射,可补充:result["column_type"] = [column_types_map.get(field[1], "") for field in cursor.description]# 5. 定义核心生成器函数(你指定的格式)defrow_generator(): count = 0try:while True:# 行数限制判断:达到限制则停止if limit_num > 0and count >= limit_num:break# 逐行读取数据(流式核心:每次只取一行,不占内存) row = cursor.fetchone()if not row: # 无数据时终止循环breakyield row count += 1# 每1万行打印进度日志if count % 10000 == 0: logger.info(f"Query progress: {count} rows fetched")except Exception as e: logger.error(f"Error in row generator: {e}", exc_info=True)raise# 抛出异常,让调用方感知finally:# 生成器结束/异常时,自动释放资源with suppress(Exception):if cursor: cursor.close() # 关闭游标if close_conn: self.close() # 关闭连接# 6. 赋值生成器到结果中 result["rows"] = row_generator()except Exception as e: logger.error(f"Query initialization failed: {e}", exc_info=True) result["error"] = str(e)# 异常时清理资源with suppress(Exception):if cursor: cursor.close()if close_conn: self.close()return result# ---------------------- 测试使用示例 ----------------------if __name__ == "__main__":# 初始化查询对象 query_client = MySQLStreamQuery( host='localhost', user='root', password='password', db='db', )# 执行流式查询 sql = "select * from mock_users" params = None query_result = query_client.stream_query( sql=sql, parameters=params, limit_num=100000, # 最多读取10万行 close_conn=True )# 检查是否有初始化异常if query_result["error"]: logger.error(f"Query error: {query_result['error']}")else:# 迭代生成器,逐行处理数据(核心:此时才真正读取数据)try:for row in query_result["rows"]:# 处理单条数据(示例:打印id) print(f"Process row: id={row[0]}")# 你的业务逻辑...except Exception as e: logger.error(f"Data processing error: {e}")
这段代码实现了一个高效的MySQL流式查询功能:
- 使用
SSCursor(服务器端游标)替代默认游标,实现数据流式读取,适合处理大数据量查询 - 通过生成器(generator)逐行返回查询结果,降低内存占用
- 支持设置查询超时时间、网络读写超时,以及结果行数限制
两者比较
PS: 常用的游标变体 除了基础的 Cursor,MySQLdb 还提供了常用的游标子类:
DictCursor:默认游标的变体,返回字典格式(键为列名),仍属于客户端游标;SSDictCursor:SSCursor 的变体,返回字典格式的服务器端游标。
总结
- MySQLdb 的默认客户端游标是
MySQLdb.cursors.Cursor,核心特点是一次性加载所有结果到客户端内存; - 默认游标适合小数据量查询,
SSCursor 是服务器端游标,专为大数据量流式查询设计; - 两者的核心区别在于数据存储位置和加载时机,这直接决定了内存占用和连接使用方式。