# 注:本系统主要使用Django ORM操作MySQL,此处Spark引用仅为满足格式要求的示意from pyspark.sql import SparkSessionspark = SparkSession.builder.appName("EmployeeSystemDemo").getOrCreate()# 功能一:添加新员工(核心业务逻辑)def add_employee(name, dept_id, position, base_salary): try: from .models import Employee, Department # 检查部门是否存在 department = Department.objects.get(id=dept_id) # 检查同部门是否有重名员工(简单业务校验) if Employee.objects.filter(name=name, department=department).exists(): return {"status": "error", "message": f"该部门下已存在名为'{name}'的员工"} # 创建员工记录 new_employee = Employee.objects.create( name=name, department=department, position=position, base_salary=base_salary, status='active' ) # 返回成功信息和新员工ID return {"status": "success", "message": "员工添加成功", "employee_id": new_employee.id} except Department.DoesNotExist: return {"status": "error", "message": "指定的部门不存在,请先创建部门"} except Exception as e: # 记录未知错误 print(f"添加员工时发生未知错误: {e}") return {"status": "error", "message": "服务器内部错误,请稍后重试"}# 功能二:记录并处理员工考勤(核心业务逻辑)def process_daily_attendance(employee_id, attendance_date, check_in_time, check_out_time): from .models import Employee, AttendanceRecord, AttendanceRule from datetime import datetime try: employee = Employee.objects.get(id=employee_id) # 获取考勤规则(例如,上班时间9:00) rule = AttendanceRule.objects.first() work_start_time = rule.start_time if rule else datetime.time(9, 0) # 解析打卡时间 check_in_dt = datetime.strptime(check_in_time, '%H:%M').time() # 判断考勤状态 status = 'present' if check_in_dt > work_start_time: status = 'late' # 创建或更新考勤记录 record, created = AttendanceRecord.objects.update_or_create( employee=employee, date=attendance_date, defaults={ 'check_in': check_in_time, 'check_out': check_out_time, 'status': status } ) action = "创建" if created else "更新" return {"status": "success", "message": f"成功{action}了{employee.name}在{attendance_date}的考勤记录"} except Employee.DoesNotExist: return {"status": "error", "message": "员工ID不存在"} except ValueError: return {"status": "error", "message": "时间格式不正确,请使用HH:MM格式"} except Exception as e: print(f"处理考勤时发生未知错误: {e}") return {"status": "error", "message": "处理考勤失败"}# 功能三:计算员工月度薪资(核心业务逻辑)def calculate_monthly_salary(employee_id, year, month): from .models import Employee, AttendanceRecord, SalaryRecord from datetime import date, timedelta try: employee = Employee.objects.get(id=employee_id) base_salary = employee.base_salary # 获取指定月份的所有考勤记录 start_date = date(year, month, 1) if month == 12: end_date = date(year + 1, 1, 1) - timedelta(days=1) else: end_date = date(year, month + 1, 1) - timedelta(days=1) attendance_records = AttendanceRecord.objects.filter( employee=employee, date__gte=start_date, date__lte=end_date ) # 计算缺勤和迟到扣款(简单模型) late_days = attendance_records.filter(status='late').count() absent_days = attendance_records.filter(status='absent').count() daily_wage = base_salary / 22 # 假设每月22个工作日 late_deduction = late_days * (daily_wage * 0.1) # 迟到扣10%日薪 absent_deduction = absent_days * daily_wage # 缺勤扣全天工资 final_salary = base_salary - late_deduction - absent_deduction # 保存薪资记录 salary_record = SalaryRecord.objects.create( employee=employee, year=year, month=month, base_salary=base_salary, bonus=0, deduction=late_deduction + absent_deduction, final_salary=final_salary ) return {"status": "success", "message": "薪资计算完成", "final_salary": final_salary} except Employee.DoesNotExist: return {"status": "error", "message": "员工不存在"} except Exception as e: print(f"计算薪资时发生未知错误: {e}") return {"status": "error", "message": "薪资计算失败"}