# task2_process_goods.py import pandas as pd from db_service import create_db_connection import argparse import os def clean_text_field(text): if pd.isna(text): return '' # 移除换行符、回车符,并替换其他可能导致CSV格式问题的字符 return str(text).replace('\r', ' ').replace('\n', ' ').replace('"', '""').strip() # Parse command line arguments parser = argparse.ArgumentParser(description='Process goods data with configurable year limit') parser.add_argument('--years', type=int, default=3, help='Number of years to look back (default: 3)') args = parser.parse_args() # 数据库连接信息 - datacenter host = 'selectdb-cn-wuf3vsokg05-public.selectdbfe.rds.aliyuncs.com' port = '9030' database = 'datacenter' username = 'readonly' password = 'essa1234' # 数据库连接信息 - filebank filebank_host = '120.76.244.158' filebank_port = '3325' filebank_database = 'filebank' filebank_username = 'PRD_M1_190311' filebank_password = 'WTF)xdbqtW!4gwA7' # 创建数据库连接 engine_datacenter = create_db_connection(host, port, database, username, password) engine = create_db_connection(filebank_host, filebank_port, 'bpms', filebank_username, filebank_password) filebank_engine = create_db_connection(filebank_host, filebank_port, filebank_database, filebank_username, filebank_password) # 配置参数 YEARS_LIMIT = args.years # 从命令行参数获取,默认为3 OUTPUT_PREFIX = f"{YEARS_LIMIT}years" # 输出文件名前缀 sql_sku_multi_pic = f""" SELECT pgs.id as skuId, REPLACE(REPLACE(pgs.name, '\r', ''), '\n', '') as name, REPLACE(REPLACE(pgs.name_pinyin, '\r', ''), '\n', '') as name_pinyin, pgs.create_time as create_time, REPLACE(REPLACE(ifnull(pgsoel.operate_name,ifnull(pgsl.name,pgsl_en.name)), '\r', ''), '\n', '') AS ruSkuName, REPLACE(REPLACE(pgl.name, '\r', ''), '\n', '') as enSpuName, REPLACE(REPLACE(pc.`name`, '\r', ''), '\n', '') as categoryName, REPLACE(REPLACE(ss.`name`, '\r', ''), '\n', '') as supplierName, REPLACE(REPLACE(sbl.`name`, '\r', ''), '\n', '') as brandName, IFNULL(pic_sku.file_id,pic_spu.file_id) as file_id, case when pic_sku.file_id is not null then concat(spu_pic_list.file_id,',',pic_sku.file_id) else spu_pic_list.file_id end as file_ids, DATEDIFF(CURDATE(), pgs.last_update_time) AS days_since_last_update FROM prd_goods_sku as pgs INNER JOIN prd_goods as pg ON pgs.goods_id=pg.id INNER JOIN sup_supplier as ss ON pg.supplier_id=ss.id INNER JOIN prd_category as pc ON pg.category_id=pc.id LEFT JOIN prd_goods_lang pgl ON pgl.goods_id = pg.id AND pgl.lang = 'en' LEFT JOIN prd_goods_sku_operate_ext_lang pgsoel ON pgsoel.prd_goods_sku_id = pgs.id AND pgsoel.lang = 'ru' LEFT JOIN prd_goods_sku_lang pgsl ON pgsl.goods_sku_id = pgs.id AND pgsl.is_delete = 0 AND pgsl.lang = 'ru' LEFT JOIN prd_goods_sku_lang pgsl_en ON pgsl_en.goods_sku_id = pgs.id AND pgsl_en.is_delete = 0 AND pgsl_en.lang = 'en' LEFT JOIN sup_brand_lang as sbl ON pg.brand_id=sbl.brand_id AND sbl.is_delete=0 AND sbl.lang='ru' LEFT JOIN prd_pic as pic_sku ON pic_sku.sku_id=pgs.id AND pic_sku.spu_id=pgs.goods_id AND pic_sku.type='m' AND pic_sku.is_delete=0 LEFT JOIN prd_pic as pic_spu ON pic_spu.sku_id IS NULL AND pic_spu.spu_id=pgs.goods_id AND pic_spu.type='m' AND pic_spu.is_delete=0 left join ( select spu_id,null as sku_id,GROUP_CONCAT(file_id) as file_id from prd_pic where is_delete=0 and sku_id is null group by spu_id ) as spu_pic_list on pgs.goods_id=spu_pic_list.spu_id WHERE pgs.create_time >= CURDATE() - INTERVAL {YEARS_LIMIT * 365} DAY AND pgs.status in ('2','4','5') AND pgs.buyer_id is null AND pgs.is_delete = 0 ; """ sql = sql_sku_multi_pic # 使用 pandas 读取商品数据 print("正在获取商品数据...") goods_output_file = f'data/df_goods_debug.{OUTPUT_PREFIX}_congku.csv' if os.path.exists(goods_output_file): print(f"发现已存在的商品数据文件,直接读取: {goods_output_file}") df_goods = pd.read_csv(goods_output_file) else: print("执行商品数据SQL查询...") df_goods = pd.read_sql(sql, engine) print("df_goods columns:", df_goods.columns.tolist()) # 清理所有文本字段 text_columns = ['name', 'name_pinyin', 'ruSkuName', 'enSpuName', 'categoryName', 'supplierName', 'brandName'] for col in text_columns: df_goods[col] = df_goods[col].apply(clean_text_field) # 确保file_id列被正确处理为字符串类型,避免转为CSV时变成小数 # 处理file_id列,先将空值替换为NaN,然后安全地转换为字符串 df_goods['file_id'] = df_goods['file_id'].fillna(pd.NA) # 对非NA值进行转换 mask = df_goods['file_id'].notna() df_goods.loc[mask, 'file_id'] = df_goods.loc[mask, 'file_id'].astype(int).astype(str) # 将NA值替换为空字符串 df_goods['file_id'] = df_goods['file_id'].fillna('') df_goods.to_csv(goods_output_file, index=False) print(f"商品数据已保存到 '{goods_output_file}'") # 将数据保存为 CSV 文件 output_file_with_pic = f'data/goods_with_pic.{OUTPUT_PREFIX}_congku.sku_multi_pic.csv' if os.path.exists(output_file_with_pic): print(f"发现已存在的带图片商品数据文件,直接读取: {output_file_with_pic}") df_goods = pd.read_csv(output_file_with_pic) else: # 从filebank数据库获取所有图片URL print("正在获取图片URL数据...") images_output_file = f'data/df_images_debug.{OUTPUT_PREFIX}_congku.sku_multi_pic.csv' if os.path.exists(images_output_file): print(f"发现已存在的图片数据文件,直接读取: {images_output_file}") df_images = pd.read_csv(images_output_file) else: print("执行图片数据SQL查询...") filebank_sql = """ SELECT id, CONCAT('https://oss.essa.cn/', obj_key) as imageUrl FROM fb_upload WHERE obj_key IS NOT NULL AND obj_key != ''; """ df_images = pd.read_sql(filebank_sql, filebank_engine) print("df_images columns:", df_images.columns.tolist()) # Convert id column to string type df_images['id'] = df_images['id'].astype(str) df_images.to_csv(images_output_file, index=False) print(f"图片数据已保存到 '{images_output_file}'") # Create a dictionary for faster lookups image_dict = dict(zip(df_images['id'].astype(int), df_images['imageUrl'])) print(f"图片字典大小: {len(image_dict)}") print("图片字典示例:", dict(list(image_dict.items())[:3])) # 处理多个file_ids并获取对应的图片URL print("正在处理多个file_ids并获取对应的图片URL...") def get_image_urls(file_ids): if pd.isna(file_ids) or file_ids == '': return '' # Split the file_ids string and get corresponding URLs ids = str(file_ids).split(',') # 打印一些调试信息 if len(ids) > 0: urls = [image_dict.get(int(id.strip()), '') for id in ids] else: urls = [] # Filter out empty strings and join with comma result = ','.join(url for url in urls if url) return result # 检查file_ids列的数据 print("\n检查file_ids列的数据:") print("file_ids列的前几行:") print(df_goods['file_ids'].head()) print("\nfile_ids列的非空值数量:", df_goods['file_ids'].notna().sum()) print("file_ids列的唯一值数量:", df_goods['file_ids'].nunique()) # 添加新的列存储所有图片URL df_goods['image_urls'] = df_goods['file_ids'].apply(get_image_urls) # 检查结果 print("\n检查image_urls列的结果:") print("image_urls列的非空值数量:", df_goods['image_urls'].notna().sum()) print("image_urls列的前几行:") print(df_goods['image_urls'].head()) df_goods.to_csv(output_file_with_pic, index=False, quoting=1) # quoting=1 确保所有字段都被引号包围 print(f"数据已成功导出到 '{output_file_with_pic}'") # 创建倒排索引文件 print("\n正在创建倒排索引文件...") inverted_index = {} # 遍历每个SKU的图片URL for _, row in df_goods.iterrows(): sku_id = row['skuId'] # 处理可能的NaN值 image_urls = row['image_urls'] if pd.isna(image_urls) or image_urls == '': urls = [] else: urls = str(image_urls).split(',') # 将每个URL映射到对应的SKU for url in urls: if url: # 确保URL不为空 if url not in inverted_index: inverted_index[url] = [] inverted_index[url].append(str(sku_id)) # 将倒排索引写入文件 """ data/url_to_sku.{OUTPUT_PREFIX}_congku.sku_multi_pic.txt data/url_to_sku.5years_congku.sku_multi_pic.txt 倒排文件,key是url(单个url),value是sku的list 逗号分割。key和value \t分割。 """ inverted_index_file = f'data/url_to_sku.{OUTPUT_PREFIX}_congku.sku_multi_pic.txt' with open(inverted_index_file, 'w', encoding='utf-8') as f: for url, sku_list in inverted_index.items(): f.write(f"{url}\t{','.join(sku_list)}\n") print(f"倒排索引文件已保存到 '{inverted_index_file}'") print(f"倒排索引包含 {len(inverted_index)} 个唯一的URL")