import pymysqlimport pandas as pdimport warningsfrom contextlib import closingwarnings.filterwarnings('ignore')# 行业映射字典industry = { 'A': '农、林、牧、渔业', 'B': '采矿业', 'C': '制造业', 'D': '电力、热力、燃气及水生产和供应业', 'E': '建筑业', 'F': '批发和零售业', 'G': '交通运输、仓储和邮政业', 'H': '住宿和餐饮业', 'I': '信息传输、软件和信息技术服务业', 'J': '金融业', 'K': '房地产业', 'L': '租赁和商务服务业', 'M': '科学研究和技术服务业', 'N': '水利、环境和公共设施管理业', 'O': '居民服务、修理和其他服务业', 'P': '教育', 'Q': '卫生和社会工作', 'R': '文化、体育和娱乐业', 'S': '公共管理、社会保障和社会组织', 'T': '国际组织'}# 机构映射字典branch = { '营业部': '321023***', '新城分理处': '321023***', '红旗分理处': '321023***', '王桥分理处': '321023***', '白田路支行': '321023***', '清算中心': '321023***', '管理机构': '321023***'}def fetch_table(cursor, table_name, data_date=None, columns=None): """ 通用查询函数:执行SQL并返回DataFrame :param cursor: 数据库游标 :param table_name: 表名 :param data_date: 数据日期(若需要过滤) :param columns: 预定义的列名列表(若不提供则从cursor.description获取) :return: DataFrame """ if data_date is not None: sql = f"SELECT * FROM {table_name} WHERE `数据日期` = %s" cursor.execute(sql, (data_date,)) else: sql = f"SELECT * FROM {table_name}" cursor.execute(sql) data = cursor.fetchall() if columns is None: # 从cursor.description获取列名 columns = [desc[0] for desc in cursor.description] return pd.DataFrame(data, columns=columns)def process_pious(cur, data_date): """处理个人贷款数据""" # 获取pious表 pious_columns = ['***', '***', '***', ······] pious = fetch_table(cur, 'pious', data_date, pious_columns) # 获取pbasic表 pbasic_columns = ['***', '***', '***', ······] pbasic = fetch_table(cur, 'pbasic', data_date, pbasic_columns) pious_res = (pious.query("贷款余额等值人民币 > 0") [["数据日期", "借据机构号", "借据号", "客户号", "核心客户号", "客户名称", "放款日期", "到期日期", "贷款余额等值人民币", "科目号", "借据管户人名称", "借新还旧", '展期标志', "担保方式", "贷款投向5", "贷款类型", "五级分类-个人"]] .merge(pbasic[["客户号", "证件号码"]], on="客户号") .rename(columns={"贷款类型": "贷款类别"})) pious_label = (pious_res.pivot_table(index=["客户号"], values=["贷款余额等值人民币"], aggfunc="sum") .reset_index() .rename(columns={"贷款余额等值人民币": "单户贷款余额合计"})) pious_label["贷款类型"] = pious_label["单户贷款余额合计"].apply(lambda x: "小额自然人" if x <= 300000 else "大额自然人") pious_fin = (pious_res.merge(pious_label[["客户号", "贷款类型"]], on="客户号") [["数据日期", "借据机构号", "借据号", "证件号码", "核心客户号", "客户名称", "放款日期", "到期日期", "贷款余额等值人民币", "科目号", "贷款类型", "借据管户人名称", "借新还旧", '展期标志', "担保方式", "贷款投向5", "贷款类别", "五级分类-个人"]] .rename(columns={"借据机构号": "机构号", "证件号码": "证件号", "客户名称": "借款人", "放款日期": "借款日期", "贷款余额等值人民币": "余额", "借据管户人名称": "管户员", "借新还旧": "是否借新还旧", "五级分类-个人": "五级分类", "贷款投向5": "主贷款投向"})) for col in ["是否借新还旧", '展期标志', "担保方式", "主贷款投向", "贷款类别", "五级分类"]: pious_fin[col] = pious_fin[col].apply(lambda x: x.split(".")[0] if len(x) > 0 else "") pious_fin["主贷款投向"] = pious_fin["主贷款投向"].apply(lambda x: industry[x[0]] if len(x) > 0 else "") pious_fin['借款日期'] = pd.to_datetime(pious_fin['借款日期'], format='%Y%m%d').dt.strftime('%Y-%m-%d') pious_fin['到期日期'] = pd.to_datetime(pious_fin['到期日期'], format='%Y%m%d').dt.strftime('%Y-%m-%d') return pious_findef process_cious(cur, data_date): """处理企业贷款数据""" cious_columns = ['***', '***', '***', ······] cious = fetch_table(cur, 'cious', data_date, cious_columns) cbasic_columns = ['***', '***', '***', ······] cbasic = fetch_table(cur, 'cbasic', data_date, cbasic_columns) cious_res = (cious.query("贷款余额等值人民币 > 0") [["数据日期", "借据机构号", "借据号", "客户号", "核心客户号", "客户名称", "放款日期", "到期日期", "贷款余额等值人民币", "科目号", "借据管户人名称", "借新还旧", '展期标志', "担保方式", "贷款投向5", "贷款类型", "五级分类"]] .merge(cbasic[["客户号", "证件号码"]], on="客户号") .rename(columns={"贷款类型": "贷款类别"})) cious_label = (cious_res.pivot_table(index=["客户号"], values=["贷款余额等值人民币"], aggfunc="sum") .reset_index() .rename(columns={"贷款余额等值人民币": "单户贷款余额合计"})) cious_label["贷款类型"] = cious_label["单户贷款余额合计"].apply(lambda x: "小额企事业" if x <= 5000000 else "大额企事业") cious_fin = (cious_res.merge(cious_label[["客户号", "贷款类型"]], on="客户号") [["数据日期", "借据机构号", "借据号", "证件号码", "核心客户号", "客户名称", "放款日期", "到期日期", "贷款余额等值人民币", "科目号", "贷款类型", "借据管户人名称", "借新还旧", '展期标志', "担保方式", "贷款投向5", "贷款类别", "五级分类"]] .rename(columns={"借据机构号": "机构号", "证件号码": "证件号", "客户名称": "借款人", "放款日期": "借款日期", "贷款余额等值人民币": "余额", "借据管户人名称": "管户员", "借新还旧": "是否借新还旧", "贷款投向5": "主贷款投向"})) for col in ["是否借新还旧", '展期标志', "担保方式", "主贷款投向", "贷款类别", "五级分类"]: cious_fin[col] = cious_fin[col].apply(lambda x: x.split(".")[0] if len(x) > 0 else "") cious_fin["主贷款投向"] = cious_fin["主贷款投向"].apply(lambda x: industry[x[0]] if len(x) > 0 else "") cious_fin['借款日期'] = pd.to_datetime(cious_fin['借款日期'], format='%Y%m%d').dt.strftime('%Y-%m-%d') cious_fin['到期日期'] = pd.to_datetime(cious_fin['到期日期'], format='%Y%m%d').dt.strftime('%Y-%m-%d') return cious_findef process_ztx(cur, data_date): """处理转贴现数据""" ztx_columns = ['***', '***', '***', ······] ztx = fetch_table(cur, 'tmp_ztx', columns=ztx_columns) # 该表没有数据日期过滤 mapping = { "科目号": "13067520", "贷款类型": "大额企事业", "管户员": "***", "是否借新还旧": "否", "担保方式": "其他-买断式转贴现", "主贷款投向": "", "贷款类别": "买断式转贴现", "五级分类": "正常" } for attr, val in mapping.items(): ztx[attr] = val ztx['借据号'] = ztx[['票号', '子票区间']].apply(lambda x: f"{x[0]},{x[1]}", axis=1) ztx['数据日期'] = data_date ztx["展期标志"] = "否" ztx_fin = ztx[["数据日期", "核心机构号", "借据号", "交易对手组织机构代码", "交易对手行号", "交易对手", "出票日", "到期日", "票面金额", "科目号", "贷款类型", "管户员", "是否借新还旧", "展期标志", "担保方式", "主贷款投向", "贷款类别", "五级分类"]].rename(columns={ "核心机构号": "机构号", "交易对手组织机构代码": "证件号", "交易对手行号": "*****", "交易对手": "借款人", "出票日": "借款日期", "到期日": "到期日期", "票面金额": "余额" }) return ztx_findef process_card(cur, data_date): """处理信用卡数据""" # 查询card表 card_columns = ['***', '***', '***', ······] card = fetch_table(cur, 'card', data_date, card_columns) # 查询market表 market_columns = ['***', '***', '***', ······] market = fetch_table(cur, 'tmp_market', columns=market_columns) # 无数据日期 # 查询khtybh表 khtybh_columns = ['***', '***', '***', ······] khtybh = fetch_table(cur, 'tmp_khtybh', columns=khtybh_columns) # 查询pbasic表(用于补充核心客户号) pbasic_columns = ['***', '***', '***', ······] pbasic = fetch_table(cur, 'pbasic', data_date, pbasic_columns) # 处理market空值 index_null = market.loc[market["营销人工号"].isnull() | market["营销人姓名"].isnull() | market["营销机构"].isnull(), :].index mapping_null = {"营销人工号": "管护人工号", "营销人姓名": "管护人姓名", "营销机构": "管护机构"} for key, val in mapping_null.items(): market.loc[index_null, key] = market.loc[index_null, val] market = market.loc[~(market["营销人工号"].isnull() | market["营销人姓名"].isnull() | market["营销机构"].isnull()), :].reset_index(drop=True) market["营销机构"] = market["营销机构"].apply(lambda x: x.replace("***", "").replace("***", "营业部")) market["机构号"] = market["营销机构"].apply(lambda x: branch[x] if x in branch else '321023***') market_res = market[["客户姓名", "账号", "证件号码", "营销人工号", "营销人姓名", "机构号"]].rename(columns={ "客户姓名": "姓名", "账号": "卡号", "证件号码": "客户统一编号", "营销人工号": "管户员工号", "营销人姓名": "管户员" }) # 合并数据 card_tmp = (card.query("本币透支金额 > 0")[["数据日期", "卡号", "账号", "客户统一编号", "姓名", "本币透支金额", "贷记卡五级分类"]] .merge(market_res, on=["卡号", "姓名", "客户统一编号"], how="left") .merge(khtybh, on=["卡号", "姓名", "客户统一编号"], how="left")) card_res = (card_tmp[["数据日期", "机构号", "账号", "客户统一编号", "核心客户号", "姓名", "本币透支金额", "贷记卡五级分类", "管户员"]] .rename(columns={"账号": "借据号", "客户统一编号": "证件号码", "姓名": "借款人", "本币透支金额": "余额", "贷记卡五级分类": "五级分类"}) .merge(pbasic[["核心客户号", "证件号码"]].rename(columns={"核心客户号": "核心客户号2"}), on="证件号码", how="left") .rename(columns={"证件号码": "证件号"})) # 填充缺失的核心客户号 card_res.loc[card_res["核心客户号"].isnull(), "核心客户号"] = card_res.loc[card_res["核心客户号"].isnull(), "核心客户号2"] # 添加固定字段 mapping_c = { "借款日期": data_date, "到期日期": data_date, "科目号": "1305", "贷款类型": "信用卡", "展期标志": "否", "是否借新还旧": "信用卡", "担保方式": "信用/免担保透支", "主贷款投向": "信用卡", "贷款类别": "信用卡" } for attr, val in mapping_c.items(): card_res[attr] = val card_res['借款日期'] = pd.to_datetime(card_res['借款日期'], format='%Y%m%d').dt.strftime('%Y-%m-%d') card_res['到期日期'] = pd.to_datetime(card_res['到期日期'], format='%Y%m%d').dt.strftime('%Y-%m-%d') card_fin = card_res[["数据日期", "机构号", "借据号", "证件号", "核心客户号", "借款人", "借款日期", "到期日期", "余额", "科目号", "贷款类别", "管户员", "是否借新还旧", "展期标志", "担保方式", "主贷款投向", "贷款类型", "五级分类"]] for col in ["五级分类"]: card_fin[col] = card_fin[col].apply(lambda x: x.split(".")[0] if len(x) > 0 else "") return card_findef main(data_date, host, port, user, password, database, charset='utf8'): """ 主函数:从数据库读取数据,生成贷款分类清单Excel文件 :param data_date: 数据日期,格式为YYYYMMDD(如20260228) :param host: 数据库主机 :param port: 数据库端口 :param user: 数据库用户名 :param password: 数据库密码 :param database: 数据库名称 :param charset: 字符集,默认为utf8 """ # 建立数据库连接 try: conn = pymysql.connect( host=host, port=port, user=user, password=password, database=database, charset=charset ) except pymysql.Error as e: print(f"数据库连接失败: {e}") return try: with closing(conn.cursor()) as cur: # 处理各类贷款数据 pious_fin = process_pious(cur, data_date) cious_fin = process_cious(cur, data_date) ztx_fin = process_ztx(cur, data_date) card_fin = process_card(cur, data_date) # 合并所有数据 result = pd.concat([pious_fin, cious_fin, ztx_fin, card_fin], ignore_index=True) result[["数据日期", "机构号", "科目号"]] = result[["数据日期", "机构号", "科目号"]].astype("str") # 统一担保方式 mapping_r = ["信用", "抵押", "保证", "贴现", "质押"] for ensure in mapping_r: result.loc[result["担保方式"].str.contains(ensure), "担保方式"] = ensure # 输出文件 output_filename = f"D:/MyDocx/贷款十级分类清单/{data_date[:4]}年{data_date[4:6]}月份贷款十级分类清单(SASP版v2.0).xlsx" result.to_excel(output_filename, index=False) print(f"文件已生成: {output_filename}") except Exception as e: print(f"处理过程中发生错误: {e}") finally: conn.close()if __name__ == '__main__': # 示例调用(实际使用时请替换为真实参数) # 注意:这里演示了如何手动传入 config.ini 中的参数 main( data_date="***", host="***", port=3306, user="root", password="***", database="***", charset="utf8" )