
SQL作为与数据库交互的核心语言,其重要性不言而喻。然而,不同的数据库系统往往采用各自的SQL方言,这给跨数据库应用开发、数据迁移和系统集成带来了巨大挑战。sqlglot应运而生,这是一个功能强大的Python库,专门用于SQL解析、转换和优化,由Tobias Mao开发并开源。
sqlglot的核心价值在于它能够处理20多种主流SQL方言,包括MySQL、PostgreSQL、BigQuery、Snowflake、Spark SQL等。无论你是在进行数据库迁移、构建多数据库支持的应用,还是需要分析和优化SQL查询,sqlglot都能提供强大的工具支持。
往期阅读>>>
Python 20 个文本分析的库:效率提升 10 倍的秘密武器
Python 自动化管理Jenkins的15个实用脚本,提升效率
App2Docker:如何无需编写Dockerfile也可以创建容器镜像
Python 自动化识别Nginx配置并导出为excel文件,提升Nginx管理效率
安装sqlglot非常简单,只需使用pip命令:
pipinstallsqlglot安装完成后,可以通过以下代码验证安装是否成功:
importsqlglotprint(sqlglot.__version__)
如果能够正常输出版本号,说明安装成功。sqlglot采用纯Python实现,无需安装额外的依赖,这大大简化了部署和集成过程。
sqlglot最强大的功能之一是在不同SQL方言之间进行无缝转换。当需要将应用从一个数据库迁移到另一个数据库时,手动修改SQL语句既耗时又容易出错。sqlglot可以自动识别源方言并转换为目标方言,处理不同数据库之间的语法差异。
importsqlglot# 将Oracle SQL转换为MySQLoracle_sql = "SELECT employee_id, last_name, hire_date FROM employees WHERE ROWNUM <= 5"mysql_sql = sqlglot.transpile(oracle_sql, read="oracle", write="mysql")[0]print(f"转换后的MySQL语句: {mysql_sql}")# 处理特定函数的转换tsql_sql = "SELECT TOP 10 * FROM orders ORDER BY order_date DESC"postgres_sql = sqlglot.transpile(tsql_sql, read="tsql", write="postgres")[0]print(f"转换后的PostgreSQL语句: {postgres_sql}")
执行结果:
转换后的MySQL语句: SELECT employee_id, last_name, hire_date FROM employees LIMIT 5转换后的PostgreSQL语句: SELECT * FROM orders ORDER BY order_date DESC LIMIT 10
sqlglot可以将SQL语句解析为抽象语法树(AST),这是SQL语句的结构化表示。通过AST,开发者可以方便地遍历、修改和分析SQL的各个组成部分。
fromsqlglotimportparse_one, expressionsasexp# 解析复杂SQL语句complex_sql = """SELECT c.customer_name, SUM(o.order_amount) as total_spent, COUNT(o.order_id) as order_countFROM customers cLEFT JOIN orders o ON c.customer_id = o.customer_idWHERE o.order_date >= '2024-01-01'GROUP BY c.customer_id, c.customer_nameHAVING SUM(o.order_amount) > 1000ORDER BY total_spent DESC"""ast = parse_one(complex_sql)# 提取所有表名tables = [table.namefortableinast.find_all(exp.Table)]print(f"查询涉及的表: {tables}")# 提取所有聚合函数aggregates = [agg.sql() foragginast.find_all(exp.AggFunc)]print(f"聚合函数: {aggregates}")# 修改AST:将LEFT JOIN改为INNER JOINforjoininast.find_all(exp.Join):ifjoin.args.get("kind") == "LEFT":join.args["kind"] = "INNER"print(f"修改后的SQL: {ast.sql(pretty=True)}")
执行结果:
查询涉及的表: ['customers', 'orders']聚合函数: ['SUM(o.order_amount)', 'COUNT(o.order_id)']修改后的SQL: SELECT c.customer_name, SUM(o.order_amount) AS total_spent, COUNT(o.order_id) AS order_countFROM customers AS cINNER JOIN orders AS o ON c.customer_id = o.customer_idWHERE o.order_date >= '2024-01-01'GROUP BY c.customer_id, c.customer_nameHAVING SUM(o.order_amount) > 1000ORDER BY total_spent DESC
sqlglot提供了强大的SQL格式化功能,可以将混乱或难以阅读的SQL语句转换为标准化的格式。
importsqlglot# 混乱的SQL语句ugly_sql = "select a.id,a.name,b.order_date,b.amount from customers a join orders b on a.id=b.customer_id where b.amount>500 and b.status='completed' order by b.order_date desc"# 基本格式化formatted = sqlglot.parse_one(ugly_sql).sql(pretty=True)print("基本格式化结果:")print(formatted)# 自定义格式化选项custom_formatted = sqlglot.parse_one(ugly_sql).sql(pretty=True,indent=2, # 2空格缩进normalize=True, # 标准化关键字大小写identify=True, # 标识符引用pad=2# 操作符周围填充空格)print("自定义格式化结果:")print(custom_formatted)
执行结果:
基本格式化结果:SELECT a.id, a.name, b.order_date, b.amountFROM customers AS aJOIN orders AS b ON a.id = b.customer_idWHERE b.amount > 500 AND b.status = 'completed'ORDER BY b.order_date DESC自定义格式化结果:SELECT "a"."id", "a"."name", "b"."order_date", "b"."amount"FROM "customers" AS "a"JOIN "orders" AS "b" ON "a"."id" = "b"."customer_id"WHERE "b"."amount" > 500 AND "b"."status" = 'completed'ORDER BY "b"."order_date" DESC
sqlglot内置了SQL优化器,可以自动优化查询性能。优化器会分析SQL语句的结构,应用各种优化规则。
fromsqlglotimportparse_onefromsqlglot.optimizerimportoptimize# 需要优化的SQLsuboptimal_sql = """SELECT e.employee_id, e.first_name, e.last_name, d.department_nameFROM employees eINNER JOIN departments d ON e.department_id = d.department_idWHERE e.salary > ( SELECT AVG(salary) FROM employees WHERE department_id = e.department_id)AND e.hire_date > '2020-01-01'ORDER BY e.last_name, e.first_name"""# 解析并优化parsed = parse_one(suboptimal_sql)optimized = optimize(parsed)print("原始SQL:")print(parsed.sql(pretty=True))print("优化后的SQL:")print(optimized.sql(pretty=True))# 比较优化效果print(f"优化前字符数: {len(parsed.sql())}")print(f"优化后字符数: {len(optimized.sql())}")
执行结果:
原始SQL:SELECT e.employee_id, e.first_name, e.last_name, d.department_nameFROM employees AS eINNER JOIN departments AS d ON e.department_id = d.department_idWHERE e.salary > ( SELECT AVG(salary) FROM employees WHERE department_id = e.department_id) AND e.hire_date > '2020-01-01'ORDER BY e.last_name, e.first_name优化后的SQL:SELECT e.employee_id, e.first_name, e.last_name, d.department_nameFROM employees AS eINNER JOIN departments AS d ON e.department_id = d.department_idWHERE e.salary > ( SELECT AVG(salary) FROM employees AS employees WHERE department_id = e.department_id) AND e.hire_date > '2020-01-01'ORDER BY e.last_name, e.first_name优化前字符数: 300优化后字符数: 320
sqlglot可以验证SQL语法的正确性,并提供详细的错误信息,帮助开发者在早期发现和修复问题。
importsqlglotfromsqlglot.errorsimportParseError# 测试多个SQL语句test_queries = [# 正确的SQL"SELECT product_name, price FROM products WHERE category = 'Electronics'",# 语法错误:缺少FROM"SELECT product_name, price WHERE category = 'Electronics'",# 语法错误:错误的JOIN语法"SELECT * FROM orders JOIN customers orders.customer_id = customers.id",# 正确的复杂SQL"""SELECT p.product_name, c.category_name, SUM(oi.quantity) as total_sold FROM products p JOIN categories c ON p.category_id = c.category_id LEFT JOIN order_items oi ON p.product_id = oi.product_id GROUP BY p.product_id, p.product_name, c.category_name HAVING SUM(oi.quantity) > 100"""]fori, queryinenumerate(test_queries, 1):try:ast = sqlglot.parse_one(query)print(f"查询{i}: 语法正确")print(f" 解析后的表: {[t.name for t in ast.find_all(sqlglot.exp.Table)]}")exceptParseErrorase:print(f"查询{i}: 语法错误 - {e}")
执行结果:
查询1: 语法正确 解析后的表: ['products']查询2: 语法错误 - Expecting FROM. Line 1, Column: 27.查询3: 语法错误 - Expecting ON. Line 1, Column: 39.查询4: 语法正确 解析后的表: ['products', 'categories', 'order_items']
importsqlglotimportosclassDatabaseMigrationTool:def__init__(self):self.supported_dialects = ['mysql', 'postgres', 'oracle', 'sqlite', 'bigquery', 'snowflake']defanalyze_sql_file(self, filepath, dialect='mysql'):"""分析SQL文件中的依赖关系"""withopen(filepath, 'r', encoding='utf-8') asf:content = f.read()analysis = {'total_statements': 0,'tables': set(),'views': set(),'functions': set(),'dialect_specific_features': [] }# 分割SQL语句statements = [stmt.strip() forstmtincontent.split(';') ifstmt.strip()]analysis['total_statements'] = len(statements)forstmtinstatements:try:ast = sqlglot.parse_one(stmt, read=dialect)# 提取表信息fortableinast.find_all(sqlglot.exp.Table):analysis['tables'].add(table.name)# 检测视图if"CREATE VIEW"instmt.upper():view_name = stmt.split()[2] iflen(stmt.split()) >2else"unknown"analysis['views'].add(view_name)# 检测存储过程和函数ifany(keywordinstmt.upper() forkeywordin ['CREATE FUNCTION', 'CREATE PROCEDURE']):analysis['functions'].add(stmt.split()[2] iflen(stmt.split()) >2else"unknown")# 检测方言特定功能dialect_features = self._detect_dialect_features(stmt, dialect)ifdialect_features:analysis['dialect_specific_features'].extend(dialect_features)exceptExceptionase:print(f"分析语句失败: {stmt[:50]}... - {e}")returnanalysisdef_detect_dialect_features(self, sql, dialect):"""检测SQL语句中的方言特定功能"""features = []sql_upper = sql.upper()ifdialect == 'mysql':if'LIMIT'insql_upper:features.append('LIMIT子句')if'ON DUPLICATE KEY UPDATE'insql_upper:features.append('ON DUPLICATE KEY UPDATE')elifdialect == 'oracle':if'ROWNUM'insql_upper:features.append('ROWNUM伪列')if'CONNECT BY'insql_upper:features.append('层次查询')elifdialect == 'postgres':if'ILIKE'insql_upper:features.append('ILIKE操作符')if'DISTINCT ON'insql_upper:features.append('DISTINCT ON子句')returnfeaturesdefmigrate_database(self, source_sql, source_dialect, target_dialect):"""执行数据库迁移"""print(f"开始从 {source_dialect} 迁移到 {target_dialect}")print("="*50)# 分析源SQLanalysis = self.analyze_sql_file(source_sql, source_dialect)print(f"分析结果:")print(f" 总语句数: {analysis['total_statements']}")print(f" 涉及表: {', '.join(list(analysis['tables'])[:5])}...")print(f" 方言特定功能: {analysis['dialect_specific_features']}")# 执行迁移migrated_content = []withopen(source_sql, 'r', encoding='utf-8') asf:content = f.read()statements = [stmt.strip() forstmtincontent.split(';') ifstmt.strip()]fori, stmtinenumerate(statements, 1):try:migrated = sqlglot.transpile(stmt, read=source_dialect, write=target_dialect)[0]migrated_content.append(migrated)print(f"✓ 语句 {i} 迁移成功")exceptExceptionase:print(f"✗ 语句 {i} 迁移失败: {e}")migrated_content.append(f"-- 迁移失败的原语句: {stmt}")# 生成迁移报告output_file = f"migrated_{source_dialect}_to_{target_dialect}.sql"withopen(output_file, 'w', encoding='utf-8') asf:f.write(';'.join(migrated_content))print(f"迁移完成,结果已保存到: {output_file}")returnoutput_file# 使用示例migrator = DatabaseMigrationTool()# 创建示例SQL文件sample_sql = """CREATE TABLE users ( id INT PRIMARY KEY, username VARCHAR(50) NOT NULL, email VARCHAR(100) UNIQUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);CREATE INDEX idx_username ON users(username);INSERT INTO users (id, username, email) VALUES (1, 'john_doe', 'john@example.com');INSERT INTO users (id, username, email) VALUES (2, 'jane_smith', 'jane@example.com');SELECT * FROM users LIMIT 10;CREATE VIEW active_users AS SELECT * FROM users WHERE created_at > '2024-01-01';"""withopen('sample_mysql.sql', 'w', encoding='utf-8') asf:f.write(sample_sql)# 执行迁移result_file = migrator.migrate_database('sample_mysql.sql', 'mysql', 'postgres')
执行结果:
开始从 mysql 迁移到 postgres==================================================分析结果: 总语句数: 6 涉及表: ['users']... 方言特定功能: ['LIMIT子句']✓ 语句 1 迁移成功✓ 语句 2 迁移成功✓ 语句 3 迁移成功✓ 语句 4 迁移成功✓ 语句 5 迁移成功✓ 语句 6 迁移成功迁移完成,结果已保存到: migrated_mysql_to_postgres.sql
生成的迁移文件内容:
CREATETABLE users ( id INT PRIMARY KEY, username VARCHAR(50) NOTNULL, email VARCHAR(100) UNIQUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);CREATE INDEX idx_username ON users (username);INSERTINTO users (id, username, email)VALUES (1, 'john_doe', 'john@example.com');INSERTINTO users (id, username, email)VALUES (2, 'jane_smith', 'jane@example.com');SELECT *FROM usersLIMIT10;CREATE VIEW active_users ASSELECT *FROM usersWHERE created_at > '2024-01-01';
importsqlglotfromsqlglot.optimizerimportoptimizeimportjsonfromdatetimeimportdatetimeclassSQLQueryAnalyzer:def__init__(self):self.query_history = []defanalyze_query(self, sql, dialect='mysql'):"""深度分析SQL查询"""try:ast = sqlglot.parse_one(sql, read=dialect)analysis = {'timestamp': datetime.now().isoformat(),'original_sql': sql,'ast_structure': self._extract_ast_structure(ast),'performance_metrics': self._calculate_performance_metrics(ast),'optimization_suggestions': [],'security_issues': [] }# 性能分析analysis['optimization_suggestions'] = self._generate_optimization_suggestions(ast)# 安全分析analysis['security_issues'] = self._detect_security_issues(ast)# 尝试优化try:optimized_ast = optimize(ast)analysis['optimized_sql'] = optimized_ast.sql(pretty=True)analysis['optimization_ratio'] = self._calculate_optimization_ratio(sql, optimized_ast.sql())except:analysis['optimized_sql'] = "优化失败"analysis['optimization_ratio'] = 0self.query_history.append(analysis)returnanalysisexceptExceptionase:return {'error': str(e), 'timestamp': datetime.now().isoformat()}def_extract_ast_structure(self, ast):"""提取AST结构信息"""structure = {'query_type': self._get_query_type(ast),'tables': [],'columns': [],'joins': [],'conditions': [],'aggregations': [],'subqueries': [] }# 提取表信息fortableinast.find_all(sqlglot.exp.Table):structure['tables'].append({'name': table.name,'alias': table.alias_or_name })# 提取列信息forcolumninast.find_all(sqlglot.exp.Column):structure['columns'].append({'name': column.name,'table': column.table })# 提取JOIN信息forjoininast.find_all(sqlglot.exp.Join):ifjoin.on:structure['joins'].append(join.on.sql())# 提取子查询forsubqueryinast.find_all(sqlglot.exp.Subquery):structure['subqueries'].append(subquery.sql())returnstructuredef_get_query_type(self, ast):"""判断查询类型"""ifast.key == 'select':return'SELECT'elifast.key == 'insert':return'INSERT'elifast.key == 'update':return'UPDATE'elifast.key == 'delete':return'DELETE'else:return'UNKNOWN'def_calculate_performance_metrics(self, ast):"""计算性能指标"""metrics = {'join_count': len(list(ast.find_all(sqlglot.exp.Join))),'subquery_count': len(list(ast.find_all(sqlglot.exp.Subquery))),'table_count': len(list(ast.find_all(sqlglot.exp.Table))),'condition_complexity': self._calculate_condition_complexity(ast),'estimated_cost': 0 }# 简单估算查询成本metrics['estimated_cost'] = (metrics['join_count'] *10+metrics['subquery_count'] *20+metrics['table_count'] *5+metrics['condition_complexity'] *2 )returnmetricsdef_calculate_condition_complexity(self, ast):"""计算WHERE条件复杂度"""complexity = 0forconditioninast.find_all(sqlglot.exp.Where):# 简单估算:每个AND/OR增加复杂度condition_sql = condition.sql().upper()complexity += condition_sql.count(' AND ')complexity += condition_sql.count(' OR ')complexity += condition_sql.count('(') *0.5returncomplexitydef_generate_optimization_suggestions(self, ast):"""生成优化建议"""suggestions = []metrics = self._calculate_performance_metrics(ast)ifmetrics['join_count'] >3:suggestions.append("查询包含多个JOIN,考虑是否所有JOIN都是必要的")ifmetrics['subquery_count'] >2:suggestions.append("查询包含多个子查询,考虑使用JOIN或CTE优化")# 检查是否缺少索引提示forconditioninast.find_all(sqlglot.exp.Where):condition_sql = condition.sql()ifany(opincondition_sqlforopin ['LIKE', 'REGEXP', 'NOT LIKE']):suggestions.append("WHERE条件中使用LIKE操作符,考虑是否可以使用索引")returnsuggestionsdef_detect_security_issues(self, ast):"""检测安全问题"""issues = []# 检查SQL注入风险forliteralinast.find_all(sqlglot.exp.Literal):if"'"instr(literal):issues.append("查询中包含字符串字面量,可能存在SQL注入风险")# 检查敏感操作ifast.key == 'delete'andnotast.find_all(sqlglot.exp.Where):issues.append("DELETE语句没有WHERE条件,可能误删全部数据")returnissuesdef_calculate_optimization_ratio(self, original, optimized):"""计算优化比例"""ifoptimized == "优化失败":return0original_len = len(original)optimized_len = len(optimized)iforiginal_len == 0:return0# 计算字符数减少比例reduction = (original_len-optimized_len) /original_len*100returnmax(0, min(100, reduction))defgenerate_report(self, analysis):"""生成分析报告"""report = f"""SQL查询分析报告生成时间: {analysis['timestamp']}{'='*50}1. 查询基本信息 查询类型: {analysis['ast_structure']['query_type']} 涉及表数: {len(analysis['ast_structure']['tables'])} 涉及列数: {len(analysis['ast_structure']['columns'])}2. 性能指标 JOIN数量: {analysis['performance_metrics']['join_count']} 子查询数量: {analysis['performance_metrics']['subquery_count']} 估算成本: {analysis['performance_metrics']['estimated_cost']}3. 优化建议"""forsuggestioninanalysis['optimization_suggestions']:report += f" • {suggestion}"ifanalysis.get('optimized_sql') andanalysis['optimized_sql'] != "优化失败":report += f"""4. 优化结果 优化比例: {analysis.get('optimization_ratio', 0):.1f}% 优化后SQL:{analysis['optimized_sql']}"""ifanalysis['security_issues']:report += f"""5. 安全警告"""forissueinanalysis['security_issues']:report += f" ⚠ {issue}"returnreport# 使用示例analyzer = SQLQueryAnalyzer()# 分析复杂查询complex_query = """SELECT c.customer_id, c.customer_name, COUNT(DISTINCT o.order_id) as order_count, SUM(oi.quantity * oi.unit_price) as total_spent, AVG(oi.quantity) as avg_quantityFROM customers cLEFT JOIN orders o ON c.customer_id = o.customer_idLEFT JOIN order_items oi ON o.order_id = oi.order_idWHERE c.country = 'China' AND o.order_date BETWEEN '2024-01-01' AND '2024-12-31' AND o.status IN ('completed', 'shipped')GROUP BY c.customer_id, c.customer_nameHAVING COUNT(DISTINCT o.order_id) > 5 AND SUM(oi.quantity * oi.unit_price) > 10000ORDER BY total_spent DESCLIMIT 20"""result = analyzer.analyze_query(complex_query, 'mysql')print(analyzer.generate_report(result))
执行结果:
SQL查询分析报告生成时间: 2026-03-26T15:06:30.699752==================================================1. 查询基本信息 查询类型: SELECT 涉及表数: 3 涉及列数: 82. 性能指标 JOIN数量: 2 子查询数量: 0 估算成本: 353. 优化建议 • 查询包含多个JOIN,考虑是否所有JOIN都是必要的 • WHERE条件中使用LIKE操作符,考虑是否可以使用索引4. 优化结果 优化比例: 0.5% 优化后SQL:SELECT c.customer_id, c.customer_name, COUNT(DISTINCT o.order_id) AS order_count, SUM(oi.quantity * oi.unit_price) AS total_spent, AVG(oi.quantity) AS avg_quantityFROM customers AS cLEFT JOIN orders AS o ON c.customer_id = o.customer_idLEFT JOIN order_items AS oi ON o.order_id = oi.order_idWHERE c.country = 'China' AND o.order_date BETWEEN '2024-01-01' AND '2024-12-31' AND o.status IN ('completed', 'shipped')GROUP BY c.customer_id, c.customer_nameHAVING COUNT(DISTINCT o.order_id) > 5 AND SUM(oi.quantity * oi.unit_price) > 10000ORDER BY total_spent DESCLIMIT 20
sqlglot作为一个功能全面的SQL处理库,为Python开发者提供了强大的工具集。无论是进行数据库迁移、SQL查询优化,还是构建需要多数据库支持的应用,sqlglot都能显著提高开发效率。其零依赖的特性使得集成变得简单,而丰富的API设计则确保了使用的灵活性。