本文主要介绍如何使用python脚本快速加载csv文件至YashanDB数据库。主要目的是解决加载csv文件存在的以下两个痛点:
通过yasboot或者yasldr导入csv文件均需要配置表的字段名,如果表比较多,字段也很多的情况下比较费力;
一些大表需要导入多个csv文件,一个个手工导入效率也比较低。
首先声明,本文主要是参考文章https://www.yashandb.com/newsinfo/7253738.html?templateId=1718516给出的python脚本import_csv_data_2.py。
由于原脚本没有添加表owner参数和计算csv文件个数错误等小问题,我做了轻微调整后,生成脚本import_csv_data_3.py,完成如下具体的实验。
准备一个csv文件
[yashan@centos78 backup]$ ll BSD_ADV_DEPT.csv -rw-r----- 1 yashan yashan 2059107 Feb 25 20:41 BSD_ADV_DEPT.csv[yashan@centos78 backup]$ wc -l BSD_ADV_DEPT.csv 14849 BSD_ADV_DEPT.csv
数据格式如下:
[yashan@centos78 backup]$ head -n 2 BSD_ADV_DEPT.csv "2540750","Y10071014","2529079","1","13244","0","0","1","1","超级管理员","2023-09-22","1","超级管理员","2023-09-22","0",,,"2540755","Y10071014","2529142","1","13244","0","0","1","1","超级管理员","2023-09-22","1","超级管理员","2023-09-22","0",,,
把这个csv文件按2000行每个文件分割,注意按行数大概分,不要按字节分,字节分割可能造成部分文件行尾被截断导致导入失败。
[yashan@centos78 BSD_ADV_DEPT]$ split -l 2000 -d -a 1 --additional-suffix=.csv ../BSD_ADV_DEPT.csv BSD_ADV_DEPT_ [yashan@centos78 BSD_ADV_DEPT]$ lltotal 2032-rw-rw-r-- 1 yashan yashan 279163 Feb 25 21:55 BSD_ADV_DEPT_0.csv-rw-rw-r-- 1 yashan yashan 276514 Feb 25 21:55 BSD_ADV_DEPT_1.csv-rw-rw-r-- 1 yashan yashan 274590 Feb 25 21:55 BSD_ADV_DEPT_2.csv-rw-rw-r-- 1 yashan yashan 279015 Feb 25 21:55 BSD_ADV_DEPT_3.csv-rw-rw-r-- 1 yashan yashan 275273 Feb 25 21:55 BSD_ADV_DEPT_4.csv-rw-rw-r-- 1 yashan yashan 275536 Feb 25 21:55 BSD_ADV_DEPT_5.csv-rw-rw-r-- 1 yashan yashan 277069 Feb 25 21:55 BSD_ADV_DEPT_6.csv-rw-rw-r-- 1 yashan yashan 121947 Feb 25 21:55 BSD_ADV_DEPT_7.csv[yashan@centos78 BSD_ADV_DEPT]$ wc -l * 2000 BSD_ADV_DEPT_0.csv 2000 BSD_ADV_DEPT_1.csv 2000 BSD_ADV_DEPT_2.csv 2000 BSD_ADV_DEPT_3.csv 2000 BSD_ADV_DEPT_4.csv 2000 BSD_ADV_DEPT_5.csv 2000 BSD_ADV_DEPT_6.csv 849 BSD_ADV_DEPT_7.csv 14849 total
脚本import_csv_data_3.py使用注意事项:
脚本会先truncate数据库中表的数据再进行导入,生产环境一定要小心使用。
数据格式必须是逗号分割,且文件以csv文件结尾,建议是一个表一个专门目录存放csv文件。
参数说明:
-d:csv所在目录
-c:库名
-u:用户名
-p:密码
-a:数据库地址和端口
-o: 表owner
-t:表名
运行python导入脚本import_csv_data_3.py
[yashan@centos78 ~]$ python3 import_csv_data_3.py -d /u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT -c yashandb -u sys -p Yashandb1234 -a 172.31.255.150:1688 -o TEST1 -t BSD_ADV_DEPT[2026-02-25 21:57:03,551] import csv data ...[2026-02-25 21:57:03,661] 提取到TEST1.BSD_ADV_DEPT表的信息[2026-02-25 21:57:03,723] 正在清空表:yasboot sql -c yashandb -u sys -p Yashandb1234 -n 1-1 --sql "truncate table TEST1.BSD_ADV_DEPT"[2026-02-25 21:57:03,780] Succeed.[2026-02-25 21:57:03,782] 正在导入表数据:yasldr sys/Yashandb1234@172.31.255.150:1688 batch_size=4032 senders=9 control_text="'LOAD DATA OPTIONS(DEGREE_OF_PARALLELISM=16,ENABLE_BULK=FALSE) INFILE '/u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_0.csv' WITH EMBEDDED FIELDS TERMINATED BY ',' INTO TABLE TEST1.BSD_ADV_DEPT(RULE_ID,HOSPITAL_ID,ADV_ITEM_ID,ADV_DEPT_TYPE,DEPT_ID,IS_FROM_PARENT,USE_RANGE,RULE_TYPE,CREATE_ID,CREATE_NAME,CREATE_TIME,MODIFY_ID,MODIFY_NAME,MODIFY_TIME,DELETE_FLAG,SORT_NO,MIN_AGE,MAX_AGE)'"1/8[2026-02-25 21:57:03,889] YashanDB Loader Enterprise Edition Release 23.4.1.109 x86_64 00e2751ad42000 rows successfully loaded.Check /u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_0.log for more info.[YASLDR] execute succeeded[2026-02-25 21:57:03,890] 正在导入表数据:yasldr sys/Yashandb1234@172.31.255.150:1688 batch_size=4032 senders=9 control_text="'LOAD DATA OPTIONS(DEGREE_OF_PARALLELISM=16,ENABLE_BULK=FALSE) INFILE '/u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_1.csv' WITH EMBEDDED FIELDS TERMINATED BY ',' INTO TABLE TEST1.BSD_ADV_DEPT(RULE_ID,HOSPITAL_ID,ADV_ITEM_ID,ADV_DEPT_TYPE,DEPT_ID,IS_FROM_PARENT,USE_RANGE,RULE_TYPE,CREATE_ID,CREATE_NAME,CREATE_TIME,MODIFY_ID,MODIFY_NAME,MODIFY_TIME,DELETE_FLAG,SORT_NO,MIN_AGE,MAX_AGE)'"2/8[2026-02-25 21:57:03,985] YashanDB Loader Enterprise Edition Release 23.4.1.109 x86_64 00e2751ad42000 rows successfully loaded.Check /u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_1.log for more info.[YASLDR] execute succeeded[2026-02-25 21:57:03,986] 正在导入表数据:yasldr sys/Yashandb1234@172.31.255.150:1688 batch_size=4032 senders=9 control_text="'LOAD DATA OPTIONS(DEGREE_OF_PARALLELISM=16,ENABLE_BULK=FALSE) INFILE '/u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_2.csv' WITH EMBEDDED FIELDS TERMINATED BY ',' INTO TABLE TEST1.BSD_ADV_DEPT(RULE_ID,HOSPITAL_ID,ADV_ITEM_ID,ADV_DEPT_TYPE,DEPT_ID,IS_FROM_PARENT,USE_RANGE,RULE_TYPE,CREATE_ID,CREATE_NAME,CREATE_TIME,MODIFY_ID,MODIFY_NAME,MODIFY_TIME,DELETE_FLAG,SORT_NO,MIN_AGE,MAX_AGE)'"3/8[2026-02-25 21:57:04,083] YashanDB Loader Enterprise Edition Release 23.4.1.109 x86_64 00e2751ad42000 rows successfully loaded.Check /u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_2.log for more info.[YASLDR] execute succeeded[2026-02-25 21:57:04,084] 正在导入表数据:yasldr sys/Yashandb1234@172.31.255.150:1688 batch_size=4032 senders=9 control_text="'LOAD DATA OPTIONS(DEGREE_OF_PARALLELISM=16,ENABLE_BULK=FALSE) INFILE '/u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_3.csv' WITH EMBEDDED FIELDS TERMINATED BY ',' INTO TABLE TEST1.BSD_ADV_DEPT(RULE_ID,HOSPITAL_ID,ADV_ITEM_ID,ADV_DEPT_TYPE,DEPT_ID,IS_FROM_PARENT,USE_RANGE,RULE_TYPE,CREATE_ID,CREATE_NAME,CREATE_TIME,MODIFY_ID,MODIFY_NAME,MODIFY_TIME,DELETE_FLAG,SORT_NO,MIN_AGE,MAX_AGE)'"4/8[2026-02-25 21:57:04,176] YashanDB Loader Enterprise Edition Release 23.4.1.109 x86_64 00e2751ad42000 rows successfully loaded.Check /u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_3.log for more info.[YASLDR] execute succeeded[2026-02-25 21:57:04,176] 正在导入表数据:yasldr sys/Yashandb1234@172.31.255.150:1688 batch_size=4032 senders=9 control_text="'LOAD DATA OPTIONS(DEGREE_OF_PARALLELISM=16,ENABLE_BULK=FALSE) INFILE '/u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_4.csv' WITH EMBEDDED FIELDS TERMINATED BY ',' INTO TABLE TEST1.BSD_ADV_DEPT(RULE_ID,HOSPITAL_ID,ADV_ITEM_ID,ADV_DEPT_TYPE,DEPT_ID,IS_FROM_PARENT,USE_RANGE,RULE_TYPE,CREATE_ID,CREATE_NAME,CREATE_TIME,MODIFY_ID,MODIFY_NAME,MODIFY_TIME,DELETE_FLAG,SORT_NO,MIN_AGE,MAX_AGE)'"5/8[2026-02-25 21:57:04,271] YashanDB Loader Enterprise Edition Release 23.4.1.109 x86_64 00e2751ad42000 rows successfully loaded.Check /u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_4.log for more info.[YASLDR] execute succeeded[2026-02-25 21:57:04,271] 正在导入表数据:yasldr sys/Yashandb1234@172.31.255.150:1688 batch_size=4032 senders=9 control_text="'LOAD DATA OPTIONS(DEGREE_OF_PARALLELISM=16,ENABLE_BULK=FALSE) INFILE '/u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_5.csv' WITH EMBEDDED FIELDS TERMINATED BY ',' INTO TABLE TEST1.BSD_ADV_DEPT(RULE_ID,HOSPITAL_ID,ADV_ITEM_ID,ADV_DEPT_TYPE,DEPT_ID,IS_FROM_PARENT,USE_RANGE,RULE_TYPE,CREATE_ID,CREATE_NAME,CREATE_TIME,MODIFY_ID,MODIFY_NAME,MODIFY_TIME,DELETE_FLAG,SORT_NO,MIN_AGE,MAX_AGE)'"6/8[2026-02-25 21:57:04,361] YashanDB Loader Enterprise Edition Release 23.4.1.109 x86_64 00e2751ad42000 rows successfully loaded.Check /u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_5.log for more info.[YASLDR] execute succeeded[2026-02-25 21:57:04,362] 正在导入表数据:yasldr sys/Yashandb1234@172.31.255.150:1688 batch_size=4032 senders=9 control_text="'LOAD DATA OPTIONS(DEGREE_OF_PARALLELISM=16,ENABLE_BULK=FALSE) INFILE '/u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_6.csv' WITH EMBEDDED FIELDS TERMINATED BY ',' INTO TABLE TEST1.BSD_ADV_DEPT(RULE_ID,HOSPITAL_ID,ADV_ITEM_ID,ADV_DEPT_TYPE,DEPT_ID,IS_FROM_PARENT,USE_RANGE,RULE_TYPE,CREATE_ID,CREATE_NAME,CREATE_TIME,MODIFY_ID,MODIFY_NAME,MODIFY_TIME,DELETE_FLAG,SORT_NO,MIN_AGE,MAX_AGE)'"7/8[2026-02-25 21:57:04,462] YashanDB Loader Enterprise Edition Release 23.4.1.109 x86_64 00e2751ad42000 rows successfully loaded.Check /u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_6.log for more info.[YASLDR] execute succeeded[2026-02-25 21:57:04,462] 正在导入表数据:yasldr sys/Yashandb1234@172.31.255.150:1688 batch_size=4032 senders=9 control_text="'LOAD DATA OPTIONS(DEGREE_OF_PARALLELISM=16,ENABLE_BULK=FALSE) INFILE '/u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_7.csv' WITH EMBEDDED FIELDS TERMINATED BY ',' INTO TABLE TEST1.BSD_ADV_DEPT(RULE_ID,HOSPITAL_ID,ADV_ITEM_ID,ADV_DEPT_TYPE,DEPT_ID,IS_FROM_PARENT,USE_RANGE,RULE_TYPE,CREATE_ID,CREATE_NAME,CREATE_TIME,MODIFY_ID,MODIFY_NAME,MODIFY_TIME,DELETE_FLAG,SORT_NO,MIN_AGE,MAX_AGE)'"8/8[2026-02-25 21:57:04,536] YashanDB Loader Enterprise Edition Release 23.4.1.109 x86_64 00e2751ad4849 rows successfully loaded.Check /u01/yashan/data/db-1-1/backup/BSD_ADV_DEPT/BSD_ADV_DEPT_7.log for more info.[YASLDR] execute succeeded
所有文件全部成功导入!
import_csv_data_3.py脚本并不是需要分割多个文件,也可以利用它快速的加载单个csv文件,只是这里为了验证多csv文件加载能力,所以专门分割文件进行实验。
最后,由于公众号无法直接插入python文件附件,这里直接给出我修改后import_csv_data_3.py的源码,仅供参考。
[yashan@centos78 ~]$ cat import_csv_data_3.py import subprocessimport argparseimport loggingimport osimport globfrom pathlib import Pathlogger = logging.getLogger('my_logger')logger.setLevel(logging.INFO)ch = logging.StreamHandler()ch.setLevel(logging.INFO)formatter = logging.Formatter('[%(asctime)s] %(message)s')ch.setFormatter(formatter)logger.addHandler(ch)logger.info("import csv data ...")def execute_command(command): try: output = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines=True) return output.stdout, output.returncode except subprocess.CalledProcessError as e: return e.outputdef pause(): wait = "" while True: wait = input("输入go继续:") if wait=="go": breakdef load_data(data_dir, cluster_name, user_name, password, cn_addr, table_owner,table_name): # data_dir = Path(data_dir) # data_dir.mkdir(parents=True, exist_ok=True) # 导出表字段名 raw_table_list, _ = execute_command(f"yasboot sql -c {cluster_name} -u {user_name} -p {password} -n 1-1 --sql \"select owner,TABLE_NAME,TABLE_TYPE from dba_tables where DATABASE_MAINTAINED='N' and owner='{table_owner}' and TABLE_NAME='{table_name}'\"") raw_table_list = raw_table_list.splitlines() table_list = [] for line in raw_table_list[3:-3]: line_split = line.split() table_list.append({"owner":line_split[0],"table_name":line_split[1],"table_type":line_split[2]}) #logger.info("提取到{0}张表的信息".format(len(table_list))) table = table_list[0] logger.info("提取到{0[owner]}.{0[table_name]}表的信息".format(table)) raw_tableinfo, _ = execute_command("yasboot sql -c {0} -u {1} -p {2} -n 1-1 --sql \"desc {3}.{4}\"".format(cluster_name, user_name, password, table["owner"], table["table_name"])) raw_tableinfo = raw_tableinfo.splitlines() col_list = [] for line in raw_tableinfo[2:-1]: line_split = line.split() col_list.append(line_split[0]) cmd = "yasboot sql -c {4} -u {2} -p {3} -n 1-1 --sql \"truncate table {0}.{1}\"".format(table["owner"], table["table_name"], user_name, password, cluster_name) logger.info("正在清空表:{0}".format(cmd)) res, _ = execute_command(cmd) logger.info(res) if "Succeed." not in res: pause() file_names = glob.glob(data_dir + "/*.csv") i = 0 for file_name in file_names: i += 1 table_data_dir = Path(data_dir) / file_name col_str = ",".join(col_list) if table["table_type"] == "LSC": cmd = "yasldr {0}/{1}@{2} batch_size=4032 senders=9 control_text=\"'LOAD DATA OPTIONS(DEGREE_OF_PARALLELISM=16,ENABLE_BULK=TRUE) INFILE '{3}' WITH EMBEDDED FIELDS TERMINATED BY ',' INTO TABLE {5}.{4}({6})'\"".format(user_name, password, cn_addr, table_data_dir, table["table_name"], table["owner"], col_str) else: cmd = "yasldr {0}/{1}@{2} batch_size=4032 senders=9 control_text=\"'LOAD DATA OPTIONS(DEGREE_OF_PARALLELISM=16,ENABLE_BULK=FALSE) INFILE '{3}' WITH EMBEDDED FIELDS TERMINATED BY ',' INTO TABLE {5}.{4}({6})'\"".format(user_name, password, cn_addr, table_data_dir, table["table_name"], table["owner"], col_str) logger.info("正在导入表数据:{0}\n{1}/{2}".format(cmd,i,len(file_names))) res, _ = execute_command(cmd) logger.info(res) if "succeeded with info" in res or "succeeded" not in res: pause()# 创建解析器parser = argparse.ArgumentParser()# 添加选项parser.add_argument("-d", "--data-dir", help="表数据保存目录", type=str, required=True)parser.add_argument("-c", "--cluster-name", help="集群名", type=str, required=True)parser.add_argument("-u", "--user-name", help="用户名", type=str, required=True)parser.add_argument("-p", "--password", help="用户密码", type=str, required=True)parser.add_argument("-a", "--cn-addr", help="连接的cn节点addr,形式为ip:port", type=str, required=True)parser.add_argument("-o", "--table-owner", help="模式名", type=str, required=True)parser.add_argument("-t", "--table-name", help="表名", type=str, required=True)# 解析命令行参数args = parser.parse_args()# 执行导入load_data(data_dir=args.data_dir, cluster_name=args.cluster_name, user_name=args.user_name, password=args.password, cn_addr=args.cn_addr,table_owner=args.table_owner.upper(), table_name=args.table_name.upper())
Keep hope!
如果喜欢我的文章,欢迎关注我的账号阿才漫谈
如果觉得我的文章对你有帮助,欢迎转发点赞并留言。
- 声明 -
本文系个人实践的技术总结,难免有考虑不周全的地方~ 如果您发现错误、有更优解法,或者有相关疑问,欢迎在评论区留言讨论,一起交流学习、共同进步!