Blame view

offline/product_understanding/feedback_es.py 12.4 KB
46f8dd12   tangwang   1. add prod under...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
  #!/usr/bin/env python3

  """

  clothing数据的多个字段拆解后写入ES索引的tagskeyword.zh字段

  

  字段说明:

  - category_path:  > 分隔

  - tags:  , 分隔

  - target_audience:  , 分隔

  - usage_scene:  , 分隔

  - season:  , 分隔

  - key_attributes:  , 分隔

  - material:  , 分隔

  - features:  , 分隔

  

  ES配置:

  - 索引: search_products_tenant_170

  - 用户: essa

  - 密码: 4hOaLaf41y2VuI8y

  - 地址: http://localhost:9200

  """

  

  import csv

  import json

  import sys

  from elasticsearch import Elasticsearch

  from collections import defaultdict

  

  # ES配置

  ES_CONFIG = {

      'host': 'localhost',

      'port': 9200,

      'user': 'essa',

      'password': '4hOaLaf41y2VuI8y',

      'index': 'search_products_tenant_170'

  }

  

  def get_es_client():

      """创建ES客户端"""

      return Elasticsearch(

          [f"http://{ES_CONFIG['host']}:{ES_CONFIG['port']}"],

          basic_auth=(ES_CONFIG['user'], ES_CONFIG['password']),

          verify_certs=False,

          ssl_show_warn=False,

          request_timeout=30,

          max_retries=3,

          retry_on_timeout=True

      )

  

  def split_field(value, delimiter):

      """拆分字段并去重"""

      if not value or not value.strip():

          return []

      parts = [part.strip() for part in value.split(delimiter) if part.strip()]

      # 过滤掉无效值

      invalid_values = ['-', '—', 'N/A', 'NA', 'null', 'None', '无']

      parts = [part for part in parts if part not in invalid_values and len(part) > 0]

  

      # 去重并保持顺序

      seen = set()

      result = []

      for part in parts:

          if part not in seen:

              seen.add(part)

              result.append(part)

      return result

  

  def read_clothing_data(file_path):

      """读取clothing数据"""

      products = []

  

      with open(file_path, 'r', encoding='utf-8') as f:

          # 读取第一行来检查是否有表头

          first_line = f.readline().strip()

          f.seek(0)  # 重置文件指针

  

          # 检查第一行是否是数据行(以数字ID开头)

          if first_line.split(',')[0].isdigit():

              # 没有表头,手动构建字段名

              fieldnames = ['id', 'title', 'title_cn', 'category_path', 'tags',

                           'target_audience', 'usage_scene', 'season',

                           'key_attributes', 'material', 'features', 'selling_points']

              reader = csv.DictReader(f, fieldnames=fieldnames)

          else:

              # 有表头,使用正常读取

              reader = csv.DictReader(f)

  

          for row in reader:

              # 跳过空行

              if not row or not row.get('id'):

                  continue

  

              products.append(row)

  

      return products

  

  def extract_all_tags(product):

      """从产品中提取所有标签"""

      all_tags = []

  

      # 1. Category Path (按 > 分隔)

      category_path = product.get('category_path', '')

      if category_path:

          category_tags = split_field(category_path, '>')

          all_tags.extend(category_tags)

  

      # 2. Tags (按 , 分隔)

      tags = product.get('tags', '')

      if tags:

          tag_items = split_field(tags, ',')

          all_tags.extend(tag_items)

  

      # 3. Target Audience (按 , 分隔)

      target_audience = product.get('target_audience', '')

      if target_audience:

          audience_items = split_field(target_audience, ',')

          all_tags.extend(audience_items)

  

      # 4. Usage Scene (按 , 分隔)

      usage_scene = product.get('usage_scene', '')

      if usage_scene:

          scene_items = split_field(usage_scene, ',')

          all_tags.extend(scene_items)

  

      # 5. Season (按 , 分隔)

      season = product.get('season', '')

      if season:

          season_items = split_field(season, ',')

          all_tags.extend(season_items)

  

      # 6. Key Attributes (按 , 分隔)

      key_attributes = product.get('key_attributes', '')

      if key_attributes:

          attribute_items = split_field(key_attributes, ',')

          all_tags.extend(attribute_items)

  

      # 7. Material (按 , 分隔)

      material = product.get('material', '')

      if material:

          material_items = split_field(material, ',')

          all_tags.extend(material_items)

  

      # 8. Features (按 , 分隔)

      features = product.get('features', '')

      if features:

          feature_items = split_field(features, ',')

          all_tags.extend(feature_items)

  

      # 去重

      seen = set()

      unique_tags = []

      for tag in all_tags:

          if tag not in seen:

              seen.add(tag)

              unique_tags.append(tag)

  

      return unique_tags

  

  def build_spu_id_mapping(file_path='output_logs/products_analyzed.csv'):

      """从完整的products_analyzed.csv构建spu_id映射

  

      Returns:

          dict: {product_id: spu_id}

      """

      mapping = {}

  

      try:

          with open(file_path, 'r', encoding='utf-8') as f:

              reader = csv.DictReader(f)

  

              for row in reader:

                  product_id = row.get('id', '').strip()

  

                  if product_id:

                      try:

                          # 直接使用product_id作为spu_id

                          mapping[product_id] = int(product_id)

                      except ValueError:

                          continue

  

          print(f"✅ 从 {file_path} 构建了 {len(mapping)} 个ID映射")

          return mapping

  

      except FileNotFoundError:

          print(f"⚠️  警告: 找不到文件 {file_path},将直接使用clothing文件的ID")

          return {}

  

  def update_document_by_spu_id(es, spu_id, tags):

      """通过spu_id更新文档"""

      # 首先查询文档ID

      query = {

          "size": 1,

          "_source": False,

          "query": {

              "bool": {

                  "filter": [

                      {"term": {"spu_id": spu_id}},

                      {"term": {"tenant_id": "170"}}

                  ]

              }

          }

      }

  

      try:

          response = es.search(index=ES_CONFIG['index'], body=query)

  

          if response['hits']['total']['value'] > 0:

              doc_id = response['hits']['hits'][0]['_id']

  

              # 更新文档

              update_doc = {

                  "doc": {

                      "tags": tags,

                      "keyword.zh": tags

                  }

              }

  

              es.update(index=ES_CONFIG['index'], id=doc_id, body=update_doc)

              return True, doc_id

          else:

              return False, None

  

      except Exception as e:

          print(f"❌ 更新失败 spu_id={spu_id}: {e}")

          return False, None

  

  def update_elasticsearch(products, spu_id_mapping=None):

      """批量更新ES"""

      es = get_es_client()

  

      print(f"📤 准备更新 {len(products)} 条文档到ES索引: {ES_CONFIG['index']}")

  

      success_count = 0

      failed_count = 0

      not_found_count = 0

  

      # 统计信息

      stats = {

          'total': len(products),

          'success': 0,

          'failed': 0,

          'not_found': 0

      }

  

      for i, product in enumerate(products):

          product_id = product.get('id', '').strip()

  

          if not product_id:

              continue

  

          # 提取所有标签

          all_tags = extract_all_tags(product)

  

          # 确定spu_id

          if spu_id_mapping and product_id in spu_id_mapping:

              spu_id = spu_id_mapping[product_id]

          else:

              try:

                  spu_id = int(product_id)

              except ValueError:

                  print(f"⚠️  跳过无效的product_id: {product_id}")

                  continue

  

          # 更新文档

          success, doc_id = update_document_by_spu_id(es, spu_id, all_tags)

  

          if success:

              stats['success'] += 1

              if (stats['success'] % 100 == 0) or (stats['success'] == 1):

                  print(f"   进度: {stats['success']}/{stats['total']} (spu_id={spu_id})")

          else:

              stats['not_found'] += 1

              if stats['not_found'] <= 5:  # 只打印前5个

                  print(f"   ⚠️  未找到文档: spu_id={spu_id}")

  

      print(f"\n✅ 更新完成统计:")

      print(f"   总数: {stats['total']}")

      print(f"   成功: {stats['success']}")

      print(f"   未找到: {stats['not_found']}")

      print(f"   失败: {stats['failed']}")

  

      return stats

  

  def verify_updates(sample_size=10):

      """验证更新结果"""

      es = get_es_client()

  

      print(f"\n🔍 验证更新结果 (随机抽查 {sample_size} 条)...")

      print("-" * 80)

  

      # 随机查询几条数据验证

      query = {

          "size": sample_size,

          "_source": ["spu_id", "tags", "keyword.zh"],

          "query": {

              "bool": {

                  "must": [

                      {"exists": {"field": "tags"}},

                      {"term": {"tenant_id": "170"}}

                  ]

              }

          }

      }

  

      try:

          response = es.search(index=ES_CONFIG['index'], body=query)

  

          if response['hits']['total']['value'] == 0:

              print("⚠️  没有找到任何包含tags的文档")

              return

  

          for hit in response['hits']['hits']:

              source = hit['_source']

              print(f"\nSPU_ID: {source.get('spu_id', 'N/A')}")

              print(f"Tags数量: {len(source.get('tags', []))}")

              print(f"Keyword.zh数量: {len(source.get('keyword.zh', []))}")

              if source.get('tags'):

                  print(f"Tags示例: {source['tags'][:5]}...")

  

          print("\n✅ 验证完成!")

  

      except Exception as e:

          print(f"❌ 验证失败: {e}")

  

  def test_connection():

      """测试ES连接"""

      es = get_es_client()

  

      print("🔗 测试ES连接...")

      try:

          # 测试连接

          info = es.info()

          print(f"✅ ES连接成功!")

          print(f"   版本: {info['version']['number']}")

  

          # 测试索引是否存在

          if es.indices.exists(index=ES_CONFIG['index']):

              print(f"✅ 索引 {ES_CONFIG['index']} 存在")

  

              # 获取文档数量

              count = es.count(index=ES_CONFIG['index'])['count']

              print(f"   文档总数: {count:,}")

          else:

              print(f"❌ 索引 {ES_CONFIG['index']} 不存在")

              return False

  

          return True

  

      except Exception as e:

          print(f"❌ ES连接失败: {e}")

          return False

  

  def main():

      input_file = 'output_logs/products_analyzed.csv.clothing'

  

      print("=" * 80)

      print("📦 将Clothing数据字段写入ES索引")

      print("=" * 80)

      print()

  

      # 0. 测试ES连接

      if not test_connection():

          print("\n❌ 无法连接到ES,退出")

          return

  

      # 1. 读取clothing数据

      print(f"\n📂 读取Clothing数据: {input_file}")

      products = read_clothing_data(input_file)

      print(f"✅ 读取了 {len(products)} 条产品数据")

  

      if not products:

          print("❌ 没有读取到产品数据,退出")

          return

  

      # 2. 构建spu_id映射

      print("\n🔗 构建SPU_ID映射...")

      spu_id_mapping = build_spu_id_mapping()

  

      # 显示示例

      print(f"\n📋 数据示例 (第1条):")

      if products:

          example_product = products[0]

          example_tags = extract_all_tags(example_product)

          print(f"   Product ID: {example_product.get('id')}")

          print(f"   Category Path: {example_product.get('category_path', 'N/A')}")

          print(f"   Tags: {example_product.get('tags', 'N/A')}")

          print(f"   提取的标签数量: {len(example_tags)}")

          print(f"   标签示例: {example_tags[:10]}")

  

      # 3. 确认

      print(f"\n⚠️  准备更新 {len(products)} 条文档到ES索引")

      confirm = input("是否继续? (yes/no): ").strip().lower()

  

      if confirm not in ['yes', 'y']:

          print("❌ 取消更新")

          return

  

      # 4. 执行更新

      print("\n🚀 开始更新ES...")

      stats = update_elasticsearch(products, spu_id_mapping)

  

      # 5. 验证

      if stats['success'] > 0:

          verify = input("\n是否验证更新结果? (yes/no): ").strip().lower()

          if verify in ['yes', 'y']:

              verify_updates()

  

      print("\n" + "=" * 80)

      print("✅ 处理完成!")

      print("=" * 80)

  

  if __name__ == '__main__':

      try:

          main()

      except KeyboardInterrupt:

          print("\n\n❌ 用户中断")

          sys.exit(1)

      except Exception as e:

          print(f"\n❌ 错误: {e}")

          import traceback

          traceback.print_exc()

          sys.exit(1)