当前位置:首页>python>Python爬虫第13课:爬虫性能优化与监控

Python爬虫第13课:爬虫性能优化与监控

  • 2026-03-07 03:49:19
Python爬虫第13课:爬虫性能优化与监控

Python爬虫第13课:爬虫性能优化与监控

课程目标

  • 掌握爬虫性能瓶颈分析方法
  • 学习系统监控与告警机制
  • 实现资源使用优化策略
  • 构建高可用爬虫架构

1. 性能瓶颈分析

1.1 性能指标监控

# performance_monitor.py
import time
import psutil
import threading
import logging
from typing importDictListAnyOptional
from dataclasses import dataclass, asdict
from collections import deque
import json
import statistics
from datetime import datetime, timedelta

@dataclass
classPerformanceMetrics:
"""性能指标"""
    timestamp: float
    cpu_percent: float
    memory_percent: float
    memory_used_mb: float
    disk_io_read_mb: float
    disk_io_write_mb: float
    network_sent_mb: float
    network_recv_mb: float
    active_threads: int
    requests_per_second: float
    response_time_avg: float
    error_rate: float
    queue_size: int = 0

defto_dict(self) -> Dict[strAny]:
return asdict(self)

classSystemMonitor:
"""系统性能监控器"""

def__init__(self, collection_interval: float = 1.0, max_history: int = 3600):
self.collection_interval = collection_interval
self.max_history = max_history
self.metrics_history = deque(maxlen=max_history)
self.running = False
self.monitor_thread = None
self.logger = logging.getLogger(__name__)

# 网络和磁盘IO基准值
self.last_net_io = psutil.net_io_counters()
self.last_disk_io = psutil.disk_io_counters()
self.last_check_time = time.time()

# 请求统计
self.request_count = 0
self.response_times = deque(maxlen=1000)
self.error_count = 0
self.last_request_reset = time.time()

defrecord_request(self, response_time: float, is_error: bool = False):
"""记录请求"""
self.request_count += 1
self.response_times.append(response_time)
if is_error:
self.error_count += 1

defcollect_metrics(self) -> PerformanceMetrics:
"""收集性能指标"""
        current_time = time.time()

# CPU和内存
        cpu_percent = psutil.cpu_percent()
        memory = psutil.virtual_memory()
        memory_percent = memory.percent
        memory_used_mb = memory.used / (1024 * 1024)

# 磁盘IO
        current_disk_io = psutil.disk_io_counters()
        time_delta = current_time - self.last_check_time

if time_delta > 0:
            disk_read_mb = (current_disk_io.read_bytes - self.last_disk_io.read_bytes) / (1024 * 1024) / time_delta
            disk_write_mb = (current_disk_io.write_bytes - self.last_disk_io.write_bytes) / (1024 * 1024) / time_delta
else:
            disk_read_mb = disk_write_mb = 0

# 网络IO
        current_net_io = psutil.net_io_counters()
if time_delta > 0:
            net_sent_mb = (current_net_io.bytes_sent - self.last_net_io.bytes_sent) / (1024 * 1024) / time_delta
            net_recv_mb = (current_net_io.bytes_recv - self.last_net_io.bytes_recv) / (1024 * 1024) / time_delta
else:
            net_sent_mb = net_recv_mb = 0

# 线程数
        active_threads = threading.active_count()

# 请求统计
        request_time_delta = current_time - self.last_request_reset
if request_time_delta > 0:
            requests_per_second = self.request_count / request_time_delta
else:
            requests_per_second = 0

# 响应时间
        response_time_avg = statistics.mean(self.response_times) ifself.response_times else0

# 错误率
        error_rate = (self.error_count / self.request_count) ifself.request_count > 0else0

# 更新基准值
self.last_disk_io = current_disk_io
self.last_net_io = current_net_io
self.last_check_time = current_time

# 重置请求统计(每分钟重置一次)
if request_time_delta >= 60:
self.request_count = 0
self.error_count = 0
self.last_request_reset = current_time

return PerformanceMetrics(
            timestamp=current_time,
            cpu_percent=cpu_percent,
            memory_percent=memory_percent,
            memory_used_mb=memory_used_mb,
            disk_io_read_mb=disk_read_mb,
            disk_io_write_mb=disk_write_mb,
            network_sent_mb=net_sent_mb,
            network_recv_mb=net_recv_mb,
            active_threads=active_threads,
            requests_per_second=requests_per_second,
            response_time_avg=response_time_avg,
            error_rate=error_rate
        )

defmonitor_loop(self):
"""监控循环"""
whileself.running:
try:
                metrics = self.collect_metrics()
self.metrics_history.append(metrics)

# 检查异常情况
self.check_alerts(metrics)

                time.sleep(self.collection_interval)

except Exception as e:
self.logger.error(f"监控循环错误: {e}")
                time.sleep(self.collection_interval)

defcheck_alerts(self, metrics: PerformanceMetrics):
"""检查告警条件"""
        alerts = []

# CPU使用率告警
if metrics.cpu_percent > 80:
            alerts.append(f"CPU使用率过高: {metrics.cpu_percent:.1f}%")

# 内存使用率告警
if metrics.memory_percent > 85:
            alerts.append(f"内存使用率过高: {metrics.memory_percent:.1f}%")

# 响应时间告警
if metrics.response_time_avg > 5.0:
            alerts.append(f"平均响应时间过长: {metrics.response_time_avg:.2f}s")

# 错误率告警
if metrics.error_rate > 0.1:
            alerts.append(f"错误率过高: {metrics.error_rate:.2%}")

# 线程数告警
if metrics.active_threads > 100:
            alerts.append(f"活跃线程数过多: {metrics.active_threads}")

for alert in alerts:
self.logger.warning(f"性能告警: {alert}")

defstart(self):
"""启动监控"""
ifself.running:
return

self.running = True
self.monitor_thread = threading.Thread(target=self.monitor_loop)
self.monitor_thread.start()
self.logger.info("性能监控已启动")

defstop(self):
"""停止监控"""
self.running = False
ifself.monitor_thread:
self.monitor_thread.join()
self.logger.info("性能监控已停止")

defget_current_metrics(self) -> Optional[PerformanceMetrics]:
"""获取当前指标"""
ifself.metrics_history:
returnself.metrics_history[-1]
returnNone

defget_metrics_summary(self, minutes: int = 10) -> Dict[strAny]:
"""获取指标摘要"""
ifnotself.metrics_history:
return {}

# 获取最近N分钟的数据
        cutoff_time = time.time() - (minutes * 60)
        recent_metrics = [m for m inself.metrics_history if m.timestamp >= cutoff_time]

ifnot recent_metrics:
return {}

# 计算统计信息
        cpu_values = [m.cpu_percent for m in recent_metrics]
        memory_values = [m.memory_percent for m in recent_metrics]
        response_times = [m.response_time_avg for m in recent_metrics if m.response_time_avg > 0]
        error_rates = [m.error_rate for m in recent_metrics]

return {
'time_range_minutes': minutes,
'sample_count'len(recent_metrics),
'cpu': {
'avg': statistics.mean(cpu_values),
'max'max(cpu_values),
'min'min(cpu_values)
            },
'memory': {
'avg': statistics.mean(memory_values),
'max'max(memory_values),
'min'min(memory_values)
            },
'response_time': {
'avg': statistics.mean(response_times) if response_times else0,
'max'max(response_times) if response_times else0,
'min'min(response_times) if response_times else0
            },
'error_rate': {
'avg': statistics.mean(error_rates),
'max'max(error_rates),
'min'min(error_rates)
            }
        }

defexport_metrics(self, filename: str, hours: int = 1):
"""导出指标数据"""
        cutoff_time = time.time() - (hours * 3600)
        export_data = [
            m.to_dict() for m inself.metrics_history 
if m.timestamp >= cutoff_time
        ]

withopen(filename, 'w', encoding='utf-8'as f:
            json.dump(export_data, f, indent=2)

self.logger.info(f"导出了 {len(export_data)} 条指标数据到 {filename}")

classBottleneckAnalyzer:
"""性能瓶颈分析器"""

def__init__(self, monitor: SystemMonitor):
self.monitor = monitor
self.logger = logging.getLogger(__name__)

defanalyze_bottlenecks(self, analysis_minutes: int = 30) -> Dict[strAny]:
"""分析性能瓶颈"""
        summary = self.monitor.get_metrics_summary(analysis_minutes)

ifnot summary:
return {'error''没有足够的监控数据'}

        bottlenecks = []
        recommendations = []

# CPU瓶颈分析
if summary['cpu']['avg'] > 70:
            bottlenecks.append({
'type''CPU',
'severity''high'if summary['cpu']['avg'] > 85else'medium',
'description'f"CPU平均使用率 {summary['cpu']['avg']:.1f}%",
'max_value': summary['cpu']['max']
            })
            recommendations.extend([
"考虑增加并发控制,减少同时运行的任务数",
"优化CPU密集型操作,如数据解析和处理",
"使用多进程而非多线程处理CPU密集型任务"
            ])

# 内存瓶颈分析
if summary['memory']['avg'] > 75:
            bottlenecks.append({
'type''Memory',
'severity''high'if summary['memory']['avg'] > 90else'medium',
'description'f"内存平均使用率 {summary['memory']['avg']:.1f}%",
'max_value': summary['memory']['max']
            })
            recommendations.extend([
"实现数据流式处理,避免大量数据同时加载到内存",
"优化数据结构,减少内存占用",
"增加内存回收机制,及时释放不用的对象"
            ])

# 响应时间瓶颈分析
if summary['response_time']['avg'] > 3.0:
            bottlenecks.append({
'type''Response Time',
'severity''high'if summary['response_time']['avg'] > 5.0else'medium',
'description'f"平均响应时间 {summary['response_time']['avg']:.2f}s",
'max_value': summary['response_time']['max']
            })
            recommendations.extend([
"优化网络请求,使用连接池和Keep-Alive",
"实现请求缓存,避免重复请求",
"调整超时设置,避免长时间等待"
            ])

# 错误率瓶颈分析
if summary['error_rate']['avg'] > 0.05:
            bottlenecks.append({
'type''Error Rate',
'severity''high'if summary['error_rate']['avg'] > 0.1else'medium',
'description'f"平均错误率 {summary['error_rate']['avg']:.2%}",
'max_value': summary['error_rate']['max']
            })
            recommendations.extend([
"增强错误处理和重试机制",
"分析错误原因,优化请求策略",
"实现智能退避算法,避免频繁失败"
            ])

# 综合评分
        total_score = 100
for bottleneck in bottlenecks:
if bottleneck['severity'] == 'high':
                total_score -= 25
elif bottleneck['severity'] == 'medium':
                total_score -= 15

        performance_grade = 'A'if total_score >= 90else'B'if total_score >= 75else'C'if total_score >= 60else'D'

return {
'analysis_time': datetime.now().isoformat(),
'analysis_period_minutes': analysis_minutes,
'performance_score': total_score,
'performance_grade': performance_grade,
'bottlenecks': bottlenecks,
'recommendations': recommendations,
'summary': summary
        }

defgenerate_optimization_plan(self, analysis_result: Dict[strAny]) -> Dict[strAny]:
"""生成优化计划"""
        bottlenecks = analysis_result.get('bottlenecks', [])

# 按严重程度排序
        high_priority = [b for b in bottlenecks if b['severity'] == 'high']
        medium_priority = [b for b in bottlenecks if b['severity'] == 'medium']

        optimization_tasks = []

# 高优先级任务
for bottleneck in high_priority:
if bottleneck['type'] == 'CPU':
                optimization_tasks.append({
'priority''high',
'task''CPU优化',
'actions': [
'实现智能并发控制',
'优化数据处理算法',
'使用异步IO替代同步操作'
                    ],
'expected_improvement''20-30% CPU使用率降低'
                })

elif bottleneck['type'] == 'Memory':
                optimization_tasks.append({
'priority''high',
'task''内存优化',
'actions': [
'实现流式数据处理',
'优化对象生命周期管理',
'增加内存监控和回收'
                    ],
'expected_improvement''15-25% 内存使用率降低'
                })

# 中等优先级任务
for bottleneck in medium_priority:
if bottleneck['type'] == 'Response Time':
                optimization_tasks.append({
'priority''medium',
'task''响应时间优化',
'actions': [
'实现请求缓存机制',
'优化网络连接配置',
'增加CDN和代理支持'
                    ],
'expected_improvement''30-50% 响应时间改善'
                })

return {
'plan_created': datetime.now().isoformat(),
'total_tasks'len(optimization_tasks),
'high_priority_tasks'len([t for t in optimization_tasks if t['priority'] == 'high']),
'medium_priority_tasks'len([t for t in optimization_tasks if t['priority'] == 'medium']),
'optimization_tasks': optimization_tasks,
'estimated_completion_time''1-2 weeks'
        }

# 使用示例
if __name__ == "__main__":
import random

# 配置日志
    logging.basicConfig(level=logging.INFO)

# 创建系统监控器
    monitor = SystemMonitor(collection_interval=1.0)
    monitor.start()

# 模拟一些请求
for i inrange(100):
        response_time = random.uniform(0.53.0)
        is_error = random.random() < 0.05
        monitor.record_request(response_time, is_error)
        time.sleep(0.1)

# 等待收集一些数据
    time.sleep(10)

# 创建瓶颈分析器
    analyzer = BottleneckAnalyzer(monitor)

# 分析瓶颈
    analysis = analyzer.analyze_bottlenecks(analysis_minutes=1)
print(f"瓶颈分析结果: {json.dumps(analysis, indent=2, ensure_ascii=False)}")

# 生成优化计划
    plan = analyzer.generate_optimization_plan(analysis)
print(f"优化计划: {json.dumps(plan, indent=2, ensure_ascii=False)}")

# 导出指标
    monitor.export_metrics("performance_metrics.json", hours=1)

# 停止监控
    monitor.stop()

1.2 代码性能分析

# code_profiler.py
import cProfile
import pstats
import io
import time
import functools
import threading
from typing importDictAnyCallableOptional
import logging
from contextlib import contextmanager

classCodeProfiler:
"""代码性能分析器"""

def__init__(self):
self.profiles = {}
self.logger = logging.getLogger(__name__)

defprofile_function(self, func_name: str = None):
"""函数性能分析装饰器"""
defdecorator(func: Callable) -> Callable:
            name = func_name orf"{func.__module__}.{func.__name__}"

            @functools.wraps(func)
defwrapper(*args, **kwargs):
                profiler = cProfile.Profile()
                profiler.enable()

try:
                    result = func(*args, **kwargs)
return result
finally:
                    profiler.disable()

# 保存分析结果
                    s = io.StringIO()
                    ps = pstats.Stats(profiler, stream=s)
                    ps.sort_stats('cumulative')
                    ps.print_stats()

self.profiles[name] = {
'timestamp': time.time(),
'stats': s.getvalue(),
'function': name
                    }

return wrapper
return decorator

    @contextmanager
defprofile_block(self, block_name: str):
"""代码块性能分析上下文管理器"""
        profiler = cProfile.Profile()
        profiler.enable()

try:
yield
finally:
            profiler.disable()

            s = io.StringIO()
            ps = pstats.Stats(profiler, stream=s)
            ps.sort_stats('cumulative')
            ps.print_stats()

self.profiles[block_name] = {
'timestamp': time.time(),
'stats': s.getvalue(),
'block': block_name
            }

defget_profile_report(self, name: str) -> Optional[str]:
"""获取性能分析报告"""
if name inself.profiles:
returnself.profiles[name]['stats']
returnNone

defget_all_profiles(self) -> Dict[strAny]:
"""获取所有性能分析结果"""
returnself.profiles

defclear_profiles(self):
"""清除所有分析结果"""
self.profiles.clear()

classTimingProfiler:
"""时间性能分析器"""

def__init__(self):
self.timings = {}
self.lock = threading.Lock()
self.logger = logging.getLogger(__name__)

deftime_function(self, func_name: str = None):
"""函数计时装饰器"""
defdecorator(func: Callable) -> Callable:
            name = func_name orf"{func.__module__}.{func.__name__}"

            @functools.wraps(func)
defwrapper(*args, **kwargs):
                start_time = time.time()

try:
                    result = func(*args, **kwargs)
return result
finally:
                    end_time = time.time()
                    duration = end_time - start_time

withself.lock:
if name notinself.timings:
self.timings[name] = []
self.timings[name].append({
'duration': duration,
'timestamp': start_time,
'args_count'len(args),
'kwargs_count'len(kwargs)
                        })

return wrapper
return decorator

    @contextmanager
deftime_block(self, block_name: str):
"""代码块计时上下文管理器"""
        start_time = time.time()

try:
yield
finally:
            end_time = time.time()
            duration = end_time - start_time

withself.lock:
if block_name notinself.timings:
self.timings[block_name] = []
self.timings[block_name].append({
'duration': duration,
'timestamp': start_time
                })

defget_timing_stats(self, name: str) -> Optional[Dict[strAny]]:
"""获取计时统计"""
withself.lock:
if name notinself.timings:
returnNone

            durations = [t['duration'for t inself.timings[name]]

ifnot durations:
returnNone

return {
'count'len(durations),
'total_time'sum(durations),
'avg_time'sum(durations) / len(durations),
'min_time'min(durations),
'max_time'max(durations),
'last_call'max(t['timestamp'for t inself.timings[name])
            }

defget_all_stats(self) -> Dict[strDict[strAny]]:
"""获取所有计时统计"""
        stats = {}
withself.lock:
for name inself.timings:
                stats[name] = self.get_timing_stats(name)
return stats

defgenerate_report(self) -> str:
"""生成性能报告"""
        stats = self.get_all_stats()

ifnot stats:
return"没有性能数据"

        report_lines = ["性能分析报告""=" * 50]

# 按平均时间排序
        sorted_stats = sorted(
            stats.items(), 
            key=lambda x: x[1]['avg_time'if x[1else0
            reverse=True
        )

for name, stat in sorted_stats:
if stat:
                report_lines.extend([
f"\n函数/代码块: {name}",
f"  调用次数: {stat['count']}",
f"  总时间: {stat['total_time']:.4f}s",
f"  平均时间: {stat['avg_time']:.4f}s",
f"  最短时间: {stat['min_time']:.4f}s",
f"  最长时间: {stat['max_time']:.4f}s"
                ])

return"\n".join(report_lines)

# 使用示例
profiler = CodeProfiler()
timer = TimingProfiler()

@profiler.profile_function("example_function")
@timer.time_function("example_function")
defexample_function(n: int) -> int:
"""示例函数"""
    total = 0
for i inrange(n):
        total += i * i
return total

@timer.time_function("slow_function")
defslow_function():
"""慢函数示例"""
    time.sleep(0.1)
return"done"

if __name__ == "__main__":
# 测试函数
for i inrange(10):
        example_function(1000)
        slow_function()

# 使用代码块分析
with profiler.profile_block("data_processing"):
with timer.time_block("data_processing"):
            data = [i ** 2for i inrange(10000)]
            result = sum(data)

# 生成报告
print(timer.generate_report())

# 获取详细分析
print("\n详细性能分析:")
for name in profiler.get_all_profiles():
print(f"\n{name}:")
print(profiler.get_profile_report(name)[:500] + "...")

2. 资源使用优化

2.1 内存优化策略

# memory_optimizer.py
import gc
import sys
import weakref
import threading
import time
import logging
from typing importDictAnyListOptional, Generator
from collections import defaultdict
import tracemalloc
from dataclasses import dataclass

@dataclass
classMemorySnapshot:
"""内存快照"""
    timestamp: float
    current_mb: float
    peak_mb: float
    traced_mb: float
    object_count: int
    gc_collections: Dict[intint]

classMemoryOptimizer:
"""内存优化器"""

def__init__(self, enable_tracing: bool = True):
self.enable_tracing = enable_tracing
self.snapshots = []
self.object_tracker = defaultdict(int)
self.weak_refs = []
self.logger = logging.getLogger(__name__)

if enable_tracing:
            tracemalloc.start()

# 启动内存监控
self.monitoring = False
self.monitor_thread = None

deftake_snapshot(self) -> MemorySnapshot:
"""拍摄内存快照"""
# 基本内存信息
import psutil
        process = psutil.Process()
        memory_info = process.memory_info()
        current_mb = memory_info.rss / (1024 * 1024)

# 获取峰值内存
        peak_mb = process.memory_info().peak_wss / (1024 * 1024ifhasattr(process.memory_info(), 'peak_wss'else current_mb

# tracemalloc信息
        traced_mb = 0
ifself.enable_tracing:
            current, peak = tracemalloc.get_traced_memory()
            traced_mb = current / (1024 * 1024)

# 对象计数
        object_count = len(gc.get_objects())

# GC统计
        gc_stats = {}
for i inrange(3):
            gc_stats[i] = gc.get_count()[i]

        snapshot = MemorySnapshot(
            timestamp=time.time(),
            current_mb=current_mb,
            peak_mb=peak_mb,
            traced_mb=traced_mb,
            object_count=object_count,
            gc_collections=gc_stats
        )

self.snapshots.append(snapshot)
return snapshot

defanalyze_memory_growth(self, window_size: int = 10) -> Dict[strAny]:
"""分析内存增长趋势"""
iflen(self.snapshots) < 2:
return {'error''快照数据不足'}

        recent_snapshots = self.snapshots[-window_size:]

# 计算增长率
        first_snapshot = recent_snapshots[0]
        last_snapshot = recent_snapshots[-1]

        time_delta = last_snapshot.timestamp - first_snapshot.timestamp
        memory_delta = last_snapshot.current_mb - first_snapshot.current_mb

        growth_rate = memory_delta / time_delta if time_delta > 0else0

# 检测内存泄漏
        is_leaking = growth_rate > 1.0# 每秒增长超过1MB

return {
'window_size'len(recent_snapshots),
'time_span_seconds': time_delta,
'memory_growth_mb': memory_delta,
'growth_rate_mb_per_second': growth_rate,
'is_potential_leak': is_leaking,
'current_memory_mb': last_snapshot.current_mb,
'object_count_change': last_snapshot.object_count - first_snapshot.object_count
        }

defget_top_memory_objects(self, limit: int = 10) -> List[Dict[strAny]]:
"""获取占用内存最多的对象类型"""
ifnotself.enable_tracing:
return []

        snapshot = tracemalloc.take_snapshot()
        top_stats = snapshot.statistics('lineno')

        result = []
for index, stat inenumerate(top_stats[:limit]):
            result.append({
'rank': index + 1,
'filename': stat.traceback.format()[0if stat.traceback.format() else'unknown',
'size_mb': stat.size / (1024 * 1024),
'count': stat.count
            })

return result

defforce_garbage_collection(self) -> Dict[strint]:
"""强制垃圾回收"""
        before_objects = len(gc.get_objects())

# 执行垃圾回收
        collected = {}
for generation inrange(3):
            collected[generation] = gc.collect(generation)

        after_objects = len(gc.get_objects())
        freed_objects = before_objects - after_objects

self.logger.info(f"垃圾回收完成,释放了 {freed_objects} 个对象")

return {
'freed_objects': freed_objects,
'collections_by_generation': collected,
'remaining_objects': after_objects
        }

defoptimize_memory_usage(self) -> Dict[strAny]:
"""优化内存使用"""
        before_snapshot = self.take_snapshot()

# 1. 强制垃圾回收
        gc_result = self.force_garbage_collection()

# 2. 清理弱引用
        cleaned_refs = 0
for ref inself.weak_refs[:]:
if ref() isNone:
self.weak_refs.remove(ref)
                cleaned_refs += 1

# 3. 清理缓存(如果有的话)
# 这里可以添加特定的缓存清理逻辑

        after_snapshot = self.take_snapshot()

        memory_saved = before_snapshot.current_mb - after_snapshot.current_mb

return {
'memory_before_mb': before_snapshot.current_mb,
'memory_after_mb': after_snapshot.current_mb,
'memory_saved_mb': memory_saved,
'gc_result': gc_result,
'cleaned_weak_refs': cleaned_refs,
'optimization_effective': memory_saved > 0
        }

defstart_monitoring(self, interval: float = 30.0):
"""启动内存监控"""
ifself.monitoring:
return

self.monitoring = True

defmonitor_loop():
whileself.monitoring:
try:
                    snapshot = self.take_snapshot()

# 检查内存增长
iflen(self.snapshots) >= 5:
                        analysis = self.analyze_memory_growth(5)
if analysis.get('is_potential_leak'False):
self.logger.warning(f"检测到潜在内存泄漏: {analysis['growth_rate_mb_per_second']:.2f} MB/s")

# 自动优化
                            opt_result = self.optimize_memory_usage()
self.logger.info(f"自动内存优化,节省 {opt_result['memory_saved_mb']:.2f} MB")

                    time.sleep(interval)

except Exception as e:
self.logger.error(f"内存监控错误: {e}")
                    time.sleep(interval)

self.monitor_thread = threading.Thread(target=monitor_loop)
self.monitor_thread.start()
self.logger.info("内存监控已启动")

defstop_monitoring(self):
"""停止内存监控"""
self.monitoring = False
ifself.monitor_thread:
self.monitor_thread.join()
self.logger.info("内存监控已停止")

defregister_object(self, obj: Any, name: str = None):
"""注册对象用于跟踪"""
        obj_type = type(obj).__name__
if name:
            obj_type = f"{obj_type}({name})"

self.object_tracker[obj_type] += 1

# 创建弱引用
defcleanup_callback(ref):
self.object_tracker[obj_type] -= 1

        weak_ref = weakref.ref(obj, cleanup_callback)
self.weak_refs.append(weak_ref)

return weak_ref

classStreamProcessor:
"""流式数据处理器 - 内存优化示例"""

def__init__(self, chunk_size: int = 1000):
self.chunk_size = chunk_size
self.memory_optimizer = MemoryOptimizer()
self.logger = logging.getLogger(__name__)

defprocess_large_dataset(self, data_generator: Generator) -> Generator:
"""流式处理大数据集"""
        chunk = []

for item in data_generator:
            chunk.append(item)

iflen(chunk) >= self.chunk_size:
# 处理当前块
                processed_chunk = self._process_chunk(chunk)

# 返回处理结果
for result in processed_chunk:
yield result

# 清理内存
                chunk.clear()

# 定期检查内存使用
iflen(chunk) % (self.chunk_size * 10) == 0:
                    snapshot = self.memory_optimizer.take_snapshot()
if snapshot.current_mb > 500:  # 超过500MB时优化
self.memory_optimizer.optimize_memory_usage()

# 处理剩余数据
if chunk:
            processed_chunk = self._process_chunk(chunk)
for result in processed_chunk:
yield result

def_process_chunk(self, chunk: List[Any]) -> List[Any]:
"""处理数据块"""
# 这里实现具体的数据处理逻辑
return [self._process_item(item) for item in chunk]

def_process_item(self, item: Any) -> Any:
"""处理单个数据项"""
# 示例处理逻辑
returnstr(item).upper()

# 使用示例
if __name__ == "__main__":
import random

# 配置日志
    logging.basicConfig(level=logging.INFO)

# 创建内存优化器
    optimizer = MemoryOptimizer()
    optimizer.start_monitoring(interval=5.0)

try:
# 模拟内存使用
        large_data = []
for i inrange(10000):
            large_data.append([random.random() for _ inrange(100)])

if i % 1000 == 0:
                snapshot = optimizer.take_snapshot()
print(f"当前内存使用: {snapshot.current_mb:.2f} MB")

# 分析内存增长
        analysis = optimizer.analyze_memory_growth()
print(f"内存增长分析: {analysis}")

# 获取内存占用最多的对象
        top_objects = optimizer.get_top_memory_objects()
print(f"内存占用最多的对象: {top_objects}")

# 优化内存
        opt_result = optimizer.optimize_memory_usage()
print(f"内存优化结果: {opt_result}")

# 测试流式处理
defdata_generator():
for i inrange(100000):
yieldf"data_item_{i}"

        processor = StreamProcessor(chunk_size=1000)
        processed_count = 0

for processed_item in processor.process_large_dataset(data_generator()):
            processed_count += 1
if processed_count % 10000 == 0:
print(f"已处理 {processed_count} 个项目")

print(f"流式处理完成,总共处理 {processed_count} 个项目")

finally:
        optimizer.stop_monitoring()

2.2 连接池优化

# connection_pool.py
import asyncio
import aiohttp
import time
import threading
import logging
from typing importDictAnyOptionalList
from dataclasses import dataclass, field
from collections import deque
import weakref
import ssl

@dataclass
classConnectionStats:
"""连接统计信息"""
    total_created: int = 0
    total_closed: int = 0
    active_connections: int = 0
    pool_hits: int = 0
    pool_misses: int = 0
    average_lifetime: float = 0.0
    peak_connections: int = 0

defhit_rate(self) -> float:
        total_requests = self.pool_hits + self.pool_misses
returnself.pool_hits / total_requests if total_requests > 0else0.0

@dataclass
classPooledConnection:
"""池化连接"""
    session: aiohttp.ClientSession
    created_time: float = field(default_factory=time.time)
    last_used: float = field(default_factory=time.time)
    use_count: int = 0
    is_active: bool = True

defmark_used(self):
"""标记连接被使用"""
self.last_used = time.time()
self.use_count += 1

defis_expired(self, max_lifetime: float, max_idle: float) -> bool:
"""检查连接是否过期"""
        now = time.time()
        lifetime_expired = (now - self.created_time) > max_lifetime
        idle_expired = (now - self.last_used) > max_idle
return lifetime_expired or idle_expired

classOptimizedConnectionPool:
"""优化的连接池"""

def__init__(
        self,
        max_connections: int = 100,
        max_connections_per_host: int = 30,
        max_lifetime: float = 3600.0,  # 1小时
        max_idle_time: float = 300.0,  # 5分钟
        cleanup_interval: float = 60.0,  # 1分钟清理一次
        enable_ssl: bool = True
):
self.max_connections = max_connections
self.max_connections_per_host = max_connections_per_host
self.max_lifetime = max_lifetime
self.max_idle_time = max_idle_time
self.cleanup_interval = cleanup_interval
self.enable_ssl = enable_ssl

# 连接池
self.pools: Dict[str, deque] = {}
self.stats = ConnectionStats()
self.lock = asyncio.Lock()

# 清理任务
self.cleanup_task = None
self.running = False

# SSL配置
self.ssl_context = self._create_ssl_context() if enable_ssl elseNone

self.logger = logging.getLogger(__name__)

def_create_ssl_context(self) -> ssl.SSLContext:
"""创建SSL上下文"""
        context = ssl.create_default_context()
        context.check_hostname = False
        context.verify_mode = ssl.CERT_NONE
return context

def_get_pool_key(self, url: str) -> str:
"""获取连接池键"""
from urllib.parse import urlparse
        parsed = urlparse(url)
returnf"{parsed.scheme}://{parsed.netloc}"

asyncdefget_session(self, url: str) -> aiohttp.ClientSession:
"""获取会话连接"""
        pool_key = self._get_pool_key(url)

asyncwithself.lock:
# 检查是否有可用连接
if pool_key inself.pools andself.pools[pool_key]:
                connection = self.pools[pool_key].popleft()

# 检查连接是否过期
ifnot connection.is_expired(self.max_lifetime, self.max_idle_time):
                    connection.mark_used()
self.stats.pool_hits += 1
return connection.session
else:
# 连接过期,关闭它
await connection.session.close()
self.stats.total_closed += 1
self.stats.active_connections -= 1

# 创建新连接
self.stats.pool_misses += 1
returnawaitself._create_new_session(pool_key)

asyncdef_create_new_session(self, pool_key: str) -> aiohttp.ClientSession:
"""创建新的会话连接"""
# 检查连接数限制
ifself.stats.active_connections >= self.max_connections:
# 清理过期连接
awaitself._cleanup_expired_connections()

# 如果仍然超过限制,等待
ifself.stats.active_connections >= self.max_connections:
raise Exception("连接池已满,无法创建新连接")

# 创建连接器
        connector = aiohttp.TCPConnector(
            limit=self.max_connections,
            limit_per_host=self.max_connections_per_host,
            ttl_dns_cache=300,
            use_dns_cache=True,
            ssl=self.ssl_context,
            enable_cleanup_closed=True
        )

# 创建会话
        timeout = aiohttp.ClientTimeout(total=30, connect=10)
        session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            headers={
'User-Agent''OptimizedCrawler/1.0',
'Accept-Encoding''gzip, deflate',
'Connection''keep-alive'
            }
        )

self.stats.total_created += 1
self.stats.active_connections += 1
self.stats.peak_connections = max(self.stats.peak_connections, self.stats.active_connections)

self.logger.debug(f"创建新连接: {pool_key}")
return session

asyncdefreturn_session(self, url: str, session: aiohttp.ClientSession):
"""归还会话连接"""
        pool_key = self._get_pool_key(url)

asyncwithself.lock:
if pool_key notinself.pools:
self.pools[pool_key] = deque()

# 检查池大小限制
iflen(self.pools[pool_key]) < self.max_connections_per_host:
                connection = PooledConnection(session=session)
self.pools[pool_key].append(connection)
else:
# 池已满,关闭连接
await session.close()
self.stats.total_closed += 1
self.stats.active_connections -= 1

asyncdef_cleanup_expired_connections(self):
"""清理过期连接"""
        cleaned_count = 0

for pool_key, pool inself.pools.items():
            expired_connections = []

# 找出过期连接
for connection in pool:
if connection.is_expired(self.max_lifetime, self.max_idle_time):
                    expired_connections.append(connection)

# 移除并关闭过期连接
for connection in expired_connections:
                pool.remove(connection)
await connection.session.close()
                cleaned_count += 1
self.stats.total_closed += 1
self.stats.active_connections -= 1

if cleaned_count > 0:
self.logger.info(f"清理了 {cleaned_count} 个过期连接")

asyncdef_cleanup_loop(self):
"""清理循环"""
whileself.running:
try:
awaitself._cleanup_expired_connections()
await asyncio.sleep(self.cleanup_interval)
except Exception as e:
self.logger.error(f"连接清理错误: {e}")
await asyncio.sleep(self.cleanup_interval)

asyncdefstart(self):
"""启动连接池"""
ifself.running:
return

self.running = True
self.cleanup_task = asyncio.create_task(self._cleanup_loop())
self.logger.info("连接池已启动")

asyncdefstop(self):
"""停止连接池"""
self.running = False

ifself.cleanup_task:
self.cleanup_task.cancel()
try:
awaitself.cleanup_task
except asyncio.CancelledError:
pass

# 关闭所有连接
asyncwithself.lock:
for pool inself.pools.values():
for connection in pool:
await connection.session.close()

self.pools.clear()
self.stats.active_connections = 0

self.logger.info("连接池已停止")

defget_stats(self) -> ConnectionStats:
"""获取连接池统计信息"""
returnself.stats

asyncdefhealth_check(self) -> Dict[strAny]:
"""健康检查"""
asyncwithself.lock:
            total_pooled = sum(len(pool) for pool inself.pools.values())

return {
'running'self.running,
'total_pools'len(self.pools),
'total_pooled_connections': total_pooled,
'active_connections'self.stats.active_connections,
'hit_rate'self.stats.hit_rate(),
'peak_connections'self.stats.peak_connections,
'memory_usage_mb'self._estimate_memory_usage()
            }

def_estimate_memory_usage(self) -> float:
"""估算内存使用量"""
# 粗略估算每个连接占用约1MB内存
returnself.stats.active_connections * 1.0

classPooledHttpClient:
"""使用连接池的HTTP客户端"""

def__init__(self, connection_pool: OptimizedConnectionPool):
self.pool = connection_pool
self.logger = logging.getLogger(__name__)

asyncdefget(self, url: str, **kwargs) -> aiohttp.ClientResponse:
"""GET请求"""
        session = awaitself.pool.get_session(url)

try:
            response = await session.get(url, **kwargs)
return response
finally:
awaitself.pool.return_session(url, session)

asyncdefpost(self, url: str, **kwargs) -> aiohttp.ClientResponse:
"""POST请求"""
        session = awaitself.pool.get_session(url)

try:
            response = await session.post(url, **kwargs)
return response
finally:
awaitself.pool.return_session(url, session)

asyncdefrequest(self, method: str, url: str, **kwargs) -> aiohttp.ClientResponse:
"""通用请求方法"""
        session = awaitself.pool.get_session(url)

try:
            response = await session.request(method, url, **kwargs)
return response
finally:
awaitself.pool.return_session(url, session)

# 使用示例
asyncdefmain():
# 配置日志
    logging.basicConfig(level=logging.INFO)

# 创建连接池
    pool = OptimizedConnectionPool(
        max_connections=50,
        max_connections_per_host=10,
        max_lifetime=1800.0,  # 30分钟
        max_idle_time=300.0,  # 5分钟
        cleanup_interval=60.0# 1分钟清理一次
    )

# 启动连接池
await pool.start()

# 创建HTTP客户端
    client = PooledHttpClient(pool)

try:
# 测试请求
        test_urls = [
"https://httpbin.org/get",
"https://httpbin.org/json",
"https://httpbin.org/delay/1",
"https://httpbin.org/status/200"
        ]

# 并发请求测试
        tasks = []
for i inrange(20):
            url = test_urls[i % len(test_urls)]
            task = asyncio.create_task(client.get(url))
            tasks.append(task)

        responses = await asyncio.gather(*tasks, return_exceptions=True)

        success_count = sum(1for r in responses ifnotisinstance(r, Exception))
print(f"成功请求: {success_count}/{len(responses)}")

# 获取连接池统计
        stats = pool.get_stats()
print(f"连接池统计: 命中率={stats.hit_rate():.2%}, 活跃连接={stats.active_connections}")

# 健康检查
        health = await pool.health_check()
print(f"健康检查: {health}")

finally:
# 停止连接池
await pool.stop()

if __name__ == "__main__":
    asyncio.run(main())

5. 实战案例:高性能新闻爬虫系统

5.1 系统架构设计

# news_crawler_system.py
import asyncio
import aiohttp
import time
import json
import logging
from typing importDictAnyListOptional
from dataclasses import dataclass, field
import hashlib
from urllib.parse import urljoin, urlparse
import re
from bs4 import BeautifulSoup

@dataclass
classNewsItem:
"""新闻条目"""
    url: str
    title: str = ""
    content: str = ""
    author: str = ""
    publish_time: str = ""
    category: str = ""
    tags: List[str] = field(default_factory=list)
    source: str = ""
    crawl_time: float = field(default_factory=time.time)

defto_dict(self) -> Dict[strAny]:
return {
'url'self.url,
'title'self.title,
'content'self.content,
'author'self.author,
'publish_time'self.publish_time,
'category'self.category,
'tags'self.tags,
'source'self.source,
'crawl_time'self.crawl_time
        }

classNewsExtractor:
"""新闻内容提取器"""

def__init__(self):
self.logger = logging.getLogger(__name__)

# 常见新闻网站的选择器配置
self.site_configs = {
'default': {
'title': ['h1''.title''.headline''[class*="title"]'],
'content': ['.content''.article-content''.post-content''[class*="content"]'],
'author': ['.author''.byline''[class*="author"]'],
'time': ['.time''.date''.publish-time''[class*="time"]''[class*="date"]']
            },
'sina.com.cn': {
'title': ['.main-title''h1'],
'content': ['.article''.content'],
'author': ['.author'],
'time': ['.time']
            },
'sohu.com': {
'title': ['.text-title''h1'],
'content': ['.text'],
'author': ['.author-name'],
'time': ['.time']
            }
        }

defget_site_config(self, url: str) -> Dict[strList[str]]:
"""获取网站配置"""
        domain = urlparse(url).netloc
for site, config inself.site_configs.items():
if site in domain:
return config
returnself.site_configs['default']

defextract_text_by_selectors(self, soup: BeautifulSoup, selectors: List[str]) -> str:
"""通过选择器提取文本"""
for selector in selectors:
try:
                element = soup.select_one(selector)
if element:
return element.get_text(strip=True)
except Exception:
continue
return""

defextract_news(self, html: str, url: str) -> NewsItem:
"""提取新闻内容"""
try:
            soup = BeautifulSoup(html, 'html.parser')
            config = self.get_site_config(url)

# 提取标题
            title = self.extract_text_by_selectors(soup, config['title'])
ifnot title:
                title = soup.title.get_text(strip=Trueif soup.title else""

# 提取内容
            content = self.extract_text_by_selectors(soup, config['content'])

# 提取作者
            author = self.extract_text_by_selectors(soup, config['author'])

# 提取时间
            publish_time = self.extract_text_by_selectors(soup, config['time'])

# 提取标签
            tags = []
            tag_elements = soup.select('.tag, .tags, [class*="tag"]')
for tag_elem in tag_elements:
                tag_text = tag_elem.get_text(strip=True)
if tag_text:
                    tags.append(tag_text)

# 确定来源
            source = urlparse(url).netloc

            news_item = NewsItem(
                url=url,
                title=title,
                content=content,
                author=author,
                publish_time=publish_time,
                tags=tags,
                source=source
            )

self.logger.debug(f"提取新闻成功: {title[:50]}...")
return news_item

except Exception as e:
self.logger.error(f"提取新闻失败 {url}{e}")
return NewsItem(url=url)

classPerformanceOptimizedCrawler:
"""性能优化的爬虫"""

def__init__(self, 
                 max_concurrent: int = 100,
                 request_delay: float = 0.1,
                 timeout: float = 30.0
):
self.max_concurrent = max_concurrent
self.request_delay = request_delay
self.timeout = timeout
self.session = None
self.semaphore = asyncio.Semaphore(max_concurrent)
self.extractor = NewsExtractor()
self.logger = logging.getLogger(__name__)

# 性能统计
self.stats = {
'total_requests'0,
'successful_requests'0,
'failed_requests'0,
'total_time'0.0,
'avg_response_time'0.0
        }

asyncdefcreate_session(self):
"""创建HTTP会话"""
        connector = aiohttp.TCPConnector(
            limit=self.max_concurrent,
            limit_per_host=20,
            ttl_dns_cache=300,
            use_dns_cache=True,
            keepalive_timeout=30,
            enable_cleanup_closed=True
        )

        timeout = aiohttp.ClientTimeout(total=self.timeout)

self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            headers={
'User-Agent''Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
        )

asyncdefclose_session(self):
"""关闭HTTP会话"""
ifself.session:
awaitself.session.close()

asyncdeffetch_url(self, url: str) -> Optional[str]:
"""获取URL内容"""
asyncwithself.semaphore:
            start_time = time.time()

try:
await asyncio.sleep(self.request_delay)

asyncwithself.session.get(url) as response:
if response.status == 200:
                        content = await response.text()

# 更新统计
                        response_time = time.time() - start_time
self.stats['successful_requests'] += 1
self.stats['total_time'] += response_time

return content
else:
self.logger.warning(f"HTTP {response.status}{url}")
self.stats['failed_requests'] += 1
returnNone

except Exception as e:
self.logger.error(f"请求失败 {url}{e}")
self.stats['failed_requests'] += 1
returnNone

finally:
self.stats['total_requests'] += 1
ifself.stats['successful_requests'] > 0:
self.stats['avg_response_time'] = (
self.stats['total_time'] / self.stats['successful_requests']
                    )

asyncdefcrawl_news(self, url: str) -> Optional[NewsItem]:
"""爬取单个新闻"""
        html = awaitself.fetch_url(url)
if html:
returnself.extractor.extract_news(html, url)
returnNone

asyncdefcrawl_news_batch(self, urls: List[str]) -> List[NewsItem]:
"""批量爬取新闻"""
        tasks = [self.crawl_news(url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        news_items = []
for result in results:
ifisinstance(result, NewsItem):
                news_items.append(result)
elifisinstance(result, Exception):
self.logger.error(f"爬取异常: {result}")

return news_items

defget_performance_stats(self) -> Dict[strAny]:
"""获取性能统计"""
        success_rate = 0.0
ifself.stats['total_requests'] > 0:
            success_rate = self.stats['successful_requests'] / self.stats['total_requests']

return {
'total_requests'self.stats['total_requests'],
'successful_requests'self.stats['successful_requests'],
'failed_requests'self.stats['failed_requests'],
'success_rate': success_rate,
'avg_response_time'self.stats['avg_response_time'],
'requests_per_second': (
self.stats['successful_requests'] / self.stats['total_time']
ifself.stats['total_time'] > 0else0
            )
        }

classNewsMonitoringSystem:
"""新闻监控系统"""

def__init__(self, crawler: PerformanceOptimizedCrawler):
self.crawler = crawler
self.monitoring_data = {
'start_time': time.time(),
'crawl_sessions': [],
'performance_history': [],
'error_log': []
        }
self.logger = logging.getLogger(__name__)

deflog_crawl_session(self, session_data: Dict[strAny]):
"""记录爬取会话"""
        session_data['timestamp'] = time.time()
self.monitoring_data['crawl_sessions'].append(session_data)

# 保留最近100次会话
iflen(self.monitoring_data['crawl_sessions']) > 100:
self.monitoring_data['crawl_sessions'] = self.monitoring_data['crawl_sessions'][-100:]

deflog_performance(self):
"""记录性能数据"""
        perf_data = self.crawler.get_performance_stats()
        perf_data['timestamp'] = time.time()
self.monitoring_data['performance_history'].append(perf_data)

# 保留最近1000个数据点
iflen(self.monitoring_data['performance_history']) > 1000:
self.monitoring_data['performance_history'] = self.monitoring_data['performance_history'][-1000:]

deflog_error(self, error_msg: str, error_type: str = "general"):
"""记录错误"""
        error_data = {
'timestamp': time.time(),
'type': error_type,
'message': error_msg
        }
self.monitoring_data['error_log'].append(error_data)

# 保留最近500个错误
iflen(self.monitoring_data['error_log']) > 500:
self.monitoring_data['error_log'] = self.monitoring_data['error_log'][-500:]

defget_monitoring_report(self) -> Dict[strAny]:
"""生成监控报告"""
        current_time = time.time()
        uptime = current_time - self.monitoring_data['start_time']

# 计算最近性能
        recent_perf = self.monitoring_data['performance_history'][-10:] ifself.monitoring_data['performance_history'else []
        avg_success_rate = sum(p['success_rate'for p in recent_perf) / len(recent_perf) if recent_perf else0
        avg_response_time = sum(p['avg_response_time'for p in recent_perf) / len(recent_perf) if recent_perf else0

# 计算错误统计
        recent_errors = [e for e inself.monitoring_data['error_log'if current_time - e['timestamp'] < 3600]  # 最近1小时

return {
'uptime_seconds': uptime,
'total_sessions'len(self.monitoring_data['crawl_sessions']),
'recent_avg_success_rate': avg_success_rate,
'recent_avg_response_time': avg_response_time,
'recent_errors_count'len(recent_errors),
'current_performance'self.crawler.get_performance_stats(),
'error_types'self._get_error_type_stats(recent_errors)
        }

def_get_error_type_stats(self, errors: List[Dict[strAny]]) -> Dict[strint]:
"""获取错误类型统计"""
        error_types = {}
for error in errors:
            error_type = error['type']
            error_types[error_type] = error_types.get(error_type, 0) + 1
return error_types

classHighPerformanceNewsSystem:
"""高性能新闻爬虫系统"""

def__init__(self, 
                 max_concurrent: int = 100,
                 request_delay: float = 0.1
):
self.crawler = PerformanceOptimizedCrawler(
            max_concurrent=max_concurrent,
            request_delay=request_delay
        )
self.monitoring = NewsMonitoringSystem(self.crawler)
self.news_storage = []
self.logger = logging.getLogger(__name__)

asyncdefstart(self):
"""启动系统"""
awaitself.crawler.create_session()
self.logger.info("高性能新闻爬虫系统已启动")

asyncdefstop(self):
"""停止系统"""
awaitself.crawler.close_session()
self.logger.info("高性能新闻爬虫系统已停止")

asyncdefcrawl_news_sites(self, news_urls: List[str]) -> List[NewsItem]:
"""爬取新闻网站"""
        start_time = time.time()

try:
# 批量爬取
            news_items = awaitself.crawler.crawl_news_batch(news_urls)

# 过滤有效新闻
            valid_news = [item for item in news_items if item.title and item.content]

# 存储新闻
self.news_storage.extend(valid_news)

# 记录会话
            session_data = {
'urls_count'len(news_urls),
'valid_news_count'len(valid_news),
'duration': time.time() - start_time,
'performance'self.crawler.get_performance_stats()
            }
self.monitoring.log_crawl_session(session_data)
self.monitoring.log_performance()

self.logger.info(f"爬取完成: {len(valid_news)}/{len(news_urls)} 条有效新闻")
return valid_news

except Exception as e:
            error_msg = f"爬取新闻失败: {e}"
self.logger.error(error_msg)
self.monitoring.log_error(error_msg, "crawl_error")
return []

defget_system_status(self) -> Dict[strAny]:
"""获取系统状态"""
return {
'news_count'len(self.news_storage),
'monitoring_report'self.monitoring.get_monitoring_report(),
'crawler_stats'self.crawler.get_performance_stats()
        }

defexport_news(self, format_type: str = "json") -> str:
"""导出新闻数据"""
if format_type == "json":
return json.dumps([item.to_dict() for item inself.news_storage], 
                            ensure_ascii=False, indent=2)
else:
raise ValueError(f"不支持的格式: {format_type}")

# 使用示例
asyncdefmain():
# 配置日志
    logging.basicConfig(
        level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )

# 创建高性能新闻系统
    news_system = HighPerformanceNewsSystem(
        max_concurrent=50,
        request_delay=0.05
    )

await news_system.start()

try:
# 示例新闻URL列表
        news_urls = [
"https://news.sina.com.cn/c/2023-01-01/doc-example1.shtml",
"https://news.sohu.com/20230101/example2.shtml",
"https://www.163.com/news/article/example3.html",
# 添加更多新闻URL...
        ]

# 爬取新闻
        news_items = await news_system.crawl_news_sites(news_urls)

# 显示系统状态
        status = news_system.get_system_status()
print(f"系统状态: {json.dumps(status, indent=2, ensure_ascii=False)}")

# 导出新闻数据
if news_items:
            news_data = news_system.export_news()
print(f"导出新闻数据: {len(news_items)} 条")

# 保存到文件
withopen("news_data.json""w", encoding="utf-8"as f:
                f.write(news_data)

finally:
await news_system.stop()

if __name__ == "__main__":
    asyncio.run(main())

6. 练习与实战

6.1 基础练习

  1. 性能分析练习

    • 使用 SystemMonitor 监控你的爬虫程序

    • 识别性能瓶颈并提出优化方案
    • 比较优化前后的性能指标
  2. 内存优化练习

    • 实现一个大文件处理的爬虫
    • 使用 MemoryOptimizer 监控内存使用

    • 优化内存使用,避免内存泄漏
  3. 连接池优化练习

    • 实现一个使用连接池的爬虫
    • 测试不同连接池大小对性能的影响
    • 找到最优的连接池配置

6.2 高级练习

  1. 监控系统搭建

    • 搭建完整的爬虫监控系统
    • 实现实时监控和告警功能
    • 创建监控仪表板
  2. 高可用架构设计

    • 设计一个分布式爬虫系统
    • 实现负载均衡和故障转移
    • 确保数据一致性
  3. 性能压测

    • 对爬虫系统进行压力测试
    • 找到系统的性能极限
    • 优化系统以提高吞吐量

6.3 实战项目

  1. 高性能电商价格监控系统

    • 监控多个电商平台的商品价格
    • 实现实时价格变化告警
    • 支持大规模商品监控
  2. 新闻聚合与分析平台

    • 爬取多个新闻网站
    • 实现新闻去重和分类
    • 提供实时新闻分析
  3. 社交媒体监控系统

    • 监控社交媒体平台内容
    • 实现情感分析和热点发现
    • 提供数据可视化

7. 常见问题与解决方案

7.1 性能问题

问题1:爬虫速度慢

  • 检查网络连接和带宽
  • 优化并发数和请求间隔
  • 使用连接池和会话复用
  • 考虑使用代理池

问题2:内存使用过高

  • 实现流式处理
  • 及时释放不需要的对象
  • 使用生成器代替列表
  • 定期进行垃圾回收

问题3:CPU使用率高

  • 优化数据处理算法
  • 使用多进程处理CPU密集型任务
  • 减少不必要的计算
  • 使用缓存避免重复计算

7.2 监控问题

问题1:监控数据不准确

  • 确保监控代码的正确性
  • 检查时间同步问题
  • 验证数据收集逻辑
  • 定期校准监控指标

问题2:告警过于频繁

  • 调整告警阈值
  • 实现告警去重
  • 设置告警冷却期
  • 优化告警规则

问题3:监控系统性能影响

  • 使用异步监控
  • 减少监控频率
  • 优化监控代码
  • 考虑使用采样监控

7.3 高可用问题

问题1:节点故障处理

  • 实现健康检查
  • 自动故障转移
  • 节点状态管理
  • 故障恢复机制

问题2:数据一致性

  • 使用分布式锁
  • 实现事务机制
  • 数据版本控制
  • 冲突解决策略

问题3:负载均衡效果差

  • 调整负载均衡算法
  • 监控节点性能
  • 动态调整权重
  • 优化任务分配

8. 总结

本课程详细介绍了爬虫性能优化与监控的各个方面:

  1. 性能瓶颈分析:学会识别和分析性能问题

  2. 系统监控:实现全面的监控和告警系统

  3. 资源优化:优化内存、CPU和网络资源使用

  4. 高可用架构:设计可靠的分布式爬虫系统

通过这些技术,你可以构建高性能、高可用的爬虫系统,满足大规模数据采集的需求。

9. 下节预告

下一课我们将学习**"爬虫部署与运维"**,包括:

  • Docker容器化部署
  • Kubernetes集群管理
  • CI/CD自动化部署
  • 生产环境运维监控
  • 日志管理与分析

这将帮助你将爬虫系统成功部署到生产环境并进行有效运维。


## 3. 系统监控与告警

### 3.1 实时监控系统
```python
# monitoring_system.py
import asyncio
import time
import json
import logging
import threading
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, asdict
from collections import deque, defaultdict
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests

@dataclass
class Alert:
    """告警信息"""
    id: str
    level: str  # info, warning, error, critical
    title: str
    message: str
    timestamp: float
    source: str
    tags: Dict[str, str]
    resolved: bool = False
    resolved_time: Optional[float] = None

    def to_dict(self) -> Dict[str, Any]:
        return asdict(self)

@dataclass
class MetricPoint:
    """指标数据点"""
    name: str
    value: float
    timestamp: float
    tags: Dict[str, str]

    def to_dict(self) -> Dict[str, Any]:
        return asdict(self)

class AlertRule:
    """告警规则"""

    def __init__(
        self,
        name: str,
        metric_name: str,
        condition: str,  # >, <, >=, <=, ==, !=
        threshold: float,
        duration: float = 60.0,  # 持续时间(秒)
        level: str = "warning",
        message_template: str = None
    ):
        self.name = name
        self.metric_name = metric_name
        self.condition = condition
        self.threshold = threshold
        self.duration = duration
        self.level = level
        self.message_template = message_template or f"{metric_name} {condition} {threshold}"

        # 状态跟踪
        self.triggered_time = None
        self.last_alert_time = None
        self.alert_count = 0

    def check_condition(self, value: float) -> bool:
        """检查条件是否满足"""
        if self.condition == ">":
            return value > self.threshold
        elif self.condition == "<":
            return value < self.threshold
        elif self.condition == ">=":
            return value >= self.threshold
        elif self.condition == "<=":
            return value <= self.threshold
        elif self.condition == "==":
            return value == self.threshold
        elif self.condition == "!=":
            return value != self.threshold
        return False

    def evaluate(self, metric_point: MetricPoint) -> Optional[Alert]:
        """评估指标并生成告警"""
        current_time = time.time()
        condition_met = self.check_condition(metric_point.value)

        if condition_met:
            if self.triggered_time is None:
                self.triggered_time = current_time

            # 检查是否达到持续时间
            if current_time - self.triggered_time >= self.duration:
                # 避免重复告警(5分钟内不重复)
                if (self.last_alert_time is None or 
                    current_time - self.last_alert_time >= 300):

                    alert = Alert(
                        id=f"{self.name}_{int(current_time)}",
                        level=self.level,
                        title=f"告警: {self.name}",
                        message=self.message_template.format(
                            value=metric_point.value,
                            threshold=self.threshold
                        ),
                        timestamp=current_time,
                        source=self.metric_name,
                        tags=metric_point.tags.copy()
                    )

                    self.last_alert_time = current_time
                    self.alert_count += 1
                    return alert
        else:
            # 条件不满足,重置触发时间
            self.triggered_time = None

        return None

class MetricsCollector:
    """指标收集器"""

    def __init__(self, max_points: int = 10000):
        self.max_points = max_points
        self.metrics: Dict[str, deque] = defaultdict(lambda: deque(maxlen=max_points))
        self.lock = threading.Lock()
        self.logger = logging.getLogger(__name__)

    def record_metric(self, name: str, value: float, tags: Dict[str, str] = None):
        """记录指标"""
        if tags is None:
            tags = {}

        point = MetricPoint(
            name=name,
            value=value,
            timestamp=time.time(),
            tags=tags
        )

        with self.lock:
            self.metrics[name].append(point)

    def get_latest_metric(self, name: str) -> Optional[MetricPoint]:
        """获取最新指标值"""
        with self.lock:
            if name in self.metrics and self.metrics[name]:
                return self.metrics[name][-1]
        return None

    def get_metric_history(self, name: str, duration: float = 3600) -> List[MetricPoint]:
        """获取指标历史数据"""
        cutoff_time = time.time() - duration

        with self.lock:
            if name in self.metrics:
                return [
                    point for point in self.metrics[name]
                    if point.timestamp >= cutoff_time
                ]
        return []

    def get_metric_stats(self, name: str, duration: float = 3600) -> Dict[str, float]:
        """获取指标统计信息"""
        history = self.get_metric_history(name, duration)

        if not history:
            return {}

        values = [point.value for point in history]

        return {
            'count': len(values),
            'avg': sum(values) / len(values),
            'min': min(values),
            'max': max(values),
            'latest': values[-1] if values else 0
        }

class AlertManager:
    """告警管理器"""

    def __init__(self):
        self.rules: List[AlertRule] = []
        self.alerts: deque = deque(maxlen=1000)
        self.handlers: List[Callable[[Alert], None]] = []
        self.logger = logging.getLogger(__name__)

    def add_rule(self, rule: AlertRule):
        """添加告警规则"""
        self.rules.append(rule)
        self.logger.info(f"添加告警规则: {rule.name}")

    def add_handler(self, handler: Callable[[Alert], None]):
        """添加告警处理器"""
        self.handlers.append(handler)

    def process_metric(self, metric_point: MetricPoint):
        """处理指标并检查告警"""
        for rule in self.rules:
            if rule.metric_name == metric_point.name:
                alert = rule.evaluate(metric_point)
                if alert:
                    self.trigger_alert(alert)

    def trigger_alert(self, alert: Alert):
        """触发告警"""
        self.alerts.append(alert)
        self.logger.warning(f"触发告警: {alert.title} - {alert.message}")

        # 调用所有处理器
        for handler in self.handlers:
            try:
                handler(alert)
            except Exception as e:
                self.logger.error(f"告警处理器错误: {e}")

    def get_active_alerts(self) -> List[Alert]:
        """获取活跃告警"""
        return [alert for alert in self.alerts if not alert.resolved]

    def resolve_alert(self, alert_id: str):
        """解决告警"""
        for alert in self.alerts:
            if alert.id == alert_id and not alert.resolved:
                alert.resolved = True
                alert.resolved_time = time.time()
                self.logger.info(f"告警已解决: {alert_id}")
                break

class EmailAlertHandler:
    """邮件告警处理器"""

    def __init__(self, smtp_server: str, smtp_port: int, username: str, password: str, recipients: List[str]):
        self.smtp_server = smtp_server
        self.smtp_port = smtp_port
        self.username = username
        self.password = password
        self.recipients = recipients
        self.logger = logging.getLogger(__name__)

    def __call__(self, alert: Alert):
        """发送邮件告警"""
        try:
            msg = MIMEMultipart()
            msg['From'] = self.username
            msg['To'] = ', '.join(self.recipients)
            msg['Subject'] = f"[{alert.level.upper()}] {alert.title}"

            body = f"""
告警详情:
- 级别: {alert.level}
- 时间: {datetime.fromtimestamp(alert.timestamp)}
- 来源: {alert.source}
- 消息: {alert.message}
- 标签: {alert.tags}
            """

            msg.attach(MIMEText(body, 'plain', 'utf-8'))

            with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
                server.starttls()
                server.login(self.username, self.password)
                server.send_message(msg)

            self.logger.info(f"邮件告警已发送: {alert.id}")

        except Exception as e:
            self.logger.error(f"发送邮件告警失败: {e}")

class WebhookAlertHandler:
    """Webhook告警处理器"""

    def __init__(self, webhook_url: str, timeout: float = 10.0):
        self.webhook_url = webhook_url
        self.timeout = timeout
        self.logger = logging.getLogger(__name__)

    def __call__(self, alert: Alert):
        """发送Webhook告警"""
        try:
            payload = {
                'alert': alert.to_dict(),
                'timestamp': alert.timestamp,
                'level': alert.level,
                'title': alert.title,
                'message': alert.message
            }

            response = requests.post(
                self.webhook_url,
                json=payload,
                timeout=self.timeout,
                headers={'Content-Type': 'application/json'}
            )

            response.raise_for_status()
            self.logger.info(f"Webhook告警已发送: {alert.id}")

        except Exception as e:
            self.logger.error(f"发送Webhook告警失败: {e}")

class MonitoringSystem:
    """监控系统"""

    def __init__(self):
        self.metrics_collector = MetricsCollector()
        self.alert_manager = AlertManager()
        self.running = False
        self.monitor_tasks = []
        self.logger = logging.getLogger(__name__)

    def add_alert_rule(self, rule: AlertRule):
        """添加告警规则"""
        self.alert_manager.add_rule(rule)

    def add_alert_handler(self, handler: Callable[[Alert], None]):
        """添加告警处理器"""
        self.alert_manager.add_handler(handler)

    def record_metric(self, name: str, value: float, tags: Dict[str, str] = None):
        """记录指标"""
        metric_point = MetricPoint(
            name=name,
            value=value,
            timestamp=time.time(),
            tags=tags or {}
        )

        self.metrics_collector.record_metric(name, value, tags)
        self.alert_manager.process_metric(metric_point)

    async def start_system_monitoring(self, interval: float = 30.0):
        """启动系统监控"""
        import psutil

        while self.running:
            try:
                # CPU使用率
                cpu_percent = psutil.cpu_percent(interval=1)
                self.record_metric("system.cpu.percent", cpu_percent)

                # 内存使用率
                memory = psutil.virtual_memory()
                self.record_metric("system.memory.percent", memory.percent)
                self.record_metric("system.memory.used_mb", memory.used / (1024 * 1024))

                # 磁盘使用率
                disk = psutil.disk_usage('/')
                disk_percent = (disk.used / disk.total) * 100
                self.record_metric("system.disk.percent", disk_percent)

                # 网络IO
                net_io = psutil.net_io_counters()
                self.record_metric("system.network.bytes_sent", net_io.bytes_sent)
                self.record_metric("system.network.bytes_recv", net_io.bytes_recv)

                await asyncio.sleep(interval)

            except Exception as e:
                self.logger.error(f"系统监控错误: {e}")
                await asyncio.sleep(interval)

    async def start_crawler_monitoring(self, crawler_stats: Dict[str, Any], interval: float = 60.0):
        """启动爬虫监控"""
        while self.running:
            try:
                # 请求统计
                self.record_metric("crawler.requests_total", crawler_stats.get('requests_total', 0))
                self.record_metric("crawler.requests_success", crawler_stats.get('requests_success', 0))
                self.record_metric("crawler.requests_failed", crawler_stats.get('requests_failed', 0))

                # 响应时间
                self.record_metric("crawler.response_time_avg", crawler_stats.get('response_time_avg', 0))

                # 队列大小
                self.record_metric("crawler.queue_size", crawler_stats.get('queue_size', 0))

                # 错误率
                total_requests = crawler_stats.get('requests_total', 0)
                failed_requests = crawler_stats.get('requests_failed', 0)
                error_rate = (failed_requests / total_requests) if total_requests > 0 else 0
                self.record_metric("crawler.error_rate", error_rate)

                await asyncio.sleep(interval)

            except Exception as e:
                self.logger.error(f"爬虫监控错误: {e}")
                await asyncio.sleep(interval)

    async def start(self):
        """启动监控系统"""
        if self.running:
            return

        self.running = True

        # 启动系统监控
        system_task = asyncio.create_task(self.start_system_monitoring())
        self.monitor_tasks.append(system_task)

        self.logger.info("监控系统已启动")

    async def stop(self):
        """停止监控系统"""
        self.running = False

        # 取消所有监控任务
        for task in self.monitor_tasks:
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                pass

        self.monitor_tasks.clear()
        self.logger.info("监控系统已停止")

    def get_dashboard_data(self) -> Dict[str, Any]:
        """获取仪表板数据"""
        current_time = time.time()

        # 系统指标
        system_metrics = {}
        for metric_name in ['system.cpu.percent', 'system.memory.percent', 'system.disk.percent']:
            latest = self.metrics_collector.get_latest_metric(metric_name)
            if latest:
                system_metrics[metric_name] = latest.value

        # 爬虫指标
        crawler_metrics = {}
        for metric_name in ['crawler.requests_total', 'crawler.error_rate', 'crawler.queue_size']:
            latest = self.metrics_collector.get_latest_metric(metric_name)
            if latest:
                crawler_metrics[metric_name] = latest.value

        # 活跃告警
        active_alerts = self.alert_manager.get_active_alerts()

        return {
            'timestamp': current_time,
            'system_metrics': system_metrics,
            'crawler_metrics': crawler_metrics,
            'active_alerts': [alert.to_dict() for alert in active_alerts],
            'alert_count': len(active_alerts)
        }

# 使用示例
async def main():
    # 配置日志
    logging.basicConfig(level=logging.INFO)

    # 创建监控系统
    monitoring = MonitoringSystem()

    # 添加告警规则
    monitoring.add_alert_rule(AlertRule(
        name="高CPU使用率",
        metric_name="system.cpu.percent",
        condition=">",
        threshold=80.0,
        duration=60.0,
        level="warning",
        message_template="CPU使用率过高: {value:.1f}% (阈值: {threshold}%)"
    ))

    monitoring.add_alert_rule(AlertRule(
        name="高内存使用率",
        metric_name="system.memory.percent",
        condition=">",
        threshold=85.0,
        duration=120.0,
        level="error",
        message_template="内存使用率过高: {value:.1f}% (阈值: {threshold}%)"
    ))

    monitoring.add_alert_rule(AlertRule(
        name="高错误率",
        metric_name="crawler.error_rate",
        condition=">",
        threshold=0.1,
        duration=300.0,
        level="critical",
        message_template="爬虫错误率过高: {value:.2%} (阈值: {threshold:.2%})"
    ))

    # 添加告警处理器
    webhook_handler = WebhookAlertHandler("https://hooks.slack.com/services/YOUR/WEBHOOK/URL")
    monitoring.add_alert_handler(webhook_handler)

    # 启动监控
    await monitoring.start()

    try:
        # 模拟运行
        for i in range(100):
            # 模拟指标数据
            import random

            monitoring.record_metric("system.cpu.percent", random.uniform(60, 90))
            monitoring.record_metric("system.memory.percent", random.uniform(70, 95))
            monitoring.record_metric("crawler.error_rate", random.uniform(0.05, 0.15))

            # 获取仪表板数据
            if i % 10 == 0:
                dashboard = monitoring.get_dashboard_data()
                print(f"仪表板数据: {json.dumps(dashboard, indent=2, ensure_ascii=False)}")

            await asyncio.sleep(1)

    finally:
        await monitoring.stop()

if __name__ == "__main__":
    asyncio.run(main())

3.2 可视化监控面板

# monitoring_dashboard.py
from flask import Flask, render_template, jsonify, request
import json
import time
from datetime import datetime, timedelta
from typing importDictAnyList
import threading

classMonitoringDashboard:
"""监控仪表板"""

def__init__(self, monitoring_system, host: str = "0.0.0.0", port: int = 8080):
self.monitoring_system = monitoring_system
self.host = host
self.port = port
self.app = Flask(__name__)
self.setup_routes()

defsetup_routes(self):
"""设置路由"""

        @self.app.route('/')
defdashboard():
"""主仪表板页面"""
return render_template('dashboard.html')

        @self.app.route('/api/metrics')
defget_metrics():
"""获取指标数据API"""
return jsonify(self.monitoring_system.get_dashboard_data())

        @self.app.route('/api/metrics/<metric_name>')
defget_metric_history(metric_name):
"""获取指标历史数据API"""
            duration = request.args.get('duration'3600type=int)
            history = self.monitoring_system.metrics_collector.get_metric_history(metric_name, duration)

return jsonify({
'metric_name': metric_name,
'duration': duration,
'data': [point.to_dict() for point in history]
            })

        @self.app.route('/api/alerts')
defget_alerts():
"""获取告警数据API"""
            active_alerts = self.monitoring_system.alert_manager.get_active_alerts()
return jsonify([alert.to_dict() for alert in active_alerts])

        @self.app.route('/api/alerts/<alert_id>/resolve', methods=['POST'])
defresolve_alert(alert_id):
"""解决告警API"""
self.monitoring_system.alert_manager.resolve_alert(alert_id)
return jsonify({'status''success''message'f'告警 {alert_id} 已解决'})

        @self.app.route('/api/stats')
defget_stats():
"""获取统计信息API"""
            stats = {}

# 系统指标统计
for metric_name in ['system.cpu.percent''system.memory.percent''system.disk.percent']:
                metric_stats = self.monitoring_system.metrics_collector.get_metric_stats(metric_name)
if metric_stats:
                    stats[metric_name] = metric_stats

# 爬虫指标统计
for metric_name in ['crawler.requests_total''crawler.error_rate''crawler.response_time_avg']:
                metric_stats = self.monitoring_system.metrics_collector.get_metric_stats(metric_name)
if metric_stats:
                    stats[metric_name] = metric_stats

return jsonify(stats)

defrun(self, debug: bool = False):
"""运行仪表板"""
self.app.run(host=self.host, port=self.port, debug=debug, threaded=True)

defrun_in_thread(self):
"""在线程中运行仪表板"""
        thread = threading.Thread(target=self.run, kwargs={'debug'False})
        thread.daemon = True
        thread.start()
return thread

# HTML模板
DASHBOARD_TEMPLATE = '''
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>爬虫监控仪表板</title>
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 0;
            padding: 20px;
            background-color: 
#f5f5f5;
        }
        .container {
            max-width: 1200px;
            margin: 0 auto;
        }
        .header {
            background: white;
            padding: 20px;
            border-radius: 8px;
            margin-bottom: 20px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        .metrics-grid {
            display: grid;
            grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
            gap: 20px;
            margin-bottom: 20px;
        }
        .metric-card {
            background: white;
            padding: 20px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        .metric-value {
            font-size: 2em;
            font-weight: bold;
            color: #333;
        }
        .metric-label {
            color: #666;
            margin-bottom: 10px;
        }
        .chart-container {
            background: white;
            padding: 20px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
            margin-bottom: 20px;
        }
        .alerts-container {
            background: white;
            padding: 20px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        .alert {
            padding: 10px;
            margin: 10px 0;
            border-radius: 4px;
            border-left: 4px solid;
        }
        .alert.warning {
            background-color: #fff3cd;
            border-color: #ffc107;
        }
        .alert.error {
            background-color: #f8d7da;
            border-color: #dc3545;
        }
        .alert.critical {
            background-color: #f5c6cb;
            border-color: #721c24;
        }
        .status-indicator {
            display: inline-block;
            width: 12px;
            height: 12px;
            border-radius: 50%;
            margin-right: 8px;
        }
        .status-good { background-color: #28a745; }
        .status-warning { background-color: #ffc107; }
        .status-error { background-color: #dc3545; }
    </style>
</head>
<body>
    <div class="container">
        <div class="header">
            <h1>爬虫监控仪表板</h1>
            <p>实时监控系统性能和爬虫状态</p>
            <div id="last-update">最后更新: --</div>
        </div>

        <div class="metrics-grid">
            <div class="metric-card">
                <div class="metric-label">
                    <span class="status-indicator" id="cpu-status"></span>
                    CPU使用率
                </div>
                <div class="metric-value" id="cpu-value">--%</div>
            </div>

            <div class="metric-card">
                <div class="metric-label">
                    <span class="status-indicator" id="memory-status"></span>
                    内存使用率
                </div>
                <div class="metric-value" id="memory-value">--%</div>
            </div>

            <div class="metric-card">
                <div class="metric-label">
                    <span class="status-indicator" id="error-status"></span>
                    错误率
                </div>
                <div class="metric-value" id="error-value">--%</div>
            </div>

            <div class="metric-card">
                <div class="metric-label">
                    <span class="status-indicator" id="queue-status"></span>
                    队列大小
                </div>
                <div class="metric-value" id="queue-value">--</div>
            </div>
        </div>

        <div class="chart-container">
            <h3>系统性能趋势</h3>
            <canvas id="performance-chart" width="400" height="200"></canvas>
        </div>

        <div class="alerts-container">
            <h3>活跃告警</h3>
            <div id="alerts-list">
                <p>暂无告警</p>
            </div>
        </div>
    </div>

    <script>
        // 图表配置
        const ctx = document.getElementById('performance-chart').getContext('2d');
        const chart = new Chart(ctx, {
            type: 'line',
            data: {
                labels: [],
                datasets: [
                    {
                        label: 'CPU使用率 (%)',
                        data: [],
                        borderColor: 'rgb(255, 99, 132)',
                        backgroundColor: 'rgba(255, 99, 132, 0.2)',
                        tension: 0.1
                    },
                    {
                        label: '内存使用率 (%)',
                        data: [],
                        borderColor: 'rgb(54, 162, 235)',
                        backgroundColor: 'rgba(54, 162, 235, 0.2)',
                        tension: 0.1
                    }
                ]
            },
            options: {
                responsive: true,
                scales: {
                    y: {
                        beginAtZero: true,
                        max: 100
                    }
                }
            }
        });

        // 更新指标显示
        function updateMetrics(data) {
            const systemMetrics = data.system_metrics || {};
            const crawlerMetrics = data.crawler_metrics || {};

            // CPU
            const cpuValue = systemMetrics['system.cpu.percent'] || 0;
            document.getElementById('cpu-value').textContent = cpuValue.toFixed(1) + '%';
            document.getElementById('cpu-status').className = 
                'status-indicator ' + (cpuValue > 80 ? 'status-error' : cpuValue > 60 ? 'status-warning' : 'status-good');

            // 内存
            const memoryValue = systemMetrics['system.memory.percent'] || 0;
            document.getElementById('memory-value').textContent = memoryValue.toFixed(1) + '%';
            document.getElementById('memory-status').className = 
                'status-indicator ' + (memoryValue > 85 ? 'status-error' : memoryValue > 70 ? 'status-warning' : 'status-good');

            // 错误率
            const errorValue = (crawlerMetrics['crawler.error_rate'] || 0) * 100;
            document.getElementById('error-value').textContent = errorValue.toFixed(2) + '%';
            document.getElementById('error-status').className = 
                'status-indicator ' + (errorValue > 10 ? 'status-error' : errorValue > 5 ? 'status-warning' : 'status-good');

            // 队列大小
            const queueValue = crawlerMetrics['crawler.queue_size'] || 0;
            document.getElementById('queue-value').textContent = queueValue.toString();
            document.getElementById('queue-status').className = 
                'status-indicator ' + (queueValue > 1000 ? 'status-error' : queueValue > 500 ? 'status-warning' : 'status-good');

            // 更新时间
            document.getElementById('last-update').textContent = 
                '最后更新: ' + new Date().toLocaleString();
        }

        // 更新图表
        function updateChart(cpuHistory, memoryHistory) {
            const labels = cpuHistory.map(point => 
                new Date(point.timestamp * 1000).toLocaleTimeString()
            ).slice(-20);

            chart.data.labels = labels;
            chart.data.datasets[0].data = cpuHistory.map(point => point.value).slice(-20);
            chart.data.datasets[1].data = memoryHistory.map(point => point.value).slice(-20);
            chart.update();
        }

        // 更新告警
        function updateAlerts(alerts) {
            const alertsList = document.getElementById('alerts-list');

            if (alerts.length === 0) {
                alertsList.innerHTML = '<p>暂无告警</p>';
                return;
            }

            alertsList.innerHTML = alerts.map(alert => `
                <div class="alert ${alert.level}">
                    <strong>${alert.title}</strong><br>
                    ${alert.message}<br>
                    <small>时间: ${new Date(alert.timestamp * 1000).toLocaleString()}</small>
                </div>
            `).join('');
        }

        // 获取数据
        async function fetchData() {
            try {
                // 获取当前指标
                const metricsResponse = await fetch('/api/metrics');
                const metricsData = await metricsResponse.json();
                updateMetrics(metricsData);

                // 获取历史数据
                const cpuResponse = await fetch('/api/metrics/system.cpu.percent?duration=1200');
                const cpuData = await cpuResponse.json();

                const memoryResponse = await fetch('/api/metrics/system.memory.percent?duration=1200');
                const memoryData = await memoryResponse.json();

                updateChart(cpuData.data, memoryData.data);

                // 获取告警
                const alertsResponse = await fetch('/api/alerts');
                const alertsData = await alertsResponse.json();
                updateAlerts(alertsData);

            } catch (error) {
                console.error('获取数据失败:', error);
            }
        }

        // 定期更新
        fetchData();
        setInterval(fetchData, 5000);
    </script>
</body>
</html>
'''

# 创建模板文件
import os
defcreate_dashboard_template():
"""创建仪表板模板文件"""
    templates_dir = 'templates'
ifnot os.path.exists(templates_dir):
        os.makedirs(templates_dir)

withopen(os.path.join(templates_dir, 'dashboard.html'), 'w', encoding='utf-8'as f:
        f.write(DASHBOARD_TEMPLATE)

if __name__ == "__main__":
# 创建模板文件
    create_dashboard_template()

# 这里需要传入实际的监控系统实例
# dashboard = MonitoringDashboard(monitoring_system)
# dashboard.run(debug=True)
print("仪表板模板已创建,请配合监控系统使用")

4. 高可用架构设计

4.1 负载均衡与故障转移

# high_availability.py
import asyncio
import aiohttp
import time
import random
import logging
from typing importListDictAnyOptionalCallable
from dataclasses import dataclass, field
from enum import Enum
import json
import hashlib

classNodeStatus(Enum):
"""节点状态"""
    HEALTHY = "healthy"
    UNHEALTHY = "unhealthy"
    MAINTENANCE = "maintenance"

@dataclass
classCrawlerNode:
"""爬虫节点"""
idstr
    host: str
    port: int
    weight: int = 1
    status: NodeStatus = NodeStatus.HEALTHY
    last_check: float = field(default_factory=time.time)
    response_time: float = 0.0
    error_count: int = 0
    success_count: int = 0
    current_load: int = 0
    max_load: int = 100

    @property
defurl(self) -> str:
returnf"http://{self.host}:{self.port}"

    @property
defhealth_score(self) -> float:
"""计算健康分数"""
ifself.status != NodeStatus.HEALTHY:
return0.0

        total_requests = self.success_count + self.error_count
if total_requests == 0:
return1.0

        success_rate = self.success_count / total_requests
        load_factor = 1.0 - (self.current_load / self.max_load)
        response_factor = max(0.11.0 - (self.response_time / 5.0))  # 5秒为基准

return success_rate * load_factor * response_factor

defto_dict(self) -> Dict[strAny]:
return {
'id'self.id,
'host'self.host,
'port'self.port,
'weight'self.weight,
'status'self.status.value,
'response_time'self.response_time,
'error_count'self.error_count,
'success_count'self.success_count,
'current_load'self.current_load,
'max_load'self.max_load,
'health_score'self.health_score
        }

classLoadBalancer:
"""负载均衡器"""

def__init__(self, health_check_interval: float = 30.0):
self.nodes: List[CrawlerNode] = []
self.health_check_interval = health_check_interval
self.running = False
self.logger = logging.getLogger(__name__)
self._health_check_task = None

defadd_node(self, node: CrawlerNode):
"""添加节点"""
self.nodes.append(node)
self.logger.info(f"添加节点: {node.id} ({node.url})")

defremove_node(self, node_id: str):
"""移除节点"""
self.nodes = [node for node inself.nodes if node.id != node_id]
self.logger.info(f"移除节点: {node_id}")

defget_healthy_nodes(self) -> List[CrawlerNode]:
"""获取健康节点"""
return [node for node inself.nodes if node.status == NodeStatus.HEALTHY]

defselect_node_round_robin(self) -> Optional[CrawlerNode]:
"""轮询选择节点"""
        healthy_nodes = self.get_healthy_nodes()
ifnot healthy_nodes:
returnNone

# 简单轮询实现
ifnothasattr(self'_round_robin_index'):
self._round_robin_index = 0

        node = healthy_nodes[self._round_robin_index % len(healthy_nodes)]
self._round_robin_index += 1
return node

defselect_node_weighted(self) -> Optional[CrawlerNode]:
"""加权选择节点"""
        healthy_nodes = self.get_healthy_nodes()
ifnot healthy_nodes:
returnNone

# 计算总权重
        total_weight = sum(node.weight for node in healthy_nodes)
if total_weight == 0:
return random.choice(healthy_nodes)

# 加权随机选择
        rand_weight = random.uniform(0, total_weight)
        current_weight = 0

for node in healthy_nodes:
            current_weight += node.weight
if current_weight >= rand_weight:
return node

return healthy_nodes[-1]

defselect_node_least_connections(self) -> Optional[CrawlerNode]:
"""最少连接选择节点"""
        healthy_nodes = self.get_healthy_nodes()
ifnot healthy_nodes:
returnNone

returnmin(healthy_nodes, key=lambda node: node.current_load)

defselect_node_health_based(self) -> Optional[CrawlerNode]:
"""基于健康分数选择节点"""
        healthy_nodes = self.get_healthy_nodes()
ifnot healthy_nodes:
returnNone

# 按健康分数排序,选择最佳节点
        sorted_nodes = sorted(healthy_nodes, key=lambda node: node.health_score, reverse=True)

# 在前几个最佳节点中随机选择
        top_nodes = sorted_nodes[:min(3len(sorted_nodes))]
return random.choice(top_nodes)

asyncdefhealth_check(self, node: CrawlerNode) -> bool:
"""健康检查"""
try:
            start_time = time.time()

asyncwith aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=5)) as session:
asyncwith session.get(f"{node.url}/health"as response:
                    response_time = time.time() - start_time
                    node.response_time = response_time
                    node.last_check = time.time()

if response.status == 200:
                        data = await response.json()
                        node.current_load = data.get('current_load'0)
                        node.status = NodeStatus.HEALTHY
                        node.success_count += 1
returnTrue
else:
                        node.error_count += 1
                        node.status = NodeStatus.UNHEALTHY
returnFalse

except Exception as e:
self.logger.warning(f"节点 {node.id} 健康检查失败: {e}")
            node.error_count += 1
            node.status = NodeStatus.UNHEALTHY
            node.last_check = time.time()
returnFalse

asyncdefstart_health_checks(self):
"""启动健康检查"""
self.running = True

whileself.running:
try:
# 并发检查所有节点
                tasks = [self.health_check(node) for node inself.nodes]
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)

await asyncio.sleep(self.health_check_interval)

except Exception as e:
self.logger.error(f"健康检查错误: {e}")
await asyncio.sleep(self.health_check_interval)

asyncdefstart(self):
"""启动负载均衡器"""
ifself.running:
return

self._health_check_task = asyncio.create_task(self.start_health_checks())
self.logger.info("负载均衡器已启动")

asyncdefstop(self):
"""停止负载均衡器"""
self.running = False

ifself._health_check_task:
self._health_check_task.cancel()
try:
awaitself._health_check_task
except asyncio.CancelledError:
pass

self.logger.info("负载均衡器已停止")

defget_stats(self) -> Dict[strAny]:
"""获取统计信息"""
        healthy_count = len(self.get_healthy_nodes())
        total_count = len(self.nodes)

return {
'total_nodes': total_count,
'healthy_nodes': healthy_count,
'unhealthy_nodes': total_count - healthy_count,
'nodes': [node.to_dict() for node inself.nodes]
        }

classFailoverManager:
"""故障转移管理器"""

def__init__(self, load_balancer: LoadBalancer, max_retries: int = 3):
self.load_balancer = load_balancer
self.max_retries = max_retries
self.logger = logging.getLogger(__name__)

asyncdefexecute_with_failover(self, 
                                   task_func: Callable,
                                   *args, 
                                   **kwargs
) -> Any:
"""带故障转移的任务执行"""
        last_exception = None

for attempt inrange(self.max_retries):
# 选择节点
            node = self.load_balancer.select_node_health_based()
ifnot node:
raise Exception("没有可用的健康节点")

try:
# 增加节点负载
                node.current_load += 1

# 执行任务
                result = await task_func(node, *args, **kwargs)

# 任务成功
                node.success_count += 1
return result

except Exception as e:
                last_exception = e
                node.error_count += 1

self.logger.warning(
f"节点 {node.id} 执行失败 (尝试 {attempt + 1}/{self.max_retries}): {e}"
                )

# 如果错误率过高,标记节点为不健康
                total_requests = node.success_count + node.error_count
if total_requests > 10:
                    error_rate = node.error_count / total_requests
if error_rate > 0.5:  # 错误率超过50%
                        node.status = NodeStatus.UNHEALTHY
self.logger.warning(f"节点 {node.id} 错误率过高,标记为不健康")

finally:
# 减少节点负载
                node.current_load = max(0, node.current_load - 1)

# 所有重试都失败
raise Exception(f"所有节点都失败了,最后错误: {last_exception}")

classDistributedCrawlerManager:
"""分布式爬虫管理器"""

def__init__(self):
self.load_balancer = LoadBalancer()
self.failover_manager = FailoverManager(self.load_balancer)
self.task_queue = asyncio.Queue()
self.results = []
self.running = False
self.worker_tasks = []
self.logger = logging.getLogger(__name__)

defadd_crawler_node(self, node_id: str, host: str, port: int, weight: int = 1):
"""添加爬虫节点"""
        node = CrawlerNode(
id=node_id,
            host=host,
            port=port,
            weight=weight
        )
self.load_balancer.add_node(node)

asyncdefsubmit_task(self, url: str, task_data: Dict[strAny] = None):
"""提交爬取任务"""
        task = {
'id': hashlib.md5(f"{url}{time.time()}".encode()).hexdigest(),
'url': url,
'data': task_data or {},
'timestamp': time.time()
        }
awaitself.task_queue.put(task)

asyncdefexecute_crawl_task(self, node: CrawlerNode, task: Dict[strAny]) -> Dict[strAny]:
"""在指定节点执行爬取任务"""
asyncwith aiohttp.ClientSession() as session:
asyncwith session.post(
f"{node.url}/crawl",
                json=task,
                timeout=aiohttp.ClientTimeout(total=30)
            ) as response:
if response.status == 200:
                    result = await response.json()
return {
'task_id': task['id'],
'node_id': node.id,
'status''success',
'result': result,
'timestamp': time.time()
                    }
else:
raise Exception(f"HTTP {response.status}{await response.text()}")

asyncdefworker(self):
"""工作协程"""
whileself.running:
try:
# 获取任务
                task = await asyncio.wait_for(self.task_queue.get(), timeout=1.0)

# 执行任务
                result = awaitself.failover_manager.execute_with_failover(
self.execute_crawl_task, task
                )

# 保存结果
self.results.append(result)
self.logger.info(f"任务 {task['id']} 完成,节点: {result['node_id']}")

except asyncio.TimeoutError:
continue
except Exception as e:
self.logger.error(f"工作协程错误: {e}")

asyncdefstart(self, num_workers: int = 5):
"""启动分布式爬虫管理器"""
ifself.running:
return

self.running = True

# 启动负载均衡器
awaitself.load_balancer.start()

# 启动工作协程
for i inrange(num_workers):
            task = asyncio.create_task(self.worker())
self.worker_tasks.append(task)

self.logger.info(f"分布式爬虫管理器已启动,工作协程数: {num_workers}")

asyncdefstop(self):
"""停止分布式爬虫管理器"""
self.running = False

# 停止负载均衡器
awaitself.load_balancer.stop()

# 停止工作协程
for task inself.worker_tasks:
            task.cancel()

ifself.worker_tasks:
await asyncio.gather(*self.worker_tasks, return_exceptions=True)

self.worker_tasks.clear()
self.logger.info("分布式爬虫管理器已停止")

defget_status(self) -> Dict[strAny]:
"""获取状态信息"""
return {
'running'self.running,
'queue_size'self.task_queue.qsize(),
'completed_tasks'len(self.results),
'load_balancer'self.load_balancer.get_stats()
        }

# 使用示例
asyncdefmain():
# 配置日志
    logging.basicConfig(level=logging.INFO)

# 创建分布式爬虫管理器
    manager = DistributedCrawlerManager()

# 添加爬虫节点
    manager.add_crawler_node("node1""localhost"8001, weight=2)
    manager.add_crawler_node("node2""localhost"8002, weight=1)
    manager.add_crawler_node("node3""localhost"8003, weight=1)

# 启动管理器
await manager.start(num_workers=3)

try:
# 提交任务
        urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
"https://example.com/page4",
"https://example.com/page5"
        ]

for url in urls:
await manager.submit_task(url, {'timeout'10})

# 等待任务完成
while manager.task_queue.qsize() > 0orlen(manager.results) < len(urls):
            status = manager.get_status()
print(f"状态: {json.dumps(status, indent=2, ensure_ascii=False)}")
await asyncio.sleep(2)

print(f"所有任务完成,结果数量: {len(manager.results)}")

finally:
await manager.stop()

if __name__ == "__main__":
    asyncio.run(main())

4.2 数据一致性保证

# data_consistency.py
import asyncio
import aioredis
import json
import time
import hashlib
from typing importDictAnyListOptionalSet
from dataclasses import dataclass, field
from enum import Enum
import logging

classTransactionStatus(Enum):
"""事务状态"""
    PENDING = "pending"
    COMMITTED = "committed"
    ABORTED = "aborted"

@dataclass
classTransaction:
"""分布式事务"""
idstr
    operations: List[Dict[strAny]] = field(default_factory=list)
    status: TransactionStatus = TransactionStatus.PENDING
    timestamp: float = field(default_factory=time.time)
    participants: Set[str] = field(default_factory=set)

defto_dict(self) -> Dict[strAny]:
return {
'id'self.id,
'operations'self.operations,
'status'self.status.value,
'timestamp'self.timestamp,
'participants'list(self.participants)
        }

classDistributedLock:
"""分布式锁"""

def__init__(self, redis_client, key: str, timeout: float = 30.0):
self.redis = redis_client
self.key = f"lock:{key}"
self.timeout = timeout
self.identifier = hashlib.md5(f"{time.time()}{id(self)}".encode()).hexdigest()
self.logger = logging.getLogger(__name__)

asyncdefacquire(self, blocking: bool = True, timeout: Optional[float] = None) -> bool:
"""获取锁"""
        end_time = time.time() + (timeout orself.timeout)

whileTrue:
# 尝试获取锁
            result = awaitself.redis.set(
self.key, 
self.identifier, 
                ex=int(self.timeout),
                nx=True
            )

if result:
self.logger.debug(f"获取锁成功: {self.key}")
returnTrue

ifnot blocking or time.time() > end_time:
returnFalse

await asyncio.sleep(0.1)

asyncdefrelease(self) -> bool:
"""释放锁"""
# Lua脚本确保原子性
        lua_script = """
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
        """


        result = awaitself.redis.eval(lua_script, 1self.key, self.identifier)
if result:
self.logger.debug(f"释放锁成功: {self.key}")
returnTrue
else:
self.logger.warning(f"释放锁失败: {self.key}")
returnFalse

asyncdef__aenter__(self):
"""异步上下文管理器入口"""
ifnotawaitself.acquire():
raise Exception(f"无法获取锁: {self.key}")
returnself

asyncdef__aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
awaitself.release()

classConsistencyManager:
"""数据一致性管理器"""

def__init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_url = redis_url
self.redis = None
self.transactions: Dict[str, Transaction] = {}
self.logger = logging.getLogger(__name__)

asyncdefconnect(self):
"""连接Redis"""
self.redis = await aioredis.from_url(self.redis_url)
self.logger.info("连接到Redis成功")

asyncdefdisconnect(self):
"""断开Redis连接"""
ifself.redis:
awaitself.redis.close()
self.logger.info("断开Redis连接")

defcreate_lock(self, key: str, timeout: float = 30.0) -> DistributedLock:
"""创建分布式锁"""
return DistributedLock(self.redis, key, timeout)

asyncdefbegin_transaction(self, transaction_id: str) -> Transaction:
"""开始事务"""
        transaction = Transaction(id=transaction_id)
self.transactions[transaction_id] = transaction

# 保存到Redis
awaitself.redis.hset(
"transactions",
            transaction_id,
            json.dumps(transaction.to_dict())
        )

self.logger.info(f"开始事务: {transaction_id}")
return transaction

asyncdefadd_operation(self, 
                           transaction_id: str
                           operation_type: str,
                           data: Dict[strAny],
                           participant: str
):
"""添加事务操作"""
if transaction_id notinself.transactions:
raise Exception(f"事务不存在: {transaction_id}")

        transaction = self.transactions[transaction_id]
if transaction.status != TransactionStatus.PENDING:
raise Exception(f"事务状态不正确: {transaction.status}")

        operation = {
'type': operation_type,
'data': data,
'participant': participant,
'timestamp': time.time()
        }

        transaction.operations.append(operation)
        transaction.participants.add(participant)

# 更新Redis
awaitself.redis.hset(
"transactions",
            transaction_id,
            json.dumps(transaction.to_dict())
        )

self.logger.debug(f"添加操作到事务 {transaction_id}{operation_type}")

asyncdefprepare_transaction(self, transaction_id: str) -> bool:
"""准备事务(两阶段提交的第一阶段)"""
if transaction_id notinself.transactions:
returnFalse

        transaction = self.transactions[transaction_id]

# 向所有参与者发送准备请求
        prepare_results = []
for participant in transaction.participants:
try:
# 这里应该调用参与者的prepare接口
# 简化实现,假设都成功
                result = awaitself._send_prepare_request(participant, transaction)
                prepare_results.append(result)
except Exception as e:
self.logger.error(f"参与者 {participant} 准备失败: {e}")
                prepare_results.append(False)

# 所有参与者都准备成功才能提交
        can_commit = all(prepare_results)

if can_commit:
self.logger.info(f"事务 {transaction_id} 准备成功")
else:
self.logger.warning(f"事务 {transaction_id} 准备失败")

return can_commit

asyncdefcommit_transaction(self, transaction_id: str) -> bool:
"""提交事务"""
if transaction_id notinself.transactions:
returnFalse

        transaction = self.transactions[transaction_id]

try:
# 向所有参与者发送提交请求
for participant in transaction.participants:
awaitself._send_commit_request(participant, transaction)

# 更新事务状态
            transaction.status = TransactionStatus.COMMITTED
awaitself.redis.hset(
"transactions",
                transaction_id,
                json.dumps(transaction.to_dict())
            )

self.logger.info(f"事务 {transaction_id} 提交成功")
returnTrue

except Exception as e:
self.logger.error(f"事务 {transaction_id} 提交失败: {e}")
awaitself.abort_transaction(transaction_id)
returnFalse

asyncdefabort_transaction(self, transaction_id: str) -> bool:
"""中止事务"""
if transaction_id notinself.transactions:
returnFalse

        transaction = self.transactions[transaction_id]

try:
# 向所有参与者发送中止请求
for participant in transaction.participants:
awaitself._send_abort_request(participant, transaction)

# 更新事务状态
            transaction.status = TransactionStatus.ABORTED
awaitself.redis.hset(
"transactions",
                transaction_id,
                json.dumps(transaction.to_dict())
            )

self.logger.info(f"事务 {transaction_id} 已中止")
returnTrue

except Exception as e:
self.logger.error(f"事务 {transaction_id} 中止失败: {e}")
returnFalse

asyncdef_send_prepare_request(self, participant: str, transaction: Transaction) -> bool:
"""发送准备请求(模拟实现)"""
# 这里应该实际调用参与者的API
await asyncio.sleep(0.1)  # 模拟网络延迟
returnTrue

asyncdef_send_commit_request(self, participant: str, transaction: Transaction):
"""发送提交请求(模拟实现)"""
# 这里应该实际调用参与者的API
await asyncio.sleep(0.1)  # 模拟网络延迟

asyncdef_send_abort_request(self, participant: str, transaction: Transaction):
"""发送中止请求(模拟实现)"""
# 这里应该实际调用参与者的API
await asyncio.sleep(0.1)  # 模拟网络延迟

asyncdefexecute_with_consistency(self, 
                                     operations: List[Dict[strAny]],
                                     transaction_id: Optional[str] = None
) -> bool:
"""执行一致性操作"""
ifnot transaction_id:
            transaction_id = hashlib.md5(f"{time.time()}{len(operations)}".encode()).hexdigest()

# 开始事务
        transaction = awaitself.begin_transaction(transaction_id)

try:
# 添加所有操作
for operation in operations:
awaitself.add_operation(
                    transaction_id,
                    operation['type'],
                    operation['data'],
                    operation['participant']
                )

# 两阶段提交
ifawaitself.prepare_transaction(transaction_id):
returnawaitself.commit_transaction(transaction_id)
else:
awaitself.abort_transaction(transaction_id)
returnFalse

except Exception as e:
self.logger.error(f"执行一致性操作失败: {e}")
awaitself.abort_transaction(transaction_id)
returnFalse

asyncdefensure_data_consistency(self, 
                                    key: str
                                    update_func: callable,
                                    *args, **kwargs
) -> Any:
"""确保数据一致性的更新操作"""
asyncwithself.create_lock(key):
# 在锁保护下执行更新操作
returnawait update_func(*args, **kwargs)

# 使用示例
asyncdefmain():
# 配置日志
    logging.basicConfig(level=logging.INFO)

# 创建一致性管理器
    consistency_manager = ConsistencyManager()
await consistency_manager.connect()

try:
# 测试分布式锁
asyncwith consistency_manager.create_lock("test_resource"):
print("获取锁成功,执行关键操作...")
await asyncio.sleep(1)
print("关键操作完成")

# 测试分布式事务
        operations = [
            {
'type''insert',
'data': {'url''https://example.com/1''status''pending'},
'participant''node1'
            },
            {
'type''update',
'data': {'url''https://example.com/1''status''processing'},
'participant''node2'
            }
        ]

        success = await consistency_manager.execute_with_consistency(operations)
print(f"事务执行结果: {success}")

finally:
await consistency_manager.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

最新文章

随机文章

基本 文件 流程 错误 SQL 调试
  1. 请求信息 : 2026-03-27 14:47:31 HTTP/2.0 GET : https://f.mffb.com.cn/a/478277.html
  2. 运行时间 : 0.116386s [ 吞吐率:8.59req/s ] 内存消耗:5,345.22kb 文件加载:140
  3. 缓存信息 : 0 reads,0 writes
  4. 会话信息 : SESSION_ID=3571386bf0d0faf9b73ec242d5ccd7c2
  1. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/public/index.php ( 0.79 KB )
  2. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/autoload.php ( 0.17 KB )
  3. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_real.php ( 2.49 KB )
  4. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/platform_check.php ( 0.90 KB )
  5. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/ClassLoader.php ( 14.03 KB )
  6. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_static.php ( 4.90 KB )
  7. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper.php ( 8.34 KB )
  8. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/helper.php ( 2.19 KB )
  9. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/helper.php ( 1.47 KB )
  10. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/stubs/load_stubs.php ( 0.16 KB )
  11. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Exception.php ( 1.69 KB )
  12. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Facade.php ( 2.71 KB )
  13. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/deprecation-contracts/function.php ( 0.99 KB )
  14. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap.php ( 8.26 KB )
  15. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap80.php ( 9.78 KB )
  16. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/Resources/functions/dump.php ( 1.49 KB )
  17. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-dumper/src/helper.php ( 0.18 KB )
  18. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/VarDumper.php ( 4.30 KB )
  19. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/App.php ( 15.30 KB )
  20. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Container.php ( 15.76 KB )
  21. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/container/src/ContainerInterface.php ( 1.02 KB )
  22. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/provider.php ( 0.19 KB )
  23. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Http.php ( 6.04 KB )
  24. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Str.php ( 7.29 KB )
  25. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Env.php ( 4.68 KB )
  26. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/common.php ( 0.03 KB )
  27. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/helper.php ( 18.78 KB )
  28. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Config.php ( 5.54 KB )
  29. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/app.php ( 0.95 KB )
  30. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cache.php ( 0.78 KB )
  31. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/console.php ( 0.23 KB )
  32. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cookie.php ( 0.56 KB )
  33. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/database.php ( 2.48 KB )
  34. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Env.php ( 1.67 KB )
  35. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/filesystem.php ( 0.61 KB )
  36. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/lang.php ( 0.91 KB )
  37. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/log.php ( 1.35 KB )
  38. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/middleware.php ( 0.19 KB )
  39. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/route.php ( 1.89 KB )
  40. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/session.php ( 0.57 KB )
  41. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/trace.php ( 0.34 KB )
  42. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/view.php ( 0.82 KB )
  43. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/event.php ( 0.25 KB )
  44. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Event.php ( 7.67 KB )
  45. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/service.php ( 0.13 KB )
  46. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/AppService.php ( 0.26 KB )
  47. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Service.php ( 1.64 KB )
  48. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Lang.php ( 7.35 KB )
  49. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/lang/zh-cn.php ( 13.70 KB )
  50. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/Error.php ( 3.31 KB )
  51. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/RegisterService.php ( 1.33 KB )
  52. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/services.php ( 0.14 KB )
  53. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/PaginatorService.php ( 1.52 KB )
  54. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ValidateService.php ( 0.99 KB )
  55. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ModelService.php ( 2.04 KB )
  56. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Service.php ( 0.77 KB )
  57. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Middleware.php ( 6.72 KB )
  58. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/BootService.php ( 0.77 KB )
  59. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Paginator.php ( 11.86 KB )
  60. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/Validate.php ( 63.20 KB )
  61. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Model.php ( 23.55 KB )
  62. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Attribute.php ( 21.05 KB )
  63. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/AutoWriteData.php ( 4.21 KB )
  64. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Conversion.php ( 6.44 KB )
  65. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/DbConnect.php ( 5.16 KB )
  66. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/ModelEvent.php ( 2.33 KB )
  67. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/RelationShip.php ( 28.29 KB )
  68. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Arrayable.php ( 0.09 KB )
  69. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Jsonable.php ( 0.13 KB )
  70. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/contract/Modelable.php ( 0.09 KB )
  71. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Db.php ( 2.88 KB )
  72. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/DbManager.php ( 8.52 KB )
  73. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Log.php ( 6.28 KB )
  74. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Manager.php ( 3.92 KB )
  75. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerTrait.php ( 2.69 KB )
  76. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerInterface.php ( 2.71 KB )
  77. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cache.php ( 4.92 KB )
  78. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/simple-cache/src/CacheInterface.php ( 4.71 KB )
  79. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Arr.php ( 16.63 KB )
  80. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/driver/File.php ( 7.84 KB )
  81. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/Driver.php ( 9.03 KB )
  82. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/CacheHandlerInterface.php ( 1.99 KB )
  83. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/Request.php ( 0.09 KB )
  84. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Request.php ( 55.78 KB )
  85. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/middleware.php ( 0.25 KB )
  86. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Pipeline.php ( 2.61 KB )
  87. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/TraceDebug.php ( 3.40 KB )
  88. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/middleware/SessionInit.php ( 1.94 KB )
  89. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Session.php ( 1.80 KB )
  90. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/driver/File.php ( 6.27 KB )
  91. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/SessionHandlerInterface.php ( 0.87 KB )
  92. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/Store.php ( 7.12 KB )
  93. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Route.php ( 23.73 KB )
  94. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleName.php ( 5.75 KB )
  95. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Domain.php ( 2.53 KB )
  96. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleGroup.php ( 22.43 KB )
  97. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Rule.php ( 26.95 KB )
  98. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleItem.php ( 9.78 KB )
  99. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/route/app.php ( 1.72 KB )
  100. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Route.php ( 4.70 KB )
  101. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/dispatch/Controller.php ( 4.74 KB )
  102. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Dispatch.php ( 10.44 KB )
  103. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/controller/Index.php ( 4.81 KB )
  104. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/BaseController.php ( 2.05 KB )
  105. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/facade/Db.php ( 0.93 KB )
  106. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/connector/Mysql.php ( 5.44 KB )
  107. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/PDOConnection.php ( 52.47 KB )
  108. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Connection.php ( 8.39 KB )
  109. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/ConnectionInterface.php ( 4.57 KB )
  110. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/builder/Mysql.php ( 16.58 KB )
  111. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Builder.php ( 24.06 KB )
  112. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseBuilder.php ( 27.50 KB )
  113. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Query.php ( 15.71 KB )
  114. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseQuery.php ( 45.13 KB )
  115. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TimeFieldQuery.php ( 7.43 KB )
  116. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/AggregateQuery.php ( 3.26 KB )
  117. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ModelRelationQuery.php ( 20.07 KB )
  118. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ParamsBind.php ( 3.66 KB )
  119. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ResultOperation.php ( 7.01 KB )
  120. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/WhereQuery.php ( 19.37 KB )
  121. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/JoinAndViewQuery.php ( 7.11 KB )
  122. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php ( 2.63 KB )
  123. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/Transaction.php ( 2.77 KB )
  124. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/driver/File.php ( 5.96 KB )
  125. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/LogHandlerInterface.php ( 0.86 KB )
  126. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/Channel.php ( 3.89 KB )
  127. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/event/LogRecord.php ( 1.02 KB )
  128. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/Collection.php ( 16.47 KB )
  129. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/View.php ( 1.70 KB )
  130. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/View.php ( 4.39 KB )
  131. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Response.php ( 8.81 KB )
  132. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/response/View.php ( 3.29 KB )
  133. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cookie.php ( 6.06 KB )
  134. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-view/src/Think.php ( 8.38 KB )
  135. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/TemplateHandlerInterface.php ( 1.60 KB )
  136. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/Template.php ( 46.61 KB )
  137. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/driver/File.php ( 2.41 KB )
  138. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/contract/DriverInterface.php ( 0.86 KB )
  139. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/runtime/temp/067d451b9a0c665040f3f1bdd3293d68.php ( 11.98 KB )
  140. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Html.php ( 4.42 KB )
  1. CONNECT:[ UseTime:0.000528s ] mysql:host=127.0.0.1;port=3306;dbname=f_mffb;charset=utf8mb4
  2. SHOW FULL COLUMNS FROM `fenlei` [ RunTime:0.000926s ]
  3. SELECT * FROM `fenlei` WHERE `fid` = 0 [ RunTime:0.000341s ]
  4. SELECT * FROM `fenlei` WHERE `fid` = 63 [ RunTime:0.000264s ]
  5. SHOW FULL COLUMNS FROM `set` [ RunTime:0.000498s ]
  6. SELECT * FROM `set` [ RunTime:0.002521s ]
  7. SHOW FULL COLUMNS FROM `article` [ RunTime:0.000703s ]
  8. SELECT * FROM `article` WHERE `id` = 478277 LIMIT 1 [ RunTime:0.005697s ]
  9. UPDATE `article` SET `lasttime` = 1774594051 WHERE `id` = 478277 [ RunTime:0.004745s ]
  10. SELECT * FROM `fenlei` WHERE `id` = 66 LIMIT 1 [ RunTime:0.000659s ]
  11. SELECT * FROM `article` WHERE `id` < 478277 ORDER BY `id` DESC LIMIT 1 [ RunTime:0.001069s ]
  12. SELECT * FROM `article` WHERE `id` > 478277 ORDER BY `id` ASC LIMIT 1 [ RunTime:0.000973s ]
  13. SELECT * FROM `article` WHERE `id` < 478277 ORDER BY `id` DESC LIMIT 10 [ RunTime:0.005863s ]
  14. SELECT * FROM `article` WHERE `id` < 478277 ORDER BY `id` DESC LIMIT 10,10 [ RunTime:0.006181s ]
  15. SELECT * FROM `article` WHERE `id` < 478277 ORDER BY `id` DESC LIMIT 20,10 [ RunTime:0.007567s ]
0.120405s