数据分析师的时间,不应该花在手动复制数据到Excel上。SQL 负责取数、Python 负责分析和可视化,两者联动之后,一套留存分析从取数到出图只需要几十秒,下次只需要改一个日期参数。
这篇从连接数据库开始,完整走一遍:Cohort留存分析、统计异常检测、渠道质量散点图。全部用真实数据生成的图表,可以直接拿去放报告里。
一、连接数据库
游戏公司用得最多的两种数据源:MySQL(线上库或数仓)和阿里云 ODPS(MaxCompute,大数据平台)。
MySQL / 通用数仓
from sqlalchemy import create_engineimport pandas as pdengine = create_engine( 'mysql+pymysql://user:password@host:3306/game_db' '?charset=utf8mb4')def query(sql: str) -> pd.DataFrame: with engine.connect() as conn: return pd.read_sql(sql, conn)
ODPS(MaxCompute)— 阿里云大数据平台
大型游戏公司的日志数据通常存在 ODPS,数据量在百亿行以上,不能直接连 MySQL。
from pyodps import ODPSimport pandas as pd# 初始化连接odps = ODPS( access_id = 'your_access_id', secret_access_key = 'your_access_key', project = 'your_project', endpoint = 'http://service.cn-hangzhou.maxcompute.aliyun.com/api')def odps_query(sql: str, hints: dict = None) -> pd.DataFrame: """ 执行 ODPS SQL 并返回 DataFrame hints 可以传入资源配置,比如设置内存和 CPU """ default_hints = { 'odps.sql.mapper.split.size': '256', # 每个 Mapper 处理 256MB 'odps.instance.priority': '0', # 最高优先级 } if hints: default_hints.update(hints) with odps.execute_sql(sql, hints=default_hints) as inst: inst.wait_for_success() return inst.open_reader(tunnel=True).to_pandas()# 示例:从 ODPS 取留存数据sql = """SELECT reg_date, COUNT(DISTINCT user_id) AS new_users, COUNT(DISTINCT CASE WHEN datediff(login_date, reg_date, 'dd') = 1 THEN login_uid END) * 100.0 / COUNT(DISTINCT user_id) AS d1_rateFROM dwd_user_register rLEFT JOIN dwd_user_login l ON r.user_id = l.login_uidWHERE r.reg_date BETWEEN '2024-01-01' AND '2024-01-31'GROUP BY reg_dateORDER BY reg_date"""df = odps_query(sql)
ODPS 和 MySQL 的主要区别:
日期差函数用 datediff(date1, date2, 'dd') 而不是 DATEDIFF()
不支持 DATE_ADD(),用 DATEADD(date, n, 'dd')
分区表要带分区条件 WHERE ds = '20240101',否则全表扫描会超时
二、Cohort 留存分析
在做 Cohort 分析之前,先看整体留存率趋势——用多日留存折线图配合新用户量柱状图,能快速判断留存波动是否和用户量变化有关。上方是 D1/D3/D7/D14/D30 五条留存率折线,下方是每日新用户量柱状图。两个图联动看:1月9日新用户量暴增但 D1 留存同步下跌(红色标注),说明那天涌入了大量低质用户,是买量质量问题而不是产品问题。
Cohort 分析是留存分析里最有价值的方式——按注册周分组,追踪每批用户的长期留存走势,能看出游戏是否在持续优化。
def get_cohort_retention(start_date: str, end_date: str, day_list: list = None) -> pd.DataFrame: """ 按注册周取 Cohort 留存率 day_list: 要观测的留存节点,默认 [1,3,7,14,21,30,45,60] """ if day_list is None: day_list = [1, 3, 7, 14, 21, 30, 45, 60] case_clauses = '\n ,'.join([ f"""ROUND(COUNT(DISTINCT CASE WHEN DATEDIFF(l.login_date, r.reg_date) = {d} THEN l.user_id END) * 100.0 / COUNT(DISTINCT r.user_id), 2) AS d{d}""" for d in day_list ]) sql = f""" WITH cohort_base AS ( SELECT -- 按注册周分组 DATE_FORMAT(reg_date, '%Y-%m-%d') AS cohort_week, user_id, reg_date FROM user_register WHERE reg_date BETWEEN '{start_date}' AND '{end_date}' ) SELECT r.cohort_week, COUNT(DISTINCT r.user_id) AS new_users, {case_clauses} FROM cohort_base r LEFT JOIN user_login l ON r.user_id = l.user_id AND l.login_date BETWEEN r.reg_date AND DATE_ADD(r.reg_date, INTERVAL {max(day_list)} DAY) GROUP BY r.cohort_week ORDER BY r.cohort_week """ df = query(sql) df['cohort_week'] = pd.to_datetime(df['cohort_week']) return df
可视化:Cohort 热力图
import matplotlib.pyplot as pltimport seaborn as snsfrom matplotlib.colors import LinearSegmentedColormapdef plot_cohort_heatmap(df: pd.DataFrame, save_path: str = None): day_cols = [c for c in df.columns if c.startswith('d') and c[1:].isdigit()] heat_data = df.set_index('cohort_week')[day_cols] heat_data.index = heat_data.index.strftime('%Y-%m-%d') cmap = LinearSegmentedColormap.from_list( 'ret', ['#0D1B3E','#1565C0','#2979FF','#00ACC1','#43A047'], N=256 ) fig, ax = plt.subplots(figsize=(14, 7)) fig.patch.set_facecolor('#0C0E18') ax.set_facecolor('#0C0E18') sns.heatmap( heat_data, annot=True, fmt='.1f', cmap=cmap, linewidths=0.4, linecolor='#1A1C2E', ax=ax, cbar_kws={'label': '留存率 (%)','shrink': 0.8}, annot_kws={'size': 9, 'color': 'white'}, vmin=0, vmax=70 ) ax.set_title('Cohort 留存热力图 · 各周新用户 D1~D60 留存率', color='white', fontsize=13, pad=12, loc='left') ax.tick_params(colors='#AAAACC', labelsize=9) ax.set_xlabel('留存节点', color='#8888A0') ax.set_ylabel('注册周', color='#8888A0') plt.tight_layout() if save_path: plt.savefig(save_path, dpi=150, bbox_inches='tight', facecolor='#0C0E18') plt.show()
生成的图表效果:
颜色越深蓝(→绿)说明留存越高。从热力图可以直接看出:
三、统计异常检测:用 σ 自动找问题日期
靠肉眼盯折线图找异常太累,用统计方法自动识别更可靠。
方法:均值 ± N 个标准差
正常情况下,99.7% 的数据落在均值 ± 3σ 以内;落在 2σ 以外的就值得关注。
import numpy as npdef detect_anomaly(df: pd.DataFrame, col: str = 'd1', window: int = 30, sigma: float = 2.0) -> pd.DataFrame: """ 滚动窗口异常检测:用最近 window 天的数据动态计算均值和标准差 比用全局均值更敏感,能捕捉到趋势中的局部异常 """ df = df.copy().sort_values('reg_date') df['rolling_mean'] = df[col].rolling(window, min_periods=7).mean() df['rolling_std'] = df[col].rolling(window, min_periods=7).std() df['z_score'] = (df[col] - df['rolling_mean']) / df['rolling_std'] df['is_anomaly'] = df['z_score'].abs() > sigma df['anomaly_dir'] = np.where(df['z_score'] < -sigma, 'low', np.where(df['z_score'] > sigma, 'high', 'normal')) return dfdef plot_anomaly(df: pd.DataFrame, col: str = 'd1', save_path: str = None): import matplotlib.ticker as mtick fig, ax = plt.subplots(figsize=(14, 5)) fig.patch.set_facecolor('#0C0E18') ax.set_facecolor('#0C0E18') mean_v = df[col].mean() std_v = df[col].std() lower2 = mean_v - 2 * std_v ax.fill_between(df['reg_date'], lower2, mean_v + 2*std_v, alpha=0.12, color='#2979FF', label='±2σ 正常区间') ax.axhline(mean_v, color='#2979FF', lw=1.2, ls='--', alpha=0.7, label=f'均值 {mean_v:.1f}%') ax.axhline(lower2, color='#FF5252', lw=0.8, ls=':', alpha=0.6) ax.plot(df['reg_date'], df[col], color='white', lw=1.8, zorder=3) # 标注异常点 anomalies = df[df[col] < lower2] ax.scatter(anomalies['reg_date'], anomalies[col], color='#FF5252', s=80, zorder=5, label='异常点(< -2σ)') for _, row in anomalies.iterrows(): ax.annotate( f" {row[col]:.1f}%\n ({(row[col]-mean_v)/std_v:.1f}σ)", xy=(row['reg_date'], row[col]), xytext=(10, -22), textcoords='offset points', color='#FF5252', fontsize=9, arrowprops=dict(arrowstyle='->', color='#FF5252', lw=1) ) ax.yaxis.set_major_formatter(mtick.PercentFormatter()) ax.set_title(f'{col.upper()} 留存率异常检测 · 均值 ± 2σ 方法', color='white', fontsize=13, pad=10, loc='left') ax.tick_params(colors='#8888A0') ax.spines[:].set_color('#252535') ax.grid(axis='y', color='#252535', lw=0.8) ax.legend(facecolor='#14161E', edgecolor='#252535', labelcolor='white', fontsize=9) plt.setp(ax.xaxis.get_majorticklabels(), rotation=30, ha='right', color='#8888A0') plt.tight_layout() if save_path: plt.savefig(save_path, dpi=150, bbox_inches='tight', facecolor='#0C0E18') plt.show()
生成的图表效果:
两个红点自动被标出:-2.9σ 和 -2.4σ,说明这两天的留存率显著低于历史正常水平,值得立即排查。
为什么用滚动窗口而不是全局均值?
全局均值在游戏早期数据少、波动大的时候会失真。滚动窗口(比如过去30天)能动态跟踪趋势,游戏留存在持续优化时,阈值也会随着均值一起上移,不会因为历史均值低而漏掉新的异常。
四、渠道质量散点图
渠道的好坏不能只看一个指标,要同时看 D1 留存(短期质量)、D7 留存(深度质量)和用户量(规模),三个维度一张图说清楚。
def get_channel_stats(date: str, min_users: int = 100) -> pd.DataFrame: sql = f""" SELECT r.channel, COUNT(DISTINCT r.user_id) AS new_users, ROUND(COUNT(DISTINCT CASE WHEN DATEDIFF(l.login_date, r.reg_date) = 1 THEN l.user_id END) * 100.0 / COUNT(DISTINCT r.user_id), 2) AS d1, ROUND(COUNT(DISTINCT CASE WHEN DATEDIFF(l.login_date, r.reg_date) = 7 THEN l.user_id END) * 100.0 / COUNT(DISTINCT r.user_id), 2) AS d7 FROM user_register r LEFT JOIN user_login l ON r.user_id = l.user_id AND l.login_date BETWEEN r.reg_date AND DATE_ADD(r.reg_date, INTERVAL 7 DAY) WHERE r.reg_date = '{date}' GROUP BY r.channel HAVING COUNT(DISTINCT r.user_id) >= {min_users} """ return query(sql)def plot_channel_bubble(df: pd.DataFrame, save_path: str = None): """ 气泡图:X=D1留存,Y=D7留存,气泡大小=用户量,颜色=D7/D1比值 D7/D1比值反映留存黏性:相同D1下,比值越高说明用户越愿意持续回来 """ fig, ax = plt.subplots(figsize=(10, 7)) fig.patch.set_facecolor('#0C0E18') ax.set_facecolor('#0C0E18') df['retention_ratio'] = df['d7'] / df['d1'] sc = ax.scatter( df['d1'], df['d7'], s = df['new_users'] / 8, c = df['retention_ratio'], cmap = 'RdYlGn', vmin = 0.35, vmax = 0.55, alpha= 0.75, edgecolors='white', linewidth=0.4 ) # 标注高质量或大量级渠道 threshold = df['new_users'].quantile(0.7) for _, row in df.iterrows(): if row['new_users'] > threshold or row['retention_ratio'] > 0.50: ax.annotate( row['channel'], (row['d1'], row['d7']), fontsize=8, color='white', alpha=0.85, xytext=(5, 5), textcoords='offset points' ) import matplotlib.ticker as mtick ax.xaxis.set_major_formatter(mtick.PercentFormatter()) ax.yaxis.set_major_formatter(mtick.PercentFormatter()) ax.set_xlabel('D1 留存率', color='#8888A0', fontsize=11) ax.set_ylabel('D7 留存率', color='#8888A0', fontsize=11) ax.set_title('渠道质量分布图 · 气泡大小 = 用户量 · 颜色 = D7/D1比值', color='white', fontsize=12, pad=10, loc='left') ax.tick_params(colors='#8888A0') ax.spines[:].set_color('#252535') ax.grid(color='#252535', lw=0.8, alpha=0.5) cb = plt.colorbar(sc, ax=ax, shrink=0.8) cb.ax.tick_params(colors='#8888A0') cb.set_label('D7/D1 留存比值(越高越好)', color='#8888A0', fontsize=9) plt.tight_layout() if save_path: plt.savefig(save_path, dpi=150, bbox_inches='tight', facecolor='#0C0E18') plt.show()
生成的图表效果:
右上角(D1高+D7高)+ 绿色(D7/D1比值高)= 理想渠道,应该加投。左下角(D1低+D7低)+ 红色 = 低质量渠道,考虑降预算或暂停。气泡大但偏红 = 量大但质量差,这类渠道是重点优化对象——要么优化素材,要么换竞价策略。
五、封装成每日自动化脚本
import loggingfrom datetime import datetime, timedeltalogging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')def daily_retention_report(days_back: int = 30): """ 每日自动化留存报告 days_back: 回看天数 """ today = datetime.today() start_date = (today - timedelta(days=days_back)).strftime('%Y-%m-%d') end_date = (today - timedelta(days=2)).strftime('%Y-%m-%d') # D1 需要 T+2 才能取到,所以 end_date 取昨天的前一天 spot_date = (today - timedelta(days=1)).strftime('%Y-%m-%d') logging.info(f"开始分析:{start_date} ~ {end_date}") # 1. 留存趋势 + 异常检测 logging.info("取留存趋势数据...") df_ret = query(f""" SELECT reg_date, COUNT(DISTINCT r.user_id) AS new_users, ROUND(COUNT(DISTINCT CASE WHEN DATEDIFF(l.login_date, r.reg_date) = 1 THEN l.user_id END) * 100.0 / COUNT(DISTINCT r.user_id), 2) AS d1, ROUND(COUNT(DISTINCT CASE WHEN DATEDIFF(l.login_date, r.reg_date) = 7 THEN l.user_id END) * 100.0 / COUNT(DISTINCT r.user_id), 2) AS d7 FROM user_register r LEFT JOIN user_login l ON r.user_id = l.user_id AND l.login_date BETWEEN r.reg_date AND DATE_ADD(r.reg_date, INTERVAL 7 DAY) WHERE r.reg_date BETWEEN '{start_date}' AND '{end_date}' GROUP BY reg_date ORDER BY reg_date """) df_ret['reg_date'] = pd.to_datetime(df_ret['reg_date']) # 2. 异常检测 df_ret = detect_anomaly(df_ret, col='d1', window=14, sigma=2.0) anomalies = df_ret[df_ret['is_anomaly'] & (df_ret['anomaly_dir']=='low')] if len(anomalies) > 0: logging.warning( f"发现 {len(anomalies)} 个 D1 留存异常低的日期:\n" + anomalies[['reg_date','d1','z_score']].to_string(index=False) ) else: logging.info("D1 留存无明显异常") # 3. 出图 plot_anomaly(df_ret, col='d1', save_path=f'retention_anomaly_{today:%Y%m%d}.png') # 4. Cohort 热力图 logging.info("取 Cohort 数据...") df_cohort = get_cohort_retention(start_date, end_date, day_list=[1,3,7,14,30]) plot_cohort_heatmap(df_cohort, save_path=f'cohort_heatmap_{today:%Y%m%d}.png') # 5. 渠道质量图 logging.info(f"取渠道数据({spot_date})...") df_ch = get_channel_stats(spot_date, min_users=200) plot_channel_bubble(df_ch, save_path=f'channel_bubble_{today:%Y%m%d}.png') logging.info("报告生成完毕,图表已保存。") return df_ret, df_cohort, df_ch# 每天早上跑一次df_ret, df_cohort, df_ch = daily_retention_report(days_back=30)
六、SQL 和 Python 各自做什么
| 步骤 | 工具 | 做什么 |
|---|
| 取数过滤 | SQL | 数据库侧聚合,减少传输量,不要把原始日志全拉到Python |
| 统计计算 | pandas + numpy | 均值、标准差、滚动窗口、异常判断 |
| 可视化 | matplotlib + seaborn | 趋势图、热力图、气泡散点图 |
| 封装复用 | Python 函数 | 参数化日期,每次只改参数,不重写代码 |
| ODPS 取数 | pyodps | 阿里云 MaxCompute 的专属连接方式 |
最重要的原则:复杂的数据过滤和聚合在 SQL 里做完,不要把几百万行原始数据拉到 Python 里再 filter。Python 拿到的应该是已经聚合好的、几百行的分析结果,而不是原始日志。