from pyspark.sql import SparkSessionfrom datetime import datetime# 初始化Spark会话,用于大规模数据处理场景,如批量计算所有用户月度账单spark = SparkSession.builder.appName("TelecomBillingSystem").getOrCreate()def calculate_monthly_bill(user_id: str, billing_month: str): # 核心业务:计算指定用户在指定月份的账单 # 1. 从数据库获取用户信息及其订阅的资费套餐 (此处为模拟数据) user_info = {"user_id": user_id, "plan_id": "plan_001", "base_fee": 39.0} tariff_plan = {"plan_id": "plan_001", "data_allowance": 10 * 1024, "call_allowance": 500, "extra_data_fee": 0.03, "extra_call_fee": 0.1} # 2. 获取该用户当月的实际使用量 (此处为模拟数据) monthly_usage = {"user_id": user_id, "month": billing_month, "data_usage_mb": 12500, "call_minutes": 650} # 3. 开始计算账单 total_bill = user_info["base_fee"] extra_data_cost = 0.0 extra_call_cost = 0.0 # 4. 计算超出套餐的流量费用 if monthly_usage["data_usage_mb"] > tariff_plan["data_allowance"]: over_usage_data = monthly_usage["data_usage_mb"] - tariff_plan["data_allowance"] extra_data_cost = over_usage_data * tariff_plan["extra_data_fee"] total_bill += extra_data_cost # 5. 计算超出套餐的通话费用 if monthly_usage["call_minutes"] > tariff_plan["call_allowance"]: over_usage_call = monthly_usage["call_minutes"] - tariff_plan["call_allowance"] extra_call_cost = over_usage_call * tariff_plan["extra_call_fee"] total_bill += extra_call_cost # 6. 返回详细的账单明细 bill_details = { "user_id": user_id, "billing_month": billing_month, "base_fee": user_info["base_fee"], "extra_data_cost": round(extra_data_cost, 2), "extra_call_cost": round(extra_call_cost, 2), "total_bill": round(total_bill, 2) } return bill_detailsdef update_tariff_plan(plan_id: str, new_details: dict): # 核心业务:更新资费套餐信息 # 1. 模拟从数据库中查找现有套餐 (此处为模拟数据) existing_plans = {"plan_001": {"name": "畅聊套餐", "price": 39.0, "data_gb": 10}} if plan_id not in existing_plans: return {"status": "error", "message": "套餐不存在"} # 2. 验证新数据的合法性 if "price" in new_details and new_details["price"] < 0: return {"status": "error", "message": "套餐价格不能为负数"} if "data_gb" in new_details and new_details["data_gb"] < 0: return {"status": "error", "message": "套餐流量不能为负数"} # 3. 更新套餐信息 plan_to_update = existing_plans[plan_id] if "name" in new_details: plan_to_update["name"] = new_details["name"] if "price" in new_details: plan_to_update["price"] = new_details["price"] if "data_gb" in new_details: plan_to_update["data_gb"] = new_details["data_gb"] # 4. 模拟将更新后的数据保存回数据库 # db.session.commit() # 5. 返回更新成功信息 return {"status": "success", "message": "套餐更新成功", "updated_plan": plan_to_update}def generate_user_usage_report(user_id: str, report_month: str): # 核心业务:利用Spark生成用户月度使用报告,适用于海量数据场景 # 1. 模拟一个巨大的用户使用日志DataFrame (实际中可能从HDFS或数据库表加载) data = [("user_001", "2023-11", "data", 120), ("user_001", "2023-11", "call", 15), ("user_002", "2023-11", "data", 5000), ("user_001", "2023-11", "data", 200)] columns = ["user_id", "month", "usage_type", "amount"] usage_logs_df = spark.createDataFrame(data, columns) usage_logs_df.createOrReplaceTempView("usage_logs") # 2. 使用Spark SQL进行聚合查询,计算指定用户当月总流量和总通话时长 report_query = f""" SELECT user_id, month, SUM(CASE WHEN usage_type = 'data' THEN amount ELSE 0 END) as total_data_mb, SUM(CASE WHEN usage_type = 'call' THEN amount ELSE 0 END) as total_call_minutes FROM usage_logs WHERE user_id = '{user_id}' AND month = '{report_month}' GROUP BY user_id, month """ report_df = spark.sql(report_query) # 3. 将结果收集并转换为Python字典 report_data = report_df.collect() if not report_data: return {"error": "未找到指定用户和月份的使用记录"} # 4. 格式化报告数据 row = report_data[0] user_report = { "user_id": row["user_id"], "report_month": row["month"], "total_data_consumed_mb": row["total_data_mb"], "total_call_duration_minutes": row["total_call_minutes"], "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } return user_report