pandas多维聚合与滚动计算的工程实践指南
1. 项目概述:为什么多维聚合不是“加个groupby”那么简单
我在银行数据平台组干了八年,从最早用SQL写几十行嵌套子查询做客户分层,到后来带团队重构整个风险指标计算引擎,踩过的坑比写的代码还多。今天聊的这个主题——“多维聚合中的数据操作”,听起来像教科书里的一个章节标题,但实际在生产环境里,它直接决定着风控模型能不能按时上线、月度经营分析报告能不能准时发出、甚至监管报送数据有没有逻辑性错误。我见过太多人把df.groupby().agg()当成万能胶水,结果在测试环境跑通,一上生产就报内存溢出;也见过分析师花三天调通一个滚动均值,却没意识到窗口对齐方式错了,导致欺诈预警延迟两天——而那两天里,某商户已发生27笔异常交易。
核心关键词就三个:多维聚合、滚动计算、业务可解释性。这不是Python语法课,而是讲清楚:当你面对一张千万级信用卡交易表,要同时回答“华北区餐饮类客户近30天平均单笔金额 vs 全量均值”“高净值客户中旅行类消费的金额波动率是否突破阈值”“某客户连续5天单日消费超5000元是否触发人工复核”这类问题时,怎么用pandas写出既快、又稳、还能让业务同事看懂的代码。它解决的是真实世界里的三重矛盾:一是业务需求天然多维(客户×产品×时间×地域),二是计算必须有时间上下文(不能只看静态快照),三是结果必须能落地进报表、BI或风控规则引擎。适合谁?不是刚学完pd.read_csv的新手,而是已经能写基础聚合、但一碰到“既要又要还要”的需求就卡壳的中级数据工程师、风控建模师、BI开发,或者正在从Excel转向自动化分析的业务分析师。
我特别强调一点:所有技巧都必须服务于可审计、可复现、可交接。你在代码里写一个lambda x: x.max() - x.min(),半年后自己再看,可能得花十分钟想明白这是算范围还是算极差;但如果封装成def calc_transaction_spread(series),加一行docstring说明“用于识别高波动商户以调整反洗钱参数”,那接手的人一眼就能理解设计意图。后面你会看到,我拆解的每一个案例,都来自我们2023年Q4真实上线的“商户风险动态评分卡”项目,连数据结构、字段名、阈值设定都是脱敏后的生产配置。不讲虚的,只说怎么让代码真正扛住每天3亿条交易流水的压力。
2. 多维聚合的核心设计逻辑:为什么必须放弃“先group再merge”的老路
2.1 传统思路的致命缺陷:三次IO + 两次内存拷贝
先说个血泪教训。2021年我们给某省分行做客户价值分群,原始需求是:按“客户等级+产品大类+开户月份”三个维度,输出每个组合的“近90天交易笔数、平均单笔金额、最大单笔金额、手续费收入总和”。当时团队里一位资深SQL工程师习惯性写了四条独立的GROUP BY语句,分别算四个指标,再用JOIN合并。结果在测试环境(500万客户)跑一次要18分钟,内存峰值冲到42GB。上线后第一次全量跑,直接把调度任务拖垮,下游所有报表延迟6小时。
问题出在哪?根本不是pandas慢,而是思维惯性。每执行一次groupby().agg(),pandas都要:
- 重新遍历整个DataFrame(第一次IO)
- 构建新的分组索引结构(第一次内存拷贝)
- 计算单一指标并生成中间结果(存储开销)
- 第二次
groupby()时重复上述三步(第二次IO + 第二次内存拷贝) - 最后
merge()时再做哈希匹配(第三次IO + 第三次内存拷贝)
这就像你去超市买菜,明明可以一次性列好清单直奔货架,却非要分四次:第一次只买土豆,第二次只买青椒,第三次只买肉,第四次再把它们装进同一个袋子——体力消耗翻倍,还容易漏拿。
2.2 现代方案的底层原理:单次分组 + 多函数映射
pandas的agg()方法支持字典映射,本质是利用了分组键的哈希缓存复用机制。当你执行:
result = df.groupby(['customer_tier', 'product_category']).agg({ 'transaction_amount': ['sum', 'mean', 'std'], 'fee_income': ['sum', 'min', 'max'], 'transaction_count': 'count' })pandas内部只做一次分组操作:
- 扫描全表,根据
['customer_tier', 'product_category']构建哈希表,每个桶(bucket)存对应行的索引列表; - 对每个桶,并行调用所有指定函数——
sum()读取数值列一次,mean()复用同一组数据,std()同样复用; - 所有结果按桶顺序组装成MultiIndex DataFrame。
实测对比(1000万行交易数据):
| 方式 | 耗时 | 内存峰值 | 代码行数 |
|---|---|---|---|
| 四次独立groupby + merge | 214s | 42.3GB | 27行 |
| 单次agg字典映射 | 47s | 8.9GB | 5行 |
提示:性能提升4.5倍只是副产品,真正的价值在于逻辑内聚性。所有指标基于完全相同的分组逻辑计算,避免了因
JOIN条件微小差异(比如空值处理不一致)导致的指标偏差。我们在监管报送中吃过亏:财务口径的“手续费收入”和风控口径的“手续费收入”因JOIN时NULL处理不同,相差0.3%,被要求重新溯源。
2.3 生产环境必须考虑的三个隐藏陷阱
陷阱一:层级列名(Hierarchical Columns)的“甜蜜负担”
输出结果的列名是MultiIndex结构,比如('transaction_amount', 'mean')。这在Jupyter里看着清爽,但对接下游系统时会崩溃:
- Excel导出:列名变成
transaction_amount, mean,业务同事看不懂; - 数据库写入:SQLAlchemy不支持嵌套列名,报
KeyError; - BI工具连接:Tableau/Tableau Prep无法自动识别层级关系。
我的解决方案:强制扁平化,且命名带业务语义:
# 不推荐:默认层级列名 result = df.groupby('region').agg({'revenue': ['sum', 'mean']}) # 推荐:自定义扁平列名,带业务前缀 result = df.groupby('region').agg({ 'revenue_sum': ('revenue', 'sum'), 'revenue_avg': ('revenue', 'mean'), 'revenue_std': ('revenue', 'std') }).round(2)这样输出列名就是revenue_sum、revenue_avg,业务方直接复制粘贴进PPT,不会问“那个括号里的mean是什么意思”。
陷阱二:空值(NaN)传播的连锁反应
当某分组内某列全为NaN时,'sum'返回NaN,但'count'返回0——这会导致后续计算出错。比如计算“手续费率=fee_income_sum / revenue_sum”,如果revenue_sum是NaN,结果就是NaN,而不是0。
实操心得:永远在agg字典里显式声明空值策略:
result = df.groupby('merchant_id').agg({ 'revenue': lambda x: x.sum(min_count=1) or 0, # min_count=1确保至少1个非空才计算 'fee_income': 'sum', 'transaction_count': 'size' # 用size替代count,size统计所有行(含NaN),count只统计非空 })陷阱三:分组键顺序影响结果可读性
groupby(['region', 'product'])和groupby(['product', 'region'])数学上等价,但输出结构天壤之别:
- 前者:region为外层索引,product为内层,
unstack()后region是行、product是列; - 后者:product为外层,region为内层,
unstack()后product是行、region是列。
经验法则:把业务主维度放前面。比如销售分析,客户是主体,产品是属性,所以groupby(['customer_id', 'product']);风控分析,商户是主体,交易类型是属性,所以groupby(['merchant_id', 'txn_type'])。这样unstack()后自然形成“主体×属性”的矩阵,业务方一眼能定位。
3. 核心细节解析:从语法到生产的七道关卡
3.1 多列聚合的实战配置:不只是“写个字典”
很多教程只告诉你agg({col: [func1, func2]}),但没说清什么时候该用列表,什么时候该用元组,什么时候必须用lambda。这直接决定代码能否过审。
场景一:同一列需多个统计量,且需自定义精度
# 错误示范:直接写['mean', 'std'],结果小数位数不统一 result = df.groupby('category')['amount'].agg(['mean', 'std']) # 正确做法:用元组指定函数+参数,控制精度 result = df.groupby('category')['amount'].agg([ ('avg_amount', lambda x: round(x.mean(), 2)), ('std_amount', lambda x: round(x.std(), 2)), ('cv_ratio', lambda x: round(x.std()/x.mean()*100, 1) if x.mean() != 0 else 0) # 变异系数 ])这里('avg_amount', ...)的元组第一项是自定义列名,第二项是计算逻辑。比单纯['mean', 'std']多出两重保障:一是列名业务友好(不用猜('amount', 'mean')是什么),二是所有计算共享同一套空值/除零保护逻辑。
场景二:不同列需不同聚合逻辑,且存在依赖关系比如计算“手续费率”,需要fee_sum / revenue_sum,但不能简单写两个sum再除——因为分组内可能有revenue为0的记录。
# 危险写法:分开计算再除,可能除零 result = df.groupby('merchant_id').agg({ 'revenue': 'sum', 'fee': 'sum' }) result['fee_rate'] = result['fee'] / result['revenue'] # 运行时报ZeroDivisionError # 安全写法:用apply一次性完成,内置保护 def calc_fee_metrics(group): rev_sum = group['revenue'].sum() fee_sum = group['fee'].sum() return pd.Series({ 'revenue_sum': rev_sum, 'fee_sum': fee_sum, 'fee_rate': round(fee_sum / rev_sum * 100, 2) if rev_sum != 0 else 0 }) result = df.groupby('merchant_id').apply(calc_fee_metrics).reset_index()apply()虽然稍慢,但换来的是逻辑原子性——整个计算过程不可分割,避免了中间状态不一致。
场景三:需要跨列条件聚合(最易被忽略的硬需求)业务常问:“高净值客户(AUM>100万)中,餐饮类消费占比多少?”这需要先筛选客户,再按类别聚合。
# 错误:先groupby再filter,逻辑颠倒 wrong = df.groupby('customer_id').filter(lambda x: x['aum'].iloc[0] > 1000000).groupby('category')['amount'].sum() # 正确:用transform广播分组统计量,再条件聚合 df['is_high_net_worth'] = df.groupby('customer_id')['aum'].transform('max') > 1000000 high_net_customers = df[df['is_high_net_worth']] result = high_net_customers.groupby('category')['amount'].sum()transform()是关键——它把每个客户的最高AUM广播到该客户所有交易行,从而实现“按客户打标,按交易聚合”的混合逻辑。
3.2 自定义函数的工程化封装:告别lambda的“技术债”
我见过太多项目,初期用lambda x: x.max()-x.min()写着爽,半年后没人敢动,因为:
- 没文档,不知道这个“range”是业务术语还是技术术语;
- 没单元测试,改个参数怕影响线上;
- 没版本管理,不同脚本里copy-paste出三个略有差异的版本。
我的标准化封装模板:
from typing import Union, Optional import numpy as np import pandas as pd def calc_transaction_spread( series: pd.Series, threshold_percent: float = 50.0, return_details: bool = False ) -> Union[float, dict]: """ 计算交易金额范围(最大值-最小值),专用于商户风险识别 Args: series: 交易金额序列 threshold_percent: 触发高风险告警的波动率阈值(%) return_details: 是否返回详细信息(用于调试) Returns: float: 金额范围值;或dict(含范围值、波动率、是否超阈值) Business Logic: - 波动率 = (max-min)/mean * 100% - 当波动率 > threshold_percent,标记为高风险商户 - 用于动态调整反洗钱规则参数 """ if len(series) < 2: return 0.0 if not return_details else {'spread': 0.0, 'volatility_pct': 0.0, 'is_risky': False} spread = series.max() - series.min() mean_val = series.mean() volatility_pct = (spread / mean_val * 100) if mean_val != 0 else 0 if return_details: return { 'spread': round(spread, 2), 'volatility_pct': round(volatility_pct, 1), 'is_risky': volatility_pct > threshold_percent } return round(spread, 2) # 在agg中使用 result = df.groupby('merchant_id').agg({ 'amount': calc_transaction_spread, 'fee': 'sum' })这个函数通过三重设计规避技术债:
- 类型注解:明确输入输出,IDE能自动提示;
- 详尽docstring:包含业务场景、参数说明、返回值定义,比代码本身还长;
- 防御性编程:处理
len<2、mean=0等边界,避免线上报错。
注意:自定义函数传入的是
pd.Series,不是DataFrame。如果需要访问多列(如同时用amount和fee),必须用apply()而非agg()。
3.3 滚动窗口的“时间陷阱”:窗口对齐与业务意义
滚动计算最常被忽视的是时间语义对齐。rolling(window=7)默认按行序计算,但交易数据的时间戳往往不连续(周末无交易、节假日休市)。如果直接按行滚动,周一的数据可能混入上周五的值,导致趋势误判。
真实案例:2022年某基金公司做交易量监控,用rolling(5)计算日均交易量,结果发现“周五交易量突增”——其实是滚动窗口把周四、周三、周二、周一、周日(无数据)的均值算成了周四单日值。修复方案是强制按时间索引滚动:
# 错误:按行滚动(index是默认整数索引) df.set_index('date').rolling(window=5)['volume'].mean() # 窗口大小是5行,非5天 # 正确:按时间滚动(必须设置datetime索引) df['date'] = pd.to_datetime(df['date']) df_time = df.set_index('date').sort_index() df_time['rolling_5d_vol'] = df_time.groupby('symbol')['volume'].rolling('5D').mean() # '5D'表示5个日历日,自动跳过无数据日期'5D'是关键——它告诉pandas按时间跨度(calendar days)而非行数(rows)取窗口。同理,'7D'、'30D'、'1M'(日历月)都是合法单位。
另一个陷阱:窗口内数据不足时的处理策略
默认min_periods=1,即只要有一条数据就计算,导致首几行结果失真。生产环境必须显式控制:
# 推荐:要求窗口内至少3天有数据才计算,避免噪声 df_time['rolling_7d_vol'] = df_time.groupby('symbol')['volume'].rolling( '7D', min_periods=3 # 至少3天有交易数据才输出有效值 ).mean()我们风控系统规定:所有滚动指标min_periods不得低于窗口长度的40%,否则视为无效信号。
3.4 扩展窗口的“累积陷阱”:不是所有累积都有意义
expanding().sum()看似简单,但业务上常犯两个错误:
错误一:累积起点选择错误
比如计算“客户生命周期总消费”,起点应该是客户首次交易日,而不是数据表里最早的日期。如果表里最早是2020-01-01,但某客户2023-06-01才开户,他的累积消费从2020年算起毫无意义。
正确做法:按客户分组后,对每个客户单独计算累积
# 按客户分组,确保累积从该客户首笔交易开始 df_sorted = df.sort_values(['customer_id', 'date']) df_sorted['cumulative_spend'] = df_sorted.groupby('customer_id')['amount'].expanding().sum().values错误二:累积指标未做业务校验
累积值会无限增长,但业务上常有上限。比如“单日累计提现额”超过5万元需人工审核,但expanding().sum()不会自动截断。
解决方案:用apply()封装带业务规则的累积
def cumulative_with_cap(group, cap_amount=50000): """带额度上限的累积计算""" cumsum = group['withdrawal_amount'].cumsum() # 超过上限后,后续值保持cap_amount不变 capped = cumsum.where(cumsum <= cap_amount, cap_amount) return capped df['daily_cum_withdrawal'] = df.groupby('customer_id').apply( lambda x: cumulative_with_cap(x.sort_values('date')) ).values3.5 多级分组与unstack:从技术操作到业务表达
unstack()不是简单的“转置”,它是将分析维度转化为业务语言的关键动作。groupby(['region','product'])['revenue'].mean().unstack()生成的矩阵,本质上是在回答:“每个区域对每个产品的赚钱能力如何?”
但直接unstack有三大风险:
风险一:缺失组合导致列不全
如果华东区没有卖“智能手表”,unstack()后“智能手表”列就不存在,下游BI图表会错位。
解决方案:预定义完整列名,用reindex()补全
all_products = ['手机', '电脑', '智能手表', '耳机'] result = df.groupby(['region','product'])['revenue'].sum().unstack(fill_value=0) # 强制包含所有产品,缺失值填0 result = result.reindex(columns=all_products, fill_value=0)风险二:数据类型混乱unstack()后,原本是float64的revenue,可能因某些region-product组合无数据而变成object类型(含NaN和数字混合)。
解决方案:unstack后立即类型转换
result = df.groupby(['region','product'])['revenue'].sum().unstack(fill_value=0) result = result.astype('float64') # 强制转回数值型风险三:行列顺序不符合汇报习惯
业务方想要“产品为行,区域为列”,但unstack()默认把最后一级分组键转为列。
解决方案:用swaplevel()调整索引层级
# 默认:groupby(['region','product']) -> region为行,product为列 # 想要:product为行,region为列 -> 先交换层级,再unstack result = df.groupby(['region','product'])['revenue'].sum().swaplevel().unstack(fill_value=0)4. 实操全流程:从原始交易表到高管简报的七步炼金术
4.1 数据准备:模拟真实银行交易流
我们用一个脱敏的真实场景:某全国性银行信用卡中心,每日处理约800万笔交易,核心字段包括:
txn_id: 交易唯一IDcustomer_id: 客户ID(加密后)merchant_id: 商户ID(脱敏)category: 商户大类('Groceries','Dining','Travel','Retail')amount: 交易金额(元)fee: 手续费(元)date: 交易日期(YYYY-MM-DD)hour: 交易小时(0-23)
生成10万行模拟数据(代码可直接运行):
import pandas as pd import numpy as np from datetime import datetime, timedelta np.random.seed(42) dates = pd.date_range('2024-01-01', '2024-03-31', freq='D') customers = [f'C{str(i).zfill(3)}' for i in range(1, 501)] # 500个客户 merchants = [f'M{str(i).zfill(4)}' for i in range(1, 201)] # 200个商户 # 模拟交易数据 n_rows = 100000 data = { 'txn_id': [f'TXN{str(i).zfill(8)}' for i in range(n_rows)], 'customer_id': np.random.choice(customers, n_rows), 'merchant_id': np.random.choice(merchants, n_rows), 'category': np.random.choice(['Groceries','Dining','Travel','Retail'], n_rows), 'amount': np.round(np.random.lognormal(5.5, 0.8, n_rows), 2), # 对数正态分布,模拟消费金额偏态 'fee': np.round(np.random.uniform(0.015, 0.035, n_rows) * np.random.lognormal(5.5, 0.8, n_rows), 2), 'date': np.random.choice(dates, n_rows), 'hour': np.random.randint(0, 24, n_rows) } df = pd.DataFrame(data) df['date'] = pd.to_datetime(df['date']) print(f"原始数据形状: {df.shape}") print(df.head())注意:这里amount用对数正态分布模拟真实消费——大部分交易在百元内,少数大额交易(如机票、酒店)拉高均值,这是风控建模的关键特征。
4.2 分析一:客户-产品双维度聚合(高管简报首页)
目标:生成“各客户等级在各产品类别的平均交易额”矩阵,用于月度经营分析会。
# 步骤1:先给客户打等级标签(基于历史AUM,此处简化为随机分配) customer_aum = df.groupby('customer_id')['amount'].sum().to_dict() df['customer_tier'] = df['customer_id'].map(lambda x: 'Premium' if customer_aum[x] > 50000 else 'Gold' if customer_aum[x] > 20000 else 'Standard' ) # 步骤2:双维度聚合 + unstack pivot_result = df.groupby(['customer_tier', 'category'])['amount'].agg([ ('avg_amount', lambda x: round(x.mean(), 2)), ('txn_count', 'count'), ('total_revenue', 'sum') ]).unstack(fill_value=0) # 步骤3:扁平化列名,业务友好 pivot_result.columns = [f"{col[1]}_{col[0]}" for col in pivot_result.columns] pivot_result = pivot_result.round(2) print("客户等级×产品类别矩阵(高管简报页):") print(pivot_result)输出示例:
avg_amount_Groceries avg_amount_Dining ... total_revenue_Travel customer_tier ... Gold 128.45 245.67 ... 892345.00 Premium 89.32 312.89 ... 1234567.00 Standard 156.78 189.23 ... 678901.00实操心得:这个矩阵直接喂给Power BI,拖拽就能生成热力图。业务总监一眼看出“Premium客户在Dining类消费最高,但Groceries类偏低”,立刻决策增加高端餐饮联名卡权益。
4.3 分析二:定制化风险指标(风控模型输入)
目标:为每个商户计算“交易波动率”,作为反欺诈模型的特征之一。
def calc_merchant_volatility(group): """商户交易波动率:标准差/均值,带业务校验""" if len(group) < 5: # 至少5笔交易才可信 return pd.Series({'volatility': 0.0, 'risk_score': 0}) std_val = group['amount'].std() mean_val = group['amount'].mean() if mean_val == 0: volatility = 0.0 else: volatility = std_val / mean_val # 业务规则:波动率>1.5为高风险,0.8-1.5为中风险,其余低风险 if volatility > 1.5: risk_score = 3 elif volatility > 0.8: risk_score = 2 else: risk_score = 1 return pd.Series({ 'volatility': round(volatility, 3), 'risk_score': risk_score, 'txn_count': len(group) }) # 应用到商户维度 merchant_risk = df.groupby('merchant_id').apply(calc_merchant_volatility).reset_index() print("商户风险评分(前10行):") print(merchant_risk.head(10))这个函数输出三列:volatility(模型特征)、risk_score(业务规则)、txn_count(置信度)。风控同事直接把risk_score当规则用,数据科学家把volatility当特征训练模型——同一份计算,服务两种角色。
4.4 分析三:滚动窗口检测异常模式(实时监控)
目标:识别“单客户单日交易额突增”事件,用于实时风控。
# 步骤1:按客户+日期聚合日交易额 daily_customer = df.groupby(['customer_id', 'date'])['amount'].sum().reset_index() daily_customer = daily_customer.sort_values(['customer_id', 'date']) # 步骤2:计算7日滚动均值和标准差 daily_customer['rolling_7d_mean'] = daily_customer.groupby('customer_id')['amount'].rolling( window=7, min_periods=4 ).mean().reset_index(level=0, drop=True) daily_customer['rolling_7d_std'] = daily_customer.groupby('customer_id')['amount'].rolling( window=7, min_periods=4 ).std().reset_index(level=0, drop=True) # 步骤3:标记异常:当日额 > 均值+2倍标准差 daily_customer['is_anomaly'] = ( daily_customer['amount'] > (daily_customer['rolling_7d_mean'] + 2 * daily_customer['rolling_7d_std']) ) # 步骤4:只取最近3天的异常 recent_anomalies = daily_customer[ daily_customer['date'] >= daily_customer['date'].max() - pd.Timedelta(days=3) ].query('is_anomaly').sort_values(['customer_id', 'date']) print("最近3天客户交易异常(供风控坐席核查):") print(recent_anomalies[['customer_id', 'date', 'amount', 'rolling_7d_mean', 'rolling_7d_std']])这里min_periods=4是关键——确保滚动窗口至少有4天数据才计算,避免月初数据不足导致误报。我们线上系统把这个逻辑封装成Spark UDF,每天凌晨2点跑批,生成当日待核查清单。
4.5 分析四:扩展窗口追踪客户生命周期(CRM系统)
目标:计算每个客户的“累计消费额”和“累计交易笔数”,用于客户分群。
# 按客户排序,确保时间顺序 df_sorted = df.sort_values(['customer_id', 'date', 'hour']) # 扩展窗口计算 df_sorted['cumulative_spend'] = df_sorted.groupby('customer_id')['amount'].expanding().sum().values df_sorted['cumulative_txn_count'] = df_sorted.groupby('customer_id').size().groupby('customer_id').expanding().sum().values # 生成客户生命周期快照(每个客户最新一条) customer_ltv = df_sorted.groupby('customer_id').tail(1)[[ 'customer_id', 'cumulative_spend', 'cumulative_txn_count', 'date' ]].rename(columns={'date': 'last_txn_date'}) # 计算LTV(生命周期价值)分群 customer_ltv['ltv_segment'] = pd.qcut( customer_ltv['cumulative_spend'], q=4, labels=['Tier1', 'Tier2', 'Tier3', 'Tier4'], duplicates='drop' ) print("客户LTV分群(CRM系统输入):") print(customer_ltv.head(10))pd.qcut()按分位数分群,确保每档客户数均衡,避免“Top1%客户占90%金额”的马太效应。这个结果直接同步到Salesforce,销售团队按ltv_segment分配跟进优先级。
4.6 分析五:多指标融合的高管摘要(Executive Summary)
目标:一页纸呈现核心指标,供行长办公会使用。
# 综合聚合:客户维度 + 时间维度 + 产品维度 summary = df.groupby('customer_id').agg({ 'amount': ['sum', 'mean', 'count', lambda x: x.quantile(0.95)], # 95分位数防大额干扰 'fee': 'sum', 'date': lambda x: (x.max() - x.min()).days # 客户活跃天数 }).round(2) # 扁平化列名 summary.columns = ['total_spend', 'avg_txn', 'txn_count', 'high_value_txn', 'total_fee', 'active_days'] # 计算衍生指标 summary['fee_rate_pct'] = ((summary['total_fee'] / summary['total_spend']) * 100).round(2) summary['spend_per_active_day'] = (summary['total_spend'] / summary['active_days']).round(2) # 按总消费降序,取Top10 top10_customers = summary.nlargest(10, 'total_spend')[[ 'total_spend', 'avg_txn', 'txn_count', 'fee_rate_pct', 'spend_per_active_day' ]] print("高管摘要:Top10高价值客户(单位:万元):") top10_customers['total_spend'] = (top10_customers['total_spend'] / 10000).round(2) # 转万元 print(top10_customers)输出示例:
total_spend avg_txn txn_count fee_rate_pct spend_per_active_day customer_id C123 89.25 425.67 210 2.45 12.34 C089 76.50 389.21 196 2.51 10.23 ...注意:这里
lambda x: x.quantile(0.95)计算95分位数,比max()更能反映典型大额交易水平,避免单笔异常值扭曲判断。
4.7 分析六:高级定制——动态风险分层(模型实验室)
目标:对每个客户,识别其“高价值交易行为模式”,用于个性化风控策略。
def dynamic_risk_profile(group): """动态客户风险画像""" # 定义高价值:单笔>3000元 或 日累计>10000元 group['is_high_value'] = (group['amount'] > 3000) | ( group.groupby('date')['amount'].transform('sum') > 10000 ) # 统计高价值行为 hv_stats = group[group['is_high_value']].agg({ 'amount': ['count', 'sum', 'mean'], 'date': 'nunique' # 高价值交易涉及多少天 }) # 计算常规交易均值(排除高价值) regular_mean = group[~group['is_high_value']]['amount'].mean() return pd.Series({ 'hv_txn_count': int(hv_stats['amount']['count']) if not pd.isna(hv_stats['amount']['count']) else 0, 'hv_txn_ratio': round(hv_stats['amount']['count'] / len(group) * 100, 1) if len(group) > 0 else 0, 'hv_avg_amount': round(hv_stats['amount']['mean'], 2) if not pd.isna(hv_stats['amount']['mean']) else 0, 'regular_avg_amount': round(regular_mean, 2) if not pd.isna(regular_mean) else 0, 'hv_days': int(hv_stats['date']['nunique']) if not pd.isna(hv_stats['date']['nunique']) else 0 }) # 应用 risk_profile = df.groupby('customer_id').apply(dynamic_risk_profile).reset_index() print("客户动态风险画像(模型实验室):") print(risk_profile.head(10))这个函数输出5个维度,构成完整的风险画像:
hv_txn_count: 高价值交易