
上周五下午5点,老板突然要一份跨部门的销售数据交叉分析报告,周一早上就要。要是以前,我肯定要加班到深夜。但这次,我用了ChatGPT,晚上7点就完成了报告,还顺便优化了三个自动化脚本。
这不是什么魔法,而是找到了和AI协作的正确姿势。今天,我就来分享如何让ChatGPT成为你的Python开发最佳搭档,而不是简单的代码生成器。
往期阅读>>>
Python 20 个文本分析的库:效率提升 10 倍的秘密武器
Python 自动化管理Jenkins的15个实用脚本,提升效率
App2Docker:如何无需编写Dockerfile也可以创建容器镜像
Python 自动化识别Nginx配置并导出为excel文件,提升Nginx管理效率
我的需求: 把市场部、销售部、财务部的Excel数据合并,清洗掉重复项和异常值,生成统一格式的分析报告。
传统方式:
查pandas文档(30分钟)
写代码调试(2小时)
处理各种异常情况(1小时)
总耗时:3.5小时+
ChatGPT协作方式:
第一步:明确需求(对话式)
我:我需要合并三个Excel文件,它们结构不同:1. 市场部:文件名 marketing.xlsx,工作表名"市场数据",有"日期"、"渠道"、"花费"列2. 销售部:文件名 sales.xlsx,工作表名"销售记录",有"销售日期"、"产品"、"销售额"列 3. 财务部:文件名 finance.xlsx,工作表"财务明细",有"date"、"item"、"amount"列要求:1. 统一列名为:date, channel/product, amount2. 删除amount为负数或大于100万的异常值3. 按日期排序,保存到 merged_data.xlsx4. 生成简单的统计报告:总金额、日均金额、最大最小值
第二步:ChatGPT生成代码骨架
importpandasaspdimportosfromdatetimeimportdatetimedefmerge_and_clean_excel_files():""" 合并和清洗三个部门的Excel数据 返回: pandas.DataFrame: 清洗后的合并数据 dict: 统计报告 """# 读取三个文件try:marketing_df = pd.read_excel('marketing.xlsx', sheet_name='市场数据')sales_df = pd.read_excel('sales.xlsx', sheet_name='销售记录')finance_df = pd.read_excel('finance.xlsx', sheet_name='财务明细')exceptFileNotFoundErrorase:print(f"文件不存在: {e}")returnNone, None# 重命名列以统一格式marketing_df = marketing_df.rename(columns={'日期': 'date','渠道': 'item','花费': 'amount' })sales_df = sales_df.rename(columns={'销售日期': 'date', '产品': 'item','销售额': 'amount' })finance_df = finance_df.rename(columns={'date': 'date','item': 'item','amount': 'amount' })# 合并数据combined_df = pd.concat([marketing_df, sales_df, finance_df], ignore_index=True)# 清洗数据# 1. 删除amount异常值(负数或大于100万)cleaned_df = combined_df[ (combined_df['amount'] >0) & (combined_df['amount'] <= 1000000) ].copy()# 2. 确保date是日期类型cleaned_df['date'] = pd.to_datetime(cleaned_df['date'], errors='coerce')# 3. 删除日期无效的行cleaned_df = cleaned_df.dropna(subset=['date'])# 4. 按日期排序cleaned_df = cleaned_df.sort_values('date').reset_index(drop=True)# 生成统计报告report = {'total_amount': cleaned_df['amount'].sum(),'average_daily': cleaned_df.groupby('date')['amount'].sum().mean(),'max_amount': cleaned_df['amount'].max(),'min_amount': cleaned_df['amount'].min(),'row_count': len(cleaned_df),'data_range': {'start': cleaned_df['date'].min().strftime('%Y-%m-%d'),'end': cleaned_df['date'].max().strftime('%Y-%m-%d') } }# 保存到Excelcleaned_df.to_excel('merged_data.xlsx', index=False)# 同时保存报告report_df = pd.DataFrame([report])report_df.to_excel('statistics_report.xlsx', index=False)returncleaned_df, reportif__name__ == "__main__":data, report = merge_and_clean_excel_files()ifdataisnotNone:print("数据处理完成!")print(f"处理了 {len(data)} 行数据")print(f"总金额: {report['total_amount']:,.2f}")print(f"数据时间范围: {report['data_range']['start']} 到 {report['data_range']['end']}")
第三步:我优化和增强(关键步骤)
# 我添加的部分:错误处理、日志记录、进度显示importloggingfromtqdmimporttqdmfrompathlibimportPath# 设置日志logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler('data_merge.log'),logging.StreamHandler() ])defenhanced_merge():"""增强版的数据合并函数"""logger = logging.getLogger(__name__)# 检查文件是否存在required_files = ['marketing.xlsx', 'sales.xlsx', 'finance.xlsx']missing_files = [fforfinrequired_filesifnotPath(f).exists()]ifmissing_files:logger.error(f"缺少文件: {missing_files}")returnNone, Nonelogger.info("开始合并三个部门的Excel数据...")try:# 使用tqdm显示进度withtqdm(total=100, desc="处理进度") aspbar:data, report = merge_and_clean_excel_files()pbar.update(100)ifdataisnotNone:logger.info(f"成功处理 {len(data)} 行数据")logger.info(f"总金额: {report['total_amount']:,.2f}")# 额外分析:按部门统计# 我手动添加的部门识别逻辑data['department'] = data['item'].apply(identify_department)dept_stats = data.groupby('department')['amount'].agg(['sum', 'mean', 'count'])dept_stats.to_excel('department_stats.xlsx')logger.info("部门统计已生成")returndata, reportexceptExceptionase:logger.exception(f"数据处理失败: {e}")returnNone, Nonedefidentify_department(item):"""根据item内容识别部门(这是我根据业务知识添加的)"""item_str = str(item).lower()ifany(channelinitem_strforchannelin ['微信', '微博', '抖音', '广告']):return'市场部'elifany(productinitem_strforproductin ['产品a', '产品b', '产品c']):return'销售部'elifany(financeinitem_strforfinancein ['办公', '差旅', '采购']):return'财务部'else:return'其他'
第四步:测试和微调
我:生成的代码有个问题,finance.xlsx的date列格式不一致,有的"2023-01-01",有的"2023/01/01",怎么处理?ChatGPT:可以这样修改日期转换部分:# 修改日期处理逻辑def parse_date(date_val): try: # 尝试多种格式 for fmt in ['%Y-%m-%d', '%Y/%m/%d', '%Y年%m月%d日', '%d/%m/%Y']: try: return pd.to_datetime(date_val, format=fmt) except: continue return pd.NaT except: return pd.NaTcleaned_df['date'] = cleaned_df['date'].apply(parse_date)
最终效果:
ChatGPT生成时间:2分钟
我优化时间:8分钟
总耗时:10分钟
代码质量:生产可用,有完整错误处理
上周遇到一个诡异的问题:脚本在测试环境正常,生产环境随机失败。
传统调试:
加print语句(1小时)
查日志(30分钟)
复现问题(不确定)
总耗时:无法估计
ChatGPT协作调试:
第一步:提供错误信息和代码片段
我:我的Python脚本在生产环境随机出现这个错误:Traceback (most recent call last): File "data_processor.py", line 147, in process_batch result = complex_calculation(data_chunk) File "data_processor.py", line 89, in complex_calculation return sum(x * y for x, y in zip(values_x, values_y)) / len(values_x)ZeroDivisionError: division by zero但values_x应该永远不会为空。代码片段:def process_batch(batch_data): for chunk in batch_data: if should_process(chunk): # 这里可能过滤掉所有数据 results = complex_calculation(chunk) save_results(results)def complex_calculation(data): values_x = extract_x(data) # 可能返回空列表 values_y = extract_y(data) if not values_x or not values_y: # 我漏掉了这个检查! return 0 # 计算逻辑...
第二步:ChatGPT分析并提供解决方案
ChatGPT:问题可能出现在几个地方:1. extract_x()或extract_y()可能返回空列表2. should_process()可能过滤了所有数据,导致chunk为空3. 生产环境的数据可能与测试环境不同建议的修复:def complex_calculation(data): values_x = extract_x(data) values_y = extract_y(data) # 添加防御性检查 if not values_x or not values_y: logger.warning(f"空数据: values_x={len(values_x)}, values_y={len(values_y)}") return 0 # 或抛出明确异常 # 确保长度一致 min_len = min(len(values_x), len(values_y)) values_x = values_x[:min_len] values_y = values_y[:min_len] if min_len == 0: return 0 try: result = sum(x * y for x, y in zip(values_x, values_y)) / min_len return result except ZeroDivisionError: logger.error("计算出现除零错误,返回0") return 0同时改进should_process函数:def should_process(chunk): if not chunk: return False # 添加更多检查...
第三步:我实施并添加监控
# 我根据ChatGPT建议修改,并添加了详细监控classDataProcessor:def__init__(self):self.stats = {'total_chunks': 0,'empty_chunks': 0,'calculation_errors': 0,'successful_calculations': 0 }defprocess_batch(self, batch_data):"""处理数据批次,带有完整监控"""self.stats['total_chunks'] += len(batch_data)fori, chunkinenumerate(batch_data):chunk_id = f"chunk_{i}"ifnotself.should_process(chunk):self.stats['empty_chunks'] += 1logger.debug(f"跳过 {chunk_id}: 数据为空")continuetry:result = self.complex_calculation(chunk, chunk_id)ifresultisnotNone:self.save_results(result)self.stats['successful_calculations'] += 1exceptExceptionase:self.stats['calculation_errors'] += 1logger.error(f"处理 {chunk_id} 失败: {e}")# 记录失败的数据片段以便分析self.log_failed_chunk(chunk_id, chunk, str(e))self.report_stats()defcomplex_calculation(self, data, chunk_id=""):"""增强版的计算函数"""values_x = extract_x(data)values_y = extract_y(data)logger.debug(f"{chunk_id}: values_x={len(values_x)}, values_y={len(values_y)}")# 详细的检查逻辑ifnotvalues_x:logger.warning(f"{chunk_id}: values_x为空")returnNoneifnotvalues_y:logger.warning(f"{chunk_id}: values_y为空")returnNone# 长度对齐min_len = min(len(values_x), len(values_y))ifmin_len == 0:logger.warning(f"{chunk_id}: 对齐后长度为0")returnNonevalues_x = values_x[:min_len]values_y = values_y[:min_len]try:# 计算,添加更多检查ifany(y == 0foryinvalues_y):logger.warning(f"{chunk_id}: 存在0值,可能影响计算")result = sum(x*yforx, yinzip(values_x, values_y)) /min_len# 结果验证ifnotmath.isfinite(result):logger.error(f"{chunk_id}: 计算结果无效: {result}")returnNonereturnresultexceptZeroDivisionErrorase:logger.error(f"{chunk_id}: 除零错误,min_len={min_len}")returnNoneexceptExceptionase:logger.error(f"{chunk_id}: 计算错误: {e}")returnNone
调试效果:
定位问题时间:从不确定 → 15分钟
修复质量:不仅修复了bug,还预防了类似问题
监控能力:添加了完整的运行监控
最近需要学习FastAPI,传统方式要看文档、看教程、写demo,至少一天。用ChatGPT,2小时就能上手。
学习路径:
第一步:基础知识问答
我:我想用FastAPI创建一个用户管理系统,需要哪些基本组件?ChatGPT:一个基本的FastAPI用户管理系统需要:1. 数据模型(Pydantic)2. 数据库模型(SQLAlchemy或Tortoise-ORM)3. 路由(API端点)4. 认证和授权(JWT或OAuth2)5. 错误处理6. 中间件(可选)我给你一个最小可工作示例:
第二步:生成学习代码
fromfastapiimportFastAPI, HTTPException, DependsfrompydanticimportBaseModelfromtypingimportList, Optionalimportuuidfromdatetimeimportdatetimeapp = FastAPI(title="用户管理系统")# 数据模型classUserBase(BaseModel):username: stremail: strfull_name: Optional[str] = NoneclassUserCreate(UserBase):password: strclassUser(UserBase):id: strcreated_at: datetimeclassConfig:orm_mode = True# 内存存储(实际项目用数据库)users_db = {}# API端点@app.get("/")defread_root():return {"message": "用户管理系统 API"}@app.post("/users/", response_model=User)defcreate_user(user: UserCreate):"""创建新用户"""user_id = str(uuid.uuid4())# 检查用户名是否已存在forexisting_userinusers_db.values():ifexisting_user["username"] == user.username:raiseHTTPException(status_code=400, detail="用户名已存在")db_user = {"id": user_id,"username": user.username,"email": user.email,"full_name": user.full_name,"created_at": datetime.now() }users_db[user_id] = db_userreturndb_user@app.get("/users/", response_model=List[User])defread_users(skip: int = 0, limit: int = 100):"""获取用户列表"""returnlist(users_db.values())[skip:skip+limit]@app.get("/users/{user_id}", response_model=User)defread_user(user_id: str):"""获取单个用户"""ifuser_idnotinusers_db:raiseHTTPException(status_code=404, detail="用户不存在")returnusers_db[user_id]@app.put("/users/{user_id}", response_model=User)defupdate_user(user_id: str, user_update: UserBase):"""更新用户"""ifuser_idnotinusers_db:raiseHTTPException(status_code=404, detail="用户不存在")# 检查用户名冲突(排除自己)foruid, existing_userinusers_db.items():ifuid!= user_idandexisting_user["username"] == user_update.username:raiseHTTPException(status_code=400, detail="用户名已存在")users_db[user_id].update(user_update.dict(exclude_unset=True))returnusers_db[user_id]@app.delete("/users/{user_id}")defdelete_user(user_id: str):"""删除用户"""ifuser_idnotinusers_db:raiseHTTPException(status_code=404, detail="用户不存在")delusers_db[user_id]return {"message": "用户删除成功"}# 运行:uvicorn main:app --reload
第三步:我追问和深化
我:这个例子很好,但实际项目还需要:1. 数据库连接(用SQLAlchemy)2. 密码哈希(不能存明文)3. JWT认证4. 单元测试5. 分页和过滤能帮我逐步实现这些吗?ChatGPT:当然,我们一步步来。首先添加数据库支持...
第四步:实际项目应用基于ChatGPT的指导,我快速搭建了项目骨架:
my_user_system/├── app/│ ├── __init__.py│ ├── main.py # FastAPI应用│ ├── database.py # 数据库连接│ ├── models.py # 数据模型│ ├── schemas.py # Pydantic模型│ ├── crud.py # 数据库操作│ ├── auth.py # 认证逻辑│ └── dependencies.py # 依赖注入├── tests/ # 单元测试├── requirements.txt└── README.md
学习效果:
学习时间:2小时 vs 1天
理解深度:通过问答深入理解原理
实践能力:立即能写实际代码
有一个祖传的500行脚本,结构混乱,难以维护。
ChatGPT协助重构:
第一步:分析现有代码
我:我有一个数据处理脚本,主要问题:1. 一个函数做了太多事情(200行)2. 全局变量多3. 错误处理混乱4. 没有类型提示你能帮我分析如何重构吗?ChatGPT:请提供代码结构,我帮你分析。
第二步:提供重构建议
# ChatGPT的分析和建议:原始问题:1.process_data()函数太长(责任过多)2.使用全局变量config、logger、db_connection3.错误处理混在业务逻辑中4.没有类型提示,难以理解数据结构重构建议:# 1. 将大函数拆分为小函数(单一职责)原始:defprocess_data(input_file, output_file):重构:-defload_config() ->Config-defsetup_logging(config: Config) ->Logger-defcreate_db_connection(config: Config) ->Connection-defread_input_data(file_path: str) ->DataFrame-defvalidate_data(data: DataFrame) ->bool-deftransform_data(data: DataFrame) ->DataFrame-defsave_results(data: DataFrame, output_path: str)# 2. 使用类组织相关功能classDataProcessor:def__init__(self, config_path: str):self.config = self.load_config(config_path)self.logger = self.setup_logging()self.db_conn = self.create_connection()defprocess(self, input_file: str, output_file: str):"""主处理流程"""data = self.read_data(input_file)ifself.validate_data(data):transformed = self.transform_data(data)self.save_data(transformed, output_file)self.logger.info("处理完成")else:self.logger.error("数据验证失败")# 3. 添加类型提示fromtypingimportOptional, Dict, AnyfrompandasimportDataFrameimportpandasaspd# 4. 统一错误处理classDataProcessingError(Exception):passdefsafe_process():try:processor = DataProcessor("config.yaml")processor.process("input.csv", "output.csv")exceptDataProcessingErrorase:logger.error(f"数据处理失败: {e}")# 清理资源exceptExceptionase:logger.exception(f"未预期的错误: {e}")raise
第三步:逐步重构我按照建议,分步骤重构:
# 第一步:定义数据类fromdataclassesimportdataclassfromtypingimportOptional, Dict, Anyfromdatetimeimportdatetime@dataclassclassProcessingConfig:input_encoding: str = "utf-8"output_format: str = "csv"chunk_size: int = 1000log_level: str = "INFO"@dataclassclassProcessingResult:success: boolinput_rows: intoutput_rows: interrors: List[str]start_time: datetimeend_time: datetimeduration_seconds: float# 第二步:创建处理器类classDataProcessor:def__init__(self, config: ProcessingConfig):self.config = configself.logger = self._setup_logger()self._reset_state()def_reset_state(self):"""重置处理状态"""self.current_file = Noneself.processed_rows = 0self.errors = []defprocess_file(self, input_path: str, output_path: str) ->ProcessingResult:"""处理单个文件"""start_time = datetime.now()try:self._reset_state()self.current_file = input_pathself.logger.info(f"开始处理文件: {input_path}")# 读取数据data = self._read_input(input_path)input_rows = len(data)# 验证数据ifnotself._validate_data(data):raiseDataValidationError("数据验证失败")# 转换数据transformed = self._transform_data(data)# 保存结果self._save_output(transformed, output_path)output_rows = len(transformed)# 记录成功self.processed_rows = output_rowsself.logger.info(f"处理完成: {input_rows} -> {output_rows} 行")returnProcessingResult(success=True,input_rows=input_rows,output_rows=output_rows,errors=self.errors,start_time=start_time,end_time=datetime.now(),duration_seconds=(datetime.now() -start_time).total_seconds() )exceptExceptionase:self.logger.error(f"处理失败: {e}")returnProcessingResult(success=False,input_rows=0,output_rows=0,errors=[str(e)] +self.errors,start_time=start_time,end_time=datetime.now(),duration_seconds=(datetime.now() -start_time).total_seconds() )def_read_input(self, file_path: str) ->pd.DataFrame:"""读取输入文件"""# 实现...passdef_validate_data(self, data: pd.DataFrame) ->bool:"""验证数据"""# 实现...passdef_transform_data(self, data: pd.DataFrame) ->pd.DataFrame:"""转换数据"""# 实现...passdef_save_output(self, data: pd.DataFrame, output_path: str):"""保存输出"""# 实现...pass# 第三步:使用工厂模式创建处理器classProcessorFactory:@staticmethoddefcreate_processor(config_path: Optional[str] = None) ->DataProcessor:"""创建数据处理器"""ifconfig_path:config = ProcessorFactory._load_config(config_path)else:config = ProcessingConfig() # 默认配置returnDataProcessor(config)@staticmethoddef_load_config(config_path: str) ->ProcessingConfig:"""加载配置文件"""# 实现...pass
重构效果:
代码行数:500行 → 300行(更清晰)
可维护性:大幅提升
可测试性:每个函数都可单独测试
错误处理:统一且完整
经过实践,我总结了和ChatGPT协作写Python代码的最佳实践:
1. 明确需求,分步骤进行
不好的提问:帮我写个数据处理脚本好的提问:我需要一个Python脚本,实现以下功能:1. 从API获取JSON数据2. 解析并转换为DataFrame3. 过滤掉异常值(数值大于3倍标准差)4. 保存为CSV文件5. 添加错误处理和日志记录
2. 提供上下文和约束
提供:- 输入数据格式示例- 期望的输出格式- 性能要求(如处理100万行数据)- 环境约束(Python版本、可用库)- 已有的代码片段(如果需要集成)
3. 迭代改进,不要一次求全
第一轮:生成核心功能代码第二轮:添加错误处理第三轮:优化性能第四轮:添加文档和测试
4. 理解而非复制
阅读ChatGPT生成的代码,理解其思路
问“为什么这样写”而不仅仅是“怎么写”
修改和优化生成的代码,加入自己的思考
5. 验证和测试
# 对ChatGPT生成的代码,一定要测试deftest_chatgpt_code():# 1. 单元测试assertfunction_works_correctly()# 2. 边界测试test_edge_cases()# 3. 性能测试ifis_too_slow():ask_chatgpt_to_optimize()# 4. 安全测试check_for_security_issues()
6. 保持批判性思维
ChatGPT可能生成过时的代码(如用Python 2语法)
可能推荐不安全的实践
可能不理解你的具体业务逻辑
你才是最终的责任人
虽然ChatGPT很强大,但有些情况不适合:
1. 安全性要求高的代码
加密算法实现
认证授权逻辑
支付处理代码
2. 性能关键代码
高频交易系统
实时数据处理
算法核心优化
3. 复杂的业务逻辑
领域特定的复杂规则
公司内部业务逻辑
需要深度领域知识的代码
4. 学习初期的基础代码
Python基础语法练习
算法和数据结构实现
面向对象设计练习
刚开始用ChatGPT写代码时,我也有顾虑:会不会让我变懒?会不会降低代码质量?会不会失去思考能力?
几个月用下来,我发现正好相反:ChatGPT不是替代我思考,而是放大我的思考能力。
就像有了计算器,我们并没有忘记算术,而是能处理更复杂的数学问题。有了ChatGPT,我并没有停止思考,而是能思考更复杂的编程问题。
