
# performance_monitor.py
import time
import psutil
import threading
import logging
from typing importDict, List, Any, Optional
from dataclasses import dataclass, asdict
from collections import deque
import json
import statistics
from datetime import datetime, timedelta
@dataclass
classPerformanceMetrics:
"""性能指标"""
timestamp: float
cpu_percent: float
memory_percent: float
memory_used_mb: float
disk_io_read_mb: float
disk_io_write_mb: float
network_sent_mb: float
network_recv_mb: float
active_threads: int
requests_per_second: float
response_time_avg: float
error_rate: float
queue_size: int = 0
defto_dict(self) -> Dict[str, Any]:
return asdict(self)
classSystemMonitor:
"""系统性能监控器"""
def__init__(self, collection_interval: float = 1.0, max_history: int = 3600):
self.collection_interval = collection_interval
self.max_history = max_history
self.metrics_history = deque(maxlen=max_history)
self.running = False
self.monitor_thread = None
self.logger = logging.getLogger(__name__)
# 网络和磁盘IO基准值
self.last_net_io = psutil.net_io_counters()
self.last_disk_io = psutil.disk_io_counters()
self.last_check_time = time.time()
# 请求统计
self.request_count = 0
self.response_times = deque(maxlen=1000)
self.error_count = 0
self.last_request_reset = time.time()
defrecord_request(self, response_time: float, is_error: bool = False):
"""记录请求"""
self.request_count += 1
self.response_times.append(response_time)
if is_error:
self.error_count += 1
defcollect_metrics(self) -> PerformanceMetrics:
"""收集性能指标"""
current_time = time.time()
# CPU和内存
cpu_percent = psutil.cpu_percent()
memory = psutil.virtual_memory()
memory_percent = memory.percent
memory_used_mb = memory.used / (1024 * 1024)
# 磁盘IO
current_disk_io = psutil.disk_io_counters()
time_delta = current_time - self.last_check_time
if time_delta > 0:
disk_read_mb = (current_disk_io.read_bytes - self.last_disk_io.read_bytes) / (1024 * 1024) / time_delta
disk_write_mb = (current_disk_io.write_bytes - self.last_disk_io.write_bytes) / (1024 * 1024) / time_delta
else:
disk_read_mb = disk_write_mb = 0
# 网络IO
current_net_io = psutil.net_io_counters()
if time_delta > 0:
net_sent_mb = (current_net_io.bytes_sent - self.last_net_io.bytes_sent) / (1024 * 1024) / time_delta
net_recv_mb = (current_net_io.bytes_recv - self.last_net_io.bytes_recv) / (1024 * 1024) / time_delta
else:
net_sent_mb = net_recv_mb = 0
# 线程数
active_threads = threading.active_count()
# 请求统计
request_time_delta = current_time - self.last_request_reset
if request_time_delta > 0:
requests_per_second = self.request_count / request_time_delta
else:
requests_per_second = 0
# 响应时间
response_time_avg = statistics.mean(self.response_times) ifself.response_times else0
# 错误率
error_rate = (self.error_count / self.request_count) ifself.request_count > 0else0
# 更新基准值
self.last_disk_io = current_disk_io
self.last_net_io = current_net_io
self.last_check_time = current_time
# 重置请求统计(每分钟重置一次)
if request_time_delta >= 60:
self.request_count = 0
self.error_count = 0
self.last_request_reset = current_time
return PerformanceMetrics(
timestamp=current_time,
cpu_percent=cpu_percent,
memory_percent=memory_percent,
memory_used_mb=memory_used_mb,
disk_io_read_mb=disk_read_mb,
disk_io_write_mb=disk_write_mb,
network_sent_mb=net_sent_mb,
network_recv_mb=net_recv_mb,
active_threads=active_threads,
requests_per_second=requests_per_second,
response_time_avg=response_time_avg,
error_rate=error_rate
)
defmonitor_loop(self):
"""监控循环"""
whileself.running:
try:
metrics = self.collect_metrics()
self.metrics_history.append(metrics)
# 检查异常情况
self.check_alerts(metrics)
time.sleep(self.collection_interval)
except Exception as e:
self.logger.error(f"监控循环错误: {e}")
time.sleep(self.collection_interval)
defcheck_alerts(self, metrics: PerformanceMetrics):
"""检查告警条件"""
alerts = []
# CPU使用率告警
if metrics.cpu_percent > 80:
alerts.append(f"CPU使用率过高: {metrics.cpu_percent:.1f}%")
# 内存使用率告警
if metrics.memory_percent > 85:
alerts.append(f"内存使用率过高: {metrics.memory_percent:.1f}%")
# 响应时间告警
if metrics.response_time_avg > 5.0:
alerts.append(f"平均响应时间过长: {metrics.response_time_avg:.2f}s")
# 错误率告警
if metrics.error_rate > 0.1:
alerts.append(f"错误率过高: {metrics.error_rate:.2%}")
# 线程数告警
if metrics.active_threads > 100:
alerts.append(f"活跃线程数过多: {metrics.active_threads}")
for alert in alerts:
self.logger.warning(f"性能告警: {alert}")
defstart(self):
"""启动监控"""
ifself.running:
return
self.running = True
self.monitor_thread = threading.Thread(target=self.monitor_loop)
self.monitor_thread.start()
self.logger.info("性能监控已启动")
defstop(self):
"""停止监控"""
self.running = False
ifself.monitor_thread:
self.monitor_thread.join()
self.logger.info("性能监控已停止")
defget_current_metrics(self) -> Optional[PerformanceMetrics]:
"""获取当前指标"""
ifself.metrics_history:
returnself.metrics_history[-1]
returnNone
defget_metrics_summary(self, minutes: int = 10) -> Dict[str, Any]:
"""获取指标摘要"""
ifnotself.metrics_history:
return {}
# 获取最近N分钟的数据
cutoff_time = time.time() - (minutes * 60)
recent_metrics = [m for m inself.metrics_history if m.timestamp >= cutoff_time]
ifnot recent_metrics:
return {}
# 计算统计信息
cpu_values = [m.cpu_percent for m in recent_metrics]
memory_values = [m.memory_percent for m in recent_metrics]
response_times = [m.response_time_avg for m in recent_metrics if m.response_time_avg > 0]
error_rates = [m.error_rate for m in recent_metrics]
return {
'time_range_minutes': minutes,
'sample_count': len(recent_metrics),
'cpu': {
'avg': statistics.mean(cpu_values),
'max': max(cpu_values),
'min': min(cpu_values)
},
'memory': {
'avg': statistics.mean(memory_values),
'max': max(memory_values),
'min': min(memory_values)
},
'response_time': {
'avg': statistics.mean(response_times) if response_times else0,
'max': max(response_times) if response_times else0,
'min': min(response_times) if response_times else0
},
'error_rate': {
'avg': statistics.mean(error_rates),
'max': max(error_rates),
'min': min(error_rates)
}
}
defexport_metrics(self, filename: str, hours: int = 1):
"""导出指标数据"""
cutoff_time = time.time() - (hours * 3600)
export_data = [
m.to_dict() for m inself.metrics_history
if m.timestamp >= cutoff_time
]
withopen(filename, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2)
self.logger.info(f"导出了 {len(export_data)} 条指标数据到 {filename}")
classBottleneckAnalyzer:
"""性能瓶颈分析器"""
def__init__(self, monitor: SystemMonitor):
self.monitor = monitor
self.logger = logging.getLogger(__name__)
defanalyze_bottlenecks(self, analysis_minutes: int = 30) -> Dict[str, Any]:
"""分析性能瓶颈"""
summary = self.monitor.get_metrics_summary(analysis_minutes)
ifnot summary:
return {'error': '没有足够的监控数据'}
bottlenecks = []
recommendations = []
# CPU瓶颈分析
if summary['cpu']['avg'] > 70:
bottlenecks.append({
'type': 'CPU',
'severity': 'high'if summary['cpu']['avg'] > 85else'medium',
'description': f"CPU平均使用率 {summary['cpu']['avg']:.1f}%",
'max_value': summary['cpu']['max']
})
recommendations.extend([
"考虑增加并发控制,减少同时运行的任务数",
"优化CPU密集型操作,如数据解析和处理",
"使用多进程而非多线程处理CPU密集型任务"
])
# 内存瓶颈分析
if summary['memory']['avg'] > 75:
bottlenecks.append({
'type': 'Memory',
'severity': 'high'if summary['memory']['avg'] > 90else'medium',
'description': f"内存平均使用率 {summary['memory']['avg']:.1f}%",
'max_value': summary['memory']['max']
})
recommendations.extend([
"实现数据流式处理,避免大量数据同时加载到内存",
"优化数据结构,减少内存占用",
"增加内存回收机制,及时释放不用的对象"
])
# 响应时间瓶颈分析
if summary['response_time']['avg'] > 3.0:
bottlenecks.append({
'type': 'Response Time',
'severity': 'high'if summary['response_time']['avg'] > 5.0else'medium',
'description': f"平均响应时间 {summary['response_time']['avg']:.2f}s",
'max_value': summary['response_time']['max']
})
recommendations.extend([
"优化网络请求,使用连接池和Keep-Alive",
"实现请求缓存,避免重复请求",
"调整超时设置,避免长时间等待"
])
# 错误率瓶颈分析
if summary['error_rate']['avg'] > 0.05:
bottlenecks.append({
'type': 'Error Rate',
'severity': 'high'if summary['error_rate']['avg'] > 0.1else'medium',
'description': f"平均错误率 {summary['error_rate']['avg']:.2%}",
'max_value': summary['error_rate']['max']
})
recommendations.extend([
"增强错误处理和重试机制",
"分析错误原因,优化请求策略",
"实现智能退避算法,避免频繁失败"
])
# 综合评分
total_score = 100
for bottleneck in bottlenecks:
if bottleneck['severity'] == 'high':
total_score -= 25
elif bottleneck['severity'] == 'medium':
total_score -= 15
performance_grade = 'A'if total_score >= 90else'B'if total_score >= 75else'C'if total_score >= 60else'D'
return {
'analysis_time': datetime.now().isoformat(),
'analysis_period_minutes': analysis_minutes,
'performance_score': total_score,
'performance_grade': performance_grade,
'bottlenecks': bottlenecks,
'recommendations': recommendations,
'summary': summary
}
defgenerate_optimization_plan(self, analysis_result: Dict[str, Any]) -> Dict[str, Any]:
"""生成优化计划"""
bottlenecks = analysis_result.get('bottlenecks', [])
# 按严重程度排序
high_priority = [b for b in bottlenecks if b['severity'] == 'high']
medium_priority = [b for b in bottlenecks if b['severity'] == 'medium']
optimization_tasks = []
# 高优先级任务
for bottleneck in high_priority:
if bottleneck['type'] == 'CPU':
optimization_tasks.append({
'priority': 'high',
'task': 'CPU优化',
'actions': [
'实现智能并发控制',
'优化数据处理算法',
'使用异步IO替代同步操作'
],
'expected_improvement': '20-30% CPU使用率降低'
})
elif bottleneck['type'] == 'Memory':
optimization_tasks.append({
'priority': 'high',
'task': '内存优化',
'actions': [
'实现流式数据处理',
'优化对象生命周期管理',
'增加内存监控和回收'
],
'expected_improvement': '15-25% 内存使用率降低'
})
# 中等优先级任务
for bottleneck in medium_priority:
if bottleneck['type'] == 'Response Time':
optimization_tasks.append({
'priority': 'medium',
'task': '响应时间优化',
'actions': [
'实现请求缓存机制',
'优化网络连接配置',
'增加CDN和代理支持'
],
'expected_improvement': '30-50% 响应时间改善'
})
return {
'plan_created': datetime.now().isoformat(),
'total_tasks': len(optimization_tasks),
'high_priority_tasks': len([t for t in optimization_tasks if t['priority'] == 'high']),
'medium_priority_tasks': len([t for t in optimization_tasks if t['priority'] == 'medium']),
'optimization_tasks': optimization_tasks,
'estimated_completion_time': '1-2 weeks'
}
# 使用示例
if __name__ == "__main__":
import random
# 配置日志
logging.basicConfig(level=logging.INFO)
# 创建系统监控器
monitor = SystemMonitor(collection_interval=1.0)
monitor.start()
# 模拟一些请求
for i inrange(100):
response_time = random.uniform(0.5, 3.0)
is_error = random.random() < 0.05
monitor.record_request(response_time, is_error)
time.sleep(0.1)
# 等待收集一些数据
time.sleep(10)
# 创建瓶颈分析器
analyzer = BottleneckAnalyzer(monitor)
# 分析瓶颈
analysis = analyzer.analyze_bottlenecks(analysis_minutes=1)
print(f"瓶颈分析结果: {json.dumps(analysis, indent=2, ensure_ascii=False)}")
# 生成优化计划
plan = analyzer.generate_optimization_plan(analysis)
print(f"优化计划: {json.dumps(plan, indent=2, ensure_ascii=False)}")
# 导出指标
monitor.export_metrics("performance_metrics.json", hours=1)
# 停止监控
monitor.stop()# code_profiler.py
import cProfile
import pstats
import io
import time
import functools
import threading
from typing importDict, Any, Callable, Optional
import logging
from contextlib import contextmanager
classCodeProfiler:
"""代码性能分析器"""
def__init__(self):
self.profiles = {}
self.logger = logging.getLogger(__name__)
defprofile_function(self, func_name: str = None):
"""函数性能分析装饰器"""
defdecorator(func: Callable) -> Callable:
name = func_name orf"{func.__module__}.{func.__name__}"
@functools.wraps(func)
defwrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
try:
result = func(*args, **kwargs)
return result
finally:
profiler.disable()
# 保存分析结果
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s)
ps.sort_stats('cumulative')
ps.print_stats()
self.profiles[name] = {
'timestamp': time.time(),
'stats': s.getvalue(),
'function': name
}
return wrapper
return decorator
@contextmanager
defprofile_block(self, block_name: str):
"""代码块性能分析上下文管理器"""
profiler = cProfile.Profile()
profiler.enable()
try:
yield
finally:
profiler.disable()
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s)
ps.sort_stats('cumulative')
ps.print_stats()
self.profiles[block_name] = {
'timestamp': time.time(),
'stats': s.getvalue(),
'block': block_name
}
defget_profile_report(self, name: str) -> Optional[str]:
"""获取性能分析报告"""
if name inself.profiles:
returnself.profiles[name]['stats']
returnNone
defget_all_profiles(self) -> Dict[str, Any]:
"""获取所有性能分析结果"""
returnself.profiles
defclear_profiles(self):
"""清除所有分析结果"""
self.profiles.clear()
classTimingProfiler:
"""时间性能分析器"""
def__init__(self):
self.timings = {}
self.lock = threading.Lock()
self.logger = logging.getLogger(__name__)
deftime_function(self, func_name: str = None):
"""函数计时装饰器"""
defdecorator(func: Callable) -> Callable:
name = func_name orf"{func.__module__}.{func.__name__}"
@functools.wraps(func)
defwrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
return result
finally:
end_time = time.time()
duration = end_time - start_time
withself.lock:
if name notinself.timings:
self.timings[name] = []
self.timings[name].append({
'duration': duration,
'timestamp': start_time,
'args_count': len(args),
'kwargs_count': len(kwargs)
})
return wrapper
return decorator
@contextmanager
deftime_block(self, block_name: str):
"""代码块计时上下文管理器"""
start_time = time.time()
try:
yield
finally:
end_time = time.time()
duration = end_time - start_time
withself.lock:
if block_name notinself.timings:
self.timings[block_name] = []
self.timings[block_name].append({
'duration': duration,
'timestamp': start_time
})
defget_timing_stats(self, name: str) -> Optional[Dict[str, Any]]:
"""获取计时统计"""
withself.lock:
if name notinself.timings:
returnNone
durations = [t['duration'] for t inself.timings[name]]
ifnot durations:
returnNone
return {
'count': len(durations),
'total_time': sum(durations),
'avg_time': sum(durations) / len(durations),
'min_time': min(durations),
'max_time': max(durations),
'last_call': max(t['timestamp'] for t inself.timings[name])
}
defget_all_stats(self) -> Dict[str, Dict[str, Any]]:
"""获取所有计时统计"""
stats = {}
withself.lock:
for name inself.timings:
stats[name] = self.get_timing_stats(name)
return stats
defgenerate_report(self) -> str:
"""生成性能报告"""
stats = self.get_all_stats()
ifnot stats:
return"没有性能数据"
report_lines = ["性能分析报告", "=" * 50]
# 按平均时间排序
sorted_stats = sorted(
stats.items(),
key=lambda x: x[1]['avg_time'] if x[1] else0,
reverse=True
)
for name, stat in sorted_stats:
if stat:
report_lines.extend([
f"\n函数/代码块: {name}",
f" 调用次数: {stat['count']}",
f" 总时间: {stat['total_time']:.4f}s",
f" 平均时间: {stat['avg_time']:.4f}s",
f" 最短时间: {stat['min_time']:.4f}s",
f" 最长时间: {stat['max_time']:.4f}s"
])
return"\n".join(report_lines)
# 使用示例
profiler = CodeProfiler()
timer = TimingProfiler()
@profiler.profile_function("example_function")
@timer.time_function("example_function")
defexample_function(n: int) -> int:
"""示例函数"""
total = 0
for i inrange(n):
total += i * i
return total
@timer.time_function("slow_function")
defslow_function():
"""慢函数示例"""
time.sleep(0.1)
return"done"
if __name__ == "__main__":
# 测试函数
for i inrange(10):
example_function(1000)
slow_function()
# 使用代码块分析
with profiler.profile_block("data_processing"):
with timer.time_block("data_processing"):
data = [i ** 2for i inrange(10000)]
result = sum(data)
# 生成报告
print(timer.generate_report())
# 获取详细分析
print("\n详细性能分析:")
for name in profiler.get_all_profiles():
print(f"\n{name}:")
print(profiler.get_profile_report(name)[:500] + "...")# memory_optimizer.py
import gc
import sys
import weakref
import threading
import time
import logging
from typing importDict, Any, List, Optional, Generator
from collections import defaultdict
import tracemalloc
from dataclasses import dataclass
@dataclass
classMemorySnapshot:
"""内存快照"""
timestamp: float
current_mb: float
peak_mb: float
traced_mb: float
object_count: int
gc_collections: Dict[int, int]
classMemoryOptimizer:
"""内存优化器"""
def__init__(self, enable_tracing: bool = True):
self.enable_tracing = enable_tracing
self.snapshots = []
self.object_tracker = defaultdict(int)
self.weak_refs = []
self.logger = logging.getLogger(__name__)
if enable_tracing:
tracemalloc.start()
# 启动内存监控
self.monitoring = False
self.monitor_thread = None
deftake_snapshot(self) -> MemorySnapshot:
"""拍摄内存快照"""
# 基本内存信息
import psutil
process = psutil.Process()
memory_info = process.memory_info()
current_mb = memory_info.rss / (1024 * 1024)
# 获取峰值内存
peak_mb = process.memory_info().peak_wss / (1024 * 1024) ifhasattr(process.memory_info(), 'peak_wss') else current_mb
# tracemalloc信息
traced_mb = 0
ifself.enable_tracing:
current, peak = tracemalloc.get_traced_memory()
traced_mb = current / (1024 * 1024)
# 对象计数
object_count = len(gc.get_objects())
# GC统计
gc_stats = {}
for i inrange(3):
gc_stats[i] = gc.get_count()[i]
snapshot = MemorySnapshot(
timestamp=time.time(),
current_mb=current_mb,
peak_mb=peak_mb,
traced_mb=traced_mb,
object_count=object_count,
gc_collections=gc_stats
)
self.snapshots.append(snapshot)
return snapshot
defanalyze_memory_growth(self, window_size: int = 10) -> Dict[str, Any]:
"""分析内存增长趋势"""
iflen(self.snapshots) < 2:
return {'error': '快照数据不足'}
recent_snapshots = self.snapshots[-window_size:]
# 计算增长率
first_snapshot = recent_snapshots[0]
last_snapshot = recent_snapshots[-1]
time_delta = last_snapshot.timestamp - first_snapshot.timestamp
memory_delta = last_snapshot.current_mb - first_snapshot.current_mb
growth_rate = memory_delta / time_delta if time_delta > 0else0
# 检测内存泄漏
is_leaking = growth_rate > 1.0# 每秒增长超过1MB
return {
'window_size': len(recent_snapshots),
'time_span_seconds': time_delta,
'memory_growth_mb': memory_delta,
'growth_rate_mb_per_second': growth_rate,
'is_potential_leak': is_leaking,
'current_memory_mb': last_snapshot.current_mb,
'object_count_change': last_snapshot.object_count - first_snapshot.object_count
}
defget_top_memory_objects(self, limit: int = 10) -> List[Dict[str, Any]]:
"""获取占用内存最多的对象类型"""
ifnotself.enable_tracing:
return []
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
result = []
for index, stat inenumerate(top_stats[:limit]):
result.append({
'rank': index + 1,
'filename': stat.traceback.format()[0] if stat.traceback.format() else'unknown',
'size_mb': stat.size / (1024 * 1024),
'count': stat.count
})
return result
defforce_garbage_collection(self) -> Dict[str, int]:
"""强制垃圾回收"""
before_objects = len(gc.get_objects())
# 执行垃圾回收
collected = {}
for generation inrange(3):
collected[generation] = gc.collect(generation)
after_objects = len(gc.get_objects())
freed_objects = before_objects - after_objects
self.logger.info(f"垃圾回收完成,释放了 {freed_objects} 个对象")
return {
'freed_objects': freed_objects,
'collections_by_generation': collected,
'remaining_objects': after_objects
}
defoptimize_memory_usage(self) -> Dict[str, Any]:
"""优化内存使用"""
before_snapshot = self.take_snapshot()
# 1. 强制垃圾回收
gc_result = self.force_garbage_collection()
# 2. 清理弱引用
cleaned_refs = 0
for ref inself.weak_refs[:]:
if ref() isNone:
self.weak_refs.remove(ref)
cleaned_refs += 1
# 3. 清理缓存(如果有的话)
# 这里可以添加特定的缓存清理逻辑
after_snapshot = self.take_snapshot()
memory_saved = before_snapshot.current_mb - after_snapshot.current_mb
return {
'memory_before_mb': before_snapshot.current_mb,
'memory_after_mb': after_snapshot.current_mb,
'memory_saved_mb': memory_saved,
'gc_result': gc_result,
'cleaned_weak_refs': cleaned_refs,
'optimization_effective': memory_saved > 0
}
defstart_monitoring(self, interval: float = 30.0):
"""启动内存监控"""
ifself.monitoring:
return
self.monitoring = True
defmonitor_loop():
whileself.monitoring:
try:
snapshot = self.take_snapshot()
# 检查内存增长
iflen(self.snapshots) >= 5:
analysis = self.analyze_memory_growth(5)
if analysis.get('is_potential_leak', False):
self.logger.warning(f"检测到潜在内存泄漏: {analysis['growth_rate_mb_per_second']:.2f} MB/s")
# 自动优化
opt_result = self.optimize_memory_usage()
self.logger.info(f"自动内存优化,节省 {opt_result['memory_saved_mb']:.2f} MB")
time.sleep(interval)
except Exception as e:
self.logger.error(f"内存监控错误: {e}")
time.sleep(interval)
self.monitor_thread = threading.Thread(target=monitor_loop)
self.monitor_thread.start()
self.logger.info("内存监控已启动")
defstop_monitoring(self):
"""停止内存监控"""
self.monitoring = False
ifself.monitor_thread:
self.monitor_thread.join()
self.logger.info("内存监控已停止")
defregister_object(self, obj: Any, name: str = None):
"""注册对象用于跟踪"""
obj_type = type(obj).__name__
if name:
obj_type = f"{obj_type}({name})"
self.object_tracker[obj_type] += 1
# 创建弱引用
defcleanup_callback(ref):
self.object_tracker[obj_type] -= 1
weak_ref = weakref.ref(obj, cleanup_callback)
self.weak_refs.append(weak_ref)
return weak_ref
classStreamProcessor:
"""流式数据处理器 - 内存优化示例"""
def__init__(self, chunk_size: int = 1000):
self.chunk_size = chunk_size
self.memory_optimizer = MemoryOptimizer()
self.logger = logging.getLogger(__name__)
defprocess_large_dataset(self, data_generator: Generator) -> Generator:
"""流式处理大数据集"""
chunk = []
for item in data_generator:
chunk.append(item)
iflen(chunk) >= self.chunk_size:
# 处理当前块
processed_chunk = self._process_chunk(chunk)
# 返回处理结果
for result in processed_chunk:
yield result
# 清理内存
chunk.clear()
# 定期检查内存使用
iflen(chunk) % (self.chunk_size * 10) == 0:
snapshot = self.memory_optimizer.take_snapshot()
if snapshot.current_mb > 500: # 超过500MB时优化
self.memory_optimizer.optimize_memory_usage()
# 处理剩余数据
if chunk:
processed_chunk = self._process_chunk(chunk)
for result in processed_chunk:
yield result
def_process_chunk(self, chunk: List[Any]) -> List[Any]:
"""处理数据块"""
# 这里实现具体的数据处理逻辑
return [self._process_item(item) for item in chunk]
def_process_item(self, item: Any) -> Any:
"""处理单个数据项"""
# 示例处理逻辑
returnstr(item).upper()
# 使用示例
if __name__ == "__main__":
import random
# 配置日志
logging.basicConfig(level=logging.INFO)
# 创建内存优化器
optimizer = MemoryOptimizer()
optimizer.start_monitoring(interval=5.0)
try:
# 模拟内存使用
large_data = []
for i inrange(10000):
large_data.append([random.random() for _ inrange(100)])
if i % 1000 == 0:
snapshot = optimizer.take_snapshot()
print(f"当前内存使用: {snapshot.current_mb:.2f} MB")
# 分析内存增长
analysis = optimizer.analyze_memory_growth()
print(f"内存增长分析: {analysis}")
# 获取内存占用最多的对象
top_objects = optimizer.get_top_memory_objects()
print(f"内存占用最多的对象: {top_objects}")
# 优化内存
opt_result = optimizer.optimize_memory_usage()
print(f"内存优化结果: {opt_result}")
# 测试流式处理
defdata_generator():
for i inrange(100000):
yieldf"data_item_{i}"
processor = StreamProcessor(chunk_size=1000)
processed_count = 0
for processed_item in processor.process_large_dataset(data_generator()):
processed_count += 1
if processed_count % 10000 == 0:
print(f"已处理 {processed_count} 个项目")
print(f"流式处理完成,总共处理 {processed_count} 个项目")
finally:
optimizer.stop_monitoring()# connection_pool.py
import asyncio
import aiohttp
import time
import threading
import logging
from typing importDict, Any, Optional, List
from dataclasses import dataclass, field
from collections import deque
import weakref
import ssl
@dataclass
classConnectionStats:
"""连接统计信息"""
total_created: int = 0
total_closed: int = 0
active_connections: int = 0
pool_hits: int = 0
pool_misses: int = 0
average_lifetime: float = 0.0
peak_connections: int = 0
defhit_rate(self) -> float:
total_requests = self.pool_hits + self.pool_misses
returnself.pool_hits / total_requests if total_requests > 0else0.0
@dataclass
classPooledConnection:
"""池化连接"""
session: aiohttp.ClientSession
created_time: float = field(default_factory=time.time)
last_used: float = field(default_factory=time.time)
use_count: int = 0
is_active: bool = True
defmark_used(self):
"""标记连接被使用"""
self.last_used = time.time()
self.use_count += 1
defis_expired(self, max_lifetime: float, max_idle: float) -> bool:
"""检查连接是否过期"""
now = time.time()
lifetime_expired = (now - self.created_time) > max_lifetime
idle_expired = (now - self.last_used) > max_idle
return lifetime_expired or idle_expired
classOptimizedConnectionPool:
"""优化的连接池"""
def__init__(
self,
max_connections: int = 100,
max_connections_per_host: int = 30,
max_lifetime: float = 3600.0, # 1小时
max_idle_time: float = 300.0, # 5分钟
cleanup_interval: float = 60.0, # 1分钟清理一次
enable_ssl: bool = True
):
self.max_connections = max_connections
self.max_connections_per_host = max_connections_per_host
self.max_lifetime = max_lifetime
self.max_idle_time = max_idle_time
self.cleanup_interval = cleanup_interval
self.enable_ssl = enable_ssl
# 连接池
self.pools: Dict[str, deque] = {}
self.stats = ConnectionStats()
self.lock = asyncio.Lock()
# 清理任务
self.cleanup_task = None
self.running = False
# SSL配置
self.ssl_context = self._create_ssl_context() if enable_ssl elseNone
self.logger = logging.getLogger(__name__)
def_create_ssl_context(self) -> ssl.SSLContext:
"""创建SSL上下文"""
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
def_get_pool_key(self, url: str) -> str:
"""获取连接池键"""
from urllib.parse import urlparse
parsed = urlparse(url)
returnf"{parsed.scheme}://{parsed.netloc}"
asyncdefget_session(self, url: str) -> aiohttp.ClientSession:
"""获取会话连接"""
pool_key = self._get_pool_key(url)
asyncwithself.lock:
# 检查是否有可用连接
if pool_key inself.pools andself.pools[pool_key]:
connection = self.pools[pool_key].popleft()
# 检查连接是否过期
ifnot connection.is_expired(self.max_lifetime, self.max_idle_time):
connection.mark_used()
self.stats.pool_hits += 1
return connection.session
else:
# 连接过期,关闭它
await connection.session.close()
self.stats.total_closed += 1
self.stats.active_connections -= 1
# 创建新连接
self.stats.pool_misses += 1
returnawaitself._create_new_session(pool_key)
asyncdef_create_new_session(self, pool_key: str) -> aiohttp.ClientSession:
"""创建新的会话连接"""
# 检查连接数限制
ifself.stats.active_connections >= self.max_connections:
# 清理过期连接
awaitself._cleanup_expired_connections()
# 如果仍然超过限制,等待
ifself.stats.active_connections >= self.max_connections:
raise Exception("连接池已满,无法创建新连接")
# 创建连接器
connector = aiohttp.TCPConnector(
limit=self.max_connections,
limit_per_host=self.max_connections_per_host,
ttl_dns_cache=300,
use_dns_cache=True,
ssl=self.ssl_context,
enable_cleanup_closed=True
)
# 创建会话
timeout = aiohttp.ClientTimeout(total=30, connect=10)
session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={
'User-Agent': 'OptimizedCrawler/1.0',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive'
}
)
self.stats.total_created += 1
self.stats.active_connections += 1
self.stats.peak_connections = max(self.stats.peak_connections, self.stats.active_connections)
self.logger.debug(f"创建新连接: {pool_key}")
return session
asyncdefreturn_session(self, url: str, session: aiohttp.ClientSession):
"""归还会话连接"""
pool_key = self._get_pool_key(url)
asyncwithself.lock:
if pool_key notinself.pools:
self.pools[pool_key] = deque()
# 检查池大小限制
iflen(self.pools[pool_key]) < self.max_connections_per_host:
connection = PooledConnection(session=session)
self.pools[pool_key].append(connection)
else:
# 池已满,关闭连接
await session.close()
self.stats.total_closed += 1
self.stats.active_connections -= 1
asyncdef_cleanup_expired_connections(self):
"""清理过期连接"""
cleaned_count = 0
for pool_key, pool inself.pools.items():
expired_connections = []
# 找出过期连接
for connection in pool:
if connection.is_expired(self.max_lifetime, self.max_idle_time):
expired_connections.append(connection)
# 移除并关闭过期连接
for connection in expired_connections:
pool.remove(connection)
await connection.session.close()
cleaned_count += 1
self.stats.total_closed += 1
self.stats.active_connections -= 1
if cleaned_count > 0:
self.logger.info(f"清理了 {cleaned_count} 个过期连接")
asyncdef_cleanup_loop(self):
"""清理循环"""
whileself.running:
try:
awaitself._cleanup_expired_connections()
await asyncio.sleep(self.cleanup_interval)
except Exception as e:
self.logger.error(f"连接清理错误: {e}")
await asyncio.sleep(self.cleanup_interval)
asyncdefstart(self):
"""启动连接池"""
ifself.running:
return
self.running = True
self.cleanup_task = asyncio.create_task(self._cleanup_loop())
self.logger.info("连接池已启动")
asyncdefstop(self):
"""停止连接池"""
self.running = False
ifself.cleanup_task:
self.cleanup_task.cancel()
try:
awaitself.cleanup_task
except asyncio.CancelledError:
pass
# 关闭所有连接
asyncwithself.lock:
for pool inself.pools.values():
for connection in pool:
await connection.session.close()
self.pools.clear()
self.stats.active_connections = 0
self.logger.info("连接池已停止")
defget_stats(self) -> ConnectionStats:
"""获取连接池统计信息"""
returnself.stats
asyncdefhealth_check(self) -> Dict[str, Any]:
"""健康检查"""
asyncwithself.lock:
total_pooled = sum(len(pool) for pool inself.pools.values())
return {
'running': self.running,
'total_pools': len(self.pools),
'total_pooled_connections': total_pooled,
'active_connections': self.stats.active_connections,
'hit_rate': self.stats.hit_rate(),
'peak_connections': self.stats.peak_connections,
'memory_usage_mb': self._estimate_memory_usage()
}
def_estimate_memory_usage(self) -> float:
"""估算内存使用量"""
# 粗略估算每个连接占用约1MB内存
returnself.stats.active_connections * 1.0
classPooledHttpClient:
"""使用连接池的HTTP客户端"""
def__init__(self, connection_pool: OptimizedConnectionPool):
self.pool = connection_pool
self.logger = logging.getLogger(__name__)
asyncdefget(self, url: str, **kwargs) -> aiohttp.ClientResponse:
"""GET请求"""
session = awaitself.pool.get_session(url)
try:
response = await session.get(url, **kwargs)
return response
finally:
awaitself.pool.return_session(url, session)
asyncdefpost(self, url: str, **kwargs) -> aiohttp.ClientResponse:
"""POST请求"""
session = awaitself.pool.get_session(url)
try:
response = await session.post(url, **kwargs)
return response
finally:
awaitself.pool.return_session(url, session)
asyncdefrequest(self, method: str, url: str, **kwargs) -> aiohttp.ClientResponse:
"""通用请求方法"""
session = awaitself.pool.get_session(url)
try:
response = await session.request(method, url, **kwargs)
return response
finally:
awaitself.pool.return_session(url, session)
# 使用示例
asyncdefmain():
# 配置日志
logging.basicConfig(level=logging.INFO)
# 创建连接池
pool = OptimizedConnectionPool(
max_connections=50,
max_connections_per_host=10,
max_lifetime=1800.0, # 30分钟
max_idle_time=300.0, # 5分钟
cleanup_interval=60.0# 1分钟清理一次
)
# 启动连接池
await pool.start()
# 创建HTTP客户端
client = PooledHttpClient(pool)
try:
# 测试请求
test_urls = [
"https://httpbin.org/get",
"https://httpbin.org/json",
"https://httpbin.org/delay/1",
"https://httpbin.org/status/200"
]
# 并发请求测试
tasks = []
for i inrange(20):
url = test_urls[i % len(test_urls)]
task = asyncio.create_task(client.get(url))
tasks.append(task)
responses = await asyncio.gather(*tasks, return_exceptions=True)
success_count = sum(1for r in responses ifnotisinstance(r, Exception))
print(f"成功请求: {success_count}/{len(responses)}")
# 获取连接池统计
stats = pool.get_stats()
print(f"连接池统计: 命中率={stats.hit_rate():.2%}, 活跃连接={stats.active_connections}")
# 健康检查
health = await pool.health_check()
print(f"健康检查: {health}")
finally:
# 停止连接池
await pool.stop()
if __name__ == "__main__":
asyncio.run(main())# news_crawler_system.py
import asyncio
import aiohttp
import time
import json
import logging
from typing importDict, Any, List, Optional
from dataclasses import dataclass, field
import hashlib
from urllib.parse import urljoin, urlparse
import re
from bs4 import BeautifulSoup
@dataclass
classNewsItem:
"""新闻条目"""
url: str
title: str = ""
content: str = ""
author: str = ""
publish_time: str = ""
category: str = ""
tags: List[str] = field(default_factory=list)
source: str = ""
crawl_time: float = field(default_factory=time.time)
defto_dict(self) -> Dict[str, Any]:
return {
'url': self.url,
'title': self.title,
'content': self.content,
'author': self.author,
'publish_time': self.publish_time,
'category': self.category,
'tags': self.tags,
'source': self.source,
'crawl_time': self.crawl_time
}
classNewsExtractor:
"""新闻内容提取器"""
def__init__(self):
self.logger = logging.getLogger(__name__)
# 常见新闻网站的选择器配置
self.site_configs = {
'default': {
'title': ['h1', '.title', '.headline', '[class*="title"]'],
'content': ['.content', '.article-content', '.post-content', '[class*="content"]'],
'author': ['.author', '.byline', '[class*="author"]'],
'time': ['.time', '.date', '.publish-time', '[class*="time"]', '[class*="date"]']
},
'sina.com.cn': {
'title': ['.main-title', 'h1'],
'content': ['.article', '.content'],
'author': ['.author'],
'time': ['.time']
},
'sohu.com': {
'title': ['.text-title', 'h1'],
'content': ['.text'],
'author': ['.author-name'],
'time': ['.time']
}
}
defget_site_config(self, url: str) -> Dict[str, List[str]]:
"""获取网站配置"""
domain = urlparse(url).netloc
for site, config inself.site_configs.items():
if site in domain:
return config
returnself.site_configs['default']
defextract_text_by_selectors(self, soup: BeautifulSoup, selectors: List[str]) -> str:
"""通过选择器提取文本"""
for selector in selectors:
try:
element = soup.select_one(selector)
if element:
return element.get_text(strip=True)
except Exception:
continue
return""
defextract_news(self, html: str, url: str) -> NewsItem:
"""提取新闻内容"""
try:
soup = BeautifulSoup(html, 'html.parser')
config = self.get_site_config(url)
# 提取标题
title = self.extract_text_by_selectors(soup, config['title'])
ifnot title:
title = soup.title.get_text(strip=True) if soup.title else""
# 提取内容
content = self.extract_text_by_selectors(soup, config['content'])
# 提取作者
author = self.extract_text_by_selectors(soup, config['author'])
# 提取时间
publish_time = self.extract_text_by_selectors(soup, config['time'])
# 提取标签
tags = []
tag_elements = soup.select('.tag, .tags, [class*="tag"]')
for tag_elem in tag_elements:
tag_text = tag_elem.get_text(strip=True)
if tag_text:
tags.append(tag_text)
# 确定来源
source = urlparse(url).netloc
news_item = NewsItem(
url=url,
title=title,
content=content,
author=author,
publish_time=publish_time,
tags=tags,
source=source
)
self.logger.debug(f"提取新闻成功: {title[:50]}...")
return news_item
except Exception as e:
self.logger.error(f"提取新闻失败 {url}: {e}")
return NewsItem(url=url)
classPerformanceOptimizedCrawler:
"""性能优化的爬虫"""
def__init__(self,
max_concurrent: int = 100,
request_delay: float = 0.1,
timeout: float = 30.0):
self.max_concurrent = max_concurrent
self.request_delay = request_delay
self.timeout = timeout
self.session = None
self.semaphore = asyncio.Semaphore(max_concurrent)
self.extractor = NewsExtractor()
self.logger = logging.getLogger(__name__)
# 性能统计
self.stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'total_time': 0.0,
'avg_response_time': 0.0
}
asyncdefcreate_session(self):
"""创建HTTP会话"""
connector = aiohttp.TCPConnector(
limit=self.max_concurrent,
limit_per_host=20,
ttl_dns_cache=300,
use_dns_cache=True,
keepalive_timeout=30,
enable_cleanup_closed=True
)
timeout = aiohttp.ClientTimeout(total=self.timeout)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
)
asyncdefclose_session(self):
"""关闭HTTP会话"""
ifself.session:
awaitself.session.close()
asyncdeffetch_url(self, url: str) -> Optional[str]:
"""获取URL内容"""
asyncwithself.semaphore:
start_time = time.time()
try:
await asyncio.sleep(self.request_delay)
asyncwithself.session.get(url) as response:
if response.status == 200:
content = await response.text()
# 更新统计
response_time = time.time() - start_time
self.stats['successful_requests'] += 1
self.stats['total_time'] += response_time
return content
else:
self.logger.warning(f"HTTP {response.status}: {url}")
self.stats['failed_requests'] += 1
returnNone
except Exception as e:
self.logger.error(f"请求失败 {url}: {e}")
self.stats['failed_requests'] += 1
returnNone
finally:
self.stats['total_requests'] += 1
ifself.stats['successful_requests'] > 0:
self.stats['avg_response_time'] = (
self.stats['total_time'] / self.stats['successful_requests']
)
asyncdefcrawl_news(self, url: str) -> Optional[NewsItem]:
"""爬取单个新闻"""
html = awaitself.fetch_url(url)
if html:
returnself.extractor.extract_news(html, url)
returnNone
asyncdefcrawl_news_batch(self, urls: List[str]) -> List[NewsItem]:
"""批量爬取新闻"""
tasks = [self.crawl_news(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
news_items = []
for result in results:
ifisinstance(result, NewsItem):
news_items.append(result)
elifisinstance(result, Exception):
self.logger.error(f"爬取异常: {result}")
return news_items
defget_performance_stats(self) -> Dict[str, Any]:
"""获取性能统计"""
success_rate = 0.0
ifself.stats['total_requests'] > 0:
success_rate = self.stats['successful_requests'] / self.stats['total_requests']
return {
'total_requests': self.stats['total_requests'],
'successful_requests': self.stats['successful_requests'],
'failed_requests': self.stats['failed_requests'],
'success_rate': success_rate,
'avg_response_time': self.stats['avg_response_time'],
'requests_per_second': (
self.stats['successful_requests'] / self.stats['total_time']
ifself.stats['total_time'] > 0else0
)
}
classNewsMonitoringSystem:
"""新闻监控系统"""
def__init__(self, crawler: PerformanceOptimizedCrawler):
self.crawler = crawler
self.monitoring_data = {
'start_time': time.time(),
'crawl_sessions': [],
'performance_history': [],
'error_log': []
}
self.logger = logging.getLogger(__name__)
deflog_crawl_session(self, session_data: Dict[str, Any]):
"""记录爬取会话"""
session_data['timestamp'] = time.time()
self.monitoring_data['crawl_sessions'].append(session_data)
# 保留最近100次会话
iflen(self.monitoring_data['crawl_sessions']) > 100:
self.monitoring_data['crawl_sessions'] = self.monitoring_data['crawl_sessions'][-100:]
deflog_performance(self):
"""记录性能数据"""
perf_data = self.crawler.get_performance_stats()
perf_data['timestamp'] = time.time()
self.monitoring_data['performance_history'].append(perf_data)
# 保留最近1000个数据点
iflen(self.monitoring_data['performance_history']) > 1000:
self.monitoring_data['performance_history'] = self.monitoring_data['performance_history'][-1000:]
deflog_error(self, error_msg: str, error_type: str = "general"):
"""记录错误"""
error_data = {
'timestamp': time.time(),
'type': error_type,
'message': error_msg
}
self.monitoring_data['error_log'].append(error_data)
# 保留最近500个错误
iflen(self.monitoring_data['error_log']) > 500:
self.monitoring_data['error_log'] = self.monitoring_data['error_log'][-500:]
defget_monitoring_report(self) -> Dict[str, Any]:
"""生成监控报告"""
current_time = time.time()
uptime = current_time - self.monitoring_data['start_time']
# 计算最近性能
recent_perf = self.monitoring_data['performance_history'][-10:] ifself.monitoring_data['performance_history'] else []
avg_success_rate = sum(p['success_rate'] for p in recent_perf) / len(recent_perf) if recent_perf else0
avg_response_time = sum(p['avg_response_time'] for p in recent_perf) / len(recent_perf) if recent_perf else0
# 计算错误统计
recent_errors = [e for e inself.monitoring_data['error_log'] if current_time - e['timestamp'] < 3600] # 最近1小时
return {
'uptime_seconds': uptime,
'total_sessions': len(self.monitoring_data['crawl_sessions']),
'recent_avg_success_rate': avg_success_rate,
'recent_avg_response_time': avg_response_time,
'recent_errors_count': len(recent_errors),
'current_performance': self.crawler.get_performance_stats(),
'error_types': self._get_error_type_stats(recent_errors)
}
def_get_error_type_stats(self, errors: List[Dict[str, Any]]) -> Dict[str, int]:
"""获取错误类型统计"""
error_types = {}
for error in errors:
error_type = error['type']
error_types[error_type] = error_types.get(error_type, 0) + 1
return error_types
classHighPerformanceNewsSystem:
"""高性能新闻爬虫系统"""
def__init__(self,
max_concurrent: int = 100,
request_delay: float = 0.1):
self.crawler = PerformanceOptimizedCrawler(
max_concurrent=max_concurrent,
request_delay=request_delay
)
self.monitoring = NewsMonitoringSystem(self.crawler)
self.news_storage = []
self.logger = logging.getLogger(__name__)
asyncdefstart(self):
"""启动系统"""
awaitself.crawler.create_session()
self.logger.info("高性能新闻爬虫系统已启动")
asyncdefstop(self):
"""停止系统"""
awaitself.crawler.close_session()
self.logger.info("高性能新闻爬虫系统已停止")
asyncdefcrawl_news_sites(self, news_urls: List[str]) -> List[NewsItem]:
"""爬取新闻网站"""
start_time = time.time()
try:
# 批量爬取
news_items = awaitself.crawler.crawl_news_batch(news_urls)
# 过滤有效新闻
valid_news = [item for item in news_items if item.title and item.content]
# 存储新闻
self.news_storage.extend(valid_news)
# 记录会话
session_data = {
'urls_count': len(news_urls),
'valid_news_count': len(valid_news),
'duration': time.time() - start_time,
'performance': self.crawler.get_performance_stats()
}
self.monitoring.log_crawl_session(session_data)
self.monitoring.log_performance()
self.logger.info(f"爬取完成: {len(valid_news)}/{len(news_urls)} 条有效新闻")
return valid_news
except Exception as e:
error_msg = f"爬取新闻失败: {e}"
self.logger.error(error_msg)
self.monitoring.log_error(error_msg, "crawl_error")
return []
defget_system_status(self) -> Dict[str, Any]:
"""获取系统状态"""
return {
'news_count': len(self.news_storage),
'monitoring_report': self.monitoring.get_monitoring_report(),
'crawler_stats': self.crawler.get_performance_stats()
}
defexport_news(self, format_type: str = "json") -> str:
"""导出新闻数据"""
if format_type == "json":
return json.dumps([item.to_dict() for item inself.news_storage],
ensure_ascii=False, indent=2)
else:
raise ValueError(f"不支持的格式: {format_type}")
# 使用示例
asyncdefmain():
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 创建高性能新闻系统
news_system = HighPerformanceNewsSystem(
max_concurrent=50,
request_delay=0.05
)
await news_system.start()
try:
# 示例新闻URL列表
news_urls = [
"https://news.sina.com.cn/c/2023-01-01/doc-example1.shtml",
"https://news.sohu.com/20230101/example2.shtml",
"https://www.163.com/news/article/example3.html",
# 添加更多新闻URL...
]
# 爬取新闻
news_items = await news_system.crawl_news_sites(news_urls)
# 显示系统状态
status = news_system.get_system_status()
print(f"系统状态: {json.dumps(status, indent=2, ensure_ascii=False)}")
# 导出新闻数据
if news_items:
news_data = news_system.export_news()
print(f"导出新闻数据: {len(news_items)} 条")
# 保存到文件
withopen("news_data.json", "w", encoding="utf-8") as f:
f.write(news_data)
finally:
await news_system.stop()
if __name__ == "__main__":
asyncio.run(main())性能分析练习
使用 SystemMonitor 监控你的爬虫程序
内存优化练习
使用 MemoryOptimizer 监控内存使用
连接池优化练习
监控系统搭建
高可用架构设计
性能压测
高性能电商价格监控系统
新闻聚合与分析平台
社交媒体监控系统
问题1:爬虫速度慢
问题2:内存使用过高
问题3:CPU使用率高
问题1:监控数据不准确
问题2:告警过于频繁
问题3:监控系统性能影响
问题1:节点故障处理
问题2:数据一致性
问题3:负载均衡效果差
本课程详细介绍了爬虫性能优化与监控的各个方面:
性能瓶颈分析:学会识别和分析性能问题
系统监控:实现全面的监控和告警系统
资源优化:优化内存、CPU和网络资源使用
高可用架构:设计可靠的分布式爬虫系统
通过这些技术,你可以构建高性能、高可用的爬虫系统,满足大规模数据采集的需求。
下一课我们将学习**"爬虫部署与运维"**,包括:
这将帮助你将爬虫系统成功部署到生产环境并进行有效运维。
## 3. 系统监控与告警
### 3.1 实时监控系统
```python
# monitoring_system.py
import asyncio
import time
import json
import logging
import threading
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, asdict
from collections import deque, defaultdict
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests
@dataclass
class Alert:
"""告警信息"""
id: str
level: str # info, warning, error, critical
title: str
message: str
timestamp: float
source: str
tags: Dict[str, str]
resolved: bool = False
resolved_time: Optional[float] = None
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class MetricPoint:
"""指标数据点"""
name: str
value: float
timestamp: float
tags: Dict[str, str]
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class AlertRule:
"""告警规则"""
def __init__(
self,
name: str,
metric_name: str,
condition: str, # >, <, >=, <=, ==, !=
threshold: float,
duration: float = 60.0, # 持续时间(秒)
level: str = "warning",
message_template: str = None
):
self.name = name
self.metric_name = metric_name
self.condition = condition
self.threshold = threshold
self.duration = duration
self.level = level
self.message_template = message_template or f"{metric_name} {condition} {threshold}"
# 状态跟踪
self.triggered_time = None
self.last_alert_time = None
self.alert_count = 0
def check_condition(self, value: float) -> bool:
"""检查条件是否满足"""
if self.condition == ">":
return value > self.threshold
elif self.condition == "<":
return value < self.threshold
elif self.condition == ">=":
return value >= self.threshold
elif self.condition == "<=":
return value <= self.threshold
elif self.condition == "==":
return value == self.threshold
elif self.condition == "!=":
return value != self.threshold
return False
def evaluate(self, metric_point: MetricPoint) -> Optional[Alert]:
"""评估指标并生成告警"""
current_time = time.time()
condition_met = self.check_condition(metric_point.value)
if condition_met:
if self.triggered_time is None:
self.triggered_time = current_time
# 检查是否达到持续时间
if current_time - self.triggered_time >= self.duration:
# 避免重复告警(5分钟内不重复)
if (self.last_alert_time is None or
current_time - self.last_alert_time >= 300):
alert = Alert(
id=f"{self.name}_{int(current_time)}",
level=self.level,
title=f"告警: {self.name}",
message=self.message_template.format(
value=metric_point.value,
threshold=self.threshold
),
timestamp=current_time,
source=self.metric_name,
tags=metric_point.tags.copy()
)
self.last_alert_time = current_time
self.alert_count += 1
return alert
else:
# 条件不满足,重置触发时间
self.triggered_time = None
return None
class MetricsCollector:
"""指标收集器"""
def __init__(self, max_points: int = 10000):
self.max_points = max_points
self.metrics: Dict[str, deque] = defaultdict(lambda: deque(maxlen=max_points))
self.lock = threading.Lock()
self.logger = logging.getLogger(__name__)
def record_metric(self, name: str, value: float, tags: Dict[str, str] = None):
"""记录指标"""
if tags is None:
tags = {}
point = MetricPoint(
name=name,
value=value,
timestamp=time.time(),
tags=tags
)
with self.lock:
self.metrics[name].append(point)
def get_latest_metric(self, name: str) -> Optional[MetricPoint]:
"""获取最新指标值"""
with self.lock:
if name in self.metrics and self.metrics[name]:
return self.metrics[name][-1]
return None
def get_metric_history(self, name: str, duration: float = 3600) -> List[MetricPoint]:
"""获取指标历史数据"""
cutoff_time = time.time() - duration
with self.lock:
if name in self.metrics:
return [
point for point in self.metrics[name]
if point.timestamp >= cutoff_time
]
return []
def get_metric_stats(self, name: str, duration: float = 3600) -> Dict[str, float]:
"""获取指标统计信息"""
history = self.get_metric_history(name, duration)
if not history:
return {}
values = [point.value for point in history]
return {
'count': len(values),
'avg': sum(values) / len(values),
'min': min(values),
'max': max(values),
'latest': values[-1] if values else 0
}
class AlertManager:
"""告警管理器"""
def __init__(self):
self.rules: List[AlertRule] = []
self.alerts: deque = deque(maxlen=1000)
self.handlers: List[Callable[[Alert], None]] = []
self.logger = logging.getLogger(__name__)
def add_rule(self, rule: AlertRule):
"""添加告警规则"""
self.rules.append(rule)
self.logger.info(f"添加告警规则: {rule.name}")
def add_handler(self, handler: Callable[[Alert], None]):
"""添加告警处理器"""
self.handlers.append(handler)
def process_metric(self, metric_point: MetricPoint):
"""处理指标并检查告警"""
for rule in self.rules:
if rule.metric_name == metric_point.name:
alert = rule.evaluate(metric_point)
if alert:
self.trigger_alert(alert)
def trigger_alert(self, alert: Alert):
"""触发告警"""
self.alerts.append(alert)
self.logger.warning(f"触发告警: {alert.title} - {alert.message}")
# 调用所有处理器
for handler in self.handlers:
try:
handler(alert)
except Exception as e:
self.logger.error(f"告警处理器错误: {e}")
def get_active_alerts(self) -> List[Alert]:
"""获取活跃告警"""
return [alert for alert in self.alerts if not alert.resolved]
def resolve_alert(self, alert_id: str):
"""解决告警"""
for alert in self.alerts:
if alert.id == alert_id and not alert.resolved:
alert.resolved = True
alert.resolved_time = time.time()
self.logger.info(f"告警已解决: {alert_id}")
break
class EmailAlertHandler:
"""邮件告警处理器"""
def __init__(self, smtp_server: str, smtp_port: int, username: str, password: str, recipients: List[str]):
self.smtp_server = smtp_server
self.smtp_port = smtp_port
self.username = username
self.password = password
self.recipients = recipients
self.logger = logging.getLogger(__name__)
def __call__(self, alert: Alert):
"""发送邮件告警"""
try:
msg = MIMEMultipart()
msg['From'] = self.username
msg['To'] = ', '.join(self.recipients)
msg['Subject'] = f"[{alert.level.upper()}] {alert.title}"
body = f"""
告警详情:
- 级别: {alert.level}
- 时间: {datetime.fromtimestamp(alert.timestamp)}
- 来源: {alert.source}
- 消息: {alert.message}
- 标签: {alert.tags}
"""
msg.attach(MIMEText(body, 'plain', 'utf-8'))
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
self.logger.info(f"邮件告警已发送: {alert.id}")
except Exception as e:
self.logger.error(f"发送邮件告警失败: {e}")
class WebhookAlertHandler:
"""Webhook告警处理器"""
def __init__(self, webhook_url: str, timeout: float = 10.0):
self.webhook_url = webhook_url
self.timeout = timeout
self.logger = logging.getLogger(__name__)
def __call__(self, alert: Alert):
"""发送Webhook告警"""
try:
payload = {
'alert': alert.to_dict(),
'timestamp': alert.timestamp,
'level': alert.level,
'title': alert.title,
'message': alert.message
}
response = requests.post(
self.webhook_url,
json=payload,
timeout=self.timeout,
headers={'Content-Type': 'application/json'}
)
response.raise_for_status()
self.logger.info(f"Webhook告警已发送: {alert.id}")
except Exception as e:
self.logger.error(f"发送Webhook告警失败: {e}")
class MonitoringSystem:
"""监控系统"""
def __init__(self):
self.metrics_collector = MetricsCollector()
self.alert_manager = AlertManager()
self.running = False
self.monitor_tasks = []
self.logger = logging.getLogger(__name__)
def add_alert_rule(self, rule: AlertRule):
"""添加告警规则"""
self.alert_manager.add_rule(rule)
def add_alert_handler(self, handler: Callable[[Alert], None]):
"""添加告警处理器"""
self.alert_manager.add_handler(handler)
def record_metric(self, name: str, value: float, tags: Dict[str, str] = None):
"""记录指标"""
metric_point = MetricPoint(
name=name,
value=value,
timestamp=time.time(),
tags=tags or {}
)
self.metrics_collector.record_metric(name, value, tags)
self.alert_manager.process_metric(metric_point)
async def start_system_monitoring(self, interval: float = 30.0):
"""启动系统监控"""
import psutil
while self.running:
try:
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
self.record_metric("system.cpu.percent", cpu_percent)
# 内存使用率
memory = psutil.virtual_memory()
self.record_metric("system.memory.percent", memory.percent)
self.record_metric("system.memory.used_mb", memory.used / (1024 * 1024))
# 磁盘使用率
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
self.record_metric("system.disk.percent", disk_percent)
# 网络IO
net_io = psutil.net_io_counters()
self.record_metric("system.network.bytes_sent", net_io.bytes_sent)
self.record_metric("system.network.bytes_recv", net_io.bytes_recv)
await asyncio.sleep(interval)
except Exception as e:
self.logger.error(f"系统监控错误: {e}")
await asyncio.sleep(interval)
async def start_crawler_monitoring(self, crawler_stats: Dict[str, Any], interval: float = 60.0):
"""启动爬虫监控"""
while self.running:
try:
# 请求统计
self.record_metric("crawler.requests_total", crawler_stats.get('requests_total', 0))
self.record_metric("crawler.requests_success", crawler_stats.get('requests_success', 0))
self.record_metric("crawler.requests_failed", crawler_stats.get('requests_failed', 0))
# 响应时间
self.record_metric("crawler.response_time_avg", crawler_stats.get('response_time_avg', 0))
# 队列大小
self.record_metric("crawler.queue_size", crawler_stats.get('queue_size', 0))
# 错误率
total_requests = crawler_stats.get('requests_total', 0)
failed_requests = crawler_stats.get('requests_failed', 0)
error_rate = (failed_requests / total_requests) if total_requests > 0 else 0
self.record_metric("crawler.error_rate", error_rate)
await asyncio.sleep(interval)
except Exception as e:
self.logger.error(f"爬虫监控错误: {e}")
await asyncio.sleep(interval)
async def start(self):
"""启动监控系统"""
if self.running:
return
self.running = True
# 启动系统监控
system_task = asyncio.create_task(self.start_system_monitoring())
self.monitor_tasks.append(system_task)
self.logger.info("监控系统已启动")
async def stop(self):
"""停止监控系统"""
self.running = False
# 取消所有监控任务
for task in self.monitor_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
self.monitor_tasks.clear()
self.logger.info("监控系统已停止")
def get_dashboard_data(self) -> Dict[str, Any]:
"""获取仪表板数据"""
current_time = time.time()
# 系统指标
system_metrics = {}
for metric_name in ['system.cpu.percent', 'system.memory.percent', 'system.disk.percent']:
latest = self.metrics_collector.get_latest_metric(metric_name)
if latest:
system_metrics[metric_name] = latest.value
# 爬虫指标
crawler_metrics = {}
for metric_name in ['crawler.requests_total', 'crawler.error_rate', 'crawler.queue_size']:
latest = self.metrics_collector.get_latest_metric(metric_name)
if latest:
crawler_metrics[metric_name] = latest.value
# 活跃告警
active_alerts = self.alert_manager.get_active_alerts()
return {
'timestamp': current_time,
'system_metrics': system_metrics,
'crawler_metrics': crawler_metrics,
'active_alerts': [alert.to_dict() for alert in active_alerts],
'alert_count': len(active_alerts)
}
# 使用示例
async def main():
# 配置日志
logging.basicConfig(level=logging.INFO)
# 创建监控系统
monitoring = MonitoringSystem()
# 添加告警规则
monitoring.add_alert_rule(AlertRule(
name="高CPU使用率",
metric_name="system.cpu.percent",
condition=">",
threshold=80.0,
duration=60.0,
level="warning",
message_template="CPU使用率过高: {value:.1f}% (阈值: {threshold}%)"
))
monitoring.add_alert_rule(AlertRule(
name="高内存使用率",
metric_name="system.memory.percent",
condition=">",
threshold=85.0,
duration=120.0,
level="error",
message_template="内存使用率过高: {value:.1f}% (阈值: {threshold}%)"
))
monitoring.add_alert_rule(AlertRule(
name="高错误率",
metric_name="crawler.error_rate",
condition=">",
threshold=0.1,
duration=300.0,
level="critical",
message_template="爬虫错误率过高: {value:.2%} (阈值: {threshold:.2%})"
))
# 添加告警处理器
webhook_handler = WebhookAlertHandler("https://hooks.slack.com/services/YOUR/WEBHOOK/URL")
monitoring.add_alert_handler(webhook_handler)
# 启动监控
await monitoring.start()
try:
# 模拟运行
for i in range(100):
# 模拟指标数据
import random
monitoring.record_metric("system.cpu.percent", random.uniform(60, 90))
monitoring.record_metric("system.memory.percent", random.uniform(70, 95))
monitoring.record_metric("crawler.error_rate", random.uniform(0.05, 0.15))
# 获取仪表板数据
if i % 10 == 0:
dashboard = monitoring.get_dashboard_data()
print(f"仪表板数据: {json.dumps(dashboard, indent=2, ensure_ascii=False)}")
await asyncio.sleep(1)
finally:
await monitoring.stop()
if __name__ == "__main__":
asyncio.run(main())# monitoring_dashboard.py
from flask import Flask, render_template, jsonify, request
import json
import time
from datetime import datetime, timedelta
from typing importDict, Any, List
import threading
classMonitoringDashboard:
"""监控仪表板"""
def__init__(self, monitoring_system, host: str = "0.0.0.0", port: int = 8080):
self.monitoring_system = monitoring_system
self.host = host
self.port = port
self.app = Flask(__name__)
self.setup_routes()
defsetup_routes(self):
"""设置路由"""
@self.app.route('/')
defdashboard():
"""主仪表板页面"""
return render_template('dashboard.html')
@self.app.route('/api/metrics')
defget_metrics():
"""获取指标数据API"""
return jsonify(self.monitoring_system.get_dashboard_data())
@self.app.route('/api/metrics/<metric_name>')
defget_metric_history(metric_name):
"""获取指标历史数据API"""
duration = request.args.get('duration', 3600, type=int)
history = self.monitoring_system.metrics_collector.get_metric_history(metric_name, duration)
return jsonify({
'metric_name': metric_name,
'duration': duration,
'data': [point.to_dict() for point in history]
})
@self.app.route('/api/alerts')
defget_alerts():
"""获取告警数据API"""
active_alerts = self.monitoring_system.alert_manager.get_active_alerts()
return jsonify([alert.to_dict() for alert in active_alerts])
@self.app.route('/api/alerts/<alert_id>/resolve', methods=['POST'])
defresolve_alert(alert_id):
"""解决告警API"""
self.monitoring_system.alert_manager.resolve_alert(alert_id)
return jsonify({'status': 'success', 'message': f'告警 {alert_id} 已解决'})
@self.app.route('/api/stats')
defget_stats():
"""获取统计信息API"""
stats = {}
# 系统指标统计
for metric_name in ['system.cpu.percent', 'system.memory.percent', 'system.disk.percent']:
metric_stats = self.monitoring_system.metrics_collector.get_metric_stats(metric_name)
if metric_stats:
stats[metric_name] = metric_stats
# 爬虫指标统计
for metric_name in ['crawler.requests_total', 'crawler.error_rate', 'crawler.response_time_avg']:
metric_stats = self.monitoring_system.metrics_collector.get_metric_stats(metric_name)
if metric_stats:
stats[metric_name] = metric_stats
return jsonify(stats)
defrun(self, debug: bool = False):
"""运行仪表板"""
self.app.run(host=self.host, port=self.port, debug=debug, threaded=True)
defrun_in_thread(self):
"""在线程中运行仪表板"""
thread = threading.Thread(target=self.run, kwargs={'debug': False})
thread.daemon = True
thread.start()
return thread
# HTML模板
DASHBOARD_TEMPLATE = '''
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>爬虫监控仪表板</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 20px;
background-color: #f5f5f5;
}
.container {
max-width: 1200px;
margin: 0 auto;
}
.header {
background: white;
padding: 20px;
border-radius: 8px;
margin-bottom: 20px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.metrics-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
gap: 20px;
margin-bottom: 20px;
}
.metric-card {
background: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.metric-value {
font-size: 2em;
font-weight: bold;
color: #333;
}
.metric-label {
color: #666;
margin-bottom: 10px;
}
.chart-container {
background: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
margin-bottom: 20px;
}
.alerts-container {
background: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.alert {
padding: 10px;
margin: 10px 0;
border-radius: 4px;
border-left: 4px solid;
}
.alert.warning {
background-color: #fff3cd;
border-color: #ffc107;
}
.alert.error {
background-color: #f8d7da;
border-color: #dc3545;
}
.alert.critical {
background-color: #f5c6cb;
border-color: #721c24;
}
.status-indicator {
display: inline-block;
width: 12px;
height: 12px;
border-radius: 50%;
margin-right: 8px;
}
.status-good { background-color: #28a745; }
.status-warning { background-color: #ffc107; }
.status-error { background-color: #dc3545; }
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>爬虫监控仪表板</h1>
<p>实时监控系统性能和爬虫状态</p>
<div id="last-update">最后更新: --</div>
</div>
<div class="metrics-grid">
<div class="metric-card">
<div class="metric-label">
<span class="status-indicator" id="cpu-status"></span>
CPU使用率
</div>
<div class="metric-value" id="cpu-value">--%</div>
</div>
<div class="metric-card">
<div class="metric-label">
<span class="status-indicator" id="memory-status"></span>
内存使用率
</div>
<div class="metric-value" id="memory-value">--%</div>
</div>
<div class="metric-card">
<div class="metric-label">
<span class="status-indicator" id="error-status"></span>
错误率
</div>
<div class="metric-value" id="error-value">--%</div>
</div>
<div class="metric-card">
<div class="metric-label">
<span class="status-indicator" id="queue-status"></span>
队列大小
</div>
<div class="metric-value" id="queue-value">--</div>
</div>
</div>
<div class="chart-container">
<h3>系统性能趋势</h3>
<canvas id="performance-chart" width="400" height="200"></canvas>
</div>
<div class="alerts-container">
<h3>活跃告警</h3>
<div id="alerts-list">
<p>暂无告警</p>
</div>
</div>
</div>
<script>
// 图表配置
const ctx = document.getElementById('performance-chart').getContext('2d');
const chart = new Chart(ctx, {
type: 'line',
data: {
labels: [],
datasets: [
{
label: 'CPU使用率 (%)',
data: [],
borderColor: 'rgb(255, 99, 132)',
backgroundColor: 'rgba(255, 99, 132, 0.2)',
tension: 0.1
},
{
label: '内存使用率 (%)',
data: [],
borderColor: 'rgb(54, 162, 235)',
backgroundColor: 'rgba(54, 162, 235, 0.2)',
tension: 0.1
}
]
},
options: {
responsive: true,
scales: {
y: {
beginAtZero: true,
max: 100
}
}
}
});
// 更新指标显示
function updateMetrics(data) {
const systemMetrics = data.system_metrics || {};
const crawlerMetrics = data.crawler_metrics || {};
// CPU
const cpuValue = systemMetrics['system.cpu.percent'] || 0;
document.getElementById('cpu-value').textContent = cpuValue.toFixed(1) + '%';
document.getElementById('cpu-status').className =
'status-indicator ' + (cpuValue > 80 ? 'status-error' : cpuValue > 60 ? 'status-warning' : 'status-good');
// 内存
const memoryValue = systemMetrics['system.memory.percent'] || 0;
document.getElementById('memory-value').textContent = memoryValue.toFixed(1) + '%';
document.getElementById('memory-status').className =
'status-indicator ' + (memoryValue > 85 ? 'status-error' : memoryValue > 70 ? 'status-warning' : 'status-good');
// 错误率
const errorValue = (crawlerMetrics['crawler.error_rate'] || 0) * 100;
document.getElementById('error-value').textContent = errorValue.toFixed(2) + '%';
document.getElementById('error-status').className =
'status-indicator ' + (errorValue > 10 ? 'status-error' : errorValue > 5 ? 'status-warning' : 'status-good');
// 队列大小
const queueValue = crawlerMetrics['crawler.queue_size'] || 0;
document.getElementById('queue-value').textContent = queueValue.toString();
document.getElementById('queue-status').className =
'status-indicator ' + (queueValue > 1000 ? 'status-error' : queueValue > 500 ? 'status-warning' : 'status-good');
// 更新时间
document.getElementById('last-update').textContent =
'最后更新: ' + new Date().toLocaleString();
}
// 更新图表
function updateChart(cpuHistory, memoryHistory) {
const labels = cpuHistory.map(point =>
new Date(point.timestamp * 1000).toLocaleTimeString()
).slice(-20);
chart.data.labels = labels;
chart.data.datasets[0].data = cpuHistory.map(point => point.value).slice(-20);
chart.data.datasets[1].data = memoryHistory.map(point => point.value).slice(-20);
chart.update();
}
// 更新告警
function updateAlerts(alerts) {
const alertsList = document.getElementById('alerts-list');
if (alerts.length === 0) {
alertsList.innerHTML = '<p>暂无告警</p>';
return;
}
alertsList.innerHTML = alerts.map(alert => `
<div class="alert ${alert.level}">
<strong>${alert.title}</strong><br>
${alert.message}<br>
<small>时间: ${new Date(alert.timestamp * 1000).toLocaleString()}</small>
</div>
`).join('');
}
// 获取数据
async function fetchData() {
try {
// 获取当前指标
const metricsResponse = await fetch('/api/metrics');
const metricsData = await metricsResponse.json();
updateMetrics(metricsData);
// 获取历史数据
const cpuResponse = await fetch('/api/metrics/system.cpu.percent?duration=1200');
const cpuData = await cpuResponse.json();
const memoryResponse = await fetch('/api/metrics/system.memory.percent?duration=1200');
const memoryData = await memoryResponse.json();
updateChart(cpuData.data, memoryData.data);
// 获取告警
const alertsResponse = await fetch('/api/alerts');
const alertsData = await alertsResponse.json();
updateAlerts(alertsData);
} catch (error) {
console.error('获取数据失败:', error);
}
}
// 定期更新
fetchData();
setInterval(fetchData, 5000);
</script>
</body>
</html>
'''
# 创建模板文件
import os
defcreate_dashboard_template():
"""创建仪表板模板文件"""
templates_dir = 'templates'
ifnot os.path.exists(templates_dir):
os.makedirs(templates_dir)
withopen(os.path.join(templates_dir, 'dashboard.html'), 'w', encoding='utf-8') as f:
f.write(DASHBOARD_TEMPLATE)
if __name__ == "__main__":
# 创建模板文件
create_dashboard_template()
# 这里需要传入实际的监控系统实例
# dashboard = MonitoringDashboard(monitoring_system)
# dashboard.run(debug=True)
print("仪表板模板已创建,请配合监控系统使用")# high_availability.py
import asyncio
import aiohttp
import time
import random
import logging
from typing importList, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import json
import hashlib
classNodeStatus(Enum):
"""节点状态"""
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
MAINTENANCE = "maintenance"
@dataclass
classCrawlerNode:
"""爬虫节点"""
id: str
host: str
port: int
weight: int = 1
status: NodeStatus = NodeStatus.HEALTHY
last_check: float = field(default_factory=time.time)
response_time: float = 0.0
error_count: int = 0
success_count: int = 0
current_load: int = 0
max_load: int = 100
@property
defurl(self) -> str:
returnf"http://{self.host}:{self.port}"
@property
defhealth_score(self) -> float:
"""计算健康分数"""
ifself.status != NodeStatus.HEALTHY:
return0.0
total_requests = self.success_count + self.error_count
if total_requests == 0:
return1.0
success_rate = self.success_count / total_requests
load_factor = 1.0 - (self.current_load / self.max_load)
response_factor = max(0.1, 1.0 - (self.response_time / 5.0)) # 5秒为基准
return success_rate * load_factor * response_factor
defto_dict(self) -> Dict[str, Any]:
return {
'id': self.id,
'host': self.host,
'port': self.port,
'weight': self.weight,
'status': self.status.value,
'response_time': self.response_time,
'error_count': self.error_count,
'success_count': self.success_count,
'current_load': self.current_load,
'max_load': self.max_load,
'health_score': self.health_score
}
classLoadBalancer:
"""负载均衡器"""
def__init__(self, health_check_interval: float = 30.0):
self.nodes: List[CrawlerNode] = []
self.health_check_interval = health_check_interval
self.running = False
self.logger = logging.getLogger(__name__)
self._health_check_task = None
defadd_node(self, node: CrawlerNode):
"""添加节点"""
self.nodes.append(node)
self.logger.info(f"添加节点: {node.id} ({node.url})")
defremove_node(self, node_id: str):
"""移除节点"""
self.nodes = [node for node inself.nodes if node.id != node_id]
self.logger.info(f"移除节点: {node_id}")
defget_healthy_nodes(self) -> List[CrawlerNode]:
"""获取健康节点"""
return [node for node inself.nodes if node.status == NodeStatus.HEALTHY]
defselect_node_round_robin(self) -> Optional[CrawlerNode]:
"""轮询选择节点"""
healthy_nodes = self.get_healthy_nodes()
ifnot healthy_nodes:
returnNone
# 简单轮询实现
ifnothasattr(self, '_round_robin_index'):
self._round_robin_index = 0
node = healthy_nodes[self._round_robin_index % len(healthy_nodes)]
self._round_robin_index += 1
return node
defselect_node_weighted(self) -> Optional[CrawlerNode]:
"""加权选择节点"""
healthy_nodes = self.get_healthy_nodes()
ifnot healthy_nodes:
returnNone
# 计算总权重
total_weight = sum(node.weight for node in healthy_nodes)
if total_weight == 0:
return random.choice(healthy_nodes)
# 加权随机选择
rand_weight = random.uniform(0, total_weight)
current_weight = 0
for node in healthy_nodes:
current_weight += node.weight
if current_weight >= rand_weight:
return node
return healthy_nodes[-1]
defselect_node_least_connections(self) -> Optional[CrawlerNode]:
"""最少连接选择节点"""
healthy_nodes = self.get_healthy_nodes()
ifnot healthy_nodes:
returnNone
returnmin(healthy_nodes, key=lambda node: node.current_load)
defselect_node_health_based(self) -> Optional[CrawlerNode]:
"""基于健康分数选择节点"""
healthy_nodes = self.get_healthy_nodes()
ifnot healthy_nodes:
returnNone
# 按健康分数排序,选择最佳节点
sorted_nodes = sorted(healthy_nodes, key=lambda node: node.health_score, reverse=True)
# 在前几个最佳节点中随机选择
top_nodes = sorted_nodes[:min(3, len(sorted_nodes))]
return random.choice(top_nodes)
asyncdefhealth_check(self, node: CrawlerNode) -> bool:
"""健康检查"""
try:
start_time = time.time()
asyncwith aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=5)) as session:
asyncwith session.get(f"{node.url}/health") as response:
response_time = time.time() - start_time
node.response_time = response_time
node.last_check = time.time()
if response.status == 200:
data = await response.json()
node.current_load = data.get('current_load', 0)
node.status = NodeStatus.HEALTHY
node.success_count += 1
returnTrue
else:
node.error_count += 1
node.status = NodeStatus.UNHEALTHY
returnFalse
except Exception as e:
self.logger.warning(f"节点 {node.id} 健康检查失败: {e}")
node.error_count += 1
node.status = NodeStatus.UNHEALTHY
node.last_check = time.time()
returnFalse
asyncdefstart_health_checks(self):
"""启动健康检查"""
self.running = True
whileself.running:
try:
# 并发检查所有节点
tasks = [self.health_check(node) for node inself.nodes]
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
await asyncio.sleep(self.health_check_interval)
except Exception as e:
self.logger.error(f"健康检查错误: {e}")
await asyncio.sleep(self.health_check_interval)
asyncdefstart(self):
"""启动负载均衡器"""
ifself.running:
return
self._health_check_task = asyncio.create_task(self.start_health_checks())
self.logger.info("负载均衡器已启动")
asyncdefstop(self):
"""停止负载均衡器"""
self.running = False
ifself._health_check_task:
self._health_check_task.cancel()
try:
awaitself._health_check_task
except asyncio.CancelledError:
pass
self.logger.info("负载均衡器已停止")
defget_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
healthy_count = len(self.get_healthy_nodes())
total_count = len(self.nodes)
return {
'total_nodes': total_count,
'healthy_nodes': healthy_count,
'unhealthy_nodes': total_count - healthy_count,
'nodes': [node.to_dict() for node inself.nodes]
}
classFailoverManager:
"""故障转移管理器"""
def__init__(self, load_balancer: LoadBalancer, max_retries: int = 3):
self.load_balancer = load_balancer
self.max_retries = max_retries
self.logger = logging.getLogger(__name__)
asyncdefexecute_with_failover(self,
task_func: Callable,
*args,
**kwargs) -> Any:
"""带故障转移的任务执行"""
last_exception = None
for attempt inrange(self.max_retries):
# 选择节点
node = self.load_balancer.select_node_health_based()
ifnot node:
raise Exception("没有可用的健康节点")
try:
# 增加节点负载
node.current_load += 1
# 执行任务
result = await task_func(node, *args, **kwargs)
# 任务成功
node.success_count += 1
return result
except Exception as e:
last_exception = e
node.error_count += 1
self.logger.warning(
f"节点 {node.id} 执行失败 (尝试 {attempt + 1}/{self.max_retries}): {e}"
)
# 如果错误率过高,标记节点为不健康
total_requests = node.success_count + node.error_count
if total_requests > 10:
error_rate = node.error_count / total_requests
if error_rate > 0.5: # 错误率超过50%
node.status = NodeStatus.UNHEALTHY
self.logger.warning(f"节点 {node.id} 错误率过高,标记为不健康")
finally:
# 减少节点负载
node.current_load = max(0, node.current_load - 1)
# 所有重试都失败
raise Exception(f"所有节点都失败了,最后错误: {last_exception}")
classDistributedCrawlerManager:
"""分布式爬虫管理器"""
def__init__(self):
self.load_balancer = LoadBalancer()
self.failover_manager = FailoverManager(self.load_balancer)
self.task_queue = asyncio.Queue()
self.results = []
self.running = False
self.worker_tasks = []
self.logger = logging.getLogger(__name__)
defadd_crawler_node(self, node_id: str, host: str, port: int, weight: int = 1):
"""添加爬虫节点"""
node = CrawlerNode(
id=node_id,
host=host,
port=port,
weight=weight
)
self.load_balancer.add_node(node)
asyncdefsubmit_task(self, url: str, task_data: Dict[str, Any] = None):
"""提交爬取任务"""
task = {
'id': hashlib.md5(f"{url}{time.time()}".encode()).hexdigest(),
'url': url,
'data': task_data or {},
'timestamp': time.time()
}
awaitself.task_queue.put(task)
asyncdefexecute_crawl_task(self, node: CrawlerNode, task: Dict[str, Any]) -> Dict[str, Any]:
"""在指定节点执行爬取任务"""
asyncwith aiohttp.ClientSession() as session:
asyncwith session.post(
f"{node.url}/crawl",
json=task,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 200:
result = await response.json()
return {
'task_id': task['id'],
'node_id': node.id,
'status': 'success',
'result': result,
'timestamp': time.time()
}
else:
raise Exception(f"HTTP {response.status}: {await response.text()}")
asyncdefworker(self):
"""工作协程"""
whileself.running:
try:
# 获取任务
task = await asyncio.wait_for(self.task_queue.get(), timeout=1.0)
# 执行任务
result = awaitself.failover_manager.execute_with_failover(
self.execute_crawl_task, task
)
# 保存结果
self.results.append(result)
self.logger.info(f"任务 {task['id']} 完成,节点: {result['node_id']}")
except asyncio.TimeoutError:
continue
except Exception as e:
self.logger.error(f"工作协程错误: {e}")
asyncdefstart(self, num_workers: int = 5):
"""启动分布式爬虫管理器"""
ifself.running:
return
self.running = True
# 启动负载均衡器
awaitself.load_balancer.start()
# 启动工作协程
for i inrange(num_workers):
task = asyncio.create_task(self.worker())
self.worker_tasks.append(task)
self.logger.info(f"分布式爬虫管理器已启动,工作协程数: {num_workers}")
asyncdefstop(self):
"""停止分布式爬虫管理器"""
self.running = False
# 停止负载均衡器
awaitself.load_balancer.stop()
# 停止工作协程
for task inself.worker_tasks:
task.cancel()
ifself.worker_tasks:
await asyncio.gather(*self.worker_tasks, return_exceptions=True)
self.worker_tasks.clear()
self.logger.info("分布式爬虫管理器已停止")
defget_status(self) -> Dict[str, Any]:
"""获取状态信息"""
return {
'running': self.running,
'queue_size': self.task_queue.qsize(),
'completed_tasks': len(self.results),
'load_balancer': self.load_balancer.get_stats()
}
# 使用示例
asyncdefmain():
# 配置日志
logging.basicConfig(level=logging.INFO)
# 创建分布式爬虫管理器
manager = DistributedCrawlerManager()
# 添加爬虫节点
manager.add_crawler_node("node1", "localhost", 8001, weight=2)
manager.add_crawler_node("node2", "localhost", 8002, weight=1)
manager.add_crawler_node("node3", "localhost", 8003, weight=1)
# 启动管理器
await manager.start(num_workers=3)
try:
# 提交任务
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
"https://example.com/page4",
"https://example.com/page5"
]
for url in urls:
await manager.submit_task(url, {'timeout': 10})
# 等待任务完成
while manager.task_queue.qsize() > 0orlen(manager.results) < len(urls):
status = manager.get_status()
print(f"状态: {json.dumps(status, indent=2, ensure_ascii=False)}")
await asyncio.sleep(2)
print(f"所有任务完成,结果数量: {len(manager.results)}")
finally:
await manager.stop()
if __name__ == "__main__":
asyncio.run(main())# data_consistency.py
import asyncio
import aioredis
import json
import time
import hashlib
from typing importDict, Any, List, Optional, Set
from dataclasses import dataclass, field
from enum import Enum
import logging
classTransactionStatus(Enum):
"""事务状态"""
PENDING = "pending"
COMMITTED = "committed"
ABORTED = "aborted"
@dataclass
classTransaction:
"""分布式事务"""
id: str
operations: List[Dict[str, Any]] = field(default_factory=list)
status: TransactionStatus = TransactionStatus.PENDING
timestamp: float = field(default_factory=time.time)
participants: Set[str] = field(default_factory=set)
defto_dict(self) -> Dict[str, Any]:
return {
'id': self.id,
'operations': self.operations,
'status': self.status.value,
'timestamp': self.timestamp,
'participants': list(self.participants)
}
classDistributedLock:
"""分布式锁"""
def__init__(self, redis_client, key: str, timeout: float = 30.0):
self.redis = redis_client
self.key = f"lock:{key}"
self.timeout = timeout
self.identifier = hashlib.md5(f"{time.time()}{id(self)}".encode()).hexdigest()
self.logger = logging.getLogger(__name__)
asyncdefacquire(self, blocking: bool = True, timeout: Optional[float] = None) -> bool:
"""获取锁"""
end_time = time.time() + (timeout orself.timeout)
whileTrue:
# 尝试获取锁
result = awaitself.redis.set(
self.key,
self.identifier,
ex=int(self.timeout),
nx=True
)
if result:
self.logger.debug(f"获取锁成功: {self.key}")
returnTrue
ifnot blocking or time.time() > end_time:
returnFalse
await asyncio.sleep(0.1)
asyncdefrelease(self) -> bool:
"""释放锁"""
# Lua脚本确保原子性
lua_script = """
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
"""
result = awaitself.redis.eval(lua_script, 1, self.key, self.identifier)
if result:
self.logger.debug(f"释放锁成功: {self.key}")
returnTrue
else:
self.logger.warning(f"释放锁失败: {self.key}")
returnFalse
asyncdef__aenter__(self):
"""异步上下文管理器入口"""
ifnotawaitself.acquire():
raise Exception(f"无法获取锁: {self.key}")
returnself
asyncdef__aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
awaitself.release()
classConsistencyManager:
"""数据一致性管理器"""
def__init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_url = redis_url
self.redis = None
self.transactions: Dict[str, Transaction] = {}
self.logger = logging.getLogger(__name__)
asyncdefconnect(self):
"""连接Redis"""
self.redis = await aioredis.from_url(self.redis_url)
self.logger.info("连接到Redis成功")
asyncdefdisconnect(self):
"""断开Redis连接"""
ifself.redis:
awaitself.redis.close()
self.logger.info("断开Redis连接")
defcreate_lock(self, key: str, timeout: float = 30.0) -> DistributedLock:
"""创建分布式锁"""
return DistributedLock(self.redis, key, timeout)
asyncdefbegin_transaction(self, transaction_id: str) -> Transaction:
"""开始事务"""
transaction = Transaction(id=transaction_id)
self.transactions[transaction_id] = transaction
# 保存到Redis
awaitself.redis.hset(
"transactions",
transaction_id,
json.dumps(transaction.to_dict())
)
self.logger.info(f"开始事务: {transaction_id}")
return transaction
asyncdefadd_operation(self,
transaction_id: str,
operation_type: str,
data: Dict[str, Any],
participant: str):
"""添加事务操作"""
if transaction_id notinself.transactions:
raise Exception(f"事务不存在: {transaction_id}")
transaction = self.transactions[transaction_id]
if transaction.status != TransactionStatus.PENDING:
raise Exception(f"事务状态不正确: {transaction.status}")
operation = {
'type': operation_type,
'data': data,
'participant': participant,
'timestamp': time.time()
}
transaction.operations.append(operation)
transaction.participants.add(participant)
# 更新Redis
awaitself.redis.hset(
"transactions",
transaction_id,
json.dumps(transaction.to_dict())
)
self.logger.debug(f"添加操作到事务 {transaction_id}: {operation_type}")
asyncdefprepare_transaction(self, transaction_id: str) -> bool:
"""准备事务(两阶段提交的第一阶段)"""
if transaction_id notinself.transactions:
returnFalse
transaction = self.transactions[transaction_id]
# 向所有参与者发送准备请求
prepare_results = []
for participant in transaction.participants:
try:
# 这里应该调用参与者的prepare接口
# 简化实现,假设都成功
result = awaitself._send_prepare_request(participant, transaction)
prepare_results.append(result)
except Exception as e:
self.logger.error(f"参与者 {participant} 准备失败: {e}")
prepare_results.append(False)
# 所有参与者都准备成功才能提交
can_commit = all(prepare_results)
if can_commit:
self.logger.info(f"事务 {transaction_id} 准备成功")
else:
self.logger.warning(f"事务 {transaction_id} 准备失败")
return can_commit
asyncdefcommit_transaction(self, transaction_id: str) -> bool:
"""提交事务"""
if transaction_id notinself.transactions:
returnFalse
transaction = self.transactions[transaction_id]
try:
# 向所有参与者发送提交请求
for participant in transaction.participants:
awaitself._send_commit_request(participant, transaction)
# 更新事务状态
transaction.status = TransactionStatus.COMMITTED
awaitself.redis.hset(
"transactions",
transaction_id,
json.dumps(transaction.to_dict())
)
self.logger.info(f"事务 {transaction_id} 提交成功")
returnTrue
except Exception as e:
self.logger.error(f"事务 {transaction_id} 提交失败: {e}")
awaitself.abort_transaction(transaction_id)
returnFalse
asyncdefabort_transaction(self, transaction_id: str) -> bool:
"""中止事务"""
if transaction_id notinself.transactions:
returnFalse
transaction = self.transactions[transaction_id]
try:
# 向所有参与者发送中止请求
for participant in transaction.participants:
awaitself._send_abort_request(participant, transaction)
# 更新事务状态
transaction.status = TransactionStatus.ABORTED
awaitself.redis.hset(
"transactions",
transaction_id,
json.dumps(transaction.to_dict())
)
self.logger.info(f"事务 {transaction_id} 已中止")
returnTrue
except Exception as e:
self.logger.error(f"事务 {transaction_id} 中止失败: {e}")
returnFalse
asyncdef_send_prepare_request(self, participant: str, transaction: Transaction) -> bool:
"""发送准备请求(模拟实现)"""
# 这里应该实际调用参与者的API
await asyncio.sleep(0.1) # 模拟网络延迟
returnTrue
asyncdef_send_commit_request(self, participant: str, transaction: Transaction):
"""发送提交请求(模拟实现)"""
# 这里应该实际调用参与者的API
await asyncio.sleep(0.1) # 模拟网络延迟
asyncdef_send_abort_request(self, participant: str, transaction: Transaction):
"""发送中止请求(模拟实现)"""
# 这里应该实际调用参与者的API
await asyncio.sleep(0.1) # 模拟网络延迟
asyncdefexecute_with_consistency(self,
operations: List[Dict[str, Any]],
transaction_id: Optional[str] = None) -> bool:
"""执行一致性操作"""
ifnot transaction_id:
transaction_id = hashlib.md5(f"{time.time()}{len(operations)}".encode()).hexdigest()
# 开始事务
transaction = awaitself.begin_transaction(transaction_id)
try:
# 添加所有操作
for operation in operations:
awaitself.add_operation(
transaction_id,
operation['type'],
operation['data'],
operation['participant']
)
# 两阶段提交
ifawaitself.prepare_transaction(transaction_id):
returnawaitself.commit_transaction(transaction_id)
else:
awaitself.abort_transaction(transaction_id)
returnFalse
except Exception as e:
self.logger.error(f"执行一致性操作失败: {e}")
awaitself.abort_transaction(transaction_id)
returnFalse
asyncdefensure_data_consistency(self,
key: str,
update_func: callable,
*args, **kwargs) -> Any:
"""确保数据一致性的更新操作"""
asyncwithself.create_lock(key):
# 在锁保护下执行更新操作
returnawait update_func(*args, **kwargs)
# 使用示例
asyncdefmain():
# 配置日志
logging.basicConfig(level=logging.INFO)
# 创建一致性管理器
consistency_manager = ConsistencyManager()
await consistency_manager.connect()
try:
# 测试分布式锁
asyncwith consistency_manager.create_lock("test_resource"):
print("获取锁成功,执行关键操作...")
await asyncio.sleep(1)
print("关键操作完成")
# 测试分布式事务
operations = [
{
'type': 'insert',
'data': {'url': 'https://example.com/1', 'status': 'pending'},
'participant': 'node1'
},
{
'type': 'update',
'data': {'url': 'https://example.com/1', 'status': 'processing'},
'participant': 'node2'
}
]
success = await consistency_manager.execute_with_consistency(operations)
print(f"事务执行结果: {success}")
finally:
await consistency_manager.disconnect()
if __name__ == "__main__":
asyncio.run(main())