# 核心功能1: 基于Spark的设备故障概率离线预测模型def predict_device_failures(): from pyspark.sql import SparkSession from pyspark.ml.feature import VectorAssembler from pyspark.ml.regression import LinearRegression from pyspark.sql.functions import col, when, datediff, current_date, lit spark = SparkSession.builder.appName("DeviceFailurePrediction").getOrCreate() # 模拟从MySQL加载历史维修记录和设备信息 data = [ (1, "电梯A", 365, 30, 5, 1), (2, "路灯B", 180, 10, 2, 0), (3, "电梯A", 400, 5, 6, 1), (4, "门禁C", 90, 60, 1, 0), (5, "路灯B", 200, 15, 3, 1), (6, "电梯A", 380, 20, 7, 1) ] columns = ["device_id", "device_type", "device_age_days", "days_since_last_repair", "repair_count", "failed"] df = spark.createDataFrame(data, columns) # 特征工程:创建特征向量 df = df.withColumn("risk_factor", when(col("device_type") == "电梯A", 3).when(col("device_type") == "路灯B", 1).otherwise(2)) assembler = VectorAssembler(inputCols=["device_age_days", "days_since_last_repair", "repair_count", "risk_factor"], outputCol="features") train_data = assembler.transform(df) # 训练一个简单的线性回归模型来预测故障概率 lr = LinearRegression(featuresCol="features", labelCol="failed", maxIter=10, regParam=0.3, elasticNetParam=0.8) lr_model = lr.fit(train_data) # 对所有设备进行预测 predictions = lr_model.transform(train_data) # 提取预测结果并模拟写回MySQL的prediction_result表 predicted_results = predictions.select("device_id", "prediction").withColumnRenamed("prediction", "failure_probability") print("--- Spark预测作业完成,结果已更新至数据库 ---") predicted_results.show() spark.stop()# 核心功能2: 住户提交报修请求的后端处理逻辑def submit_repair_request_view(request): # 假设使用了Django框架,这是一个视图函数的核心逻辑 user_id = request.POST.get('user_id') device_id = request.POST.get('device_id') description = request.POST.get('description') # 基本数据校验 if not all([user_id, device_id, description]): return {'status': 'error', 'message': '信息不完整,请检查后重试'} # 创建报修单 try: # new_repair = Repair.objects.create(user_id=user_id, device_id=device_id, description=description, status='待处理') # new_repair.save() print(f"用户 {user_id} 成功提交了关于设备 {device_id} 的报修单。") # 业务逻辑:检查该设备是否在高风险预测列表中 # high_risk_device = PredictionResult.objects.filter(device_id=device_id, failure_probability__gt=0.75).first() # if high_risk_device: # print(f"警告:设备 {device_id} 的预测故障概率高达 {high_risk_device.failure_probability},已自动提升工单优先级!") # # new_repair.priority = '高' # # new_repair.save() # 业务逻辑:自动通知可用的维修人员 # available_staff = MaintenanceStaff.objects.filter(is_available=True).first() # if available_staff: # print(f"已自动向维修人员 {available_staff.name} 发送新工单通知。") # # send_notification(available_staff, f"有新的高优先级工单:设备 {device_id} 需要维修") return {'status': 'success', 'message': '报修提交成功,我们将尽快处理'} except Exception as e: print(f"提交报修时发生错误: {e}") return {'status': 'error', 'message': '服务器内部错误,请稍后再试'}# 核心功能3: 管理员生成预防性维护计划def generate_proactive_maintenance_plan(): # 假设这是一个定时任务或管理员触发的任务 print("--- 开始生成预防性维护计划 ---") # 从数据库中获取由Spark预测出的高风险设备列表 # high_risk_devices = PredictionResult.objects.filter(failure_probability__gte=0.7).order_by('-failure_probability') # 模拟查询结果 high_risk_devices = [ {'device_id': 1, 'device_name': '电梯A', 'device_location': '1号楼', 'failure_probability': 0.92}, {'device_id': 5, 'device_name': '路灯B', 'device_location': '中心花园', 'failure_probability': 0.81}, {'device_id': 3, 'device_name': '电梯A', 'device_location': '2号楼', 'failure_probability': 0.78} ] if not high_risk_devices: print("当前没有需要特别关注的预防性维护项目。") return {'status': 'info', 'plan': []} maintenance_plan = [] for device in high_risk_devices: # 业务逻辑:根据设备类型和位置,推荐维护措施 action = "建议立即进行全面检查和关键部件更换" if device['device_name'] == '路灯B': action = "建议检查线路并更换老化灯泡" # 业务逻辑:检查所需备件库存 # required_parts = get_required_parts(device['device_id']) # stock_status = "库存充足" # for part in required_parts: # if Inventory.objects.filter(part_id=part['id'], quantity__lt=part['needed_quantity']).exists(): # stock_status = "库存不足,请及时采购" # break stock_status = "库存充足" # 模拟 plan_item = { 'device_info': f"{device['device_name']} ({device['device_location']})", 'risk': f"预测故障概率: {device['failure_probability']:.2%}", 'recommended_action': action, 'stock_status': stock_status, 'suggested_date': "下周内完成" } maintenance_plan.append(plan_item) print("--- 预防性维护计划生成完毕 ---") return {'status': 'success', 'plan': maintenance_plan}