# 引入SparkSession,为未来可能的大数据分析功能预留接口,当前主要用于演示集成思路from pyspark.sql import SparkSessionspark = SparkSession.builder.appName("FarmDataAnalysis").getOrCreate()def record_planting(request): try: land_id = request.POST.get('land_id') crop_type = request.POST.get('crop_type') plant_date = request.POST.get('plant_date') seed_amount = float(request.POST.get('seed_amount')) if not all([land_id, crop_type, plant_date, seed_amount]): return {'status': 'error', 'message': '参数不完整'} land = Land.objects.get(id=land_id) if land.status != 'idle': return {'status': 'error', 'message': '该地块非空闲状态'} seed_inventory = Inventory.objects.get(item_name=crop_type + '种子') if seed_inventory.quantity < seed_amount: return {'status': 'error', 'message': '种子库存不足'} with transaction.atomic(): new_crop_record = CropRecord.objects.create(land_id=land_id, crop_type=crop_type, plant_date=plant_date, status='growing') land.status = 'planted' land.save() seed_inventory.quantity -= seed_amount seed_inventory.save() InventoryLog.objects.create(item_name=seed_inventory.item_name, change_type='out', quantity=seed_amount, related_record_id=new_crop_record.id) return {'status': 'success', 'message': '种植记录成功', 'record_id': new_crop_record.id} except Land.DoesNotExist: return {'status': 'error', 'message': '地块不存在'} except Inventory.DoesNotExist: return {'status': 'error', 'message': '种子库存记录不存在'} except Exception as e: return {'status': 'error', 'message': f'系统错误: {str(e)}'}def check_inventory_alerts(): alert_items = [] low_stock_threshold = 10.0 all_inventory_items = Inventory.objects.all() for item in all_inventory_items: if item.quantity <= low_stock_threshold: alert_message = f"库存预警:物品 '{item.item_name}' 当前库存为 {item.quantity}{item.unit},已低于最低阈值 {low_stock_threshold}{item.unit},请及时采购!" alert_items.append({ 'item_name': item.item_name, 'current_quantity': item.quantity, 'unit': item.unit, 'threshold': low_stock_threshold, 'alert_message': alert_message }) if alert_items: try: alert_content = "\n".join([item['alert_message'] for item in alert_items]) SystemNotification.objects.create(type='inventory_alert', content=alert_content) except Exception as e: print(f"创建库存预警通知失败: {str(e)}") return alert_itemsdef generate_yield_report(start_date, end_date): report_data = {} try: harvest_records = HarvestRecord.objects.filter(harvest_date__range=[start_date, end_date], status='completed') crop_yield_summary = harvest_records.values('crop_type').annotate( total_yield=Sum('actual_yield'), total_income=Sum('actual_yield') * F('crop__selling_price_per_kg'), harvest_count=Count('id') ).order_by('-total_income') for summary in crop_yield_summary: crop_type = summary['crop_type'] total_yield = summary['total_yield'] or 0 total_income = summary['total_income'] or 0 harvest_count = summary['harvest_count'] avg_yield_per_harvest = total_yield / harvest_count if harvest_count > 0 else 0 report_data[crop_type] = { 'total_yield_kg': round(total_yield, 2), 'total_income_yuan': round(total_income, 2), 'harvest_times': harvest_count, 'average_yield_kg': round(avg_yield_per_harvest, 2) } total_farm_income = sum([data['total_income_yuan'] for data in report_data.values()]) report_data['summary'] = {'total_income': round(total_farm_income, 2), 'period': f"{start_date} 至 {end_date}"} return {'status': 'success', 'data': report_data} except Exception as e: return {'status': 'error', 'message': f'生成报告失败: {str(e)}'}