当前位置:首页>python>20个超硬核 Python 脚本!覆盖机器学习、数据处理、后台开发,直接抄作业

20个超硬核 Python 脚本!覆盖机器学习、数据处理、后台开发,直接抄作业

  • 2026-02-07 13:59:29
20个超硬核 Python 脚本!覆盖机器学习、数据处理、后台开发,直接抄作业

小编整理了Python入门知识库,获取方式见文末!

Python 的 “万能” 属性,从来都不是说说而已 —— 但对很多开发者来说,真正的痛点是 “知道 Python 能做很多事,却不知道具体怎么落地”

今天整理的《20 个超硬核 Python 脚本》,彻底打破 “学了不用、用了不会” 的困境:覆盖机器学习、数据处理、后台开发三大高频场景,每一个都是算法工程师、数据分析师、后端开发者日常会遇到的真实案例,拿来就能改、改完就能用。

📊 第一部分:数据处理与可视化

1. 数据清洗自动化脚本

# 一键处理缺失值、异常值、重复数据import pandas as pdimport numpy as npdefsmart_data_cleaner(df):"""    智能数据清洗:自动处理常见数据问题    """# 处理缺失值for col in df.columns:if df[col].dtype in ['int64''float64']:            df[col].fillna(df[col].median(), inplace=True)else:            df[col].fillna(df[col].mode()[0], inplace=True)# 处理异常值(IQR方法)    Q1 = df.select_dtypes(include=[np.number]).quantile(0.25)    Q3 = df.select_dtypes(include=[np.number]).quantile(0.75)    IQR = Q3 - Q1# 移除重复数据    df = df.drop_duplicates()return df

2. 多数据源合并工具

# 支持Excel、CSV、数据库、API数据的智能合并def multi_source_merger(sources, merge_key):"""    多数据源智能合并    """    merged_df = Noneforsourcein sources:ifsource['type'] == 'csv':df = pd.read_csv(source['path'])elifsource['type'] == 'excel':df = pd.read_excel(source['path'])elifsource['type'] == 'database':df = pd.read_sql(source['query'], source['conn'])if merged_df is None:            merged_df = dfelse:            merged_df = pd.merge(merged_df, df, on=merge_key, how='outer')return merged_df

3. 实时数据监控仪表盘

# 使用Dash构建实时监控面板import dashfrom dash import dcc, htmlimport plotly.graph_objs as gofrom datetime import datetime, timedelta# 创建实时更新的监控面板app = dash.Dash(__name__)app.layout = html.Div([    html.H1('📊 实时数据监控系统'),    dcc.Graph(id='live-graph'),    dcc.Interval(id='graph-update',        interval=5000,  # 5秒更新一次        n_intervals=0    )])@app.callback(    dash.dependencies.Output('live-graph''figure'),    [dash.dependencies.Input('graph-update''n_intervals')])defupdate_graph(n):"""    实时更新图表数据    """# 这里替换为你的实时数据获取逻辑    x_data = [datetime.now() - timedelta(minutes=i) for i inrange(30)]    y_data = np.random.randn(30).cumsum()    trace = go.Scatter(        x=x_data,        y=y_data,        mode='lines+markers',        name='实时数据流'    )return {'data': [trace]}

4. 大数据分块处理工具

"""大数据分块处理:处理GB级数据而不耗尽内存"""import pandas as pdfrom pathlib import Pathimport dask.dataframe as ddimport gcclassBigDataProcessor:"""大数据处理工具"""    @staticmethoddefprocess_large_csv(file_path, chunk_size=100000, process_func=None):"""        分块处理大CSV文件        """print(f"📁 开始处理大文件: {file_path}")        chunk_list = []# 分块读取for i, chunk inenumerate(pd.read_csv(file_path, chunksize=chunk_size)):print(f"处理第 {i+1} 块数据,大小: {len(chunk)} 行")# 应用处理函数(如果提供)if process_func:                chunk = process_func(chunk)            chunk_list.append(chunk)# 每处理5个块清理一次内存if i % 5 == 0:                gc.collect()# 合并所有块        result = pd.concat(chunk_list, ignore_index=True)print(f"✅ 处理完成,总共 {len(result)} 行数据")return result    @staticmethoddefparallel_process_with_dask(file_path, npartitions=4):"""        使用Dask进行并行处理        """# 创建Dask DataFrame        ddf = dd.read_csv(file_path)print(f"原始数据形状: {len(ddf)} 行")# 并行处理示例:计算每列平均值        means = ddf.mean().compute()# 筛选示例:过滤掉某些值        filtered = ddf[ddf['some_column'] > 0].compute()return means, filtered

📊 第二部分:机器学习实战

5. 自动机器学习(AutoML)管道

from sklearn.pipeline import Pipelinefrom sklearn.impute import SimpleImputerfrom sklearn.preprocessing import StandardScaler, OneHotEncoderfrom sklearn.compose import ColumnTransformerfrom sklearn.ensemble import RandomForestClassifierfrom sklearn.model_selection import cross_val_scoredefbuild_automl_pipeline():"""    构建自动机器学习管道    """# 数值特征处理    numeric_features = ['age''salary''experience']    numeric_transformer = Pipeline(steps=[        ('imputer', SimpleImputer(strategy='median')),        ('scaler', StandardScaler())    ])# 分类特征处理    categorical_features = ['gender''education''city']    categorical_transformer = Pipeline(steps=[        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),        ('onehot', OneHotEncoder(handle_unknown='ignore'))    ])# 组合所有特征处理器    preprocessor = ColumnTransformer(        transformers=[            ('num', numeric_transformer, numeric_features),            ('cat', categorical_transformer, categorical_features)        ])# 完整的机器学习管道    pipeline = Pipeline(steps=[        ('preprocessor', preprocessor),        ('classifier', RandomForestClassifier(n_estimators=100))    ])return pipeline

6. 模型超参数自动调优

from sklearn.model_selection import GridSearchCVimport optunadefhyperparameter_tuning(X_train, y_train):"""    使用Optuna进行超参数自动调优    """defobjective(trial):# 定义搜索空间        n_estimators = trial.suggest_int('n_estimators'50300)        max_depth = trial.suggest_int('max_depth'315)        min_samples_split = trial.suggest_int('min_samples_split'210)# 创建模型        model = RandomForestClassifier(            n_estimators=n_estimators,            max_depth=max_depth,            min_samples_split=min_samples_split,            random_state=42        )# 交叉验证        score = cross_val_score(model, X_train, y_train, cv=5, scoring='accuracy')return score.mean()# 创建Optuna研究    study = optuna.create_study(direction='maximize')    study.optimize(objective, n_trials=50)print(f"🎯 最佳超参数: {study.best_params}")print(f"📈 最佳准确率: {study.best_value:.4f}")return study.best_params

7. 深度学习模型部署脚本

import torchimport torch.nn as nnfrom fastapi import FastAPIimport uvicornfrom pydantic import BaseModel# 定义一个简单的神经网络classSimpleNN(nn.Module):def__init__(self):super(SimpleNN, self).__init__()self.fc1 = nn.Linear(1064)self.fc2 = nn.Linear(6432)self.fc3 = nn.Linear(321)self.relu = nn.ReLU()defforward(self, x):        x = self.relu(self.fc1(x))        x = self.relu(self.fc2(x))        x = self.fc3(x)return x# 创建FastAPI应用app = FastAPI(title="深度学习模型API")# 加载模型model = SimpleNN()model.load_state_dict(torch.load('model.pth'))model.eval()# 定义请求模型classPredictionRequest(BaseModel):    features: list@app.post("/predict")asyncdefpredict(request: PredictionRequest):"""    模型预测API    """# 转换为Tensor    features = torch.tensor(request.features, dtype=torch.float32)# 预测with torch.no_grad():        prediction = model(features)return {"prediction": prediction.item()}# 启动API服务if __name__ == "__main__":    uvicorn.run(app, host="0.0.0.0", port=8000)

8. 实时在线学习系统

"""实时在线学习:模型持续学习和更新"""from sklearn.linear_model import SGDClassifierimport numpy as npfrom collections import dequeimport pickleimport osclassOnlineLearningSystem:"""在线学习系统"""def__init__(self, model_path='online_model.pkl', buffer_size=1000):self.model_path = model_pathself.buffer_size = buffer_sizeself.data_buffer = deque(maxlen=buffer_size)self.label_buffer = deque(maxlen=buffer_size)# 加载现有模型或创建新模型if os.path.exists(model_path):withopen(model_path, 'rb'as f:self.model = pickle.load(f)print(f"✅ 加载已有模型,已训练样本: {self.model.t_}")else:self.model = SGDClassifier(loss='log_loss', learning_rate='adaptive')print("🆕 创建新模型")defadd_training_data(self, X, y):"""添加训练数据到缓冲区"""self.data_buffer.extend(X)self.label_buffer.extend(y)# 当缓冲区达到一定大小后训练模型iflen(self.data_buffer) >= 100:self.train_on_buffer()deftrain_on_buffer(self):"""使用缓冲区数据训练模型"""iflen(self.data_buffer) == 0:return        X_train = np.array(self.data_buffer)        y_train = np.array(self.label_buffer)# 部分拟合(在线学习)self.model.partial_fit(X_train, y_train, classes=np.unique(y_train))print(f"📊 模型已更新,累计样本: {self.model.t_}")# 清空缓冲区self.data_buffer.clear()self.label_buffer.clear()# 保存模型self.save_model()defpredict(self, X):"""预测"""returnself.model.predict(X)defsave_model(self):"""保存模型"""withopen(self.model_path, 'wb'as f:            pickle.dump(self.model, f)print(f"💾 模型已保存到 {self.model_path}")

📊 第三部分:Web后端开发

9. RESTful API快速构建

from fastapi import FastAPI, HTTPExceptionfrom sqlalchemy import create_engine, Column, Integer, Stringfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker# 创建数据库引擎DATABASE_URL = "sqlite:///./test.db"engine = create_engine(DATABASE_URL)# 创建基类Base = declarative_base()# 定义模型classUser(Base):    __tablename__ = "users"id = Column(Integer, primary_key=True, index=True)    username = Column(String, unique=True, index=True)    email = Column(String, unique=True, index=True)    full_name = Column(String)# 创建表Base.metadata.create_all(bind=engine)# 创建会话SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)# 创建FastAPI应用app = FastAPI()# CRUD操作@app.post("/users/")defcreate_user(username: str, email: str, full_name: str):    db = SessionLocal()    db_user = User(username=username, email=email, full_name=full_name)    db.add(db_user)    db.commit()    db.refresh(db_user)    db.close()return db_user@app.get("/users/{user_id}")defread_user(user_id: int):    db = SessionLocal()    user = db.query(User).filter(User.id == user_id).first()    db.close()if user isNone:raise HTTPException(status_code=404, detail="用户不存在")return user@app.get("/users/")defread_users(skip: int = 0, limit: int = 100):    db = SessionLocal()    users = db.query(User).offset(skip).limit(limit).all()    db.close()return users

10. WebSocket实时聊天服务器

from fastapi import FastAPI, WebSocket, WebSocketDisconnectfrom typing importListapp = FastAPI()classConnectionManager:def__init__(self):self.active_connections: List[WebSocket] = []asyncdefconnect(self, websocket: WebSocket):await websocket.accept()self.active_connections.append(websocket)defdisconnect(self, websocket: WebSocket):self.active_connections.remove(websocket)asyncdefsend_personal_message(self, message: str, websocket: WebSocket):await websocket.send_text(message)asyncdefbroadcast(self, message: str):for connection inself.active_connections:await connection.send_text(message)manager = ConnectionManager()@app.websocket("/ws/{client_id}")asyncdefwebsocket_endpoint(websocket: WebSocket, client_id: int):await manager.connect(websocket)try:whileTrue:            data = await websocket.receive_text()# 广播消息await manager.broadcast(f"客户端{client_id}说: {data}")except WebSocketDisconnect:        manager.disconnect(websocket)await manager.broadcast(f"客户端{client_id}离开了")

11. 异步任务队列系统

import asyncioimport redis.asyncio as redisfrom celery import Celeryimport json# Redis连接redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)# Celery配置celery_app = Celery('tasks', broker='redis://localhost:6379/0')# 异步任务处理@celery_app.taskdefprocess_data_task(data_id):"""    处理耗时任务的Celery任务    """# 模拟耗时操作import time    time.sleep(5)# 处理数据    result = f"处理完成数据ID: {data_id}"# 存储结果    redis_client.set(f"result:{data_id}", result)return result# FastAPI接口@app.post("/process/")asyncdefprocess_data(data_id: int):"""    提交数据处理任务    """# 异步执行任务    task = process_data_task.delay(data_id)# 立即返回任务IDreturn {"task_id": task.id,"status""任务已提交","data_id": data_id    }@app.get("/result/{task_id}")asyncdefget_result(task_id: str):"""    获取任务结果    """from celery.result import AsyncResult    task_result = AsyncResult(task_id, app=celery_app)if task_result.ready():return {"status""完成""result": task_result.result}else:return {"status""处理中"}

12. 微服务API网关

"""微服务API网关:统一管理多个微服务"""from fastapi import FastAPI, HTTPException, Requestfrom fastapi.responses import JSONResponseimport httpximport asynciofrom typing importDictListimport timeclassAPIGateway:"""API网关"""def__init__(self):self.services = {'user_service''http://localhost:8001','product_service''http://localhost:8002','order_service''http://localhost:8003','payment_service''http://localhost:8004'        }self.circuit_breaker = {}asyncdefroute_request(self, service_name: str, path: str, method: str                           data: Dict = None, headers: Dict = None):"""路由请求到对应服务"""if service_name notinself.services:raise HTTPException(status_code=404, detail="服务不存在")# 检查熔断器状态ifself.circuit_breaker.get(service_name, {}).get('open'False):raise HTTPException(status_code=503, detail="服务暂时不可用")        service_url = self.services[service_name]        url = f"{service_url}{path}"try:asyncwith httpx.AsyncClient(timeout=10.0as client:if method == 'GET':                    response = await client.get(url, headers=headers)elif method == 'POST':                    response = await client.post(url, json=data, headers=headers)elif method == 'PUT':                    response = await client.put(url, json=data, headers=headers)elif method == 'DELETE':                    response = await client.delete(url, headers=headers)# 更新熔断器状态if response.status_code >= 500:self._update_circuit_breaker(service_name, 'failure')else:self._update_circuit_breaker(service_name, 'success')return JSONResponse(                    status_code=response.status_code,                    content=response.json() if response.content elseNone                )except Exception as e:self._update_circuit_breaker(service_name, 'failure')raise HTTPException(status_code=500, detail=f"服务调用失败: {str(e)}")def_update_circuit_breaker(self, service_name: str, status: str):"""更新熔断器状态"""if service_name notinself.circuit_breaker:self.circuit_breaker[service_name] = {'failures'0,'open'False,'last_failure_time'None            }        cb = self.circuit_breaker[service_name]if status == 'failure':            cb['failures'] += 1            cb['last_failure_time'] = time.time()# 如果连续失败5次,打开熔断器if cb['failures'] >= 5:                cb['open'] = Trueprint(f"⚠️ 熔断器打开: {service_name}")# 30秒后尝试恢复                asyncio.create_task(self._reset_circuit_breaker(service_name))else:# 成功请求,重置计数器            cb['failures'] = 0            cb['open'] = Falseasyncdef_reset_circuit_breaker(self, service_name: str):"""重置熔断器"""await asyncio.sleep(30)if service_name inself.circuit_breaker:self.circuit_breaker[service_name]['open'] = Falseself.circuit_breaker[service_name]['failures'] = 0print(f"✅ 熔断器关闭: {service_name}")# 创建FastAPI应用app = FastAPI(title="API网关")gateway = APIGateway()@app.api_route("/api/{service_name}/{path:path}", methods=["GET""POST""PUT""DELETE"])asyncdefgateway_proxy(request: Request, service_name: str, path: str):"""    网关代理路由    """# 获取请求数据    data = Noneif request.method in ["POST""PUT"]:        data = await request.json()    headers = dict(request.headers)# 移除host头,避免冲突    headers.pop('host'None)# 路由请求returnawait gateway.route_request(        service_name=service_name,        path=f"/{path}"ifnot path.startswith("/"else path,        method=request.method,        data=data,        headers=headers    )

📊 第四部分:算法工程师工具箱

13. 特征工程自动化

import pandas as pdimport numpy as npfrom sklearn.feature_selection import SelectKBest, f_classiffrom sklearn.decomposition import PCAimport warningswarnings.filterwarnings('ignore')classFeatureEngineering:"""    自动化特征工程类    """    @staticmethoddefcreate_time_features(df, date_column):"""        从日期列创建特征        """        df['year'] = df[date_column].dt.year        df['month'] = df[date_column].dt.month        df['day'] = df[date_column].dt.day        df['dayofweek'] = df[date_column].dt.dayofweek        df['quarter'] = df[date_column].dt.quarter        df['is_weekend'] = df['dayofweek'].apply(lambda x: 1if x >= 5else0)return df    @staticmethoddefcreate_interaction_features(df, feature_pairs):"""        创建特征交互项        """for f1, f2 in feature_pairs:if f1 in df.columns and f2 in df.columns:                df[f'{f1}_times_{f2}'] = df[f1] * df[f2]                df[f'{f1}_plus_{f2}'] = df[f1] + df[f2]return df    @staticmethoddefselect_features(X, y, k=20):"""        特征选择        """        selector = SelectKBest(score_func=f_classif, k=min(k, X.shape[1]))        X_selected = selector.fit_transform(X, y)        selected_indices = selector.get_support(indices=True)return X_selected, selected_indices    @staticmethoddefreduce_dimension(X, n_components=10):"""        降维处理        """        pca = PCA(n_components=n_components)        X_reduced = pca.fit_transform(X)        explained_variance = pca.explained_variance_ratio_print(f"📊 累计解释方差: {explained_variance.sum():.2%}")return X_reduced, pca

14. 模型评估与对比

from sklearn.metrics import classification_report, confusion_matrix, roc_auc_scoreimport matplotlib.pyplot as pltimport seaborn as snsfrom sklearn.model_selection import learning_curveclassModelEvaluator:"""    模型评估工具类    """    @staticmethoddefevaluate_classifier(model, X_test, y_test, model_name=""):"""        分类模型评估        """        y_pred = model.predict(X_test)        y_pred_proba = model.predict_proba(X_test)[:, 1ifhasattr(model, "predict_proba"elseNoneprint(f"\n{'='*50}")print(f"📈 {model_name} 模型评估")print('='*50)# 分类报告print("\n📋 分类报告:")print(classification_report(y_test, y_pred))# 混淆矩阵可视化        plt.figure(figsize=(86))        cm = confusion_matrix(y_test, y_pred)        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')        plt.title(f'{model_name} 混淆矩阵')        plt.ylabel('真实标签')        plt.xlabel('预测标签')        plt.show()# AUC-ROC曲线if y_pred_proba isnotNone:            auc = roc_auc_score(y_test, y_pred_proba)print(f"📊 AUC-ROC分数: {auc:.4f}")    @staticmethoddefplot_learning_curve(model, X, y, cv=5):"""        绘制学习曲线        """        train_sizes, train_scores, test_scores = learning_curve(            model, X, y, cv=cv, n_jobs=-1,            train_sizes=np.linspace(0.11.010)        )        train_mean = np.mean(train_scores, axis=1)        test_mean = np.mean(test_scores, axis=1)        plt.figure(figsize=(106))        plt.plot(train_sizes, train_mean, 'o-', color='r', label='训练得分')        plt.plot(train_sizes, test_mean, 'o-', color='g', label='交叉验证得分')        plt.xlabel('训练样本数')        plt.ylabel('得分')        plt.title('学习曲线')        plt.legend(loc='best')        plt.grid(True)        plt.show()

15. 分布式特征计算框架

"""分布式特征计算:利用多核CPU加速特征工程"""import multiprocessing as mpimport numpy as npimport pandas as pdfrom functools import partialfrom tqdm import tqdmclassDistributedFeatureCalculator:"""分布式特征计算器"""def__init__(self, n_workers=None):self.n_workers = n_workers or mp.cpu_count() - 1self.pool = mp.Pool(self.n_workers)print(f"🔧 初始化分布式计算,使用 {self.n_workers} 个工作进程")defcalculate_features_parallel(self, df, feature_functions):"""        并行计算多个特征        """# 将数据分成多个块        chunk_size = len(df) // self.n_workers        chunks = [df.iloc[i:i+chunk_size] for i inrange(0len(df), chunk_size)]# 并行处理每个块        results = []with tqdm(total=len(chunks), desc="处理数据块"as pbar:for chunk_result inself.pool.imap_unordered(                partial(self._process_chunk, feature_functions=feature_functions),                chunks            ):                results.append(chunk_result)                pbar.update(1)# 合并结果        result_df = pd.concat(results, ignore_index=True)return result_df    @staticmethoddef_process_chunk(chunk, feature_functions):"""处理单个数据块"""        result = chunk.copy()for feature_name, func in feature_functions.items():ifcallable(func):                result[feature_name] = func(chunk)elifisinstance(func, str):# 内置特征函数if func == 'rolling_mean_7':iflen(chunk) >= 7:                        result[f'rolling_mean_7'] = chunk['value'].rolling(7).mean()elif func == 'lag_1':                    result['lag_1'] = chunk['value'].shift(1)return resultdefcalculate_cross_features(self, df, feature_pairs):"""        计算交叉特征        """# 创建任务列表        tasks = []for f1, f2 in feature_pairs:            tasks.append((f1, f2))# 并行计算交叉特征        cross_features = {}for f1, f2, result inself.pool.starmap(self._calculate_cross_pair, tasks):            cross_features[f"{f1}_{f2}_cross"] = result# 添加到原始数据for name, values in cross_features.items():            df[name] = valuesreturn df    @staticmethoddef_calculate_cross_pair(f1, f2):"""计算单个特征对"""# 这里可以实现各种交叉特征计算方法        cross_result = f1 * f2  # 简单的乘法交叉return f1.name, f2.name, cross_resultdefclose(self):"""关闭进程池"""self.pool.close()self.pool.join()

📊 第五部分:实用工具脚本

16. 数据备份自动化

import shutilimport osfrom datetime import datetimeimport zipfileimport scheduleimport timeclassDataBackup:"""    数据备份自动化工具    """    @staticmethoddefbackup_folder(source_dir, backup_dir):"""        备份文件夹        """ifnot os.path.exists(backup_dir):            os.makedirs(backup_dir)        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")        backup_name = f"backup_{timestamp}"        backup_path = os.path.join(backup_dir, backup_name)# 创建备份        shutil.make_archive(backup_path, 'zip', source_dir)print(f"✅ 备份完成: {backup_path}.zip")return backup_path + ".zip"    @staticmethoddefincremental_backup(source_dir, backup_dir, exclude_patterns=None):"""        增量备份        """if exclude_patterns isNone:            exclude_patterns = ['.tmp''.log''__pycache__']        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")        backup_file = os.path.join(backup_dir, f"incremental_{timestamp}.zip")with zipfile.ZipFile(backup_file, 'w', zipfile.ZIP_DEFLATED) as zipf:for root, dirs, files in os.walk(source_dir):# 排除不需要备份的文件                dirs[:] = [d for d in dirs ifnotany(p in d for p in exclude_patterns)]for file in files:ifnotany(p in file for p in exclude_patterns):                        file_path = os.path.join(root, file)                        arcname = os.path.relpath(file_path, source_dir)                        zipf.write(file_path, arcname)print(f"✅ 增量备份完成: {backup_file}")return backup_file    @staticmethoddefschedule_backup(source_dir, backup_dir, interval_hours=24):"""        定时备份        """defjob():print(f"⏰ 开始定时备份: {datetime.now()}")            DataBackup.backup_folder(source_dir, backup_dir)# 每天定时执行        schedule.every(interval_hours).hours.do(job)print(f"📅 备份计划已设置: 每{interval_hours}小时备份一次")# 立即执行一次        job()# 保持程序运行whileTrue:            schedule.run_pending()            time.sleep(60)

17. 日志分析工具

import refrom collections import Counterfrom datetime import datetime, timedeltaimport pandas as pdclassLogAnalyzer:"""    日志分析工具    """def__init__(self, log_file):self.log_file = log_fileself.logs = self._read_logs()def_read_logs(self):"""        读取日志文件        """withopen(self.log_file, 'r', encoding='utf-8'as f:            logs = f.readlines()return logsdefanalyze_errors(self):"""        分析错误日志        """        error_patterns = [r'ERROR',r'Exception',r'Traceback',r'失败',r'错误'        ]        errors = []for line inself.logs:for pattern in error_patterns:if re.search(pattern, line, re.IGNORECASE):                    errors.append(line.strip())breakreturn errorsdefcount_requests_by_hour(self):"""        统计每小时请求数        """        hourly_counts = Counter()        time_pattern = r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}'for line inself.logs:match = re.search(time_pattern, line)ifmatch:                dt_str = match.group()                dt = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S')                hour_key = dt.strftime('%Y-%m-%d %H:00')                hourly_counts[hour_key] += 1returndict(hourly_counts)deffind_slow_requests(self, threshold_seconds=5):"""        查找慢请求        """        slow_requests = []        duration_pattern = r'time=(\d+\.?\d*)ms'for line inself.logs:match = re.search(duration_pattern, line)ifmatch:                duration_ms = float(match.group(1))if duration_ms > threshold_seconds * 1000:                    slow_requests.append({'line': line.strip(),'duration_ms': duration_ms                    })return slow_requestsdefgenerate_report(self):"""        生成分析报告        """        report = {'total_lines'len(self.logs),'error_count'len(self.analyze_errors()),'hourly_traffic'self.count_requests_by_hour(),'slow_requests'self.find_slow_requests()        }# 转换为DataFrame便于分析        df_errors = pd.DataFrame({'errors'self.analyze_errors()})        df_traffic = pd.DataFrame(list(report['hourly_traffic'].items()),            columns=['hour''requests']        )return {'summary': report,'errors_df': df_errors,'traffic_df': df_traffic        }

18. 文件批量处理工具

import osimport shutilfrom pathlib import Pathfrom datetime import datetimeimport pandas as pdclassFileBatchProcessor:"""    文件批量处理工具    """    @staticmethoddefbatch_rename(folder_path, pattern, new_name_template):"""        批量重命名文件        """        renamed_files = []for i, filename inenumerate(os.listdir(folder_path)):if re.search(pattern, filename):                new_name = new_name_template.format(i=i, original=filename)                old_path = os.path.join(folder_path, filename)                new_path = os.path.join(folder_path, new_name)                os.rename(old_path, new_path)                renamed_files.append((filename, new_name))return renamed_files    @staticmethoddefbatch_convert_images(input_folder, output_folder, target_format='jpg'):"""        批量转换图片格式        """from PIL import Imageifnot os.path.exists(output_folder):            os.makedirs(output_folder)        converted = []for filename in os.listdir(input_folder):if filename.lower().endswith(('.png''.jpeg''.bmp''.gif')):                img = Image.open(os.path.join(input_folder, filename))                new_name = os.path.splitext(filename)[0] + f'.{target_format}'                img.save(os.path.join(output_folder, new_name))                converted.append(new_name)return converted

19. 智能监控与告警系统

"""智能监控与告警系统:实时监控系统状态并发送告警"""import psutilimport timeimport jsonfrom datetime import datetimeimport smtplibfrom email.mime.text import MIMETextfrom email.mime.multipart import MIMEMultipartimport requestsclassSmartMonitoringSystem:"""智能监控系统"""def__init__(self, config_file='monitoring_config.json'):self.config = self._load_config(config_file)self.metrics_history = []self.alert_history = []def_load_config(self, config_file):"""加载配置文件"""        default_config = {"monitoring_interval"60,  # 监控间隔(秒)"alert_thresholds": {"cpu_percent"80,"memory_percent"85,"disk_percent"90,"network_errors"10            },"alert_channels": {"email": {"enabled"True,"smtp_server""smtp.gmail.com","smtp_port"587,"username""your_email@gmail.com","password""your_password","recipients": ["admin@example.com"]                },"webhook": {"enabled"False,"url""https://your-webhook-url.com"                }            }        }try:withopen(config_file, 'r'as f:                user_config = json.load(f)# 合并配置                default_config.update(user_config)except FileNotFoundError:print(f"⚠️ 配置文件 {config_file} 不存在,使用默认配置")return default_configdefcollect_metrics(self):"""收集系统指标"""        metrics = {"timestamp": datetime.now().isoformat(),"cpu_percent": psutil.cpu_percent(interval=1),"cpu_percent_per_core": psutil.cpu_percent(interval=1, percpu=True),"memory_percent": psutil.virtual_memory().percent,"memory_used_gb": psutil.virtual_memory().used / (1024**3),"memory_total_gb": psutil.virtual_memory().total / (1024**3),"disk_percent": psutil

20. 智能监控与告警系统

"""智能监控与告警系统:实时监控系统状态并发送告警"""import psutilimport timeimport jsonfrom datetime import datetimeimport smtplibfrom email.mime.text import MIMETextfrom email.mime.multipart import MIMEMultipartimport requestsclassSmartMonitoringSystem:"""智能监控系统"""def__init__(self, config_file='monitoring_config.json'):self.config = self._load_config(config_file)self.metrics_history = []self.alert_history = []def_load_config(self, config_file):"""加载配置文件"""        default_config = {"monitoring_interval"60,  # 监控间隔(秒)"alert_thresholds": {"cpu_percent"80,"memory_percent"85,"disk_percent"90,"network_errors"10            },"alert_channels": {"email": {"enabled"True,"smtp_server""smtp.gmail.com","smtp_port"587,"username""your_email@gmail.com","password""your_password","recipients": ["admin@example.com"]                },"webhook": {"enabled"False,"url""https://your-webhook-url.com"                }            }        }try:withopen(config_file, 'r'as f:                user_config = json.load(f)# 合并配置                default_config.update(user_config)except FileNotFoundError:print(f"⚠️ 配置文件 {config_file} 不存在,使用默认配置")return default_configdefcollect_metrics(self):"""收集系统指标"""        metrics = {"timestamp": datetime.now().isoformat(),"cpu_percent": psutil.cpu_percent(interval=1),"cpu_percent_per_core": psutil.cpu_percent(interval=1, percpu=True),"memory_percent": psutil.virtual_memory().percent,"memory_used_gb": psutil.virtual_memory().used / (1024**3),"memory_total_gb": psutil.virtual_memory().total / (1024**3),"disk_percent": psutil.disk_usage('/').percent,"disk_used_gb": psutil.disk_usage('/').used / (1024**3),"disk_total_gb": psutil.disk_usage('/').total / (1024**3),"network_bytes_sent": psutil.net_io_counters().bytes_sent,"network_bytes_recv": psutil.net_io_counters().bytes_recv,"process_count"len(psutil.pids()),"uptime": time.time() - psutil.boot_time()        }self.metrics_history.append(metrics)return metricsdefcheck_thresholds(self, metrics):"""检查阈值是否超标"""        alerts = []        thresholds = self.config['alert_thresholds']# 检查CPU使用率if metrics['cpu_percent'] > thresholds['cpu_percent']:            alerts.append({'severity''high','type''cpu_usage','value': metrics['cpu_percent'],'threshold': thresholds['cpu_percent'],'message'f"CPU使用率过高: {metrics['cpu_percent']}%"            })# 检查内存使用率if metrics['memory_percent'] > thresholds['memory_percent']:            alerts.append({'severity''high','type''memory_usage','value': metrics['memory_percent'],'threshold': thresholds['memory_percent'],'message'f"内存使用率过高: {metrics['memory_percent']}%"            })# 检查磁盘使用率if metrics['disk_percent'] > thresholds['disk_percent']:            alerts.append({'severity''critical','type''disk_usage','value': metrics['disk_percent'],'threshold': thresholds['disk_percent'],'message'f"磁盘使用率过高: {metrics['disk_percent']}%"            })return alertsdefsend_alerts(self, alerts):"""发送告警"""ifnot alerts:return# 记录告警历史self.alert_history.extend(alerts)# 通过邮件发送告警ifself.config['alert_channels']['email']['enabled']:self._send_email_alerts(alerts)# 通过Webhook发送告警ifself.config['alert_channels']['webhook']['enabled']:self._send_webhook_alerts(alerts)def_send_email_alerts(self, alerts):"""发送邮件告警"""        email_config = self.config['alert_channels']['email']# 创建邮件内容        subject = f"🚨 系统告警 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"        body = "<h2>系统监控告警</h2>"        body += f"<p>时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>"        body += "<h3>告警详情:</h3><ul>"for alert in alerts:            severity_color = {'critical''red','high''orange','medium''yellow','low''blue'            }.get(alert['severity'], 'black')            body += f"""            <li style="color: {severity_color};">                <strong>[{alert['severity'].upper()}{alert['type']}</strong><br>                当前值: {alert['value']} | 阈值: {alert['threshold']}<br>{alert['message']}            </li>            """        body += "</ul>"# 发送邮件try:            msg = MIMEMultipart('alternative')            msg['From'] = email_config['username']            msg['To'] = ', '.join(email_config['recipients'])            msg['Subject'] = subject            msg.attach(MIMEText(body, 'html'))with smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port']) as server:                server.starttls()                server.login(email_config['username'], email_config['password'])                server.send_message(msg)print(f"✅ 邮件告警已发送")except Exception as e:print(f"❌ 邮件发送失败: {e}")def_send_webhook_alerts(self, alerts):"""发送Webhook告警"""        webhook_config = self.config['alert_channels']['webhook']        payload = {"timestamp": datetime.now().isoformat(),"alerts": alerts,"total_alerts"len(alerts)        }try:            response = requests.post(                webhook_config['url'],                json=payload,                timeout=10            )if response.status_code == 200:print(f"✅ Webhook告警已发送")else:print(f"❌ Webhook发送失败: {response.status_code}")except Exception as e:print(f"❌ Webhook请求失败: {e}")defgenerate_report(self, hours=24):"""生成监控报告"""# 筛选指定时间范围内的数据        cutoff_time = datetime.now().timestamp() - (hours * 3600)        recent_metrics = [            m for m inself.metrics_history if datetime.fromisoformat(m['timestamp']).timestamp() > cutoff_time        ]ifnot recent_metrics:return {"error""没有找到指定时间范围内的数据"}# 计算统计信息        report = {"report_time": datetime.now().isoformat(),"time_range_hours": hours,"total_samples"len(recent_metrics),"averages": {"cpu_percent"sum(m['cpu_percent'for m in recent_metrics) / len(recent_metrics),"memory_percent"sum(m['memory_percent'for m in recent_metrics) / len(recent_metrics),"disk_percent"sum(m['disk_percent'for m in recent_metrics) / len(recent_metrics)            },"max_values": {"cpu_percent"max(m['cpu_percent'for m in recent_metrics),"memory_percent"max(m['memory_percent'for m in recent_metrics),"disk_percent"max(m['disk_percent'for m in recent_metrics)            },"recent_alerts"self.alert_history[-10:] ifself.alert_history else [],"total_alerts"len(self.alert_history)        }return reportdefstart_monitoring(self):"""开始监控"""print("🚀 开始系统监控...")print(f"📊 监控间隔: {self.config['monitoring_interval']}秒")print("按 Ctrl+C 停止监控\n")try:whileTrue:# 收集指标                metrics = self.collect_metrics()# 打印当前状态print(f"[{datetime.now().strftime('%H:%M:%S')}] "f"CPU: {metrics['cpu_percent']:.1f}% | "f"内存: {metrics['memory_percent']:.1f}% | "f"磁盘: {metrics['disk_percent']:.1f}%")# 检查阈值                alerts = self.check_thresholds(metrics)# 发送告警if alerts:print(f"⚠️  检测到 {len(alerts)} 个告警")self.send_alerts(alerts)# 等待下一个监控周期                time.sleep(self.config['monitoring_interval'])except KeyboardInterrupt:print("\n👋 监控已停止")# 生成最终报告            report = self.generate_report(1)  # 最近1小时的报告print("\n📊 监控报告:")print(json.dumps(report, indent=2, ensure_ascii=False))# 使用示例if __name__ == "__main__":# 创建监控系统实例    monitor = SmartMonitoringSystem()# 开始监控(监控10秒作为演示)import threadingimport sysdefrun_monitoring():        monitor.start_monitoring()# 创建监控线程    monitor_thread = threading.Thread(target=run_monitoring)    monitor_thread.daemon = True    monitor_thread.start()# 等待一段时间后停止    time.sleep(10)# 模拟生成报告print("\n📈 生成监控报告...")    report = monitor.generate_report()print(json.dumps(report, indent=2, ensure_ascii=False))    sys.exit(0)

对 Python 开发者来说,“硬核” 从来不是写复杂的代码,而是用简洁的脚本解决实际问题。这 20 个脚本,本质是把机器学习、数据处理、后台开发的核心逻辑拆解得足够简单,让你不用从零开始造轮子。

不管你是想提升数据处理效率,还是入门机器学习建模,或是算法 er 需要快速搭建后台验证想法,这份脚本清单都能帮你少走弯路。收藏起来,遇到对应场景时翻出来看看,比盲目啃教程效率高 10 倍。

文末福利:私信回复「资料」,获取Python知识库!

最新文章

随机文章

基本 文件 流程 错误 SQL 调试
  1. 请求信息 : 2026-02-08 10:18:18 HTTP/2.0 GET : https://f.mffb.com.cn/a/468203.html
  2. 运行时间 : 0.144302s [ 吞吐率:6.93req/s ] 内存消耗:4,768.95kb 文件加载:140
  3. 缓存信息 : 0 reads,0 writes
  4. 会话信息 : SESSION_ID=f79f0f3dfd49f21e6f3105c05734ffe4
  1. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/public/index.php ( 0.79 KB )
  2. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/autoload.php ( 0.17 KB )
  3. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_real.php ( 2.49 KB )
  4. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/platform_check.php ( 0.90 KB )
  5. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/ClassLoader.php ( 14.03 KB )
  6. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/composer/autoload_static.php ( 4.90 KB )
  7. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper.php ( 8.34 KB )
  8. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/helper.php ( 2.19 KB )
  9. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/helper.php ( 1.47 KB )
  10. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/stubs/load_stubs.php ( 0.16 KB )
  11. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Exception.php ( 1.69 KB )
  12. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Facade.php ( 2.71 KB )
  13. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/deprecation-contracts/function.php ( 0.99 KB )
  14. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap.php ( 8.26 KB )
  15. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap80.php ( 9.78 KB )
  16. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/Resources/functions/dump.php ( 1.49 KB )
  17. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-dumper/src/helper.php ( 0.18 KB )
  18. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/symfony/var-dumper/VarDumper.php ( 4.30 KB )
  19. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/App.php ( 15.30 KB )
  20. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-container/src/Container.php ( 15.76 KB )
  21. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/container/src/ContainerInterface.php ( 1.02 KB )
  22. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/provider.php ( 0.19 KB )
  23. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Http.php ( 6.04 KB )
  24. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Str.php ( 7.29 KB )
  25. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Env.php ( 4.68 KB )
  26. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/common.php ( 0.03 KB )
  27. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/helper.php ( 18.78 KB )
  28. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Config.php ( 5.54 KB )
  29. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/app.php ( 0.95 KB )
  30. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cache.php ( 0.78 KB )
  31. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/console.php ( 0.23 KB )
  32. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/cookie.php ( 0.56 KB )
  33. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/database.php ( 2.48 KB )
  34. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Env.php ( 1.67 KB )
  35. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/filesystem.php ( 0.61 KB )
  36. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/lang.php ( 0.91 KB )
  37. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/log.php ( 1.35 KB )
  38. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/middleware.php ( 0.19 KB )
  39. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/route.php ( 1.89 KB )
  40. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/session.php ( 0.57 KB )
  41. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/trace.php ( 0.34 KB )
  42. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/config/view.php ( 0.82 KB )
  43. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/event.php ( 0.25 KB )
  44. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Event.php ( 7.67 KB )
  45. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/service.php ( 0.13 KB )
  46. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/AppService.php ( 0.26 KB )
  47. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Service.php ( 1.64 KB )
  48. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Lang.php ( 7.35 KB )
  49. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/lang/zh-cn.php ( 13.70 KB )
  50. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/Error.php ( 3.31 KB )
  51. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/RegisterService.php ( 1.33 KB )
  52. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/services.php ( 0.14 KB )
  53. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/PaginatorService.php ( 1.52 KB )
  54. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ValidateService.php ( 0.99 KB )
  55. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/service/ModelService.php ( 2.04 KB )
  56. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Service.php ( 0.77 KB )
  57. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Middleware.php ( 6.72 KB )
  58. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/initializer/BootService.php ( 0.77 KB )
  59. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Paginator.php ( 11.86 KB )
  60. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-validate/src/Validate.php ( 63.20 KB )
  61. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/Model.php ( 23.55 KB )
  62. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Attribute.php ( 21.05 KB )
  63. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/AutoWriteData.php ( 4.21 KB )
  64. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Conversion.php ( 6.44 KB )
  65. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/DbConnect.php ( 5.16 KB )
  66. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/ModelEvent.php ( 2.33 KB )
  67. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/concern/RelationShip.php ( 28.29 KB )
  68. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Arrayable.php ( 0.09 KB )
  69. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/contract/Jsonable.php ( 0.13 KB )
  70. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/model/contract/Modelable.php ( 0.09 KB )
  71. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Db.php ( 2.88 KB )
  72. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/DbManager.php ( 8.52 KB )
  73. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Log.php ( 6.28 KB )
  74. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Manager.php ( 3.92 KB )
  75. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerTrait.php ( 2.69 KB )
  76. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/log/src/LoggerInterface.php ( 2.71 KB )
  77. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cache.php ( 4.92 KB )
  78. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/psr/simple-cache/src/CacheInterface.php ( 4.71 KB )
  79. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/helper/Arr.php ( 16.63 KB )
  80. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/driver/File.php ( 7.84 KB )
  81. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/cache/Driver.php ( 9.03 KB )
  82. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/CacheHandlerInterface.php ( 1.99 KB )
  83. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/Request.php ( 0.09 KB )
  84. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Request.php ( 55.78 KB )
  85. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/middleware.php ( 0.25 KB )
  86. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Pipeline.php ( 2.61 KB )
  87. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/TraceDebug.php ( 3.40 KB )
  88. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/middleware/SessionInit.php ( 1.94 KB )
  89. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Session.php ( 1.80 KB )
  90. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/driver/File.php ( 6.27 KB )
  91. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/SessionHandlerInterface.php ( 0.87 KB )
  92. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/session/Store.php ( 7.12 KB )
  93. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Route.php ( 23.73 KB )
  94. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleName.php ( 5.75 KB )
  95. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Domain.php ( 2.53 KB )
  96. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleGroup.php ( 22.43 KB )
  97. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Rule.php ( 26.95 KB )
  98. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/RuleItem.php ( 9.78 KB )
  99. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/route/app.php ( 1.72 KB )
  100. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/Route.php ( 4.70 KB )
  101. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/dispatch/Controller.php ( 4.74 KB )
  102. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/route/Dispatch.php ( 10.44 KB )
  103. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/controller/Index.php ( 4.81 KB )
  104. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/app/BaseController.php ( 2.05 KB )
  105. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/facade/Db.php ( 0.93 KB )
  106. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/connector/Mysql.php ( 5.44 KB )
  107. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/PDOConnection.php ( 52.47 KB )
  108. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Connection.php ( 8.39 KB )
  109. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/ConnectionInterface.php ( 4.57 KB )
  110. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/builder/Mysql.php ( 16.58 KB )
  111. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Builder.php ( 24.06 KB )
  112. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseBuilder.php ( 27.50 KB )
  113. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/Query.php ( 15.71 KB )
  114. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/BaseQuery.php ( 45.13 KB )
  115. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TimeFieldQuery.php ( 7.43 KB )
  116. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/AggregateQuery.php ( 3.26 KB )
  117. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ModelRelationQuery.php ( 20.07 KB )
  118. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ParamsBind.php ( 3.66 KB )
  119. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ResultOperation.php ( 7.01 KB )
  120. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/WhereQuery.php ( 19.37 KB )
  121. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/JoinAndViewQuery.php ( 7.11 KB )
  122. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php ( 2.63 KB )
  123. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-orm/src/db/concern/Transaction.php ( 2.77 KB )
  124. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/driver/File.php ( 5.96 KB )
  125. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/LogHandlerInterface.php ( 0.86 KB )
  126. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/log/Channel.php ( 3.89 KB )
  127. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/event/LogRecord.php ( 1.02 KB )
  128. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-helper/src/Collection.php ( 16.47 KB )
  129. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/facade/View.php ( 1.70 KB )
  130. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/View.php ( 4.39 KB )
  131. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Response.php ( 8.81 KB )
  132. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/response/View.php ( 3.29 KB )
  133. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/Cookie.php ( 6.06 KB )
  134. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-view/src/Think.php ( 8.38 KB )
  135. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/framework/src/think/contract/TemplateHandlerInterface.php ( 1.60 KB )
  136. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/Template.php ( 46.61 KB )
  137. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/driver/File.php ( 2.41 KB )
  138. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-template/src/template/contract/DriverInterface.php ( 0.86 KB )
  139. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/runtime/temp/067d451b9a0c665040f3f1bdd3293d68.php ( 11.98 KB )
  140. /yingpanguazai/ssd/ssd1/www/f.mffb.com.cn/vendor/topthink/think-trace/src/Html.php ( 4.42 KB )
  1. CONNECT:[ UseTime:0.000569s ] mysql:host=127.0.0.1;port=3306;dbname=f_mffb;charset=utf8mb4
  2. SHOW FULL COLUMNS FROM `fenlei` [ RunTime:0.000673s ]
  3. SELECT * FROM `fenlei` WHERE `fid` = 0 [ RunTime:0.000464s ]
  4. SELECT * FROM `fenlei` WHERE `fid` = 63 [ RunTime:0.000249s ]
  5. SHOW FULL COLUMNS FROM `set` [ RunTime:0.000492s ]
  6. SELECT * FROM `set` [ RunTime:0.000776s ]
  7. SHOW FULL COLUMNS FROM `article` [ RunTime:0.000535s ]
  8. SELECT * FROM `article` WHERE `id` = 468203 LIMIT 1 [ RunTime:0.005108s ]
  9. UPDATE `article` SET `lasttime` = 1770517099 WHERE `id` = 468203 [ RunTime:0.022097s ]
  10. SELECT * FROM `fenlei` WHERE `id` = 66 LIMIT 1 [ RunTime:0.000658s ]
  11. SELECT * FROM `article` WHERE `id` < 468203 ORDER BY `id` DESC LIMIT 1 [ RunTime:0.002958s ]
  12. SELECT * FROM `article` WHERE `id` > 468203 ORDER BY `id` ASC LIMIT 1 [ RunTime:0.001138s ]
  13. SELECT * FROM `article` WHERE `id` < 468203 ORDER BY `id` DESC LIMIT 10 [ RunTime:0.002579s ]
  14. SELECT * FROM `article` WHERE `id` < 468203 ORDER BY `id` DESC LIMIT 10,10 [ RunTime:0.011325s ]
  15. SELECT * FROM `article` WHERE `id` < 468203 ORDER BY `id` DESC LIMIT 20,10 [ RunTime:0.005760s ]
0.148254s