task2_process_goods.py 9.24 KB
# task2_process_goods.py
import pandas as pd
from db_service import create_db_connection
import argparse
import os

def clean_text_field(text):
    if pd.isna(text):
        return ''
    # 移除换行符、回车符,并替换其他可能导致CSV格式问题的字符
    return str(text).replace('\r', ' ').replace('\n', ' ').replace('"', '""').strip()

# Parse command line arguments
parser = argparse.ArgumentParser(description='Process goods data with configurable year limit')
parser.add_argument('--years', type=int, default=3, help='Number of years to look back (default: 3)')
args = parser.parse_args()

# 数据库连接信息 - datacenter
host = 'selectdb-cn-wuf3vsokg05-public.selectdbfe.rds.aliyuncs.com'
port = '9030'
database = 'datacenter'
username = 'readonly'
password = 'essa1234'

# 数据库连接信息 - filebank
filebank_host = '120.76.244.158'
filebank_port = '3325'
filebank_database = 'filebank'
filebank_username = 'PRD_M1_190311'
filebank_password = 'WTF)xdbqtW!4gwA7'

# 创建数据库连接
engine_datacenter = create_db_connection(host, port, database, username, password)
engine = create_db_connection(filebank_host, filebank_port, 'bpms', filebank_username, filebank_password)
filebank_engine = create_db_connection(filebank_host, filebank_port, filebank_database, filebank_username, filebank_password)

# 配置参数
YEARS_LIMIT = args.years  # 从命令行参数获取,默认为3
OUTPUT_PREFIX = f"{YEARS_LIMIT}years"  # 输出文件名前缀


sql_sku_multi_pic = f"""
SELECT 
    pgs.id as skuId,
    REPLACE(REPLACE(pgs.name, '\r', ''), '\n', '') as name,
    REPLACE(REPLACE(pgs.name_pinyin, '\r', ''), '\n', '') as name_pinyin,
    pgs.create_time as create_time,
    REPLACE(REPLACE(ifnull(pgsoel.operate_name,ifnull(pgsl.name,pgsl_en.name)), '\r', ''), '\n', '') AS ruSkuName,
    REPLACE(REPLACE(pgl.name, '\r', ''), '\n', '') as enSpuName,
    REPLACE(REPLACE(pc.`name`, '\r', ''), '\n', '') as categoryName,
    REPLACE(REPLACE(ss.`name`, '\r', ''), '\n', '') as supplierName,
    REPLACE(REPLACE(sbl.`name`, '\r', ''), '\n', '') as brandName,
    IFNULL(pic_sku.file_id,pic_spu.file_id) as file_id,
    case when pic_sku.file_id is not null then concat(spu_pic_list.file_id,',',pic_sku.file_id) else spu_pic_list.file_id end as file_ids,
    DATEDIFF(CURDATE(), pgs.last_update_time) AS days_since_last_update
FROM prd_goods_sku as pgs
INNER JOIN prd_goods as pg ON pgs.goods_id=pg.id
INNER JOIN sup_supplier as ss ON pg.supplier_id=ss.id
INNER JOIN prd_category as pc ON pg.category_id=pc.id
LEFT JOIN prd_goods_lang pgl ON pgl.goods_id = pg.id AND pgl.lang = 'en'
LEFT JOIN prd_goods_sku_operate_ext_lang pgsoel ON pgsoel.prd_goods_sku_id = pgs.id AND pgsoel.lang = 'ru'
LEFT JOIN prd_goods_sku_lang pgsl ON pgsl.goods_sku_id = pgs.id AND pgsl.is_delete = 0 AND pgsl.lang = 'ru'
LEFT JOIN prd_goods_sku_lang pgsl_en ON pgsl_en.goods_sku_id = pgs.id AND pgsl_en.is_delete = 0 AND pgsl_en.lang = 'en'
LEFT JOIN sup_brand_lang as sbl ON pg.brand_id=sbl.brand_id AND sbl.is_delete=0 AND sbl.lang='ru'
LEFT JOIN prd_pic as pic_sku ON pic_sku.sku_id=pgs.id AND pic_sku.spu_id=pgs.goods_id AND pic_sku.type='m' AND pic_sku.is_delete=0
LEFT JOIN prd_pic as pic_spu ON pic_spu.sku_id IS NULL AND pic_spu.spu_id=pgs.goods_id AND pic_spu.type='m' AND pic_spu.is_delete=0
left join (
        select spu_id,null as sku_id,GROUP_CONCAT(file_id) as file_id 
        from prd_pic 
        where is_delete=0 and sku_id is null
        group by spu_id
) as spu_pic_list on pgs.goods_id=spu_pic_list.spu_id
WHERE pgs.create_time >= CURDATE() - INTERVAL {YEARS_LIMIT * 365} DAY
    AND pgs.status in ('2','4','5')
    AND pgs.buyer_id is null
    AND pgs.is_delete = 0 ;
"""

sql = sql_sku_multi_pic

# 使用 pandas 读取商品数据
print("正在获取商品数据...")
goods_output_file = f'data/df_goods_debug.{OUTPUT_PREFIX}_congku.csv'
if os.path.exists(goods_output_file):
    print(f"发现已存在的商品数据文件,直接读取: {goods_output_file}")
    df_goods = pd.read_csv(goods_output_file)
else:
    print("执行商品数据SQL查询...")
    df_goods = pd.read_sql(sql, engine)
    print("df_goods columns:", df_goods.columns.tolist())

    # 清理所有文本字段
    text_columns = ['name', 'name_pinyin', 'ruSkuName', 'enSpuName', 'categoryName', 'supplierName', 'brandName']
    for col in text_columns:
        df_goods[col] = df_goods[col].apply(clean_text_field)

    # 确保file_id列被正确处理为字符串类型,避免转为CSV时变成小数
    # 处理file_id列,先将空值替换为NaN,然后安全地转换为字符串
    df_goods['file_id'] = df_goods['file_id'].fillna(pd.NA)
    # 对非NA值进行转换
    mask = df_goods['file_id'].notna()
    df_goods.loc[mask, 'file_id'] = df_goods.loc[mask, 'file_id'].astype(int).astype(str)
    # 将NA值替换为空字符串
    df_goods['file_id'] = df_goods['file_id'].fillna('')

    df_goods.to_csv(goods_output_file, index=False)
    print(f"商品数据已保存到 '{goods_output_file}'")

# 将数据保存为 CSV 文件
output_file_with_pic = f'data/goods_with_pic.{OUTPUT_PREFIX}_congku.sku_multi_pic.csv'
if os.path.exists(output_file_with_pic):
    print(f"发现已存在的带图片商品数据文件,直接读取: {output_file_with_pic}")
    df_goods = pd.read_csv(output_file_with_pic)
else:
    # 从filebank数据库获取所有图片URL
    print("正在获取图片URL数据...")
    images_output_file = f'data/df_images_debug.{OUTPUT_PREFIX}_congku.sku_multi_pic.csv'
    if os.path.exists(images_output_file):
        print(f"发现已存在的图片数据文件,直接读取: {images_output_file}")
        df_images = pd.read_csv(images_output_file)
    else:
        print("执行图片数据SQL查询...")
        filebank_sql = """
        SELECT id, CONCAT('https://oss.essa.cn/', obj_key) as imageUrl
        FROM fb_upload 
        WHERE obj_key IS NOT NULL AND obj_key != '';
        """
        df_images = pd.read_sql(filebank_sql, filebank_engine)
        print("df_images columns:", df_images.columns.tolist())
        # Convert id column to string type
        df_images['id'] = df_images['id'].astype(str)
        df_images.to_csv(images_output_file, index=False)
        print(f"图片数据已保存到 '{images_output_file}'")

    # Create a dictionary for faster lookups
    image_dict = dict(zip(df_images['id'].astype(int), df_images['imageUrl']))
    print(f"图片字典大小: {len(image_dict)}")
    print("图片字典示例:", dict(list(image_dict.items())[:3]))

    # 处理多个file_ids并获取对应的图片URL
    print("正在处理多个file_ids并获取对应的图片URL...")
    def get_image_urls(file_ids):
        if pd.isna(file_ids) or file_ids == '':
            return ''
        # Split the file_ids string and get corresponding URLs
        ids = str(file_ids).split(',')
        # 打印一些调试信息
        if len(ids) > 0:
            urls = [image_dict.get(int(id.strip()), '') for id in ids]
        else:
            urls = []
        # Filter out empty strings and join with comma
        result = ','.join(url for url in urls if url)
        return result

    # 检查file_ids列的数据
    print("\n检查file_ids列的数据:")
    print("file_ids列的前几行:")
    print(df_goods['file_ids'].head())
    print("\nfile_ids列的非空值数量:", df_goods['file_ids'].notna().sum())
    print("file_ids列的唯一值数量:", df_goods['file_ids'].nunique())

    # 添加新的列存储所有图片URL
    df_goods['image_urls'] = df_goods['file_ids'].apply(get_image_urls)

    # 检查结果
    print("\n检查image_urls列的结果:")
    print("image_urls列的非空值数量:", df_goods['image_urls'].notna().sum())
    print("image_urls列的前几行:")
    print(df_goods['image_urls'].head())

    df_goods.to_csv(output_file_with_pic, index=False, quoting=1)  # quoting=1 确保所有字段都被引号包围
    print(f"数据已成功导出到 '{output_file_with_pic}'")

# 创建倒排索引文件
print("\n正在创建倒排索引文件...")
inverted_index = {}

# 遍历每个SKU的图片URL
for _, row in df_goods.iterrows():
    sku_id = row['skuId']
    # 处理可能的NaN值
    image_urls = row['image_urls']
    if pd.isna(image_urls) or image_urls == '':
        urls = []
    else:
        urls = str(image_urls).split(',')
    
    # 将每个URL映射到对应的SKU
    for url in urls:
        if url:  # 确保URL不为空
            if url not in inverted_index:
                inverted_index[url] = []
            inverted_index[url].append(str(sku_id))

# 将倒排索引写入文件
"""
data/url_to_sku.{OUTPUT_PREFIX}_congku.sku_multi_pic.txt
data/url_to_sku.5years_congku.sku_multi_pic.txt
倒排文件,key是url(单个url),value是sku的list 逗号分割。key和value \t分割。
"""

inverted_index_file = f'data/url_to_sku.{OUTPUT_PREFIX}_congku.sku_multi_pic.txt'
with open(inverted_index_file, 'w', encoding='utf-8') as f:
    for url, sku_list in inverted_index.items():
        f.write(f"{url}\t{','.join(sku_list)}\n")

print(f"倒排索引文件已保存到 '{inverted_index_file}'")
print(f"倒排索引包含 {len(inverted_index)} 个唯一的URL")