Blame view

data/customer1/task2_process_goods.py 9.24 KB
be52af70   tangwang   first commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
  # task2_process_goods.py

  import pandas as pd

  from db_service import create_db_connection

  import argparse

  import os

  

  def clean_text_field(text):

      if pd.isna(text):

          return ''

      # 移除换行符、回车符,并替换其他可能导致CSV格式问题的字符

      return str(text).replace('\r', ' ').replace('\n', ' ').replace('"', '""').strip()

  

  # Parse command line arguments

  parser = argparse.ArgumentParser(description='Process goods data with configurable year limit')

  parser.add_argument('--years', type=int, default=3, help='Number of years to look back (default: 3)')

  args = parser.parse_args()

  

  # 数据库连接信息 - datacenter

  host = 'selectdb-cn-wuf3vsokg05-public.selectdbfe.rds.aliyuncs.com'

  port = '9030'

  database = 'datacenter'

  username = 'readonly'

  password = 'essa1234'

  

  # 数据库连接信息 - filebank

  filebank_host = '120.76.244.158'

  filebank_port = '3325'

  filebank_database = 'filebank'

  filebank_username = 'PRD_M1_190311'

  filebank_password = 'WTF)xdbqtW!4gwA7'

  

  # 创建数据库连接

  engine_datacenter = create_db_connection(host, port, database, username, password)

  engine = create_db_connection(filebank_host, filebank_port, 'bpms', filebank_username, filebank_password)

  filebank_engine = create_db_connection(filebank_host, filebank_port, filebank_database, filebank_username, filebank_password)

  

  # 配置参数

  YEARS_LIMIT = args.years  # 从命令行参数获取,默认为3

  OUTPUT_PREFIX = f"{YEARS_LIMIT}years"  # 输出文件名前缀

  

  

  sql_sku_multi_pic = f"""

  SELECT 

      pgs.id as skuId,

      REPLACE(REPLACE(pgs.name, '\r', ''), '\n', '') as name,

      REPLACE(REPLACE(pgs.name_pinyin, '\r', ''), '\n', '') as name_pinyin,

      pgs.create_time as create_time,

      REPLACE(REPLACE(ifnull(pgsoel.operate_name,ifnull(pgsl.name,pgsl_en.name)), '\r', ''), '\n', '') AS ruSkuName,

      REPLACE(REPLACE(pgl.name, '\r', ''), '\n', '') as enSpuName,

      REPLACE(REPLACE(pc.`name`, '\r', ''), '\n', '') as categoryName,

      REPLACE(REPLACE(ss.`name`, '\r', ''), '\n', '') as supplierName,

      REPLACE(REPLACE(sbl.`name`, '\r', ''), '\n', '') as brandName,

      IFNULL(pic_sku.file_id,pic_spu.file_id) as file_id,

      case when pic_sku.file_id is not null then concat(spu_pic_list.file_id,',',pic_sku.file_id) else spu_pic_list.file_id end as file_ids,

      DATEDIFF(CURDATE(), pgs.last_update_time) AS days_since_last_update

  FROM prd_goods_sku as pgs

  INNER JOIN prd_goods as pg ON pgs.goods_id=pg.id

  INNER JOIN sup_supplier as ss ON pg.supplier_id=ss.id

  INNER JOIN prd_category as pc ON pg.category_id=pc.id

  LEFT JOIN prd_goods_lang pgl ON pgl.goods_id = pg.id AND pgl.lang = 'en'

  LEFT JOIN prd_goods_sku_operate_ext_lang pgsoel ON pgsoel.prd_goods_sku_id = pgs.id AND pgsoel.lang = 'ru'

  LEFT JOIN prd_goods_sku_lang pgsl ON pgsl.goods_sku_id = pgs.id AND pgsl.is_delete = 0 AND pgsl.lang = 'ru'

  LEFT JOIN prd_goods_sku_lang pgsl_en ON pgsl_en.goods_sku_id = pgs.id AND pgsl_en.is_delete = 0 AND pgsl_en.lang = 'en'

  LEFT JOIN sup_brand_lang as sbl ON pg.brand_id=sbl.brand_id AND sbl.is_delete=0 AND sbl.lang='ru'

  LEFT JOIN prd_pic as pic_sku ON pic_sku.sku_id=pgs.id AND pic_sku.spu_id=pgs.goods_id AND pic_sku.type='m' AND pic_sku.is_delete=0

  LEFT JOIN prd_pic as pic_spu ON pic_spu.sku_id IS NULL AND pic_spu.spu_id=pgs.goods_id AND pic_spu.type='m' AND pic_spu.is_delete=0

  left join (

          select spu_id,null as sku_id,GROUP_CONCAT(file_id) as file_id 

          from prd_pic 

          where is_delete=0 and sku_id is null

          group by spu_id

  ) as spu_pic_list on pgs.goods_id=spu_pic_list.spu_id

  WHERE pgs.create_time >= CURDATE() - INTERVAL {YEARS_LIMIT * 365} DAY

      AND pgs.status in ('2','4','5')

      AND pgs.buyer_id is null

      AND pgs.is_delete = 0 ;

  """

  

  sql = sql_sku_multi_pic

  

  # 使用 pandas 读取商品数据

  print("正在获取商品数据...")

  goods_output_file = f'data/df_goods_debug.{OUTPUT_PREFIX}_congku.csv'

  if os.path.exists(goods_output_file):

      print(f"发现已存在的商品数据文件,直接读取: {goods_output_file}")

      df_goods = pd.read_csv(goods_output_file)

  else:

      print("执行商品数据SQL查询...")

      df_goods = pd.read_sql(sql, engine)

      print("df_goods columns:", df_goods.columns.tolist())

  

      # 清理所有文本字段

      text_columns = ['name', 'name_pinyin', 'ruSkuName', 'enSpuName', 'categoryName', 'supplierName', 'brandName']

      for col in text_columns:

          df_goods[col] = df_goods[col].apply(clean_text_field)

  

      # 确保file_id列被正确处理为字符串类型,避免转为CSV时变成小数

      # 处理file_id列,先将空值替换为NaN,然后安全地转换为字符串

      df_goods['file_id'] = df_goods['file_id'].fillna(pd.NA)

      # 对非NA值进行转换

      mask = df_goods['file_id'].notna()

      df_goods.loc[mask, 'file_id'] = df_goods.loc[mask, 'file_id'].astype(int).astype(str)

      # 将NA值替换为空字符串

      df_goods['file_id'] = df_goods['file_id'].fillna('')

  

      df_goods.to_csv(goods_output_file, index=False)

      print(f"商品数据已保存到 '{goods_output_file}'")

  

  # 将数据保存为 CSV 文件

  output_file_with_pic = f'data/goods_with_pic.{OUTPUT_PREFIX}_congku.sku_multi_pic.csv'

  if os.path.exists(output_file_with_pic):

      print(f"发现已存在的带图片商品数据文件,直接读取: {output_file_with_pic}")

      df_goods = pd.read_csv(output_file_with_pic)

  else:

      # 从filebank数据库获取所有图片URL

      print("正在获取图片URL数据...")

      images_output_file = f'data/df_images_debug.{OUTPUT_PREFIX}_congku.sku_multi_pic.csv'

      if os.path.exists(images_output_file):

          print(f"发现已存在的图片数据文件,直接读取: {images_output_file}")

          df_images = pd.read_csv(images_output_file)

      else:

          print("执行图片数据SQL查询...")

          filebank_sql = """

          SELECT id, CONCAT('https://oss.essa.cn/', obj_key) as imageUrl

          FROM fb_upload 

          WHERE obj_key IS NOT NULL AND obj_key != '';

          """

          df_images = pd.read_sql(filebank_sql, filebank_engine)

          print("df_images columns:", df_images.columns.tolist())

          # Convert id column to string type

          df_images['id'] = df_images['id'].astype(str)

          df_images.to_csv(images_output_file, index=False)

          print(f"图片数据已保存到 '{images_output_file}'")

  

      # Create a dictionary for faster lookups

      image_dict = dict(zip(df_images['id'].astype(int), df_images['imageUrl']))

      print(f"图片字典大小: {len(image_dict)}")

      print("图片字典示例:", dict(list(image_dict.items())[:3]))

  

      # 处理多个file_ids并获取对应的图片URL

      print("正在处理多个file_ids并获取对应的图片URL...")

      def get_image_urls(file_ids):

          if pd.isna(file_ids) or file_ids == '':

              return ''

          # Split the file_ids string and get corresponding URLs

          ids = str(file_ids).split(',')

          # 打印一些调试信息

          if len(ids) > 0:

              urls = [image_dict.get(int(id.strip()), '') for id in ids]

          else:

              urls = []

          # Filter out empty strings and join with comma

          result = ','.join(url for url in urls if url)

          return result

  

      # 检查file_ids列的数据

      print("\n检查file_ids列的数据:")

      print("file_ids列的前几行:")

      print(df_goods['file_ids'].head())

      print("\nfile_ids列的非空值数量:", df_goods['file_ids'].notna().sum())

      print("file_ids列的唯一值数量:", df_goods['file_ids'].nunique())

  

      # 添加新的列存储所有图片URL

      df_goods['image_urls'] = df_goods['file_ids'].apply(get_image_urls)

  

      # 检查结果

      print("\n检查image_urls列的结果:")

      print("image_urls列的非空值数量:", df_goods['image_urls'].notna().sum())

      print("image_urls列的前几行:")

      print(df_goods['image_urls'].head())

  

      df_goods.to_csv(output_file_with_pic, index=False, quoting=1)  # quoting=1 确保所有字段都被引号包围

      print(f"数据已成功导出到 '{output_file_with_pic}'")

  

  # 创建倒排索引文件

  print("\n正在创建倒排索引文件...")

  inverted_index = {}

  

  # 遍历每个SKU的图片URL

  for _, row in df_goods.iterrows():

      sku_id = row['skuId']

      # 处理可能的NaN值

      image_urls = row['image_urls']

      if pd.isna(image_urls) or image_urls == '':

          urls = []

      else:

          urls = str(image_urls).split(',')

      

      # 将每个URL映射到对应的SKU

      for url in urls:

          if url:  # 确保URL不为空

              if url not in inverted_index:

                  inverted_index[url] = []

              inverted_index[url].append(str(sku_id))

  

  # 将倒排索引写入文件

  """

  data/url_to_sku.{OUTPUT_PREFIX}_congku.sku_multi_pic.txt

  data/url_to_sku.5years_congku.sku_multi_pic.txt

  倒排文件,keyurl(单个url),valueskulist 逗号分割。keyvalue \t分割。

  """

  

  inverted_index_file = f'data/url_to_sku.{OUTPUT_PREFIX}_congku.sku_multi_pic.txt'

  with open(inverted_index_file, 'w', encoding='utf-8') as f:

      for url, sku_list in inverted_index.items():

          f.write(f"{url}\t{','.join(sku_list)}\n")

  

  print(f"倒排索引文件已保存到 '{inverted_index_file}'")

  print(f"倒排索引包含 {len(inverted_index)} 个唯一的URL")