""" SPU文档转换器 - 公共转换逻辑。 提取全量和增量索引共用的文档转换逻辑,避免代码冗余。 输出文档结构与 mappings/search_products.json 及 索引字段说明v2 一致, 供 search/searcher 与 search/es_query_builder 使用。 - 多语言字段:title, brief, description, vendor, category_path, category_name_text - 嵌套:specifications, skus;向量:title_embedding、image_embedding(可选,需提供 image_encoder) """ import pandas as pd import numpy as np import logging import re from typing import Dict, Any, Optional, List from config import ConfigLoader from indexer.product_annotator import analyze_products, SUPPORTED_LANGS logger = logging.getLogger(__name__) # Try to import translator (optional dependency) try: from query.translator import Translator TRANSLATOR_AVAILABLE = True except ImportError: TRANSLATOR_AVAILABLE = False Translator = None class SPUDocumentTransformer: """SPU文档转换器,将SPU、SKU、Option数据转换为ES文档格式。""" def __init__( self, category_id_to_name: Dict[str, str], searchable_option_dimensions: List[str], tenant_config: Optional[Dict[str, Any]] = None, translator: Optional[Any] = None, translation_prompts: Optional[Dict[str, str]] = None, encoder: Optional[Any] = None, enable_title_embedding: bool = True, image_encoder: Optional[Any] = None, enable_image_embedding: bool = False, ): """ 初始化文档转换器。 Args: category_id_to_name: 分类ID到名称的映射 searchable_option_dimensions: 可搜索的option维度列表 tenant_config: 租户配置(包含主语言和翻译配置) translator: 翻译器实例(可选,如果提供则启用翻译功能) translation_prompts: 翻译提示词配置(可选) encoder: 文本编码器实例(可选,用于生成title_embedding) enable_title_embedding: 是否启用标题向量化(默认True) image_encoder: 图片编码器实例(可选,需实现 encode_image_urls(urls) -> List[Optional[np.ndarray]]) enable_image_embedding: 是否启用图片向量化(默认False) """ self.category_id_to_name = category_id_to_name self.searchable_option_dimensions = searchable_option_dimensions self.tenant_config = tenant_config or {} self.translator = translator self.translation_prompts = translation_prompts or {} self.encoder = encoder self.enable_title_embedding = enable_title_embedding self.image_encoder = image_encoder self.enable_image_embedding = bool(enable_image_embedding and image_encoder is not None) def transform_spu_to_doc( self, tenant_id: str, spu_row: pd.Series, skus: pd.DataFrame, options: pd.DataFrame, fill_llm_attributes: bool = True, ) -> Optional[Dict[str, Any]]: """ 将单个SPU行和其SKUs转换为ES文档。 Args: tenant_id: 租户ID spu_row: SPU行数据 skus: SKU数据DataFrame options: Option数据DataFrame Returns: ES文档字典 """ doc = {} # Tenant ID (required) doc['tenant_id'] = str(tenant_id) # SPU ID spu_id = spu_row['id'] doc['spu_id'] = str(spu_id) # Validate required fields if pd.isna(spu_row.get('title')) or not str(spu_row['title']).strip(): logger.error(f"SPU {spu_id} has no title, this may cause search issues") # 获取租户配置 primary_lang = self.tenant_config.get('primary_language', 'en') # 文本字段处理(使用translator的内部逻辑自动处理多语言翻译) self._fill_text_fields(doc, spu_row, primary_lang) # 标题向量化处理(如果启用) if self.enable_title_embedding and self.encoder: self._fill_title_embedding(doc) # Tags if pd.notna(spu_row.get('tags')): tags_str = str(spu_row['tags']) doc['tags'] = [tag.strip() for tag in tags_str.split(',') if tag.strip()] # Category相关字段 self._fill_category_fields(doc, spu_row) # Option名称(从option表获取) self._fill_option_names(doc, options) # Image URL self._fill_image_url(doc, spu_row) # Image embedding(与 mappings/search_products.json 中 image_embedding 嵌套结构一致) if self.enable_image_embedding: self._fill_image_embedding(doc, spu_row, skus) # Sales (fake_sales) if pd.notna(spu_row.get('fake_sales')): try: doc['sales'] = int(spu_row['fake_sales']) except (ValueError, TypeError): doc['sales'] = 0 else: doc['sales'] = 0 # Process SKUs and build specifications skus_list, prices, compare_prices, sku_prices, sku_weights, sku_weight_units, total_inventory, specifications = \ self._process_skus(skus, options) doc['skus'] = skus_list doc['specifications'] = specifications # 提取option值(根据配置的searchable_option_dimensions) self._fill_option_values(doc, skus) # Calculate price ranges if prices: doc['min_price'] = float(min(prices)) doc['max_price'] = float(max(prices)) else: doc['min_price'] = 0.0 doc['max_price'] = 0.0 # SPU 不再读取 compare_at_price 字段;ES 的 compare_at_price 使用所有 SKU 中的最大对比价 if compare_prices: doc['compare_at_price'] = float(max(compare_prices)) else: doc['compare_at_price'] = None # SKU扁平化字段 doc['sku_prices'] = sku_prices doc['sku_weights'] = sku_weights doc['sku_weight_units'] = list(set(sku_weight_units)) # 去重 doc['total_inventory'] = total_inventory # Time fields - convert datetime to ISO format string for ES DATE type if pd.notna(spu_row.get('create_time')): create_time = spu_row['create_time'] if hasattr(create_time, 'isoformat'): doc['create_time'] = create_time.isoformat() else: doc['create_time'] = str(create_time) if pd.notna(spu_row.get('update_time')): update_time = spu_row['update_time'] if hasattr(update_time, 'isoformat'): doc['update_time'] = update_time.isoformat() else: doc['update_time'] = str(update_time) # 基于 LLM 的锚文本与语义属性(默认开启,失败时仅记录日志) # 注意:批处理场景(build-docs / bulk / incremental)应优先在外层攒批, # 再调用 fill_llm_attributes_batch(),避免逐条调用 LLM。 if fill_llm_attributes: self._fill_llm_attributes(doc, spu_row) return doc def fill_llm_attributes_batch(self, docs: List[Dict[str, Any]], spu_rows: List[pd.Series]) -> None: """ 批量调用 LLM,为一批 doc 填充: - qanchors.{lang} - semantic_attributes (lang/name/value) 设计目标: - 尽可能攒批调用 LLM; - 单次 LLM 调用最多 20 条(由 analyze_products 内部强制 cap 并自动拆批)。 """ if not docs or not spu_rows or len(docs) != len(spu_rows): return try: index_langs = self.tenant_config.get("index_languages") or ["en", "zh"] except Exception: index_langs = ["en", "zh"] llm_langs = [lang for lang in index_langs if lang in SUPPORTED_LANGS] if not llm_langs: return # 只对有 title 的 SPU 参与 LLM;其余跳过 id_to_idx: Dict[str, int] = {} products: List[Dict[str, str]] = [] for i, row in enumerate(spu_rows): raw_id = row.get("id") spu_id = "" if raw_id is None else str(raw_id).strip() title = str(row.get("title") or "").strip() if not spu_id or not title: continue id_to_idx[spu_id] = i products.append({"id": spu_id, "title": title}) if not products: return tenant_id = str(docs[0].get("tenant_id") or "").strip() or None dim_keys = [ "tags", "target_audience", "usage_scene", "season", "key_attributes", "material", "features", ] for lang in llm_langs: try: rows = analyze_products( products=products, target_lang=lang, batch_size=20, tenant_id=tenant_id, ) except Exception as e: logger.warning("LLM batch attribute fill failed (lang=%s): %s", lang, e) continue for row in rows or []: spu_id = str(row.get("id") or "").strip() if not spu_id: continue idx = id_to_idx.get(spu_id) if idx is None: continue self._apply_llm_row(docs[idx], row=row, lang=lang, dim_keys=dim_keys) def _apply_llm_row(self, doc: Dict[str, Any], row: Dict[str, Any], lang: str, dim_keys: List[str]) -> None: """将单条 LLM 输出 row 按既定结构写入 doc(不抛异常)。""" try: if row.get("error"): return semantic_list = doc.get("semantic_attributes") or [] qanchors_obj = doc.get("qanchors") or {} anchor_text = str(row.get("anchor_text") or "").strip() if anchor_text: qanchors_obj[lang] = anchor_text for name in dim_keys: raw = row.get(name) if not raw: continue parts = re.split(r"[,;|/\n\t]+", str(raw)) for part in parts: value = part.strip() if not value: continue semantic_list.append({"lang": lang, "name": name, "value": value}) if qanchors_obj: doc["qanchors"] = qanchors_obj if semantic_list: doc["semantic_attributes"] = semantic_list except Exception as e: logger.warning("Failed to apply LLM row to doc (spu_id=%s, lang=%s): %s", doc.get("spu_id"), lang, e) def _fill_text_fields( self, doc: Dict[str, Any], spu_row: pd.Series, primary_lang: str ): """ 填充文本字段(根据租户 index_languages 处理多语言翻译)。 仅写入 primary_language 及 index_languages 中配置的语言。 """ index_langs = self.tenant_config.get("index_languages") or ["en", "zh"] def _set_lang_obj(field_name: str, source_text: Optional[str], translations: Optional[Dict[str, Optional[str]]] = None): """写入多语言对象 doc[field_name] = {"zh": "...", "en": "...", ...},仅包含 index_languages。""" if not source_text or not str(source_text).strip(): return obj: Dict[str, str] = {} src = str(source_text) obj[primary_lang] = src tr = translations or {} for lang in index_langs: if lang == primary_lang: continue val = tr.get(lang) if val and str(val).strip(): obj[lang] = str(val) if obj: doc[field_name] = obj # Title if pd.notna(spu_row.get('title')): title_text = str(spu_row['title']) translations: Dict[str, Optional[str]] = {} if self.translator: prompt_zh = self.translation_prompts.get('product_title_zh') or self.translation_prompts.get('default_zh') prompt_en = self.translation_prompts.get('product_title_en') or self.translation_prompts.get('default_en') translations = self.translator.translate_for_indexing( title_text, shop_language=primary_lang, source_lang=primary_lang, prompt=prompt_zh if primary_lang == 'zh' else prompt_en, index_languages=index_langs, ) or {} _set_lang_obj("title", title_text, translations) # Brief if pd.notna(spu_row.get('brief')): brief_text = str(spu_row['brief']) translations = {} if self.translator: prompt = self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') translations = self.translator.translate_for_indexing( brief_text, shop_language=primary_lang, source_lang=primary_lang, prompt=prompt, index_languages=index_langs, ) or {} _set_lang_obj("brief", brief_text, translations) # Description if pd.notna(spu_row.get('description')): desc_text = str(spu_row['description']) translations = {} if self.translator: prompt = self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') translations = self.translator.translate_for_indexing( desc_text, shop_language=primary_lang, source_lang=primary_lang, prompt=prompt, index_languages=index_langs, ) or {} _set_lang_obj("description", desc_text, translations) # Vendor if pd.notna(spu_row.get('vendor')): vendor_text = str(spu_row['vendor']) translations = {} if self.translator: prompt = self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') translations = self.translator.translate_for_indexing( vendor_text, shop_language=primary_lang, source_lang=primary_lang, prompt=prompt, index_languages=index_langs, ) or {} _set_lang_obj("vendor", vendor_text, translations) def _fill_category_fields(self, doc: Dict[str, Any], spu_row: pd.Series): """填充类目相关字段。""" # 数据质量兜底: # - 当商品的类目ID在映射中不存在时,视为“不合法类目”,整条类目相关字段都不写入(当成没有类目) # - 仅记录错误日志,不阻塞索引流程 primary_lang = self.tenant_config.get('primary_language', 'en') if pd.notna(spu_row.get('category_path')): category_path = str(spu_row['category_path']) # 解析category_path - 这是逗号分隔的类目ID列表 category_ids = [cid.strip() for cid in category_path.split(',') if cid.strip()] # 将ID映射为名称,如果找不到映射则记录错误并跳过 category_names = [] missing_ids = [] for cid in category_ids: if cid in self.category_id_to_name: category_names.append(self.category_id_to_name[cid]) else: missing_ids.append(cid) # 如果有缺失的类目ID,记录错误日志,不写入类目字段(当成没有类目) if missing_ids: logger.error( f"Category ID(s) not found in mapping for SPU {spu_row.get('id')} " f"(title: {spu_row.get('title', 'N/A')}), missing_ids={missing_ids}, " f"category_path={category_path}. Treating as no-category." ) return # 构建类目路径字符串(用于搜索) if category_names: category_path_str = '/'.join(category_names) doc['category_path'] = {primary_lang: category_path_str} # 与查询使用的 category_name_text.zh/en 对齐,便于类目搜索 doc['category_name_text'] = {primary_lang: category_path_str} # 填充分层类目名称 if len(category_names) > 0: doc['category1_name'] = category_names[0] if len(category_names) > 1: doc['category2_name'] = category_names[1] if len(category_names) > 2: doc['category3_name'] = category_names[2] elif pd.notna(spu_row.get('category')): # 如果category_path为空,使用category字段作为category1_name的备选 category = str(spu_row['category']) doc['category_name_text'] = {primary_lang: category} doc['category_name'] = category # 尝试从category字段解析多级分类 if '/' in category: path_parts = category.split('/') if len(path_parts) > 0: doc['category1_name'] = path_parts[0].strip() if len(path_parts) > 1: doc['category2_name'] = path_parts[1].strip() if len(path_parts) > 2: doc['category3_name'] = path_parts[2].strip() else: # 如果category不包含"/",直接作为category1_name doc['category1_name'] = category.strip() if pd.notna(spu_row.get('category')): # 确保category相关字段都被设置(如果前面没有设置) category_name = str(spu_row['category']) if 'category_name_text' not in doc: doc['category_name_text'] = {primary_lang: category_name} if 'category_name' not in doc: doc['category_name'] = category_name if pd.notna(spu_row.get('category_id')): doc['category_id'] = str(int(spu_row['category_id'])) if pd.notna(spu_row.get('category_level')): doc['category_level'] = int(spu_row['category_level']) def _fill_option_names(self, doc: Dict[str, Any], options: pd.DataFrame): """填充Option名称字段。""" if not options.empty: # 按position排序获取option名称 sorted_options = options.sort_values('position') if len(sorted_options) > 0 and pd.notna(sorted_options.iloc[0].get('name')): doc['option1_name'] = str(sorted_options.iloc[0]['name']) if len(sorted_options) > 1 and pd.notna(sorted_options.iloc[1].get('name')): doc['option2_name'] = str(sorted_options.iloc[1]['name']) if len(sorted_options) > 2 and pd.notna(sorted_options.iloc[2].get('name')): doc['option3_name'] = str(sorted_options.iloc[2]['name']) def _fill_image_url(self, doc: Dict[str, Any], spu_row: pd.Series): """填充图片URL字段。""" if pd.notna(spu_row.get('image_src')): image_src = str(spu_row['image_src']) if not image_src.startswith('http'): # 仅当尚未是协议相对 URL 时才补 "//",避免 "//host" 变成 "////host" image_src = f"//{image_src}" if not image_src.startswith('//') else image_src doc['image_url'] = image_src def _fill_image_embedding( self, doc: Dict[str, Any], spu_row: pd.Series, skus: pd.DataFrame ) -> None: """ 填充 image_embedding 嵌套字段,与 mappings/search_products.json 一致: [{ "vector": [float, ...], "url": "..." }, ...] 收集 SPU 主图 + SKU 图片 URL,去重后调用 image_encoder 生成向量。 """ urls: List[str] = [] seen: set = set() def _add(url: str) -> None: if not url or not str(url).strip(): return u = str(url).strip() if u.startswith("//"): u = "https:" + u if u not in seen: seen.add(u) urls.append(u) if doc.get("image_url"): _add(doc["image_url"]) if pd.notna(spu_row.get("image_src")): _add(str(spu_row["image_src"])) if not skus.empty and "image_src" in skus.columns: for _, row in skus.iterrows(): if pd.notna(row.get("image_src")): _add(str(row["image_src"])) if not urls: return vectors = self.image_encoder.encode_image_urls(urls, batch_size=8) if not vectors or len(vectors) != len(urls): raise RuntimeError( f"image_embedding response length mismatch for SPU {doc.get('spu_id')}: " f"expected {len(urls)}, got {0 if vectors is None else len(vectors)}" ) out = [] for url, vec in zip(urls, vectors): arr = np.asarray(vec, dtype=np.float32) if arr.ndim != 1 or arr.size == 0 or not np.isfinite(arr).all(): raise RuntimeError( f"Invalid image embedding for SPU {doc.get('spu_id')} and URL {url}" ) out.append({"vector": arr.tolist(), "url": url}) doc["image_embedding"] = out def _process_skus( self, skus: pd.DataFrame, options: pd.DataFrame ) -> tuple: """处理SKU数据,返回处理结果。""" skus_list = [] prices = [] compare_prices = [] sku_prices = [] sku_weights = [] sku_weight_units = [] total_inventory = 0 specifications = [] # 构建option名称映射(position -> name) option_name_map = {} if not options.empty: for _, opt_row in options.iterrows(): position = opt_row.get('position') name = opt_row.get('name') if pd.notna(position) and pd.notna(name): option_name_map[int(position)] = str(name) for _, sku_row in skus.iterrows(): sku_data = self._transform_sku_row(sku_row, option_name_map) if sku_data: skus_list.append(sku_data) # 收集价格信息 if 'price' in sku_data and sku_data['price'] is not None: try: price_val = float(sku_data['price']) prices.append(price_val) sku_prices.append(price_val) except (ValueError, TypeError): pass if 'compare_at_price' in sku_data and sku_data['compare_at_price'] is not None: try: compare_prices.append(float(sku_data['compare_at_price'])) except (ValueError, TypeError): pass # 收集重量信息 if 'weight' in sku_data and sku_data['weight'] is not None: try: sku_weights.append(int(float(sku_data['weight']))) except (ValueError, TypeError): pass if 'weight_unit' in sku_data and sku_data['weight_unit']: sku_weight_units.append(str(sku_data['weight_unit'])) # 收集库存信息 if 'stock' in sku_data and sku_data['stock'] is not None: try: total_inventory += int(sku_data['stock']) except (ValueError, TypeError): pass # 构建specifications(从SKU的option值和option表的name) sku_id = str(sku_row['id']) if pd.notna(sku_row.get('option1')) and 1 in option_name_map: specifications.append({ 'sku_id': sku_id, 'name': option_name_map[1], 'value': str(sku_row['option1']) }) if pd.notna(sku_row.get('option2')) and 2 in option_name_map: specifications.append({ 'sku_id': sku_id, 'name': option_name_map[2], 'value': str(sku_row['option2']) }) if pd.notna(sku_row.get('option3')) and 3 in option_name_map: specifications.append({ 'sku_id': sku_id, 'name': option_name_map[3], 'value': str(sku_row['option3']) }) return skus_list, prices, compare_prices, sku_prices, sku_weights, sku_weight_units, total_inventory, specifications def _fill_option_values(self, doc: Dict[str, Any], skus: pd.DataFrame): """填充option值字段。""" option1_values = [] option2_values = [] option3_values = [] for _, sku_row in skus.iterrows(): if pd.notna(sku_row.get('option1')): option1_values.append(str(sku_row['option1'])) if pd.notna(sku_row.get('option2')): option2_values.append(str(sku_row['option2'])) if pd.notna(sku_row.get('option3')): option3_values.append(str(sku_row['option3'])) # 去重并根据配置决定是否写入索引 if 'option1' in self.searchable_option_dimensions: doc['option1_values'] = list(set(option1_values)) if option1_values else [] else: doc['option1_values'] = [] if 'option2' in self.searchable_option_dimensions: doc['option2_values'] = list(set(option2_values)) if option2_values else [] else: doc['option2_values'] = [] if 'option3' in self.searchable_option_dimensions: doc['option3_values'] = list(set(option3_values)) if option3_values else [] else: doc['option3_values'] = [] def _fill_llm_attributes(self, doc: Dict[str, Any], spu_row: pd.Series) -> None: """ 调用 indexer.product_annotator.analyze_products,为当前 SPU 填充: - qanchors.{lang} - semantic_attributes (lang/name/value) """ try: index_langs = self.tenant_config.get("index_languages") or ["en", "zh"] except Exception: index_langs = ["en", "zh"] # 只在支持的语言集合内调用 llm_langs = [lang for lang in index_langs if lang in SUPPORTED_LANGS] if not llm_langs: return spu_id = str(spu_row.get("id") or "").strip() title = str(spu_row.get("title") or "").strip() if not spu_id or not title: return semantic_list = doc.get("semantic_attributes") or [] qanchors_obj = doc.get("qanchors") or {} dim_keys = [ "tags", "target_audience", "usage_scene", "season", "key_attributes", "material", "features", ] tenant_id = doc.get("tenant_id") for lang in llm_langs: try: rows = analyze_products( products=[{"id": spu_id, "title": title}], target_lang=lang, batch_size=1, tenant_id=str(tenant_id), ) except Exception as e: logger.warning( "LLM attribute fill failed for SPU %s, lang=%s: %s", spu_id, lang, e, ) continue if not rows: continue row = rows[0] or {} # qanchors.{lang} anchor_text = str(row.get("anchor_text") or "").strip() if anchor_text: qanchors_obj[lang] = anchor_text # 语义属性:按各维度拆分为短语 for name in dim_keys: raw = row.get(name) if not raw: continue parts = re.split(r"[,;|/\n\t]+", str(raw)) for part in parts: value = part.strip() if not value: continue semantic_list.append( { "lang": lang, "name": name, "value": value, } ) if qanchors_obj: doc["qanchors"] = qanchors_obj if semantic_list: doc["semantic_attributes"] = semantic_list def _transform_sku_row(self, sku_row: pd.Series, option_name_map: Dict[int, str] = None) -> Optional[Dict[str, Any]]: """ 将SKU行转换为SKU对象。 Args: sku_row: SKU行数据 option_name_map: position到option名称的映射 Returns: SKU字典 """ sku_data = {} # SKU ID sku_data['sku_id'] = str(sku_row['id']) # Price if pd.notna(sku_row.get('price')): try: sku_data['price'] = float(sku_row['price']) except (ValueError, TypeError): sku_data['price'] = None else: sku_data['price'] = None # Compare at price if pd.notna(sku_row.get('compare_at_price')): try: sku_data['compare_at_price'] = float(sku_row['compare_at_price']) except (ValueError, TypeError): sku_data['compare_at_price'] = None else: sku_data['compare_at_price'] = None # SKU Code if pd.notna(sku_row.get('sku')): sku_data['sku_code'] = str(sku_row['sku']) # Stock if pd.notna(sku_row.get('inventory_quantity')): try: sku_data['stock'] = int(sku_row['inventory_quantity']) except (ValueError, TypeError): sku_data['stock'] = 0 else: sku_data['stock'] = 0 # Weight if pd.notna(sku_row.get('weight')): try: sku_data['weight'] = float(sku_row['weight']) except (ValueError, TypeError): sku_data['weight'] = None else: sku_data['weight'] = None # Weight unit if pd.notna(sku_row.get('weight_unit')): sku_data['weight_unit'] = str(sku_row['weight_unit']) # Option values if pd.notna(sku_row.get('option1')): sku_data['option1_value'] = str(sku_row['option1']) if pd.notna(sku_row.get('option2')): sku_data['option2_value'] = str(sku_row['option2']) if pd.notna(sku_row.get('option3')): sku_data['option3_value'] = str(sku_row['option3']) # Image src if pd.notna(sku_row.get('image_src')): sku_data['image_src'] = str(sku_row['image_src']) return sku_data def _fill_title_embedding(self, doc: Dict[str, Any]) -> None: """ 填充标题向量化字段。 使用英文标题(title.en)生成embedding。如果title.en不存在,则使用title.zh。 Args: doc: ES文档字典 """ # 优先使用英文标题,如果没有则使用中文标题;再没有则取任意可用语言 title_obj = doc.get("title") or {} if isinstance(title_obj, dict): title_text = title_obj.get("en") or title_obj.get("zh") if not title_text: for v in title_obj.values(): if v and str(v).strip(): title_text = str(v) break else: title_text = None if not title_text or not title_text.strip(): logger.debug(f"No title text available for embedding, SPU: {doc.get('spu_id')}") return # 使用文本向量编码器生成 embedding # encode方法返回numpy数组,形状为(n, d) embeddings = self.encoder.encode(title_text) if embeddings is None or len(embeddings) == 0: raise RuntimeError(f"Failed to generate title embedding for SPU {doc.get('spu_id')}") embedding = np.asarray(embeddings[0], dtype=np.float32) if embedding.ndim != 1 or embedding.size == 0 or not np.isfinite(embedding).all(): raise RuntimeError(f"Invalid title embedding for SPU {doc.get('spu_id')}") doc['title_embedding'] = embedding.tolist() logger.debug(f"Generated title_embedding for SPU: {doc.get('spu_id')}, title: {title_text[:50]}...")