' 智能报价推荐系统 - VBA实现Option Explicit' 报价策略枚举Enum PricingStrategy psCostPlus ' 成本加成 psCompetitive ' 竞争性定价 psValueBased ' 价值定价 psPenetration ' 渗透定价 psSkimming ' 撇脂定价End Enum' 产品数据结构Type ProductData ProductID As String ProductName As String Category As String MaterialCost As Double LaborCost As Double OverheadCost As Double TotalCost As Double HistoricalPrice As Double HistoricalQuantity As Long CustomerSegment As String MarketPosition As StringEnd Type' 报价建议结果Type PricingRecommendation ProductID As String BaseCost As Double MinPrice As Double RecommendedPrice As Double MaxPrice As Double Confidence As Double Strategy As PricingStrategy Reason As String CompetitorPrices As CollectionEnd Type' 主报价推荐函数Sub GeneratePricingRecommendation() Dim startTime As Double startTime = Timer On Error GoTo ErrorHandler Application.ScreenUpdating = False Application.Calculation = xlCalculationManual Application.DisplayAlerts = False ' 1. 初始化 Dim wsData As Worksheet, wsResult As Worksheet, wsConfig As Worksheet Set wsData = ThisWorkbook.Sheets("历史数据") Set wsResult = ThisWorkbook.Sheets("报价建议") Set wsConfig = ThisWorkbook.Sheets("系统配置") ' 清空结果表 ClearResultSheet wsResult ' 2. 获取当前报价信息 Dim productID As String, quantity As Long, customerType As String Dim urgencyLevel As String, paymentTerms As String productID = wsConfig.Range("B2").Value quantity = wsConfig.Range("B3").Value customerType = wsConfig.Range("B4").Value urgencyLevel = wsConfig.Range("B5").Value paymentTerms = wsConfig.Range("B6").Value ' 3. 加载产品数据 Dim productData As ProductData productData = LoadProductData(wsData, productID) If productData.ProductID = "" Then MsgBox "未找到产品数据!", vbExclamation Exit Sub End If ' 4. 获取历史交易数据 Dim historyData As Collection Set historyData = GetHistoricalTransactions(wsData, productID, customerType) ' 5. 计算成本 Dim totalCost As Double totalCost = CalculateTotalCost(productData, quantity, wsConfig) ' 6. 生成报价建议 Dim recommendation As PricingRecommendation recommendation = GenerateRecommendation(productData, totalCost, quantity, _ customerType, urgencyLevel, _ paymentTerms, historyData, wsConfig) ' 7. 输出结果 OutputRecommendation wsResult, recommendation, productID, quantity ' 8. 生成分析报告 GenerateAnalysisReport wsResult, recommendation, historyData Application.ScreenUpdating = True Application.Calculation = xlCalculationAutomatic Application.DisplayAlerts = True Dim endTime As Double endTime = Timer MsgBox "报价建议生成完成!" & vbCrLf & _ "产品:" & productData.ProductName & vbCrLf & _ "建议报价:" & Format(recommendation.RecommendedPrice, "¥#,##0.00") & vbCrLf & _ "建议区间:[" & Format(recommendation.MinPrice, "¥#,##0.00") & " - " & _ Format(recommendation.MaxPrice, "¥#,##0.00") & "]" & vbCrLf & _ "置信度:" & Format(recommendation.Confidence, "0.0%") & vbCrLf & _ "耗时:" & Format(endTime - startTime, "0.0") & "秒", _ vbInformation, "报价建议" Exit SubErrorHandler: Application.ScreenUpdating = True Application.Calculation = xlCalculationAutomatic Application.DisplayAlerts = True MsgBox "报价建议生成过程中发生错误:" & vbCrLf & _ Err.Description, vbCritical, "错误"End Sub' 加载产品数据Function LoadProductData(ws As Worksheet, productID As String) As ProductData Dim data As ProductData Dim lastRow As Long, i As Long lastRow = ws.Cells(ws.Rows.Count, 1).End(xlUp).Row For i = 2 To lastRow If ws.Cells(i, 1).Value = productID Then data.ProductID = productID data.ProductName = ws.Cells(i, 2).Value data.Category = ws.Cells(i, 3).Value data.MaterialCost = ws.Cells(i, 4).Value data.LaborCost = ws.Cells(i, 5).Value data.OverheadCost = ws.Cells(i, 6).Value data.TotalCost = data.MaterialCost + data.LaborCost + data.OverheadCost data.HistoricalPrice = ws.Cells(i, 7).Value data.HistoricalQuantity = ws.Cells(i, 8).Value data.CustomerSegment = ws.Cells(i, 9).Value data.MarketPosition = ws.Cells(i, 10).Value LoadProductData = data Exit Function End If Next i ' 未找到产品 LoadProductData = dataEnd Function' 获取历史交易数据Function GetHistoricalTransactions(ws As Worksheet, productID As String, _ customerType As String) As Collection Dim transactions As New Collection Dim lastRow As Long, i As Long lastRow = ws.Cells(ws.Rows.Count, 11).End(xlUp).Row For i = 2 To lastRow ' 检查产品ID匹配 If ws.Cells(i, 11).Value = productID Then ' 如果指定了客户类型,只获取该类型客户的历史数据 If customerType = "" Or ws.Cells(i, 12).Value = customerType Then Dim transaction As Object Set transaction = CreateObject("Scripting.Dictionary") transaction("Date") = ws.Cells(i, 13).Value transaction("Customer") = ws.Cells(i, 12).Value transaction("Quantity") = ws.Cells(i, 14).Value transaction("UnitPrice") = ws.Cells(i, 15).Value transaction("TotalAmount") = ws.Cells(i, 16).Value transaction("ProfitMargin") = ws.Cells(i, 17).Value transaction("PaymentTerms") = ws.Cells(i, 18).Value transactions.Add transaction End If End If Next i Set GetHistoricalTransactions = transactionsEnd Function' 计算总成本Function CalculateTotalCost(productData As ProductData, quantity As Long, _ wsConfig As Worksheet) As Double Dim baseCost As Double baseCost = productData.TotalCost ' 数量折扣 Dim quantityDiscount As Double quantityDiscount = CalculateQuantityDiscount(quantity, wsConfig) ' 学习曲线效应(数量越多,单位成本越低) Dim learningCurveEffect As Double learningCurveEffect = CalculateLearningCurve(quantity, wsConfig) ' 特殊要求附加成本 Dim specialRequirementsCost As Double specialRequirementsCost = CalculateSpecialRequirementsCost(wsConfig) ' 计算总成本 Dim totalCost As Double totalCost = baseCost * quantity * (1 - quantityDiscount) * _ (1 - learningCurveEffect) + specialRequirementsCost ' 计算单位成本 totalCost = totalCost / quantity CalculateTotalCost = totalCostEnd Function' 计算数量折扣Function CalculateQuantityDiscount(quantity As Long, wsConfig As Worksheet) As Double Dim discountRanges As Range Set discountRanges = wsConfig.Range("QuantityDiscountTable") Dim i As Long For i = 2 To discountRanges.Rows.Count Dim minQty As Long, maxQty As Long, discount As Double minQty = discountRanges.Cells(i, 1).Value maxQty = discountRanges.Cells(i, 2).Value discount = discountRanges.Cells(i, 3).Value If quantity >= minQty And quantity <= maxQty Then CalculateQuantityDiscount = discount Exit Function End If Next i ' 默认无折扣 CalculateQuantityDiscount = 0End Function' 计算学习曲线效应Function CalculateLearningCurve(quantity As Long, wsConfig As Worksheet) As Double ' 学习曲线公式:单位成本 = 首件成本 × 数量^(log(学习率)/log(2)) Dim firstUnitCost As Double, learningRate As Double firstUnitCost = wsConfig.Range("FirstUnitCost").Value learningRate = wsConfig.Range("LearningRate").Value If quantity <= 1 Or learningRate <= 0 Then CalculateLearningCurve = 0 Exit Function End If ' 计算学习曲线效应 Dim learningEffect As Double learningEffect = Log(learningRate) / Log(2) ' 计算成本降低比例 Dim costReduction As Double costReduction = 1 - (firstUnitCost * quantity ^ learningEffect) / _ (firstUnitCost * quantity) CalculateLearningCurve = costReductionEnd Function' 生成报价建议Function GenerateRecommendation(productData As ProductData, totalCost As Double, _ quantity As Long, customerType As String, _ urgencyLevel As String, paymentTerms As String, _ historyData As Collection, wsConfig As Worksheet) As PricingRecommendation Dim recommendation As PricingRecommendation recommendation.ProductID = productData.ProductID recommendation.BaseCost = totalCost ' 1. 确定定价策略 Dim strategy As PricingStrategy strategy = DeterminePricingStrategy(productData, customerType, _ urgencyLevel, historyData, wsConfig) recommendation.Strategy = strategy ' 2. 基于历史数据计算价格区间 Dim historicalPrices As Collection Set historicalPrices = New Collection Dim transaction As Object For Each transaction In historyData historicalPrices.Add transaction("UnitPrice") Next transaction ' 3. 计算统计指标 Dim avgPrice As Double, minPrice As Double, maxPrice As Double Dim stdDev As Double If historicalPrices.Count > 0 Then avgPrice = CalculateAverage(historicalPrices) minPrice = CalculateMin(historicalPrices) maxPrice = CalculateMax(historicalPrices) stdDev = CalculateStandardDeviation(historicalPrices) Else ' 无历史数据,使用成本加成 avgPrice = totalCost * 1.3 minPrice = totalCost * 1.1 maxPrice = totalCost * 1.5 stdDev = totalCost * 0.1 End If ' 4. 根据策略调整价格 Dim basePrice As Double Select Case strategy Case psCostPlus ' 成本加成:成本 × (1 + 目标利润率) Dim targetMargin As Double targetMargin = wsConfig.Range("TargetMargin").Value basePrice = totalCost * (1 + targetMargin) Case psCompetitive ' 竞争性定价:参考市场平均价 Dim marketPosition As String marketPosition = productData.MarketPosition Select Case marketPosition Case "MarketLeader" basePrice = avgPrice * 1.1 Case "MarketFollower" basePrice = avgPrice * 0.95 Case "NewEntrant" basePrice = avgPrice * 0.85 Case Else basePrice = avgPrice End Select Case psValueBased ' 价值定价:基于客户感知价值 Dim valueMultiplier As Double valueMultiplier = CalculateValueMultiplier(productData, customerType, wsConfig) basePrice = totalCost * valueMultiplier Case psPenetration ' 渗透定价:低价获取市场份额 basePrice = totalCost * 1.1 Case psSkimming ' 撇脂定价:高价获取高利润 basePrice = totalCost * 1.8 End Select ' 5. 考虑数量因素 If quantity > 1000 Then basePrice = basePrice * 0.95 ElseIf quantity > 100 Then basePrice = basePrice * 0.98 End If ' 6. 考虑付款条件 Select Case paymentTerms Case "CashInAdvance" basePrice = basePrice * 0.98 Case "Net30" basePrice = basePrice * 1.0 Case "Net60" basePrice = basePrice * 1.02 Case "Net90" basePrice = basePrice * 1.05 End Select ' 7. 确定价格区间 recommendation.MinPrice = basePrice * 0.9 recommendation.RecommendedPrice = basePrice recommendation.MaxPrice = basePrice * 1.1 ' 8. 计算置信度 If historicalPrices.Count >= 10 Then recommendation.Confidence = 0.9 ElseIf historicalPrices.Count >= 5 Then recommendation.Confidence = 0.7 ElseIf historicalPrices.Count >= 2 Then recommendation.Confidence = 0.5 Else recommendation.Confidence = 0.3 End If ' 9. 生成推荐理由 recommendation.Reason = GenerateRecommendationReason(strategy, basePrice, _ totalCost, avgPrice) GenerateRecommendation = recommendationEnd Function' 确定定价策略Function DeterminePricingStrategy(productData As ProductData, _ customerType As String, urgencyLevel As String, _ historyData As Collection, wsConfig As Worksheet) As PricingStrategy ' 读取策略规则 Dim strategyRules As Range Set strategyRules = wsConfig.Range("PricingStrategyRules") Dim i As Long For i = 2 To strategyRules.Rows.Count Dim condition As String, strategyName As String condition = strategyRules.Cells(i, 1).Value strategyName = strategyRules.Cells(i, 2).Value ' 检查条件是否满足 If EvaluateStrategyCondition(condition, productData, customerType, _ urgencyLevel, historyData) Then ' 将策略名称转换为枚举 Select Case strategyName Case "CostPlus" DeterminePricingStrategy = psCostPlus Case "Competitive" DeterminePricingStrategy = psCompetitive Case "ValueBased" DeterminePricingStrategy = psValueBased Case "Penetration" DeterminePricingStrategy = psPenetration Case "Skimming" DeterminePricingStrategy = psSkimming Case Else DeterminePricingStrategy = psCostPlus End Select Exit Function End If Next i ' 默认策略 DeterminePricingStrategy = psCostPlusEnd Function' 输出报价建议Sub OutputRecommendation(ws As Worksheet, recommendation As PricingRecommendation, _ productID As String, quantity As Long) ' 设置表头 ws.Cells(1, 1).Value = "产品报价建议报告" ws.Cells(1, 1).Font.Size = 16 ws.Cells(1, 1).Font.Bold = True ' 基本信息 ws.Cells(3, 1).Value = "报价基本信息" ws.Cells(3, 1).Font.Bold = True ws.Cells(4, 1).Value = "产品ID:" ws.Cells(4, 2).Value = productID ws.Cells(5, 1).Value = "报价数量:" ws.Cells(5, 2).Value = quantity ws.Cells(6, 1).Value = "报价时间:" ws.Cells(6, 2).Value = Format(Now, "yyyy-mm-dd hh:mm:ss") ' 价格建议 ws.Cells(8, 1).Value = "价格建议" ws.Cells(8, 1).Font.Bold = True ws.Cells(9, 1).Value = "单位成本:" ws.Cells(9, 2).Value = recommendation.BaseCost ws.Cells(9, 2).NumberFormat = "¥#,##0.00" ws.Cells(10, 1).Value = "建议价格:" ws.Cells(10, 2).Value = recommendation.RecommendedPrice ws.Cells(10, 2).NumberFormat = "¥#,##0.00" ws.Cells(10, 2).Font.Color = RGB(0, 100, 0) ws.Cells(10, 2).Font.Bold = True ws.Cells(11, 1).Value = "价格区间:" ws.Cells(11, 2).Value = recommendation.MinPrice ws.Cells(11, 3).Value = "-" ws.Cells(11, 4).Value = recommendation.MaxPrice ws.Cells(11, 2).NumberFormat = "¥#,##0.00" ws.Cells(11, 4).NumberFormat = "¥#,##0.00" ws.Cells(12, 1).Value = "利润率:" Dim profitMargin As Double profitMargin = (recommendation.RecommendedPrice - recommendation.BaseCost) / _ recommendation.RecommendedPrice ws.Cells(12, 2).Value = profitMargin ws.Cells(12, 2).NumberFormat = "0.00%" ' 策略信息 ws.Cells(14, 1).Value = "定价策略" ws.Cells(14, 1).Font.Bold = True ws.Cells(15, 1).Value = "策略类型:" Select Case recommendation.Strategy Case psCostPlus ws.Cells(15, 2).Value = "成本加成法" Case psCompetitive ws.Cells(15, 2).Value = "竞争定价法" Case psValueBased ws.Cells(15, 2).Value = "价值定价法" Case psPenetration ws.Cells(15, 2).Value = "渗透定价法" Case psSkimming ws.Cells(15, 2).Value = "撇脂定价法" End Select ws.Cells(16, 1).Value = "置信度:" ws.Cells(16, 2).Value = recommendation.Confidence ws.Cells(16, 2).NumberFormat = "0.0%" ' 推荐理由 ws.Cells(18, 1).Value = "推荐理由" ws.Cells(18, 1).Font.Bold = True ws.Cells(19, 1).Value = recommendation.Reason ws.Cells(19, 1).WrapText = True ws.Range("A19:D22").Merge ws.Range("A19:D22").Borders.LineStyle = xlContinuous ' 自动调整列宽 ws.Columns.AutoFit ' 美化格式 Dim dataRange As Range Set dataRange = ws.Range("A1:D22") With dataRange .Borders.LineStyle = xlContinuous .Borders.Color = RGB(200, 200, 200) End WithEnd Sub
#!/usr/bin/env python3# -*- coding: utf-8 -*-"""智能市场报价推荐系统基于机器学习的动态定价与报价优化"""import osimport sysimport jsonimport loggingimport warningsimport numpy as npimport pandas as pdfrom datetime import datetime, timedeltafrom typing import List, Dict, Any, Optional, Tuple, Unionfrom dataclasses import dataclass, asdict, fieldfrom decimal import Decimalfrom pathlib import Pathimport pickleimport hashlibimport tracebackfrom enum import Enumimport copyimport itertoolsfrom collections import defaultdict, Counterimport reimport inspectimport textwrapfrom functools import lru_cacheimport asyncioimport aiohttpfrom concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutorimport requestsfrom requests.adapters import HTTPAdapterfrom urllib3.util.retry import Retry# 机器学习from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, OneHotEncoderfrom sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNetfrom sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, AdaBoostRegressorfrom sklearn.svm import SVRfrom sklearn.neural_network import MLPRegressorfrom sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV, TimeSeriesSplitfrom sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, mean_absolute_percentage_errorfrom sklearn.pipeline import Pipelinefrom sklearn.compose import ColumnTransformerfrom sklearn.feature_selection import RFE, SelectFromModel, VarianceThresholdfrom sklearn.impute import SimpleImputerfrom sklearn.base import BaseEstimator, TransformerMixin# 深度学习try: import tensorflow as tf from tensorflow.keras.models import Sequential, Model from tensorflow.keras.layers import Dense, LSTM, GRU, Dropout, Input, Conv1D, MaxPooling1D, Flatten, Embedding, Concatenate from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, TensorBoard from tensorflow.keras.optimizers import Adam, RMSprop from tensorflow.keras.losses import MeanSquaredError, MeanAbsoluteError, Huber TF_AVAILABLE = Trueexcept ImportError: TF_AVAILABLE = False logging.warning("TensorFlow not available, deep learning models disabled")# 强化学习try: import gym from stable_baselines3 import PPO, A2C, DQN from stable_baselines3.common.vec_env import DummyVecEnv RL_AVAILABLE = Trueexcept ImportError: RL_AVAILABLE = False logging.warning("Stable-Baselines3 not available, reinforcement learning disabled")# 优化算法import optunafrom scipy.optimize import minimize, differential_evolution, basinhopping# 统计分析import scipy.stats as statsfrom scipy import signalfrom scipy.special import expit# 时间序列import statsmodels.api as smfrom statsmodels.tsa.seasonal import seasonal_decompose, STLfrom statsmodels.tsa.stattools import adfuller, kpssfrom statsmodels.tsa.holtwinters import ExponentialSmoothingfrom statsmodels.tsa.arima.model import ARIMAfrom statsmodels.tsa.statespace.sarimax import SARIMAXfrom statsmodels.tsa.vector_ar.var_model import VAR# 自然语言处理import nltkfrom nltk.sentiment import SentimentIntensityAnalyzertry: nltk.data.find('vader_lexicon')except LookupError: nltk.download('vader_lexicon', quiet=True)# 可视化import matplotlib.pyplot as pltimport seaborn as snsfrom matplotlib import rcParamsimport plotly.express as pximport plotly.graph_objects as gofrom plotly.subplots import make_subplotsimport altair as alt# 数据库import sqlalchemyfrom sqlalchemy import create_engine, text, MetaData, Table, select, inspect as sqla_inspectfrom sqlalchemy.orm import sessionmaker, Sessionimport duckdbimport redis# 配置管理import yamlfrom pydantic import BaseModel, Field, validatorfrom dotenv import load_dotenvimport joblib# 日志配置logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('pricing_recommendation.log'), logging.StreamHandler() ])logger = logging.getLogger(__name__)warnings.filterwarnings('ignore')# 设置中文字体plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei']plt.rcParams['axes.unicode_minus'] = False# 加载环境变量load_dotenv()# 数据模型class PricingStrategy(Enum): """定价策略枚举""" COST_PLUS = "cost_plus" # 成本加成 COMPETITIVE = "competitive" # 竞争性定价 VALUE_BASED = "value_based" # 价值定价 PENETRATION = "penetration" # 渗透定价 SKIMMING = "skimming" # 撇脂定价 DYNAMIC = "dynamic" # 动态定价 BUNDLE = "bundle" # 捆绑定价 PSYCHOLOGICAL = "psychological" # 心理定价class CustomerSegment(Enum): """客户细分枚举""" ENTERPRISE = "enterprise" # 企业客户 SME = "sme" # 中小企业 INDIVIDUAL = "individual" # 个人客户 GOVERNMENT = "government" # 政府客户 EDUCATION = "education" # 教育客户 NON_PROFIT = "non_profit" # 非盈利组织class ProductLifecycle(Enum): """产品生命周期枚举""" INTRODUCTION = "introduction" # 导入期 GROWTH = "growth" # 成长期 MATURITY = "maturity" # 成熟期 DECLINE = "decline" # 衰退期@dataclassclass PricingContext: """定价上下文""" product_id: str customer_id: Optional[str] = None customer_segment: Optional[CustomerSegment] = None quantity: int = 1 urgency_level: str = "normal" # low, normal, high, urgent payment_terms: str = "net30" # 付款条件 delivery_date: Optional[datetime] = None special_requirements: List[str] = field(default_factory=list) competitive_situation: Optional[Dict[str, Any]] = None market_conditions: Optional[Dict[str, Any]] = None seasonality_factor: float = 1.0 def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "product_id": self.product_id, "customer_id": self.customer_id, "customer_segment": self.customer_segment.value if self.customer_segment else None, "quantity": self.quantity, "urgency_level": self.urgency_level, "payment_terms": self.payment_terms, "delivery_date": self.delivery_date.isoformat() if self.delivery_date else None, "special_requirements": self.special_requirements, "competitive_situation": self.competitive_situation, "market_conditions": self.market_conditions, "seasonality_factor": self.seasonality_factor }@dataclassclass PricingRecommendation: """报价建议""" recommended_price: float price_range: Tuple[float, float] confidence_score: float expected_profit_margin: float win_probability: float pricing_strategy: PricingStrategy rationale: str alternative_prices: List[Dict[str, Any]] = field(default_factory=list) sensitivity_analysis: Optional[Dict[str, Any]] = None negotiation_guidance: Optional[List[str]] = None def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "recommended_price": self.recommended_price, "price_range": list(self.price_range), "confidence_score": self.confidence_score, "expected_profit_margin": self.expected_profit_margin, "win_probability": self.win_probability, "pricing_strategy": self.pricing_strategy.value, "rationale": self.rationale, "alternative_prices": self.alternative_prices, "sensitivity_analysis": self.sensitivity_analysis, "negotiation_guidance": self.negotiation_guidance }class IntelligentPricingSystemConfig(BaseModel): """智能定价系统配置""" # 数据源配置 data_sources: Dict[str, Dict[str, Any]] = { "historical_transactions": {"type": "database", "connection_string": ""}, "cost_data": {"type": "api", "endpoint": ""}, "competitor_prices": {"type": "web_scraping", "sources": []}, "market_data": {"type": "api", "endpoint": ""} } # 模型配置 model_type: str = "gradient_boosting" # linear, random_forest, gradient_boosting, neural_network use_ensemble: bool = True ensemble_method: str = "stacking" # stacking, voting, blending # 特征工程配置 feature_lags: List[int] = [7, 30, 90, 365] # 滞后特征 rolling_windows: List[int] = [7, 30, 90] # 滚动窗口 use_text_features: bool = True use_time_features: bool = True use_interaction_features: bool = True # 优化目标配置 primary_objective: str = "profit_maximization" # profit_maximization, win_rate_maximization, balanced min_profit_margin: float = 0.1 max_profit_margin: float = 0.5 target_win_rate: float = 0.7 # 实时更新配置 update_frequency: str = "daily" # hourly, daily, weekly retrain_threshold: int = 1000 # 新样本数量达到此值时重新训练 # 系统配置 cache_enabled: bool = True cache_ttl: int = 3600 output_dir: str = "output/pricing_recommendations" model_save_path: str = "models/pricing_model" class Config: arbitrary_types_allowed = Trueclass IntelligentPricingSystem: """智能定价系统""" def __init__(self, config_path: str = "config/pricing_config.yaml"): """初始化""" self.config = self._load_config(config_path) self.data_collector = PricingDataCollector(self.config) self.feature_engineer = PricingFeatureEngineer(self.config) self.model_trainer = PricingModelTrainer(self.config) self.price_optimizer = PriceOptimizer(self.config) self.recommendation_generator = RecommendationGenerator(self.config) # 加载或训练模型 self.model = self._load_or_train_model() # 初始化缓存 if self.config.cache_enabled: self.cache = {} logger.info(f"智能定价系统初始化完成,模型类型:{self.config.model_type}") def _load_config(self, config_path: str) -> IntelligentPricingSystemConfig: """加载配置""" with open(config_path, 'r', encoding='utf-8') as f: config_data = yaml.safe_load(f) return IntelligentPricingSystemConfig(**config_data) def _load_or_train_model(self) -> Any: """加载或训练模型""" model_path = Path(self.config.model_save_path) if model_path.exists(): try: model = joblib.load(model_path) logger.info(f"从 {model_path} 加载已训练模型") return model except Exception as e: logger.warning(f"加载模型失败,将重新训练:{e}") # 训练新模型 logger.info("开始训练定价模型...") model = self.model_trainer.train_model() # 保存模型 model_path.parent.mkdir(parents=True, exist_ok=True) joblib.dump(model, model_path) logger.info(f"模型已保存到 {model_path}") return model async def get_price_recommendation( self, context: PricingContext, use_cache: bool = True ) -> PricingRecommendation: """获取价格建议""" start_time = datetime.now() try: # 1. 检查缓存 cache_key = self._generate_cache_key(context) if use_cache and self.config.cache_enabled and cache_key in self.cache: cached_result = self.cache[cache_key] if datetime.now() - cached_result["timestamp"] < timedelta(seconds=self.config.cache_ttl): logger.info(f"价格建议命中缓存:{cache_key}") return cached_result["recommendation"] # 2. 收集数据 logger.info(f"为产品 {context.product_id} 收集定价数据") all_data = await self.data_collector.collect_pricing_data(context) # 3. 特征工程 features = await self.feature_engineer.create_features(all_data, context) # 4. 价格预测 base_price = self._predict_price(features) # 5. 价格优化 optimized_price = await self.price_optimizer.optimize_price( base_price, features, context, all_data ) # 6. 生成建议 recommendation = await self.recommendation_generator.generate_recommendation( optimized_price, features, context, all_data ) # 7. 更新缓存 if use_cache and self.config.cache_enabled: self.cache[cache_key] = { "timestamp": datetime.now(), "recommendation": recommendation } execution_time = (datetime.now() - start_time).total_seconds() logger.info(f"价格建议生成完成,耗时:{execution_time:.2f}秒,建议价格:{recommendation.recommended_price}") return recommendation except Exception as e: logger.error(f"获取价格建议失败:{str(e)}") logger.error(traceback.format_exc()) raise def _generate_cache_key(self, context: PricingContext) -> str: """生成缓存键""" context_dict = context.to_dict() context_str = json.dumps(context_dict, sort_keys=True, default=str) return hashlib.md5(context_str.encode('utf-8')).hexdigest() def _predict_price(self, features: pd.DataFrame) -> float: """预测价格""" try: # 确保特征顺序与训练时一致 if hasattr(self.model, 'feature_names_in_'): expected_features = self.model.feature_names_in_ missing_features = set(expected_features) - set(features.columns) extra_features = set(features.columns) - set(expected_features) if missing_features: for feature in missing_features: features[feature] = 0 if extra_features: features = features[expected_features] # 预测 prediction = self.model.predict(features)[0] return float(prediction) except Exception as e: logger.error(f"价格预测失败:{str(e)}") # 返回默认值 return features.get("base_cost", 100) * 1.3 # 成本加30% async def batch_recommendations( self, contexts: List[PricingContext], parallel: bool = True ) -> List[PricingRecommendation]: """批量获取价格建议""" if not parallel or len(contexts) <= 1: # 顺序处理 recommendations = [] for context in contexts: recommendation = await self.get_price_recommendation(context) recommendations.append(recommendation) return recommendations # 并行处理 async def process_context(context): return await self.get_price_recommendation(context) tasks = [process_context(context) for context in contexts] recommendations = await asyncio.gather(*tasks, return_exceptions=True) # 处理异常 valid_recommendations = [] for i, result in enumerate(recommendations): if isinstance(result, Exception): logger.error(f"处理上下文 {i} 失败:{result}") # 创建默认建议 default_context = contexts[i] default_price = 100 * 1.3 # 默认价格 default_recommendation = PricingRecommendation( recommended_price=default_price, price_range=(default_price * 0.9, default_price * 1.1), confidence_score=0.3, expected_profit_margin=0.3, win_probability=0.5, pricing_strategy=PricingStrategy.COST_PLUS, rationale="系统生成默认建议" ) valid_recommendations.append(default_recommendation) else: valid_recommendations.append(result) return valid_recommendationsclass PricingDataCollector: """定价数据收集器""" def __init__(self, config: IntelligentPricingSystemConfig): self.config = config self.http_session = self._create_http_session() def _create_http_session(self) -> aiohttp.ClientSession: """创建HTTP会话""" timeout = aiohttp.ClientTimeout(total=30) return aiohttp.ClientSession(timeout=timeout) async def collect_pricing_data(self, context: PricingContext) -> Dict[str, Any]: """收集定价数据""" all_data = {} # 并行收集各种数据 tasks = [] # 历史交易数据 tasks.append(self._collect_historical_data(context)) # 成本数据 tasks.append(self._collect_cost_data(context)) # 竞争对手数据 tasks.append(self._collect_competitor_data(context)) # 市场数据 tasks.append(self._collect_market_data(context)) # 客户数据(如果有客户ID) if context.customer_id: tasks.append(self._collect_customer_data(context)) # 等待所有数据收集完成 results = await asyncio.gather(*tasks, return_exceptions=True) # 处理结果 data_sources = [ "historical_data", "cost_data", "competitor_data", "market_data", "customer_data" ] for i, result in enumerate(results): if i < len(data_sources) and not isinstance(result, Exception): all_data[data_sources[i]] = result return all_data async def _collect_historical_data(self, context: PricingContext) -> pd.DataFrame: """收集历史交易数据""" try: # 这里应该从数据库查询 # 示例:模拟从数据库获取数据 query = f""" SELECT transaction_id, product_id, customer_id, customer_segment, quantity, unit_price, total_amount, cost, profit_margin, transaction_date, salesperson_id, region, payment_terms, delivery_days, special_requirements, win_loss FROM historical_transactions WHERE product_id = :product_id ORDER BY transaction_date DESC LIMIT 10000 """ # 模拟数据 dates = pd.date_range(end=datetime.now(), periods=365, freq='D') data = { 'transaction_date': dates, 'product_id': [context.product_id] * 365, 'customer_segment': np.random.choice(['enterprise', 'sme', 'individual'], 365), 'quantity': np.random.randint(1, 1000, 365), 'unit_price': np.random.uniform(50, 500, 365), 'cost': np.random.uniform(30, 300, 365), 'profit_margin': np.random.uniform(0.1, 0.5, 365), 'region': np.random.choice(['north', 'south', 'east', 'west'], 365), 'payment_terms': np.random.choice(['net30', 'net60', 'cash'], 365), 'delivery_days': np.random.randint(1, 30, 365), 'win_loss': np.random.choice([0, 1], 365, p=[0.3, 0.7]) } df = pd.DataFrame(data) # 如果有客户细分,过滤相关数据 if context.customer_segment: segment_filter = df['customer_segment'] == context.customer_segment.value df = df[segment_filter] logger.info(f"收集到 {len(df)} 条历史交易数据") return df except Exception as e: logger.warning(f"收集历史数据失败:{e}") return pd.DataFrame() async def _collect_cost_data(self, context: PricingContext) -> Dict[str, Any]: """收集成本数据""" try: # 这里应该从ERP系统或成本数据库获取 # 模拟数据 cost_data = { 'material_cost': 45.0, 'labor_cost': 25.0, 'overhead_cost': 15.0, 'total_cost': 85.0, 'cost_breakdown': { 'raw_materials': 30.0, 'components': 15.0, 'direct_labor': 20.0, 'indirect_labor': 5.0, 'manufacturing_overhead': 10.0, 'shipping': 5.0 }, 'cost_trend': 'stable', # increasing, decreasing, stable 'cost_volatility': 0.05, 'last_updated': datetime.now().isoformat() } # 考虑数量折扣 if context.quantity > 1000: cost_data['total_cost'] *= 0.9 elif context.quantity > 100: cost_data['total_cost'] *= 0.95 return cost_data except Exception as e: logger.warning(f"收集成本数据失败:{e}") return {} async def _collect_competitor_data(self, context: PricingContext) -> List[Dict[str, Any]]: """收集竞争对手数据""" try: # 这里应该从竞争情报系统或网络爬虫获取 # 模拟数据 competitors = [ { 'competitor_id': 'COMP001', 'competitor_name': '竞争对手A', 'price': 120.0, 'last_updated': (datetime.now() - timedelta(days=1)).isoformat(), 'reliability': 0.9, 'promotions': ['free_shipping', 'volume_discount'], 'rating': 4.5 }, { 'competitor_id': 'COMP002', 'competitor_name': '竞争对手B', 'price': 115.0, 'last_updated': (datetime.now() - timedelta(days=2)).isoformat(), 'reliability': 0.8, 'promotions': ['cash_discount'], 'rating': 4.2 }, { 'competitor_id': 'COMP003', 'competitor_name': '竞争对手C', 'price': 125.0, 'last_updated': datetime.now().isoformat(), 'reliability': 0.95, 'promotions': [], 'rating': 4.7 } ] return competitors except Exception as e: logger.warning(f"收集竞争对手数据失败:{e}") return [] async def _collect_market_data(self, context: PricingContext) -> Dict[str, Any]: """收集市场数据""" try: # 这里应该从市场数据API获取 # 模拟数据 market_data = { 'market_demand': 0.8, # 0-1,市场需求强度 'market_supply': 0.7, # 0-1,市场供给强度 'price_elasticity': -1.5, # 价格弹性 'seasonality_index': 1.2, # 季节性指数 'economic_indicator': 0.6, # 经济景气指数 'industry_growth_rate': 0.08, # 行业增长率 'substitute_availability': 0.3, # 替代品可获得性 'collected_at': datetime.now().isoformat() } return market_data except Exception as e: logger.warning(f"收集市场数据失败:{e}") return {} async def _collect_customer_data(self, context: PricingContext) -> Dict[str, Any]: """收集客户数据""" try: if not context.customer_id: return {} # 这里应该从CRM系统获取 # 模拟数据 customer_data = { 'customer_id': context.customer_id, 'customer_lifetime_value': 50000.0, 'purchase_frequency': 12, # 年采购次数 'average_order_value': 2500.0, 'last_purchase_date': (datetime.now() - timedelta(days=30)).isoformat(), 'total_spent': 30000.0, 'payment_history': 'good', # good, average, poor 'preferred_payment_terms': 'net30', 'price_sensitivity': 0.6, # 0-1,价格敏感度 'loyalty_score': 0.8, # 忠诚度得分 'negotiation_style': 'collaborative' # collaborative, competitive, accommodating } return customer_data except Exception as e: logger.warning(f"收集客户数据失败:{e}") return {}class PricingFeatureEngineer: """定价特征工程""" def __init__(self, config: IntelligentPricingSystemConfig): self.config = config self.scaler = StandardScaler() self.encoders = {} async def create_features( self, data: Dict[str, Any], context: PricingContext ) -> pd.DataFrame: """创建特征""" features = {} # 1. 基础特征 features.update(self._create_basic_features(context)) # 2. 历史特征 if 'historical_data' in data and not data['historical_data'].empty: features.update(self._create_historical_features(data['historical_data'])) # 3. 成本特征 if 'cost_data' in data and data['cost_data']: features.update(self._create_cost_features(data['cost_data'])) # 4. 竞争特征 if 'competitor_data' in data: features.update(self._create_competitor_features(data['competitor_data'])) # 5. 市场特征 if 'market_data' in data and data['market_data']: features.update(self._create_market_features(data['market_data'])) # 6. 客户特征 if 'customer_data' in data and data['customer_data']: features.update(self._create_customer_features(data['customer_data'])) # 7. 时间特征 features.update(self._create_time_features()) # 8. 交互特征 features.update(self._create_interaction_features(features)) # 转换为DataFrame df = pd.DataFrame([features]) # 处理缺失值 df = self._handle_missing_values(df) # 特征缩放 if hasattr(self, 'scaler_fitted'): df = self._scale_features(df) logger.info(f"创建了 {len(df.columns)} 个特征") return df def _create_basic_features(self, context: PricingContext) -> Dict[str, Any]: """创建基础特征""" features = { 'product_id_hash': int(hashlib.md5(context.product_id.encode()).hexdigest()[:8], 16), 'quantity': context.quantity, 'quantity_log': np.log1p(context.quantity), 'urgency_level': {'low': 0, 'normal': 1, 'high': 2, 'urgent': 3}.get(context.urgency_level, 1), 'payment_terms': {'cash': 0, 'net15': 1, 'net30': 2, 'net60': 3, 'net90': 4}.get(context.payment_terms, 2), 'has_special_requirements': len(context.special_requirements) > 0, 'special_requirements_count': len(context.special_requirements), 'seasonality_factor': context.seasonality_factor } # 客户细分编码 if context.customer_segment: segment_mapping = { CustomerSegment.ENTERPRISE: 4, CustomerSegment.GOVERNMENT: 3, CustomerSegment.EDUCATION: 2, CustomerSegment.SME: 1, CustomerSegment.INDIVIDUAL: 0 } features['customer_segment'] = segment_mapping.get(context.customer_segment, 0) return features def _create_historical_features(self, historical_data: pd.DataFrame) -> Dict[str, Any]: """创建历史特征""" features = {} if historical_data.empty: return features # 基本统计 features['historical_avg_price'] = historical_data['unit_price'].mean() features['historical_median_price'] = historical_data['unit_price'].median() features['historical_price_std'] = historical_data['unit_price'].std() features['historical_min_price'] = historical_data['unit_price'].min() features['historical_max_price'] = historical_data['unit_price'].max() features['historical_avg_quantity'] = historical_data['quantity'].mean() features['historical_avg_profit_margin'] = historical_data['profit_margin'].mean() # 赢率 if 'win_loss' in historical_data.columns: win_rate = historical_data['win_loss'].mean() features['historical_win_rate'] = win_rate # 时间窗口特征 recent_data = historical_data[historical_data['transaction_date'] > (datetime.now() - timedelta(days=30))] if not recent_data.empty: features['recent_avg_price'] = recent_data['unit_price'].mean() features['recent_win_rate'] = recent_data.get('win_loss', pd.Series([1])).mean() # 趋势特征 if len(historical_data) >= 10: # 计算价格趋势 prices = historical_data.sort_values('transaction_date')['unit_price'].values if len(prices) >= 2: # 简单线性趋势 x = np.arange(len(prices)) slope, intercept = np.polyfit(x, prices, 1) features['price_trend_slope'] = slope return features def _create_cost_features(self, cost_data: Dict[str, Any]) -> Dict[str, Any]: """创建成本特征""" features = { 'total_cost': cost_data.get('total_cost', 0), 'material_cost': cost_data.get('material_cost', 0), 'labor_cost': cost_data.get('labor_cost', 0), 'overhead_cost': cost_data.get('overhead_cost', 0), 'cost_volatility': cost_data.get('cost_volatility', 0) } # 成本结构比率 if cost_data.get('total_cost', 0) > 0: features['material_cost_ratio'] = features['material_cost'] / features['total_cost'] features['labor_cost_ratio'] = features['labor_cost'] / features['total_cost'] # 成本趋势 cost_trend = cost_data.get('cost_trend', 'stable') features['cost_trend_increasing'] = 1 if cost_trend == 'increasing' else 0 features['cost_trend_decreasing'] = 1 if cost_trend == 'decreasing' else 0 return features def _create_competitor_features(self, competitor_data: List[Dict[str, Any]]) -> Dict[str, Any]: """创建竞争特征""" features = {} if not competitor_data: return features # 提取价格 prices = [comp.get('price', 0) for comp in competitor_data if comp.get('price', 0) > 0] if prices: features['competitor_avg_price'] = np.mean(prices) features['competitor_min_price'] = np.min(prices) features['competitor_max_price'] = np.max(prices) features['competitor_price_range'] = features['competitor_max_price'] - features['competitor_min_price'] # 价格位置 sorted_prices = np.sort(prices) if len(sorted_prices) >= 3: features['competitor_price_25th'] = np.percentile(prices, 25) features['competitor_price_50th'] = np.percentile(prices, 50) features['competitor_price_75th'] = np.percentile(prices, 75) return features def _create_market_features(self, market_data: Dict[str, Any]) -> Dict[str, Any]: """创建市场特征""" features = { 'market_demand': market_data.get('market_demand', 0.5), 'market_supply': market_data.get('market_supply', 0.5), 'price_elasticity': market_data.get('price_elasticity', -1.0), 'seasonality_index': market_data.get('seasonality_index', 1.0), 'economic_indicator': market_data.get('economic_indicator', 0.5), 'industry_growth_rate': market_data.get('industry_growth_rate', 0.0), 'substitute_availability': market_data.get('substitute_availability', 0.5) } # 供需平衡 features['supply_demand_ratio'] = ( features['market_supply'] / features['market_demand'] if features['market_demand'] > 0 else 1.0 ) return features def _create_customer_features(self, customer_data: Dict[str, Any]) -> Dict[str, Any]: """创建客户特征""" features = { 'customer_lifetime_value': customer_data.get('customer_lifetime_value', 0), 'purchase_frequency': customer_data.get('purchase_frequency', 0), 'average_order_value': customer_data.get('average_order_value', 0), 'total_spent': customer_data.get('total_spent', 0), 'price_sensitivity': customer_data.get('price_sensitivity', 0.5), 'loyalty_score': customer_data.get('loyalty_score', 0.5) } # 计算客户价值得分 if features['average_order_value'] > 0 and features['purchase_frequency'] > 0: features['customer_value_score'] = ( np.log1p(features['customer_lifetime_value']) * features['purchase_frequency'] * features['loyalty_score'] ) # 付款历史编码 payment_history = customer_data.get('payment_history', 'average') payment_scores = {'good': 2, 'average': 1, 'poor': 0} features['payment_history_score'] = payment_scores.get(payment_history, 1) return features def _create_time_features(self) -> Dict[str, Any]: """创建时间特征""" now = datetime.now() features = { 'day_of_week': now.weekday(), 'day_of_month': now.day, 'week_of_year': now.isocalendar()[1], 'month': now.month, 'quarter': (now.month - 1) // 3 + 1, 'is_weekend': 1 if now.weekday() >= 5 else 0, 'hour_of_day': now.hour } # 季节性特征(正弦/余弦编码) day_of_year = now.timetuple().tm_yday features['day_sin'] = np.sin(2 * np.pi * day_of_year / 365) features['day_cos'] = np.cos(2 * np.pi * day_of_year / 365) return features def _create_interaction_features(self, features: Dict[str, Any]) -> Dict[str, Any]: """创建交互特征""" interaction_features = {} # 成本与竞争价格比率 if 'total_cost' in features and 'competitor_avg_price' in features: if features['total_cost'] > 0: interaction_features['cost_competitor_ratio'] = ( features['total_cost'] / features['competitor_avg_price'] ) # 数量与历史平均数量比率 if 'quantity' in features and 'historical_avg_quantity' in features: if features['historical_avg_quantity'] > 0: interaction_features['quantity_ratio'] = ( features['quantity'] / features['historical_avg_quantity'] ) # 市场条件与价格敏感性交互 if 'market_demand' in features and 'price_sensitivity' in features: interaction_features['demand_sensitivity_interaction'] = ( features['market_demand'] * features['price_sensitivity'] ) return interaction_features def _handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame: """处理缺失值""" # 数值列用中位数填充 numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: if df[col].isnull().any(): median_val = df[col].median() df[col] = df[col].fillna(median_val) return df def _scale_features(self, df: pd.DataFrame) -> pd.DataFrame: """特征缩放""" numeric_cols = df.select_dtypes(include=[np.number]).columns if not hasattr(self, 'scaler_fitted'): self.scaler.fit(df[numeric_cols]) self.scaler_fitted = True df[numeric_cols] = self.scaler.transform(df[numeric_cols]) return df# 主程序if __name__ == "__main__": # 使用示例 import asyncio async def main(): # 初始化系统 pricing_system = IntelligentPricingSystem("config/pricing_config.yaml") # 创建定价上下文 context = PricingContext( product_id="PROD001", customer_id="CUST123", customer_segment=CustomerSegment.ENTERPRISE, quantity=500, urgency_level="high", payment_terms="net30", delivery_date=datetime.now() + timedelta(days=14), special_requirements=["fast_delivery", "custom_packaging"], seasonality_factor=1.2 ) # 获取价格建议 recommendation = await pricing_system.get_price_recommendation(context) print("=" * 60) print("智能报价推荐结果") print("=" * 60) print(f"产品ID: {context.product_id}") print(f"建议价格: ¥{recommendation.recommended_price:,.2f}") print(f"价格区间: [¥{recommendation.price_range[0]:,.2f}, ¥{recommendation.price_range[1]:,.2f}]") print(f"预期利润率: {recommendation.expected_profit_margin:.1%}") print(f"成交概率: {recommendation.win_probability:.1%}") print(f"置信度: {recommendation.confidence_score:.1%}") print(f"定价策略: {recommendation.pricing_strategy.value}") print(f"推荐理由: {recommendation.rationale}") print("=" * 60) if recommendation.negotiation_guidance: print("\n谈判指导:") for guidance in recommendation.negotiation_guidance: print(f" • {guidance}") # 运行 asyncio.run(main())