原理和部分资料见上篇亚马逊VC(Vendor Central)模式之 EDI 820
代码等回复本期关键字
import polars as plimport duckdbpath_源数据 = r"D:\Retail_X12_820.txt"path_对照表 = r"D:\亚马逊VC的EDI各X12辅助表.xlsx"df_对照表 = (pl.read_excel(path_对照表,sheet_name='X12_820',schema_overrides={'Ref':pl.String}).with_columns(pl.col('标签').str.strip_chars(),pl.col('Ref').str.strip_chars(),pl.col('Code').str.strip_chars(),)# 这里是因为特殊情况很少,多的话也要做一个辅助表.with_columns(pl.when(pl.col('标签').is_in(['N1','DTM'])).then(pl.concat_str('Ref','Code',ignore_nulls=True)).otherwise(pl.col('Ref')).alias('匹配')).select(['匹配','Element Name','分组','排序']).unique(subset='匹配'))提取 = []with open(path_源数据,mode='r',encoding='utf-8') as f:lines = f.readlines()for line in lines:data = line.strip().replace('~','').split(r'*',1)标签 = data[0]内容 = data[1]提取.append({'标签':标签,'内容':内容})df_提取 = (pl.DataFrame(提取).with_columns(pl.col('内容').str.split(r'*',literal=True).alias('split')))upper_bound = df_提取.select(pl.col('split').list.len().max()).item()df_提取后匹配 = (df_提取.with_columns(pl.col('split').list.to_struct(upper_bound=upper_bound)).unnest('split').unpivot(index = ['标签','内容'],variable_name='Ref',value_name='值').filter(pl.col('值').is_not_null()).with_columns(pl.col('Ref').str.split('field_').list.last().cast(pl.Int32)).with_columns(pl.col('Ref') + 1).with_columns(pl.col('Ref').cast(pl.String).str.pad_start(2,'0')).with_row_index('index').with_columns(pl.when(pl.col('标签').is_in(['N1']),pl.col('内容').str.contains_any(['PR'])).then(pl.lit('PR')).when(pl.col('标签').is_in(['N1']),pl.col('内容').str.contains_any(['PE'])).then(pl.lit('PE')).when(pl.col('标签').is_in(['DTM']),pl.col('内容').str.contains_any(['097'])).then(pl.lit('097')).when(pl.col('标签').is_in(['DTM']),pl.col('内容').str.contains_any(['003'])).then(pl.lit('003')).otherwise(None).alias('Code')).with_columns(pl.concat_str('标签','Ref','Code',ignore_nulls=True).alias('匹配')).join(other=df_对照表,left_on=['匹配'],right_on=['匹配'],how='left').sort('排序',descending=False))df_提取后匹配.write_excel('./df_提取后匹配.xlsx',hide_gridlines=True)df_提取后匹配.write_parquet('./df_提取后匹配.parquet')# 任何join的操作都要检查是否重复匹配,即使前面写了.unique(subset='匹配')# 一般来说也需要检查是否有未匹配的部分是否重复匹配 = (df_提取后匹配.group_by('index').agg(pl.col('标签').len().alias('cnt')).filter(pl.col('cnt').gt(1)).select('index').unique())if 是否重复匹配.shape[0] > 0:print('重复匹配,详情见表')(df_提取后匹配.filter(pl.col('index').is_in(是否重复匹配.to_series().to_list())).write_excel('./重复匹配.xlsx'))exit()是否存在未匹配 = (df_提取后匹配.filter(pl.col('Element Name').is_null()))if 是否存在未匹配.shape[0] > 0:print('存在未匹配,详情见表')是否存在未匹配.write_excel('./存在未匹配.xlsx')# 可选 ,入到数据库# 先在 Dbeaver 里面创建一个schema ,比如叫 EDI820 ,免得sql里面写的多path_duckdb = r"D:\VC_AMAZON.duckdb"con = duckdb.connect(path_duckdb)con.register('df_提取后匹配',df_提取后匹配)sql = r' drop table if exists EDI820.清洗后数据 ; create table EDI820.清洗后数据 as ( from df_提取后匹配 ) 'con.execute(sql)
其他同理
注:上面代码没有使用 explode() 把列表炸开变成多个行,是因为同一个标签会出现多次 ,
比如DTM反复出现,利用行数确定排序,就会出错 , 本来 DTM 只有 DTM01 和 DTM02 , 当它出现多次之后 , 就会产生 DTM03 ,DTM04等.
list.to_struct()这个函数的参数 n_field_strategy : {'first_non_null', 'max_width'}已经标记为 Deprecated and ignored 了,所以需要用其他参数
upper_bound = df_提取.select(pl.col('split').list.len().max()).item()也方便后续其他如 810 等 , 不用改这部分代码了