#!/usr/bin/env python3 """ 将clothing数据的多个字段拆解后写入ES索引的tags和keyword.zh字段 字段说明: - category_path: 按 > 分隔 - tags: 按 , 分隔 - target_audience: 按 , 分隔 - usage_scene: 按 , 分隔 - season: 按 , 分隔 - key_attributes: 按 , 分隔 - material: 按 , 分隔 - features: 按 , 分隔 ES配置: - 索引: search_products_tenant_170 - 用户: essa - 密码: 4hOaLaf41y2VuI8y - 地址: http://localhost:9200 """ import csv import json import sys from elasticsearch import Elasticsearch from collections import defaultdict # ES配置 ES_CONFIG = { 'host': 'localhost', 'port': 9200, 'user': 'essa', 'password': '4hOaLaf41y2VuI8y', 'index': 'search_products_tenant_170' } def get_es_client(): """创建ES客户端""" return Elasticsearch( [f"http://{ES_CONFIG['host']}:{ES_CONFIG['port']}"], basic_auth=(ES_CONFIG['user'], ES_CONFIG['password']), verify_certs=False, ssl_show_warn=False, request_timeout=30, max_retries=3, retry_on_timeout=True ) def split_field(value, delimiter): """拆分字段并去重""" if not value or not value.strip(): return [] parts = [part.strip() for part in value.split(delimiter) if part.strip()] # 过滤掉无效值 invalid_values = ['-', '—', 'N/A', 'NA', 'null', 'None', '无'] parts = [part for part in parts if part not in invalid_values and len(part) > 0] # 去重并保持顺序 seen = set() result = [] for part in parts: if part not in seen: seen.add(part) result.append(part) return result def read_clothing_data(file_path): """读取clothing数据""" products = [] with open(file_path, 'r', encoding='utf-8') as f: # 读取第一行来检查是否有表头 first_line = f.readline().strip() f.seek(0) # 重置文件指针 # 检查第一行是否是数据行(以数字ID开头) if first_line.split(',')[0].isdigit(): # 没有表头,手动构建字段名 fieldnames = ['id', 'title', 'title_cn', 'category_path', 'tags', 'target_audience', 'usage_scene', 'season', 'key_attributes', 'material', 'features', 'selling_points'] reader = csv.DictReader(f, fieldnames=fieldnames) else: # 有表头,使用正常读取 reader = csv.DictReader(f) for row in reader: # 跳过空行 if not row or not row.get('id'): continue products.append(row) return products def extract_all_tags(product): """从产品中提取所有标签""" all_tags = [] # 1. Category Path (按 > 分隔) category_path = product.get('category_path', '') if category_path: category_tags = split_field(category_path, '>') all_tags.extend(category_tags) # 2. Tags (按 , 分隔) tags = product.get('tags', '') if tags: tag_items = split_field(tags, ',') all_tags.extend(tag_items) # 3. Target Audience (按 , 分隔) target_audience = product.get('target_audience', '') if target_audience: audience_items = split_field(target_audience, ',') all_tags.extend(audience_items) # 4. Usage Scene (按 , 分隔) usage_scene = product.get('usage_scene', '') if usage_scene: scene_items = split_field(usage_scene, ',') all_tags.extend(scene_items) # 5. Season (按 , 分隔) season = product.get('season', '') if season: season_items = split_field(season, ',') all_tags.extend(season_items) # 6. Key Attributes (按 , 分隔) key_attributes = product.get('key_attributes', '') if key_attributes: attribute_items = split_field(key_attributes, ',') all_tags.extend(attribute_items) # 7. Material (按 , 分隔) material = product.get('material', '') if material: material_items = split_field(material, ',') all_tags.extend(material_items) # 8. Features (按 , 分隔) features = product.get('features', '') if features: feature_items = split_field(features, ',') all_tags.extend(feature_items) # 去重 seen = set() unique_tags = [] for tag in all_tags: if tag not in seen: seen.add(tag) unique_tags.append(tag) return unique_tags def build_spu_id_mapping(file_path='output_logs/products_analyzed.csv'): """从完整的products_analyzed.csv构建spu_id映射 Returns: dict: {product_id: spu_id} """ mapping = {} try: with open(file_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: product_id = row.get('id', '').strip() if product_id: try: # 直接使用product_id作为spu_id mapping[product_id] = int(product_id) except ValueError: continue print(f"✅ 从 {file_path} 构建了 {len(mapping)} 个ID映射") return mapping except FileNotFoundError: print(f"⚠️ 警告: 找不到文件 {file_path},将直接使用clothing文件的ID") return {} def update_document_by_spu_id(es, spu_id, tags): """通过spu_id更新文档""" # 首先查询文档ID query = { "size": 1, "_source": False, "query": { "bool": { "filter": [ {"term": {"spu_id": spu_id}}, {"term": {"tenant_id": "170"}} ] } } } try: response = es.search(index=ES_CONFIG['index'], body=query) if response['hits']['total']['value'] > 0: doc_id = response['hits']['hits'][0]['_id'] # 更新文档 update_doc = { "doc": { "tags": tags, "keyword.zh": tags } } es.update(index=ES_CONFIG['index'], id=doc_id, body=update_doc) return True, doc_id else: return False, None except Exception as e: print(f"❌ 更新失败 spu_id={spu_id}: {e}") return False, None def update_elasticsearch(products, spu_id_mapping=None): """批量更新ES""" es = get_es_client() print(f"📤 准备更新 {len(products)} 条文档到ES索引: {ES_CONFIG['index']}") success_count = 0 failed_count = 0 not_found_count = 0 # 统计信息 stats = { 'total': len(products), 'success': 0, 'failed': 0, 'not_found': 0 } for i, product in enumerate(products): product_id = product.get('id', '').strip() if not product_id: continue # 提取所有标签 all_tags = extract_all_tags(product) # 确定spu_id if spu_id_mapping and product_id in spu_id_mapping: spu_id = spu_id_mapping[product_id] else: try: spu_id = int(product_id) except ValueError: print(f"⚠️ 跳过无效的product_id: {product_id}") continue # 更新文档 success, doc_id = update_document_by_spu_id(es, spu_id, all_tags) if success: stats['success'] += 1 if (stats['success'] % 100 == 0) or (stats['success'] == 1): print(f" 进度: {stats['success']}/{stats['total']} (spu_id={spu_id})") else: stats['not_found'] += 1 if stats['not_found'] <= 5: # 只打印前5个 print(f" ⚠️ 未找到文档: spu_id={spu_id}") print(f"\n✅ 更新完成统计:") print(f" 总数: {stats['total']}") print(f" 成功: {stats['success']}") print(f" 未找到: {stats['not_found']}") print(f" 失败: {stats['failed']}") return stats def verify_updates(sample_size=10): """验证更新结果""" es = get_es_client() print(f"\n🔍 验证更新结果 (随机抽查 {sample_size} 条)...") print("-" * 80) # 随机查询几条数据验证 query = { "size": sample_size, "_source": ["spu_id", "tags", "keyword.zh"], "query": { "bool": { "must": [ {"exists": {"field": "tags"}}, {"term": {"tenant_id": "170"}} ] } } } try: response = es.search(index=ES_CONFIG['index'], body=query) if response['hits']['total']['value'] == 0: print("⚠️ 没有找到任何包含tags的文档") return for hit in response['hits']['hits']: source = hit['_source'] print(f"\nSPU_ID: {source.get('spu_id', 'N/A')}") print(f"Tags数量: {len(source.get('tags', []))}") print(f"Keyword.zh数量: {len(source.get('keyword.zh', []))}") if source.get('tags'): print(f"Tags示例: {source['tags'][:5]}...") print("\n✅ 验证完成!") except Exception as e: print(f"❌ 验证失败: {e}") def test_connection(): """测试ES连接""" es = get_es_client() print("🔗 测试ES连接...") try: # 测试连接 info = es.info() print(f"✅ ES连接成功!") print(f" 版本: {info['version']['number']}") # 测试索引是否存在 if es.indices.exists(index=ES_CONFIG['index']): print(f"✅ 索引 {ES_CONFIG['index']} 存在") # 获取文档数量 count = es.count(index=ES_CONFIG['index'])['count'] print(f" 文档总数: {count:,}") else: print(f"❌ 索引 {ES_CONFIG['index']} 不存在") return False return True except Exception as e: print(f"❌ ES连接失败: {e}") return False def main(): input_file = 'output_logs/products_analyzed.csv.clothing' print("=" * 80) print("📦 将Clothing数据字段写入ES索引") print("=" * 80) print() # 0. 测试ES连接 if not test_connection(): print("\n❌ 无法连接到ES,退出") return # 1. 读取clothing数据 print(f"\n📂 读取Clothing数据: {input_file}") products = read_clothing_data(input_file) print(f"✅ 读取了 {len(products)} 条产品数据") if not products: print("❌ 没有读取到产品数据,退出") return # 2. 构建spu_id映射 print("\n🔗 构建SPU_ID映射...") spu_id_mapping = build_spu_id_mapping() # 显示示例 print(f"\n📋 数据示例 (第1条):") if products: example_product = products[0] example_tags = extract_all_tags(example_product) print(f" Product ID: {example_product.get('id')}") print(f" Category Path: {example_product.get('category_path', 'N/A')}") print(f" Tags: {example_product.get('tags', 'N/A')}") print(f" 提取的标签数量: {len(example_tags)}") print(f" 标签示例: {example_tags[:10]}") # 3. 确认 print(f"\n⚠️ 准备更新 {len(products)} 条文档到ES索引") confirm = input("是否继续? (yes/no): ").strip().lower() if confirm not in ['yes', 'y']: print("❌ 取消更新") return # 4. 执行更新 print("\n🚀 开始更新ES...") stats = update_elasticsearch(products, spu_id_mapping) # 5. 验证 if stats['success'] > 0: verify = input("\n是否验证更新结果? (yes/no): ").strip().lower() if verify in ['yes', 'y']: verify_updates() print("\n" + "=" * 80) print("✅ 处理完成!") print("=" * 80) if __name__ == '__main__': try: main() except KeyboardInterrupt: print("\n\n❌ 用户中断") sys.exit(1) except Exception as e: print(f"\n❌ 错误: {e}") import traceback traceback.print_exc() sys.exit(1)