"""tools.py - 办公自动化工具集适用于日常办公场景的实用工具集合,包括文件处理、数据转换、时间管理、邮件提醒等功能"""import datetimeimport jsonimport csvimport osimport reimport randomimport shutilfrom pathlib import Pathfrom typing import List, Dict, Any, Optional, Unionfrom collections import Counterimport hashlibclass OfficeTools: """面向办公室场景的Agent工具集合""" # ==================== 文件处理工具 ==================== @staticmethod def read_file(file_path: str, encoding: str = 'utf-8') -> str: """ 读取文件内容(支持txt、json、csv等格式) 参数: file_path: 文件路径 encoding: 文件编码,默认utf-8 返回: 文件内容字符串 """ try: path = Path(file_path) if not path.exists(): return f"[错误] 文件不存在: {file_path}" # 根据扩展名选择读取方式 suffix = path.suffix.lower() if suffix == '.json': with open(path, 'r', encoding=encoding) as f: data = json.load(f) return f"[JSON文件] 内容:\n{json.dumps(data, ensure_ascii=False, indent=2)}" elif suffix == '.csv': with open(path, 'r', encoding=encoding) as f: reader = csv.reader(f) rows = list(reader) if not rows: return "[CSV文件] 文件为空" # 格式化输出 result = f"[CSV文件] 共{len(rows)}行,{len(rows[0])}列\n" for i, row in enumerate(rows[:20]): # 最多显示20行 result += f"第{i+1}行: {', '.join(row)}\n" if len(rows) > 20: result += f"... 还有{len(rows)-20}行未显示" return result else: # txt或其他文本文件 content = path.read_text(encoding=encoding) preview = content[:1000] if len(content) > 1000 else content return f"[文本文件] 文件大小: {len(content)}字符\n预览:\n{preview}" except Exception as e: return f"[错误] 读取文件失败: {str(e)}" @staticmethod def write_file(file_path: str, content: str, mode: str = 'w') -> str: """ 写入文件内容(办公常用:写报告、保存数据等) 参数: file_path: 文件路径 content: 要写入的内容 mode: 'w'覆盖写入,'a'追加写入 返回: 操作结果 """ try: path = Path(file_path) # 确保目录存在 path.parent.mkdir(parents=True, exist_ok=True) with open(path, 'w' if mode == 'w' else 'a', encoding='utf-8') as f: f.write(content) return f"[成功] 内容已保存到: {file_path} (模式: {'覆盖'if mode == 'w'else'追加'})" except Exception as e: return f"[错误] 写入文件失败: {str(e)}" @staticmethod def copy_file(source: str, destination: str) -> str: """ 复制文件或文件夹 参数: source: 源路径 destination: 目标路径 返回: 操作结果 """ try: src = Path(source) dst = Path(destination) if src.is_file(): shutil.copy2(src, dst) return f"[成功] 文件已复制: {source} -> {destination}" elif src.is_dir(): shutil.copytree(src, dst) return f"[成功] 文件夹已复制: {source} -> {destination}" else: return f"[错误] 源路径不存在: {source}" except Exception as e: return f"[错误] 复制失败: {str(e)}" @staticmethod def list_directory(path: str = '.', pattern: str = '*') -> str: """ 列出目录中的文件(办公常用:查看工作文件夹) 参数: path: 目录路径,默认为当前目录 pattern: 文件名匹配模式,如 '*.xlsx' 返回: 文件列表 """ try: dir_path = Path(path) if not dir_path.exists(): return f"[错误] 目录不存在: {path}" files = list(dir_path.glob(pattern)) # 统计文件类型 file_types = Counter(f.suffix for f in files if f.is_file()) result = f"[目录列表] {dir_path.absolute()}\n" result += f"总计: {len(files)}个项目\n" result += f"文件类型分布: {dict(file_types)}\n\n" # 按修改时间排序 files.sort(key=lambda x: x.stat().st_mtime, reverse=True) for item in files[:30]: # 最多显示30个 if item.is_file(): size = item.stat().st_size size_str = f"{size}B" if size < 1024 else f"{size/1024:.1f}KB" mtime = datetime.datetime.fromtimestamp(item.stat().st_mtime).strftime('%Y-%m-%d %H:%M') result += f"📄 {item.name} ({size_str}) - {mtime}\n" else: result += f"📁 {item.name}/\n" if len(files) > 30: result += f"... 还有{len(files)-30}个项目未显示" return result except Exception as e: return f"[错误] 列出目录失败: {str(e)}" # ==================== 数据处理工具 ==================== @staticmethod def parse_table_data(data: str, format_type: str = 'auto') -> str: """ 解析表格数据(CSV、Markdown表格、Excel复制文本等) 参数: data: 原始数据字符串 format_type: 格式类型(auto/csv/md/tsv) 返回: 格式化的表格数据 """ try: lines = data.strip().split('\n') # 自动检测格式 if format_type == 'auto': if '|' in lines[0] and '---' in lines[1]: format_type = 'md' # Markdown表格 elif '\t' in lines[0]: format_type = 'tsv' elif ',' in lines[0]: format_type = 'csv' else: return "[错误] 无法自动识别数据格式" # 解析数据 rows = [] if format_type == 'csv': for line in lines: rows.append([cell.strip() for cell in line.split(',')]) elif format_type == 'tsv': for line in lines: rows.append([cell.strip() for cell in line.split('\t')]) elif format_type == 'md': # 跳过Markdown表头分隔线 for i, line in enumerate(lines): if '---' in line: continue if '|' in line: cells = [c.strip() for c in line.split('|')[1:-1]] rows.append(cells) if not rows: return "[错误] 未找到有效数据" # 对齐输出 col_widths = [max(len(str(row[i])) for row in rows) for i in range(len(rows[0]))] result = f"[表格数据] {len(rows)}行 × {len(rows[0])}列\n" # 添加表头(第一行加粗标识) for i, row in enumerate(rows): if i == 0: result += "📊 " else: result += " " formatted = [f"{cell:<{col_widths[j]}}" for j, cell in enumerate(row)] result += " | ".join(formatted) if i == 0: result += "\n" + " " + "-" * (sum(col_widths) + len(col_widths) * 3) result += "\n" return result except Exception as e: return f"[错误] 解析表格数据失败: {str(e)}" @staticmethod def convert_units(value: float, from_unit: str, to_unit: str) -> str: """ 单位转换(长度、重量、温度等) 参数: value: 数值 from_unit: 原单位(如 'km', 'kg', 'c') to_unit: 目标单位 返回: 转换结果 """ # 定义转换率(基准单位都用国际标准) conversions = { # 长度 (基准: 米) 'length': {'m': 1, 'km': 1000, 'cm': 0.01, 'mm': 0.001, 'in': 0.0254, 'ft': 0.3048, 'yd': 0.9144, 'mile': 1609.34}, # 重量 (基准: 千克) 'weight': {'kg': 1, 'g': 0.001, 'mg': 0.000001, 'lb': 0.453592, 'oz': 0.0283495, 't': 1000}, # 温度 (特例) 'temperature': ['c', 'f', 'k'] } try: from_unit = from_unit.lower() to_unit = to_unit.lower() # 温度转换(特例) if from_unit in conversions['temperature'] and to_unit in conversions['temperature']: if from_unit == 'c' and to_unit == 'f': result = value * 9/5 + 32 elif from_unit == 'f' and to_unit == 'c': result = (value - 32) * 5/9 elif from_unit == 'c' and to_unit == 'k': result = value + 273.15 elif from_unit == 'k' and to_unit == 'c': result = value - 273.15 else: result = value return f"[单位转换] {value}{from_unit.upper()} = {result:.2f}{to_unit.upper()}" # 查找单位类型 unit_type = None for utype, units in conversions.items(): if utype != 'temperature' and from_unit in units and to_unit in units: unit_type = utype break if not unit_type: return f"[错误] 不支持的转换: {from_unit} -> {to_unit}" units = conversions[unit_type] # 转换为基准单位,再转换为目标单位 base_value = value * units[from_unit] result = base_value / units[to_unit] return f"[单位转换] {value}{from_unit} = {result:.4f}{to_unit}" except Exception as e: return f"[错误] 转换失败: {str(e)}" # ==================== 时间管理工具 ==================== @staticmethod def get_current_time(timezone: str = 'local') -> str: """ 获取当前时间(支持时区) 参数: timezone: 时区(local/UTC/asia/shanghai/us/ny等) 返回: 格式化的时间信息 """ now = datetime.datetime.now() time_info = f"[当前时间] {now.strftime('%Y年%m月%d日 %H:%M:%S')}\n" time_info += f"📅 星期: {['一','二','三','四','五','六','日'][now.weekday()]}\n" time_info += f"📆 第{now.strftime('%W')}周 | 本年第{now.timetuple().tm_yday}天\n" # 添加时间戳 timestamp = int(now.timestamp()) time_info += f"⏰ 时间戳: {timestamp}\n" # 常见时区参考 time_info += "🌍 国际时间参考:\n" utc_now = datetime.datetime.utcnow() time_info += f" UTC时间: {utc_now.strftime('%H:%M')}\n" time_info += f" 东京时间: {(now + datetime.timedelta(hours=1)).strftime('%H:%M')}\n" time_info += f" 纽约时间: {(now - datetime.timedelta(hours=13)).strftime('%H:%M')}\n" return time_info @staticmethod def calculate_workdays(start_date: str, end_date: str, include_weekend: bool = False) -> str: """ 计算工作日天数(排除周末和节假日) 参数: start_date: 开始日期 'YYYY-MM-DD' end_date: 结束日期 'YYYY-MM-DD' include_weekend: 是否包含周末 返回: 工作日统计信息 """ try: start = datetime.datetime.strptime(start_date, '%Y-%m-%d') end = datetime.datetime.strptime(end_date, '%Y-%m-%d') total_days = (end - start).days + 1 workdays = 0 holidays = 0 for i in range(total_days): current = start + datetime.timedelta(days=i) is_weekend = current.weekday() >= 5 if include_weekend: workdays += 1 else: if not is_weekend: workdays += 1 else: holidays += 1 result = f"[工作日统计] {start_date} 至 {end_date}\n" result += f"📊 总天数: {total_days}天\n" result += f"💼 工作日: {workdays}天\n" result += f"🎉 周末: {holidays}天\n" # 简单的工作日提醒 if total_days > 30: result += f"📌 预计需要{workdays//22}个月完成项目\n" return result except Exception as e: return f"[错误] 日期格式错误,请使用 YYYY-MM-DD 格式: {str(e)}" @staticmethod def create_schedule(tasks: List[Dict[str, Any]]) -> str: """ 创建智能日程安排 参数: tasks: 任务列表,每个任务包含名称、预计时长(小时)、截止日期等信息 示例: [{'name': '写报告', 'duration': 2, 'deadline': '2024-01-15', 'priority': 'high'}] 返回: 优化后的日程安排 """ if not tasks: return "[错误] 任务列表为空" # 优先级映射 priority_order = {'high': 3, 'medium': 2, 'low': 1} # 按优先级和截止日期排序 sorted_tasks = sorted(tasks, key=lambda x: (-priority_order.get(x.get('priority', 'medium'), 2), x.get('deadline', '9999-12-31'))) schedule = "[智能日程安排]\n" schedule += "=" * 50 + "\n" total_hours = 0 current_hour = 9 # 从上午9点开始 for task in sorted_tasks: name = task.get('name', '未命名任务') duration = task.get('duration', 1) deadline = task.get('deadline', '无截止日期') priority = task.get('priority', 'medium') # 优先级标志 priority_flag = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(priority, '⚪') # 计算时间段 end_hour = current_hour + duration time_slot = f"{current_hour:02d}:00 - {end_hour:02d}:00" schedule += f"{priority_flag}{time_slot} | {name}\n" schedule += f" ⏱️ 预计{duration}小时 | 📅 截止: {deadline}\n\n" current_hour = end_hour total_hours += duration # 中午休息 if current_hour >= 12 and current_hour < 13: schedule += "🍽️ 12:00 - 13:00 午休时间\n\n" current_hour = 13 schedule += "=" * 50 + "\n" schedule += f"📈 统计: 共{len(tasks)}个任务,总耗时{total_hours}小时\n" # 工作量评估 if total_hours > 8: schedule += "⚠️ 警告: 工作量超过8小时,建议调整优先级或分配给其他人\n" elif total_hours < 4: schedule += "✅ 工作量适中,可以安排一些临时任务\n" return schedule # ==================== 沟通协作工具 ==================== @staticmethod def format_email(subject: str, recipient: str, body: str, sender: str = None) -> str: """ 格式化邮件内容(符合办公规范) 参数: subject: 邮件主题 recipient: 收件人 body: 正文内容 sender: 发件人(可选) 返回: 格式化的邮件文本 """ date_str = datetime.datetime.now().strftime('%Y年%m月%d日') email = f"""{'='*60}邮件主题: {subject}收件人: {recipient}{'发件人: ' + sender if sender else''}日期: {date_str}{'='*60}{body}{'='*60}发件时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}请及时查收并回复确认。""" return email.strip() @staticmethod def generate_meeting_agenda(topic: str, duration: int, participants: List[str], items: List[str]) -> str: """ 生成会议议程 参数: topic: 会议主题 duration: 会议时长(分钟) participants: 参会人员列表 items: 议程事项列表 返回: 标准格式的会议议程 """ agenda = f"""━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 会议议程━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━会议主题: {topic}会议时长: {duration}分钟参会人员: {', '.join(participants)}日期: {datetime.datetime.now().strftime('%Y-%m-%d')}议程安排:""" # 计算每个议程的时间分配 time_per_item = duration // len(items) if items else duration for i, item in enumerate(items, 1): agenda += f"\n{i}. {item} (约{time_per_item}分钟)" agenda += f"""注意事项:• 请准时参会• 提前准备相关资料• 重要事项请提前10分钟测试设备预计结束时间: {(datetime.datetime.now() + datetime.timedelta(minutes=duration)).strftime('%H:%M')}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━""" return agenda # ==================== 搜索和计算工具 ==================== @staticmethod def smart_search(keyword: str, search_type: str = 'all') -> str: """ 智能搜索(集成多种办公搜索需求) 参数: keyword: 搜索关键词 search_type: 搜索类型(all/天气/新闻/代码/翻译) 返回: 搜索结果 """ # 模拟数据(实际可以接入API) mock_data = { '天气': { '北京': '🌤️ 晴, 15-25°C, 空气质量良, 紫外线中等', '上海': '🌧️ 小雨, 18-22°C, 湿度85%, 带伞出行', '深圳': '☀️ 晴朗, 22-28°C, 适合户外活动' }, '新闻': { '科技': 'AI技术日新月异,办公自动化成为趋势...', '经济': '数字化转型加速,远程办公常态化...' }, '文档': { '合同模板': '【合同模板】劳动合同_标准版_v2.3.docx', '周报模板': '【周报模板】工作周报_简洁版.xlsx', 'PPT模板': '【PPT模板】年终总结_商务风.pptx' } } result = f"[智能搜索] 关键词: '{keyword}'\n\n" # 搜索类型处理 if search_type == '天气' or keyword in ['天气', '气温']: result += "🌤️ 天气信息:\n" for city, weather in mock_data['天气'].items(): result += f" {city}: {weather}\n" elif search_type == '文档' or keyword in ['模板', '文档']: result += "📄 办公文档模板:\n" for name, desc in mock_data['文档'].items(): result += f" • {name}: {desc}\n" elif search_type == '代码': result += "💻 常用代码片段:\n" result += " • Python读取Excel: pd.read_excel('file.xlsx')\n" result += " • 发送邮件: smtplib.SMTP()\n" result += " • 文件处理: pathlib.Path()\n" else: result += "📋 推荐相关资源:\n" for category in ['文档', '代码', '常用链接']: result += f" • {category}: 可进一步细化搜索\n" result += "\n💡 提示: 可以使用更精确的关键词获得更好结果" return result @staticmethod def calculate_safe(expression: str) -> str: """ 安全计算器(支持办公常用计算) 参数: expression: 数学表达式,如 '3*5+2', '总价/数量', '10% of 200' 返回: 计算结果 """ try: # 处理百分比的特殊情况 if '%' in expression and 'of' in expression: # 解析 'X% of Y' import re match = re.search(r'(\d+(?:\.\d+)?)%\s+of\s+(\d+(?:\.\d+)?)', expression) if match: percent = float(match.group(1)) total = float(match.group(2)) result = (percent / 100) * total return f"[计算器] {expression} = {result:.2f}" # 允许的字符:数字、运算符、括号、小数点、空格 allowed_chars = set("0123456789+-*/().% ") if not all(c in allowed_chars for c in expression): return "[计算器] 表达式包含非法字符,拒绝计算" # 替换%为/100 clean_expr = expression.replace('%', '/100') clean_expr = clean_expr.replace(' ', '') # 安全执行计算 safe_globals = {'__builtins__': {}, 'pow': pow, 'abs': abs} result = eval(clean_expr, safe_globals) # 格式化结果(处理浮点数) if isinstance(result, float): if result.is_integer(): result = int(result) else: result = round(result, 4) return f"[计算器] {expression} = {result}" except ZeroDivisionError: return "[计算器] 错误:除数不能为零" except SyntaxError: return "[计算器] 错误:表达式语法有误(括号不匹配或运算符错误)" except Exception as e: return f"[计算器] 计算错误: {str(e)}" # ==================== 快捷工具 ==================== @staticmethod def text_to_markdown(text: str, style: str = 'basic') -> str: """ 文本转Markdown格式(办公写作) 参数: text: 原始文本 style: 样式(basic/文章/列表) 返回: Markdown格式文本 """ lines = text.strip().split('\n') if style == 'basic': # 自动识别标题(行首有数字+点的) result = [] for line in lines: if re.match(r'^\d+\.', line): result.append(f"### {line}") elif line.strip() and not line.startswith('#'): result.append(line) else: result.append(line) return '\n\n'.join(result) elif style == '文章': # 文章格式:标题、段落、强调 result = f"# 办公文档\n\n" result += f"> 生成时间: {datetime.datetime.now().strftime('%Y-%m-%d')}\n\n" result += text return result else: # 列表格式 items = [f"- {item}" for item in lines if item.strip()] return "\n".join(items) @staticmethod def quick_reminder(message: str, minutes: int = 0) -> str: """ 快速提醒(模拟) 参数: message: 提醒内容 minutes: 多少分钟后提醒(0表示立即提醒) 返回: 提醒信息 """ if minutes > 0: reminder_time = datetime.datetime.now() + datetime.timedelta(minutes=minutes) return f"⏰ 已设置提醒: '{message}'\n📅 将在 {reminder_time.strftime('%H:%M:%S')} 提醒你" else: return f"🔔 立即提醒: {message}\n📍 请及时处理"# 工具路由器(自动选择最合适的工具)class ToolRouter: """智能工具路由器 - 根据用户输入自动选择合适的工具""" @staticmethod def route(query: str, **kwargs) -> str: """ 路由到合适的工具 参数: query: 用户查询或命令 **kwargs: 额外参数 返回: 工具执行结果 """ tools = OfficeTools() # 关键词映射 patterns = { '时间|日期|现在几点了': lambda: tools.get_current_time(), '计算|多少钱|合计': lambda: tools.calculate_safe(kwargs.get('expression', query)), '创建日程|安排|计划': lambda: tools.create_schedule(kwargs.get('tasks', [])), '复制文件|备份': lambda: tools.copy_file(kwargs.get('source', ''), kwargs.get('dest', '')), '读取文件|打开': lambda: tools.read_file(kwargs.get('file_path', '')), '天气|气温': lambda: tools.smart_search('天气', '天气'), '提醒|闹钟': lambda: tools.quick_reminder(kwargs.get('message', ''), kwargs.get('minutes', 0)), } # 匹配关键词 for pattern, action in patterns.items(): if re.search(pattern, query): return action() # 默认返回帮助信息 return f"[工具路由器] 未匹配到合适工具\n您的请求: {query}\n支持的功能: {', '.join([k for k in patterns.keys()])}"# ==================== 使用示例 ====================if __name__ == "__main__": tools = OfficeTools() print("=" * 80) print("办公工具集测试") print("=" * 80) # 测试时间管理 print("\n1. 时间管理工具:") print(tools.get_current_time()) print(tools.calculate_workdays('2024-01-01', '2024-01-31')) # 测试文件处理 print("\n2. 文件处理工具:") print(tools.list_directory('.', '*.py')) # 测试日程安排 print("\n3. 智能日程:") tasks = [ {'name': '完成项目报告', 'duration': 3, 'deadline': '2024-01-20', 'priority': 'high'}, {'name': '团队会议', 'duration': 1, 'deadline': '2024-01-15', 'priority': 'high'}, {'name': '邮件回复', 'duration': 2, 'deadline': '2024-01-14', 'priority': 'medium'} ] print(tools.create_schedule(tasks)) # 测试计算功能 print("\n4. 计算功能:") print(tools.calculate_safe("15% of 200")) print(tools.calculate_safe("(3+5)*2")) # 测试会议议程 print("\n5. 会议议程:") print(tools.generate_meeting_agenda("年终总结会议", 60, ["张三", "李四", "王五"], ["汇报年度工作", "讨论明年计划", "表彰优秀员工", "总结发言"])) # 测试单位转换 print("\n6. 单位转换:") print(tools.convert_units(100, 'km', 'mile')) print(tools.convert_units(25, 'c', 'f')) # 测试路由器 print("\n7. 智能路由测试:") print(ToolRouter.route("现在几点了"))