医院信息科程序猿:我用 Python 写了个 DRG 导入工具,10 分钟跑完以前半天的活

每个月 DRG 数据分析最头疼的地方,不是算不出来,而是数据散在不同系统里:医保导出的 Excel 一份,HIS 里一份,病案首页一份,费用明细一份,门急诊关联费用又是一份。
以前靠人工查、人工补、人工导,效率低不说,还容易出错。于是信息科程序猿只能自己动手,写了一个小工具:读取 Excel,校验表头,多线程查询 HIS,自动补全字段,保存新 Excel,再批量导入 DRG 分析表。
一、这个工具解决了什么问题?
一句话:把 DRG 分析导入过程中最繁琐、最容易出错的人工补数流程自动化。
✅ 自动读取医保 Excel
✅ 校验列名,防止模板错乱
✅ 根据病案号查询 HIS 住院信息
✅ 自动补全院区、科室、费用分类、医生护士等字段
✅ 多线程并发查询,提升跑数速度
✅ 生成补全后的 Excel
✅ 自动备份原表,再批量入库
✅ 执行后续关联更新 SQL
二、为什么不用纯 SQL 一把梭?
理论上可以,现实中不一定优雅。
因为原始数据来自 Excel,字段顺序固定,部分指标来自医保结算表,部分指标来自 HIS 住院主表,部分指标来自费用明细,还有部分字段来自病案首页和手术记录。直接写一个超级 SQL,不仅维护困难,出错后也不好定位。
所以我采用了一个更适合信息科日常维护的方案:
Excel 负责承载原始导入数据,Python 负责调度与清洗,Oracle 负责提供业务数据,最终统一落到 DRG 分析表。
三、核心流程设计
第 1 步:读取 Excel 并校验表头
先把 Excel 读入 DataFrame,再对列名做严格校验。如果模板被改过,直接停止,避免脏数据入库。
第 2 步:根据病案号查询 HIS
通过病案号关联住院主表、费用表、病案首页、手术记录等数据,补齐院区、出院科室、实际住院天数、费用分类、医生护士等字段。
第 3 步:多线程并发处理
使用 ThreadPoolExecutor 开启多线程,同时配合 Oracle SessionPool,避免每条数据重复创建连接。
第 4 步:保存补全后的 Excel
跑完之后先落一份本地 Excel,方便业务科室复核,也方便出问题时回溯。
第 5 步:备份原表并批量入库
入库前自动创建备份表,再 executemany 分批插入,最后执行后续关联更新。
完整代码
import pandas as pd
import cx_Oracle
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
# ==========================================
# 0. 日志配置
# ==========================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# ==========================================
# 1. 配置信息(脱敏版)
# ==========================================
excel_file = r'C:\path\DRG数据.xlsx'
db_config = {
'user': 'DB_USER',
'password': 'DB_PASSWORD',
'dsn': 'DB_HOST/DB_SERVICE',
'min_conn': 2,
'max_conn': 15,
'increment': 1
}
MAX_WORKERS = 10
# ==========================================
# 2. 表名/对象名(脱敏版)
# ==========================================
TARGET_TABLE = "SCHEMA_NAME.DRG_RESULT_TABLE"
INMAININFO_TABLE = "SCHEMA_NAME.FIN_IPR_INMAININFO"
INMAININFO_EXT_TABLE = "SCHEMA_NAME.FIN_IPR_INMAININFO_EXT"
FEEINFO_TABLE = "SCHEMA_NAME.FIN_IPB_FEEINFO"
ITEMLIST_TABLE = "SCHEMA_NAME.FIN_IPB_ITEMLIST"
SHIFTDATA_TABLE = "SCHEMA_NAME.COM_SHIFTDATA"
PATIENTINFO_TABLE = "SCHEMA_NAME.COM_PATIENTINFO"
DEPARTMENT_TABLE = "SCHEMA_NAME.COM_DEPARTMENT"
FEECODESTAT_TABLE = "SCHEMA_NAME.FIN_COM_FEECODESTAT"
OPERATION_APPLY_TABLE = "SCHEMA_NAME.MET_OPERATION_APPLY"
OPERATION_APPLY_OPS_TABLE = "SCHEMA_NAME.MET_OPERATION_APPLY_OPS"
UNDRUGINFO_TABLE = "SCHEMA_NAME.FIN_COM_UNDRUGINFO"
HIGHVALUE_TABLE = "SCHEMA_NAME.LC_HIGH_VALUE_TABLE"
ICD_OPERATION_TABLE = "SCHEMA_NAME.MET_COM_ICDOPERATION"
MRS_OPS_TABLE = "SCHEMA_NAME.S_MRS_OPS"
CDR_BASE_TABLE = "CDR_SCHEMA.C_MRS_BASE@DBLINK_NAME"
CDR_OPS_TABLE = "CDR_SCHEMA.C_MRS_OPS@DBLINK_NAME"
# ==========================================
# 3. 读取并校验 Excel
# ==========================================
try:
logger.info(f"正在读取 Excel 文件: {excel_file}")
df = pd.read_excel(excel_file, dtype=str)
df.columns = df.columns.str.strip()
expected_columns = "病案号,姓名,入院时间,出院时间,院区,出院科室,结算人群,权重,支付标准,院前三天检查费用V预住院七天,总花费未含院前费用,DRG返款差额,统筹外金额,实际住院天数,西药费金额,成药费金额,草药费金额,治疗费金额,检查费金额,化验费金额,放射费金额,手术费金额,麻醉费金额,卫生材料费金额,手术材料费金额,护理费金额,输血费金额,诊查费金额,床位费金额,其他,结算分类名称,支付方式,DRG分组编码,DRG分组名称,分组日志,主要诊断编码,主要诊断名称,其他诊断编码1,其他诊断名称1,手术及操作编码1,手术及操作名称1,是否转科患者,医疗组长,住院医师,主治医师,责任护士,质控护士,第一术者,手术级别,结算日期,结算单据号,总费用,垫付金额,DRG返款金额,结付率,其他诊断名称2,其他诊断编码2,其他诊断名称3,其他诊断编码3,其他诊断名称4,其他诊断编码4,其他诊断名称5,其他诊断编码5,其他诊断名称6,其他诊断编码6,其他诊断名称7,其他诊断编码7,其他诊断名称8,其他诊断编码8,其他诊断名称9,其他诊断编码9,其他诊断名称10,其他诊断编码10,其他诊断名称11,其他诊断编码11,其他诊断名称12,其他诊断编码12,其他诊断名称13,其他诊断编码13,其他诊断名称14,其他诊断编码14,其他诊断名称15,其他诊断编码15,手术及操作名称2,手术及操作编码2,手术及操作名称3,手术及操作编码3,手术及操作名称4,手术及操作编码4,手术及操作名称5,手术及操作编码5,手术及操作名称6,手术及操作编码6,手术及操作名称7,手术及操作编码7,手术及操作名称8,手术及操作编码8,门急诊费用,院外费用".split(",")
actual_columns = list(df.columns)
if actual_columns[0].startswith("Unnamed") or actual_columns[0] == '':
df.drop(columns=actual_columns[0], inplace=True)
actual_columns = actual_columns[1:]
if expected_columns != actual_columns:
logger.error("列名校验失败!请检查 Excel 格式。")
exit(0)
settle_date = df['结算日期'].iloc[0] if df['结算日期'].nunique() == 1 else None
if not settle_date:
logger.error("【结算日期】列包含多种日期或为空,程序停止。")
exit(0)
logger.info(f"Excel 校验通过,当前结算日期: {settle_date},总行数: {len(df)}")
except Exception as e:
logger.error(f"读取阶段发生错误: {e}")
exit(1)
# ==========================================
# 4. 初始化连接池
# ==========================================
try:
pool = cx_Oracle.SessionPool(
user=db_config['user'],
password=db_config['password'],
dsn=db_config['dsn'],
min=db_config['min_conn'],
max=db_config['max_conn'],
increment=db_config['increment'],
getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT,
threaded=True
)
logger.info(f"数据库连接池初始化成功 (max_conn={db_config['max_conn']})")
except Exception as e:
logger.error(f"连接池创建失败: {e}")
exit(1)
# ==========================================
# 5. 查询逻辑封装
# ==========================================
def process_row(index, row_data):
inPatientNo = row_data['病案号'].split('-', -1)[0]
conn = None
try:
conn = pool.acquire()
cursor = conn.cursor()
sql = f"""
SELECT
fo.inpatient_no,
fo.name,
NULL,
NULL,
(
SELECT DECODE(
t.branch_code,
'99', '总院',
'01', '脑科',
'02', '东昌府'
)
FROM {DEPARTMENT_TABLE} t
WHERE t.dept_code = fo.dept_code
) AS 院区,
fo.dept_name AS 出院科室,
(
SELECT NVL(e.mjzfy, 0)
FROM {INMAININFO_EXT_TABLE} e
WHERE e.inpatient_no = fo.inpatient_no
) AS 院前三天费用,
(
SELECT NVL(x.tot_cost + x.balance_cost, 0)
FROM {INMAININFO_TABLE} x
WHERE x.inpatient_no = fo.inpatient_no
) AS 总花费,
NVL(
(
SELECT DECODE(
TRUNC(x.out_date) - (
SELECT TRUNC(MAX(r.oper_date))
FROM {SHIFTDATA_TABLE} r
WHERE r.clinic_no = x.inpatient_no
AND r.shift_type = 'K'
),
0,
1,
TRUNC(x.out_date) - (
SELECT TRUNC(MAX(r.oper_date))
FROM {SHIFTDATA_TABLE} r
WHERE r.clinic_no = x.inpatient_no
AND r.shift_type = 'K'
)
)
FROM {INMAININFO_TABLE} x
WHERE x.inpatient_no = fo.inpatient_no
),
0
) AS 实际天数,
(
SELECT NVL(SUM(z.tot_cost), 0)
FROM {FEEINFO_TABLE} z
WHERE z.inpatient_no = fo.inpatient_no
AND z.fee_code = '001'
) AS 西药,
(
SELECT NVL(SUM(z.tot_cost), 0)
FROM {FEEINFO_TABLE} z
WHERE z.inpatient_no = fo.inpatient_no
AND z.fee_code = '002'
) AS 成药,
(
SELECT NVL(SUM(z.tot_cost), 0)
FROM {FEEINFO_TABLE} z
WHERE z.inpatient_no = fo.inpatient_no
AND z.fee_code = '003'
) AS 草药,
(
SELECT NVL(SUM(z.tot_cost), 0)
FROM {FEEINFO_TABLE} z
WHERE z.inpatient_no = fo.inpatient_no
AND (
z.fee_code IN (
SELECT f.fee_code
FROM {FEECODESTAT_TABLE} f
WHERE f.fee_stat_cate = '05'
AND f.report_code = 'ZY01'
)
OR z.fee_code IN ('076','062','075')
)
) AS 治疗,
(
SELECT NVL(SUM(z.tot_cost), 0)
FROM {FEEINFO_TABLE} z
WHERE z.inpatient_no = fo.inpatient_no
AND z.fee_code IN (
SELECT f.fee_code
FROM {FEECODESTAT_TABLE} f
WHERE f.fee_stat_cate = '04'
AND f.report_code = 'ZY01'
)
AND z.fee_code NOT IN ('076','062')
) AS 检查,
(
SELECT NVL(SUM(z.tot_cost), 0)
FROM {FEEINFO_TABLE} z
WHERE z.inpatient_no = fo.inpatient_no
AND z.fee_code IN (
SELECT f.fee_code
FROM {FEECODESTAT_TABLE} f
WHERE f.fee_stat_cate = '08'
AND f.report_code = 'ZY01'
)
) AS 化验,
(
SELECT NVL(SUM(z.tot_cost), 0)
FROM {FEEINFO_TABLE} z
WHERE z.inpatient_no = fo.inpatient_no
AND z.fee_code IN (
SELECT f.fee_code
FROM {FEECODESTAT_TABLE} f
WHERE f.fee_stat_cate = '06'
AND f.report_code = 'ZY01'
)
) AS 放射,
(
SELECT NVL(SUM(z.tot_cost), 0)
FROM {FEEINFO_TABLE} z
WHERE z.inpatient_no = fo.inpatient_no
AND z.fee_code IN ('095', '093', '074', '019')
) AS 手术,
(
SELECT NVL(SUM(z.tot_cost), 0)
FROM {FEEINFO_TABLE} z
WHERE z.inpatient_no = fo.inpatient_no
AND z.fee_code = '094'
) AS 麻醉,
(
SELECT NVL(SUM(z.tot_cost), 0)
FROM {FEEINFO_TABLE} z
WHERE z.inpatient_no = fo.inpatient_no
AND z.fee_code IN ('055', '079')
) AS 卫生材料,
(
SELECT NVL(SUM(z.tot_cost), 0)
FROM {FEEINFO_TABLE} z
WHERE z.inpatient_no = fo.inpatient_no
AND z.fee_code = '104'
) AS 手术材料,
(
SELECT NVL(SUM(z.tot_cost), 0)
FROM {FEEINFO_TABLE} z
WHERE z.inpatient_no = fo.inpatient_no
AND z.fee_code = '153'
) AS 护理,
0 AS 输血,
(
SELECT NVL(SUM(z.tot_cost), 0)
FROM {FEEINFO_TABLE} z
WHERE z.inpatient_no = fo.inpatient_no
AND z.fee_code = '156'
) AS 诊查,
(
SELECT NVL(SUM(z.tot_cost), 0)
FROM {FEEINFO_TABLE} z
WHERE z.inpatient_no = fo.inpatient_no
AND z.fee_code IN (
SELECT f.fee_code
FROM {FEECODESTAT_TABLE} f
WHERE f.fee_stat_cate = '12'
AND f.report_code = 'ZY01'
)
) AS 床位,
(
SELECT NVL(SUM(z.tot_cost), 0)
FROM {FEEINFO_TABLE} z
WHERE z.inpatient_no = fo.inpatient_no
AND z.fee_code NOT IN (
SELECT f.fee_code
FROM {FEECODESTAT_TABLE} f
WHERE f.fee_stat_cate IN ('05','04','08','06','12')
AND f.report_code = 'ZY01'
)
AND z.fee_code NOT IN (
'001','002','003','095','093',
'074','019','094','075','079',
'104','153','055','156'
)
) AS 其他,
CASE
WHEN (
SELECT COUNT(1)
FROM {SHIFTDATA_TABLE} rr
WHERE rr.clinic_no = fo.inpatient_no
AND rr.shift_type = 'RI'
AND ROWNUM = 1
) > 0
THEN '转科'
ELSE ''
END AS 转科,
(
SELECT a.ylzz_doct_name
FROM {CDR_BASE_TABLE} a
WHERE a.inp_no = fo.inpatient_no
) AS 医疗组长,
(
SELECT x.house_doc_name
FROM {INMAININFO_TABLE} x
WHERE x.inpatient_no = fo.inpatient_no
) AS 住院医师,
(
SELECT x.charge_doc_name
FROM {INMAININFO_TABLE} x
WHERE x.inpatient_no = fo.inpatient_no
) AS 主治医师,
(
SELECT x.duty_nurse_name
FROM {INMAININFO_TABLE} x
WHERE x.inpatient_no = fo.inpatient_no
) AS 责任护士,
(
SELECT a.qc_nurse_name
FROM {CDR_BASE_TABLE} a
WHERE a.inp_no = fo.inpatient_no
) AS 质控护士,
(
SELECT a.ops_doct_name
FROM {CDR_OPS_TABLE} a
WHERE a.inp_no = fo.inpatient_no
AND a.happen_no = '1'
) AS 第一术者,
(
SELECT DECODE(
b.ops_level,
'1','一级手术',
'2','二级手术',
'3','三级手术',
'4','四级手术'
)
FROM {OPERATION_APPLY_TABLE} a,
{OPERATION_APPLY_OPS_TABLE} b
WHERE a.apply_no = b.apply_no
AND a.patient_id = fo.inpatient_no
AND a.is_valid = '1'
AND ROWNUM = 1
) AS 手术级别,
(
SELECT NVL(SUM(qq.own_cost),0)
FROM fin_opb_feedetail qq,
{PATIENTINFO_TABLE} cp
WHERE (
cp.card_no = (
SELECT x.card_no
FROM {INMAININFO_TABLE} x
WHERE x.inpatient_no = fo.inpatient_no
)
OR cp.idenno = (
SELECT x.idenno
FROM {INMAININFO_TABLE} x
WHERE x.inpatient_no = fo.inpatient_no
)
)
AND qq.card_no = cp.card_no
AND qq.CANCEL_FLAG ='1'
AND qq.pay_flag ='1'
AND qq.UPLOAD_AUDIT LIKE '1%'
AND qq.UPLOAD_CONFIRM='1'
AND qq.upload_auditnote = fo.inpatient_no
) AS 门急诊费用,
NVL(
(
SELECT NVL(MJZFY, 0)
FROM {INMAININFO_EXT_TABLE} aa
WHERE aa.inpatient_no = fo.inpatient_no
),
0
) -
(
SELECT NVL(SUM(qq.own_cost),0)
FROM fin_opb_feedetail qq,
{PATIENTINFO_TABLE} cp
WHERE (
cp.card_no = (
SELECT x.card_no
FROM {INMAININFO_TABLE} x
WHERE x.inpatient_no = fo.inpatient_no
)
OR cp.idenno = (
SELECT x.idenno
FROM {INMAININFO_TABLE} x
WHERE x.inpatient_no = fo.inpatient_no
)
)
AND qq.card_no = cp.card_no
AND qq.CANCEL_FLAG ='1'
AND qq.pay_flag ='1'
AND qq.UPLOAD_AUDIT LIKE '1%'
AND qq.UPLOAD_CONFIRM='1'
AND qq.upload_auditnote = fo.inpatient_no
) AS 院外费用
FROM {INMAININFO_TABLE} fo
WHERE fo.inpatient_no = :inpatient_no
"""
cursor.execute(sql, {"inpatient_no": inPatientNo})
result = cursor.fetchone()
cursor.close()
return index, result, None
except Exception as e:
return index, None, str(e)
finally:
if conn:
pool.release(conn)
# ==========================================
# 6. 多线程执行查询
# ==========================================
start_time = time.time()
logger.info(f"🚀 开始多线程并发查询 (线程数: {MAX_WORKERS})...")
processed_count = 0
total_tasks = len(df)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [
executor.submit(process_row, idx, row)
for idx, row in df.iterrows()
]
for future in as_completed(futures):
idx, res, err = future.result()
processed_count += 1
if err:
logger.error(
f"行 [{idx + 1}] 住院号 {df.at[idx, '病案号']} 查询异常: {err}"
)
continue
if not res:
logger.warning(
f"行 [{idx + 1}] 住院号 {df.at[idx, '病案号']} 在 HIS 中未找到记录"
)
continue
mapping = {
"院区": 4,
"出院科室": 5,
"院前三天检查费用V预住院七天": 6,
"总花费未含院前费用": 7,
"实际住院天数": 8,
"西药费金额": 9,
"成药费金额": 10,
"草药费金额": 11,
"治疗费金额": 12,
"检查费金额": 13,
"化验费金额": 14,
"放射费金额": 15,
"手术费金额": 16,
"麻醉费金额": 17,
"卫生材料费金额": 18,
"手术材料费金额": 19,
"护理费金额": 20,
"输血费金额": 21,
"诊查费金额": 22,
"床位费金额": 23,
"其他": 24,
"是否转科患者": 25,
"医疗组长": 26,
"住院医师": 27,
"主治医师": 28,
"责任护士": 29,
"质控护士": 30,
"第一术者": 31,
"手术级别": 32,
"门急诊费用": 33,
"院外费用": 34
}
for col, res_idx in mapping.items():
df.at[idx, col] = res[res_idx]
try:
f_tot = float(df.at[idx, '总费用']) if df.at[idx, '总费用'] else 0.0
f_tc = float(df.at[idx, '垫付金额']) if df.at[idx, '垫付金额'] else 0.0
f_drg = float(df.at[idx, 'DRG返款金额']) if df.at[idx, 'DRG返款金额'] else 0.0
df.at[idx, "DRG返款差额"] = round(f_drg - f_tc, 2)
df.at[idx, "统筹外金额"] = round(f_tot - f_tc, 2)
except:
pass
if processed_count % max(1, total_tasks // 10) == 0:
logger.info(
f"进度: {processed_count}/{total_tasks} "
f"({processed_count / total_tasks * 100:.1f}%)"
)
query_duration = time.time() - start_time
logger.info(f"✅ 查询阶段结束,耗时: {query_duration:.2f} 秒")
# ==========================================
# 7. 导出 Excel
# ==========================================
output_file = excel_file.replace(
'.xlsx',
f'_补全后_{datetime.now().strftime("%H%M%S")}.xlsx'
)
df.to_excel(output_file, index=False)
logger.info(f"📁 本地 Excel 已保存: {output_file}")
# ==========================================
# 8. 数据库插入
# ==========================================
main_conn = None
try:
main_conn = cx_Oracle.connect(
db_config['user'],
db_config['password'],
db_config['dsn']
)
main_cursor = main_conn.cursor()
# ==========================================
# 8.1 备份
# ==========================================
bak_name = f"DRG_RESULT_BAK_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
main_cursor.execute(
f"CREATE TABLE {bak_name} AS SELECT * FROM {TARGET_TABLE}"
)
logger.info(f"📦 备份表已创建: {bak_name}")
# ==========================================
# 8.2 批量插入
# ==========================================
df = df.fillna('')
target_fields = """
custom,num,extend1,extend2,extend3,extend4,
extend5,extend6,extend7,extend11,extend12,
extend10,extend13,extend14,extend15
""".replace("\n", "")
placeholders = ', '.join(
[f':{i + 1}' for i in range(df.shape[1])]
)
insert_sql = f"""
INSERT INTO {TARGET_TABLE}
({target_fields})
VALUES
({placeholders})
"""
rows = df.values.tolist()
batch_size = 1000
for i in range(0, len(rows), batch_size):
main_cursor.executemany(
insert_sql,
rows[i:i + batch_size]
)
main_conn.commit()
logger.info(f"🚀 数据入库成功,共 {len(rows)} 条记录")
# ==========================================
# 8.3 后续 SQL
# ==========================================
logger.info("🛠️ 正在执行 SQL 后续关联更新语句...")
sql_statements = [
f"""
update {TARGET_TABLE} q
set q.extend56 = q.extend4
where q.extend47 = :settle_date
""",
f"""
UPDATE {TARGET_TABLE} j
SET j.extend51 = '0.0000'
WHERE j.extend51 = '0'
AND j.extend47 = :settle_date
""",
f"""
UPDATE {TARGET_TABLE} j
SET j.extend53 = '0.00%'
WHERE j.extend53 = '0'
and j.extend47 = :settle_date
""",
f"""
UPDATE {TARGET_TABLE} j
SET (
extend107,
extend108,
extend109
) = (
SELECT
NVL(SUM(
CASE
WHEN s.ops_level_code = '4'
THEN 1
ELSE 0
END
), 0),
NVL(SUM(
CASE
WHEN s.ops_level_code = '3'
THEN 1
ELSE 0
END
), 0),
NVL(SUM(
CASE
WHEN s.ops_level_code NOT IN ('3','4')
THEN 1
ELSE 0
END
), 0)
FROM {MRS_OPS_TABLE} s
WHERE s.source_sys = 'EMR'
AND s.inp_no = j.custom
AND s.ops_code IN (
SELECT a.icd_code
FROM {ICD_OPERATION_TABLE} a
WHERE a.stat_code IN ('3','5')
AND a.valid_state = '1'
)
)
WHERE j.extend47 = :settle_date
""",
f"""
UPDATE {TARGET_TABLE} j
SET j.extend110 = NVL(
(
SELECT SUM(t.tot_cost)
FROM {ITEMLIST_TABLE} t
WHERE t.inpatient_no = j.custom
AND t.item_code IN (
SELECT a.item_code
FROM {UNDRUGINFO_TABLE} a
WHERE a.si_gjybhcm IN (
SELECT z.si_gjybhcm
FROM {HIGHVALUE_TABLE} z
WHERE z.ifgz = '高值'
)
)
),
0
)
WHERE j.extend47 = :settle_date
"""
]
for i, sql in enumerate(sql_statements):
main_cursor.execute(
sql,
{"settle_date": settle_date}
)
logger.info(
f" [SQL {i + 1}/{len(sql_statements)}] 执行完毕"
)
main_conn.commit()
logger.info("🎉 所有 SQL 关联更新已成功提交!")
except Exception as e:
logger.error(f"❌ 数据库操作阶段失败: {e}")
if main_conn:
main_conn.rollback()
finally:
if main_conn:
main_conn.close()
pool.close()
logger.info("🏁 程序运行结束,已释放所有资源。")
五、这个工具最重要的不是“能跑”,而是“可控”
医院里的数据工具,最怕三件事:跑错、跑慢、跑完不知道哪里错。
所以这个脚本里专门做了这些控制:
① 表头校验,防止 Excel 模板变化
② 结算日期校验,防止一张表里混多个日期
③ 查询异常单行记录日志,不影响整体跑数
④ 每 10% 输出一次进度
⑤ 入库前自动备份原表
⑥ 出错自动 rollback,避免半截数据污染
六、真实场景里的优化点
这个工具已经能解决大部分导入问题,但如果数据量继续变大,还可以继续优化。
优化方向一:SQL 参数化
当前根据病案号拼 SQL,生产环境建议改成绑定变量,减少 SQL 注入风险,也方便 Oracle 复用执行计划。
优化方向二:减少重复子查询
费用分类可以先按 inpatient_no 聚合,再 join 主表,避免每个字段都重复扫描费用明细。
优化方向三:失败记录单独落库
未查到 HIS 记录、SQL 异常、字段转换失败的数据,可以单独保存,方便补跑。
优化方向四:做成可视化小工具
后续可以封装成一个简单页面或桌面程序,让业务人员自己上传 Excel、查看进度、下载结果。
七、信息科程序猿的感受
很多时候,医院信息科写的并不是“高大上系统”,而是一个又一个解决实际问题的小工具。
它们可能没有漂亮的界面,也没有复杂的架构,但它们能让同事少加班,让数据少出错,让业务流程更顺一点。
这类工具的价值,不在于代码有多炫,而在于它真正跑在医院业务现场,解决了真实问题。
写在最后
DRG 数据分析不是简单导个表,背后牵涉 HIS、医保、病案、费用、科室、人员、手术、耗材等多个系统的数据质量。
对医院信息科来说,真正有用的工具,一定是稳定、可追踪、可回滚、可复核的。
HIS开发 | 医保接口 | Oracle | 踩坑记录 | 效率工具