diff --git a/indexer/incremental_service.py b/indexer/incremental_service.py index 4af4847..e2a0989 100644 --- a/indexer/incremental_service.py +++ b/indexer/incremental_service.py @@ -3,14 +3,16 @@ import pandas as pd import logging import time -from typing import Dict, Any, Optional, List -from sqlalchemy import text +import threading +from typing import Dict, Any, Optional, List, Tuple +from sqlalchemy import text, bindparam from indexer.indexing_utils import load_category_mapping, create_document_transformer from indexer.bulk_indexer import BulkIndexer from indexer.mapping_generator import DEFAULT_INDEX_NAME from indexer.indexer_logger import ( get_indexer_logger, log_index_request, log_index_result, log_spu_processing ) +from config import ConfigLoader # Configure logger logger = logging.getLogger(__name__) @@ -29,6 +31,73 @@ class IncrementalIndexerService: self.category_id_to_name = load_category_mapping(db_engine) logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings") + # 缓存:避免频繁增量请求重复加载config / 构造transformer + self._config: Optional[Any] = None + self._config_lock = threading.Lock() + # tenant_id -> (transformer, encoder, enable_embedding) + self._transformer_cache: Dict[str, Tuple[Any, Optional[Any], bool]] = {} + self._transformer_cache_lock = threading.Lock() + + def _get_config(self) -> Any: + """Load config once per process (thread-safe).""" + if self._config is not None: + return self._config + with self._config_lock: + if self._config is None: + self._config = ConfigLoader().load_config() + return self._config + + def _get_transformer_bundle(self, tenant_id: str) -> Tuple[Any, Optional[Any], bool]: + """ + Get a cached document transformer for tenant_id. + + - Transformer is built once per tenant (in-process cache). + - We disable per-document embedding generation inside transformer, and instead + batch-generate embeddings in index_spus_to_es for performance. + """ + with self._transformer_cache_lock: + cached = self._transformer_cache.get(str(tenant_id)) + if cached is not None: + return cached + + config = self._get_config() + enable_embedding = bool(getattr(config.query_config, "enable_text_embedding", False)) + + encoder: Optional[Any] = None + if enable_embedding: + try: + from embeddings.text_encoder import BgeEncoder + encoder = BgeEncoder() + except Exception as e: + logger.warning(f"Failed to initialize BgeEncoder for tenant_id={tenant_id}: {e}") + encoder = None + enable_embedding = False + + transformer = create_document_transformer( + category_id_to_name=self.category_id_to_name, + tenant_id=tenant_id, + encoder=encoder, + enable_title_embedding=False, # batch fill later + config=config, + ) + + bundle = (transformer, encoder, enable_embedding) + with self._transformer_cache_lock: + # simple unbounded cache; tenant count is typically small in one node + self._transformer_cache[str(tenant_id)] = bundle + return bundle + + @staticmethod + def _normalize_spu_ids(spu_ids: List[str]) -> List[int]: + """Normalize SPU IDs to ints for DB queries; skip non-int IDs.""" + out: List[int] = [] + for x in spu_ids: + try: + out.append(int(x)) + except Exception: + continue + return out + def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]: """获取SPU的ES文档数据""" try: @@ -44,11 +113,7 @@ class IncrementalIndexerService: # 加载Option数据 options_df = self._load_options_for_spu(tenant_id, spu_id) - # 创建文档转换器 - transformer = create_document_transformer( - category_id_to_name=self.category_id_to_name, - tenant_id=tenant_id - ) + transformer, encoder, enable_embedding = self._get_transformer_bundle(tenant_id) # 转换为ES文档 doc = transformer.transform_spu_to_doc( @@ -62,6 +127,17 @@ class IncrementalIndexerService: logger.warning(f"Failed to transform SPU {spu_id} for tenant_id={tenant_id}") return None + # 单条场景下也可补齐 embedding(仍走缓存) + if enable_embedding and encoder: + title_text = doc.get("title_en") or doc.get("title_zh") + if title_text and str(title_text).strip(): + try: + embeddings = encoder.encode(title_text) + if embeddings is not None and len(embeddings) > 0: + doc["title_embedding"] = embeddings[0].tolist() + except Exception as e: + logger.warning(f"Failed to generate embedding for spu_id={spu_id}: {e}") + return doc except Exception as e: @@ -140,6 +216,95 @@ class IncrementalIndexerService: if isinstance(deleted, bytes): return deleted == b'\x01' or deleted == 1 return bool(deleted) + + def _load_spus_for_spu_ids(self, tenant_id: str, spu_ids: List[str], include_deleted: bool = True) -> pd.DataFrame: + """Batch load SPU rows for a list of spu_ids using IN (...)""" + spu_ids_int = self._normalize_spu_ids(spu_ids) + if not spu_ids_int: + return pd.DataFrame() + + if include_deleted: + query = text( + """ + SELECT + id, shop_id, shoplazza_id, title, brief, description, + spu, vendor, vendor_url, + image_src, image_width, image_height, image_path, image_alt, + tags, note, category, category_id, category_google_id, + category_level, category_path, + fake_sales, display_fake_sales, + tenant_id, creator, create_time, updater, update_time, deleted + FROM shoplazza_product_spu + WHERE tenant_id = :tenant_id AND id IN :spu_ids + """ + ).bindparams(bindparam("spu_ids", expanding=True)) + else: + query = text( + """ + SELECT + id, shop_id, shoplazza_id, title, brief, description, + spu, vendor, vendor_url, + image_src, image_width, image_height, image_path, image_alt, + tags, note, category, category_id, category_google_id, + category_level, category_path, + fake_sales, display_fake_sales, + tenant_id, creator, create_time, updater, update_time, deleted + FROM shoplazza_product_spu + WHERE tenant_id = :tenant_id AND deleted = 0 AND id IN :spu_ids + """ + ).bindparams(bindparam("spu_ids", expanding=True)) + + with self.db_engine.connect() as conn: + df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_ids": spu_ids_int}) + return df + + def _load_skus_for_spu_ids(self, tenant_id: str, spu_ids: List[str]) -> pd.DataFrame: + """Batch load all SKUs for a list of spu_ids""" + spu_ids_int = self._normalize_spu_ids(spu_ids) + if not spu_ids_int: + return pd.DataFrame() + + query = text( + """ + SELECT + id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, + shoplazza_image_id, title, sku, barcode, position, + price, compare_at_price, cost_price, + option1, option2, option3, + inventory_quantity, weight, weight_unit, image_src, + wholesale_price, note, extend, + shoplazza_created_at, shoplazza_updated_at, tenant_id, + creator, create_time, updater, update_time, deleted + FROM shoplazza_product_sku + WHERE tenant_id = :tenant_id AND deleted = 0 AND spu_id IN :spu_ids + """ + ).bindparams(bindparam("spu_ids", expanding=True)) + + with self.db_engine.connect() as conn: + df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_ids": spu_ids_int}) + return df + + def _load_options_for_spu_ids(self, tenant_id: str, spu_ids: List[str]) -> pd.DataFrame: + """Batch load all options for a list of spu_ids""" + spu_ids_int = self._normalize_spu_ids(spu_ids) + if not spu_ids_int: + return pd.DataFrame() + + query = text( + """ + SELECT + id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, + position, name, `values`, tenant_id, + creator, create_time, updater, update_time, deleted + FROM shoplazza_product_option + WHERE tenant_id = :tenant_id AND deleted = 0 AND spu_id IN :spu_ids + ORDER BY spu_id, position + """ + ).bindparams(bindparam("spu_ids", expanding=True)) + + with self.db_engine.connect() as conn: + df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_ids": spu_ids_int}) + return df def _delete_spu_from_es( self, @@ -247,6 +412,10 @@ class IncrementalIndexerService: Returns: 包含成功/失败列表的字典,以及删除结果 """ + # 去重但保持顺序(避免重复DB/翻译/embedding/写ES) + if spu_ids: + spu_ids = list(dict.fromkeys(spu_ids)) + start_time = time.time() total_count = len(spu_ids) delete_count = len(delete_spu_ids) if delete_spu_ids else 0 @@ -283,53 +452,121 @@ class IncrementalIndexerService: result = self._delete_spu_from_es(es_client, tenant_id, spu_id, index_name, "explicit") delete_results.append({"spu_id": spu_id, **result}) - # 步骤1: 处理索引请求(spu_ids),并自动检测删除 - for spu_id in spu_ids: - try: - log_spu_processing(indexer_logger, tenant_id, spu_id, 'fetching') - - # 先检查SPU是否在数据库中被标记为删除 - if self.check_spu_deleted(tenant_id, spu_id): - # SPU已删除,从ES中删除对应文档 - logger.info(f"[IncrementalIndexing] SPU {spu_id} is deleted in DB, removing from ES") - result = self._delete_spu_from_es(es_client, tenant_id, spu_id, index_name, "auto") - # 统一状态:deleted或not_found都算deleted,failed保持failed + # 步骤1: 批量获取SPU/SKU/Option数据,并自动检测删除 + if spu_ids: + log_spu_processing(indexer_logger, tenant_id, ",".join(spu_ids[:10]), 'fetching') + + # 批量加载SPU(包含deleted字段,用于判断删除) + spu_df = self._load_spus_for_spu_ids(tenant_id, spu_ids, include_deleted=True) + if spu_df.empty: + # 所有SPU都不存在,按“需要删除”处理 + for spu_id in spu_ids: + logger.info(f"[IncrementalIndexing] SPU {spu_id} not found in DB, removing from ES") + result = self._delete_spu_from_es(es_client, tenant_id, spu_id, index_name, "auto_missing") status = "deleted" if result["status"] != "failed" else "failed" spu_results.append({ "spu_id": spu_id, "status": status, **({"msg": result["msg"]} if status == "failed" else {}) }) - continue - - # SPU未删除,正常获取文档 - doc = self.get_spu_document(tenant_id=tenant_id, spu_id=spu_id) - - if doc is None: - # 这种情况不应该发生,因为我们已经检查了deleted字段 - # 但为了健壮性,仍然处理 - error_msg = "SPU not found (unexpected)" - logger.warning(f"[IncrementalIndexing] SPU {spu_id} not found after deleted check") - log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg) + else: + # 建立索引:id -> row + spu_df = spu_df.copy() + # Normalize deleted column to bool + def _is_deleted_value(v: Any) -> bool: + if isinstance(v, bytes): + return v == b"\x01" or v == 1 + return bool(v) + + spu_df["_is_deleted"] = spu_df["deleted"].apply(_is_deleted_value) + spu_df.set_index("id", inplace=True, drop=False) + + found_ids = set(int(x) for x in spu_df.index.tolist()) + requested_ids_int = set(self._normalize_spu_ids(spu_ids)) + missing_ids_int = requested_ids_int - found_ids + + # missing -> delete from ES + for missing_id in sorted(missing_ids_int): + spu_id_str = str(missing_id) + logger.info(f"[IncrementalIndexing] SPU {spu_id_str} not found in DB, removing from ES") + result = self._delete_spu_from_es(es_client, tenant_id, spu_id_str, index_name, "auto_missing") + status = "deleted" if result["status"] != "failed" else "failed" spu_results.append({ - "spu_id": spu_id, - "status": "failed", - "msg": error_msg + "spu_id": spu_id_str, + "status": status, + **({"msg": result["msg"]} if status == "failed" else {}) }) - continue - - log_spu_processing(indexer_logger, tenant_id, spu_id, 'transforming') - documents.append((spu_id, doc)) # 保存spu_id和doc的对应关系 - - except Exception as e: - error_msg = str(e) - logger.error(f"[IncrementalIndexing] Error processing SPU {spu_id}: {e}", exc_info=True) - log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg) - spu_results.append({ - "spu_id": spu_id, - "status": "failed", - "msg": error_msg - }) + + # deleted -> delete from ES + deleted_rows = spu_df[spu_df["_is_deleted"]] + for _, row in deleted_rows.iterrows(): + spu_id_str = str(int(row["id"])) + logger.info(f"[IncrementalIndexing] SPU {spu_id_str} is deleted in DB, removing from ES") + result = self._delete_spu_from_es(es_client, tenant_id, spu_id_str, index_name, "auto") + status = "deleted" if result["status"] != "failed" else "failed" + spu_results.append({ + "spu_id": spu_id_str, + "status": status, + **({"msg": result["msg"]} if status == "failed" else {}) + }) + + # active -> batch load sku/option then transform + active_spu_df = spu_df[~spu_df["_is_deleted"]] + active_ids_str = [str(int(x)) for x in active_spu_df["id"].tolist()] + + skus_df = self._load_skus_for_spu_ids(tenant_id, active_ids_str) + options_df = self._load_options_for_spu_ids(tenant_id, active_ids_str) + sku_groups = skus_df.groupby("spu_id") if not skus_df.empty else None + option_groups = options_df.groupby("spu_id") if not options_df.empty else None + + transformer, encoder, enable_embedding = self._get_transformer_bundle(tenant_id) + + # 按输入顺序处理 active SPUs + for spu_id in spu_ids: + try: + spu_id_int = int(spu_id) + except Exception: + continue + if spu_id_int not in active_spu_df.index: + continue + + log_spu_processing(indexer_logger, tenant_id, spu_id, 'transforming') + spu_row = active_spu_df.loc[spu_id_int] + skus_for_spu = sku_groups.get_group(spu_id_int) if sku_groups is not None and spu_id_int in sku_groups.groups else pd.DataFrame() + opts_for_spu = option_groups.get_group(spu_id_int) if option_groups is not None and spu_id_int in option_groups.groups else pd.DataFrame() + + doc = transformer.transform_spu_to_doc( + tenant_id=tenant_id, + spu_row=spu_row, + skus=skus_for_spu, + options=opts_for_spu, + ) + if doc is None: + error_msg = "SPU transform returned None" + log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg) + spu_results.append({"spu_id": spu_id, "status": "failed", "msg": error_msg}) + continue + + documents.append((spu_id, doc)) + + # 批量生成 embedding(保持翻译逻辑不变;embedding 走缓存) + if enable_embedding and encoder and documents: + title_texts: List[str] = [] + title_doc_indices: List[int] = [] + for i, (_, doc) in enumerate(documents): + title_text = doc.get("title_en") or doc.get("title_zh") + if title_text and str(title_text).strip(): + title_texts.append(str(title_text)) + title_doc_indices.append(i) + + if title_texts: + try: + embeddings = encoder.encode_batch(title_texts, batch_size=32) + for j, emb in enumerate(embeddings): + doc_idx = title_doc_indices[j] + documents[doc_idx][1]["title_embedding"] = emb.tolist() + except Exception as e: + logger.warning(f"[IncrementalIndexing] Batch embedding failed for tenant_id={tenant_id}: {e}", exc_info=True) logger.info(f"[IncrementalIndexing] Transformed {len(documents)}/{total_count} documents") diff --git a/indexer/indexing_utils.py b/indexer/indexing_utils.py index d82634f..6aff22a 100644 --- a/indexer/indexing_utils.py +++ b/indexer/indexing_utils.py @@ -58,7 +58,8 @@ def create_document_transformer( translator: Optional[Any] = None, translation_prompts: Optional[Dict[str, str]] = None, encoder: Optional[Any] = None, - enable_title_embedding: bool = True + enable_title_embedding: bool = True, + config: Optional[Any] = None, ) -> SPUDocumentTransformer: """ 创建文档转换器(统一初始化逻辑)。 @@ -80,10 +81,17 @@ def create_document_transformer( tenant_config = tenant_config_loader.get_tenant_config(tenant_id) # 加载搜索配置(如果需要) - if searchable_option_dimensions is None or translator is None or translation_prompts is None or (encoder is None and enable_title_embedding): + if ( + searchable_option_dimensions is None + or translator is None + or translation_prompts is None + or (encoder is None and enable_title_embedding) + or config is None + ): try: - config_loader = ConfigLoader() - config = config_loader.load_config() + if config is None: + config_loader = ConfigLoader() + config = config_loader.load_config() if searchable_option_dimensions is None: searchable_option_dimensions = config.spu_config.searchable_option_dimensions diff --git a/mappings/search_products.json b/mappings/search_products.json new file mode 100644 index 0000000..fc26061 --- /dev/null +++ b/mappings/search_products.json @@ -0,0 +1,263 @@ +{ + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0, + "refresh_interval": "30s", + "analysis": { + "analyzer": { + "hanlp_index": { + "type": "custom", + "tokenizer": "standard", + "filter": ["lowercase", "asciifolding"] + }, + "hanlp_standard": { + "type": "custom", + "tokenizer": "standard", + "filter": ["lowercase", "asciifolding"] + } + }, + "normalizer": { + "lowercase": { + "type": "custom", + "filter": ["lowercase"] + } + } + }, + "similarity": { + "default": { + "type": "BM25", + "b": 0.0, + "k1": 0.0 + } + } + }, + "mappings": { + "properties": { + "tenant_id": { + "type": "keyword" + }, + "spu_id": { + "type": "keyword" + }, + "create_time": { + "type": "date" + }, + "update_time": { + "type": "date" + }, + "title_zh": { + "type": "text", + "analyzer": "hanlp_index", + "search_analyzer": "hanlp_standard" + }, + "brief_zh": { + "type": "text", + "analyzer": "hanlp_index", + "search_analyzer": "hanlp_standard" + }, + "description_zh": { + "type": "text", + "analyzer": "hanlp_index", + "search_analyzer": "hanlp_standard" + }, + "vendor_zh": { + "type": "text", + "analyzer": "hanlp_index", + "search_analyzer": "hanlp_standard", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase" + } + } + }, + "title_en": { + "type": "text", + "analyzer": "english", + "search_analyzer": "english" + }, + "brief_en": { + "type": "text", + "analyzer": "english", + "search_analyzer": "english" + }, + "description_en": { + "type": "text", + "analyzer": "english", + "search_analyzer": "english" + }, + "vendor_en": { + "type": "text", + "analyzer": "english", + "search_analyzer": "english", + "fields": { + "keyword": { + "type": "keyword", + "normalizer": "lowercase" + } + } + }, + "tags": { + "type": "keyword" + }, + "image_url": { + "type": "keyword", + "index": false + }, + "title_embedding": { + "type": "dense_vector", + "dims": 1024, + "index": true, + "similarity": "dot_product" + }, + "image_embedding": { + "type": "nested", + "properties": { + "vector": { + "type": "dense_vector", + "dims": 1024, + "index": true, + "similarity": "dot_product" + }, + "url": { + "type": "text" + } + } + }, + "category_path_zh": { + "type": "text", + "analyzer": "hanlp_index", + "search_analyzer": "hanlp_standard" + }, + "category_path_en": { + "type": "text", + "analyzer": "english", + "search_analyzer": "english" + }, + "category_name_zh": { + "type": "text", + "analyzer": "hanlp_index", + "search_analyzer": "hanlp_standard" + }, + "category_name_en": { + "type": "text", + "analyzer": "english", + "search_analyzer": "english" + }, + "category_id": { + "type": "keyword" + }, + "category_name": { + "type": "keyword" + }, + "category_level": { + "type": "integer" + }, + "category1_name": { + "type": "keyword" + }, + "category2_name": { + "type": "keyword" + }, + "category3_name": { + "type": "keyword" + }, + "specifications": { + "type": "nested", + "properties": { + "sku_id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "value": { + "type": "keyword" + } + } + }, + "option1_name": { + "type": "keyword" + }, + "option2_name": { + "type": "keyword" + }, + "option3_name": { + "type": "keyword" + }, + "option1_values": { + "type": "keyword" + }, + "option2_values": { + "type": "keyword" + }, + "option3_values": { + "type": "keyword" + }, + "min_price": { + "type": "float" + }, + "max_price": { + "type": "float" + }, + "compare_at_price": { + "type": "float" + }, + "sku_prices": { + "type": "float" + }, + "sku_weights": { + "type": "long" + }, + "sku_weight_units": { + "type": "keyword" + }, + "total_inventory": { + "type": "long" + }, + "sales": { + "type": "long" + }, + "skus": { + "type": "nested", + "properties": { + "sku_id": { + "type": "keyword" + }, + "price": { + "type": "float" + }, + "compare_at_price": { + "type": "float" + }, + "sku_code": { + "type": "keyword" + }, + "stock": { + "type": "long" + }, + "weight": { + "type": "float" + }, + "weight_unit": { + "type": "keyword" + }, + "option1_value": { + "type": "keyword" + }, + "option2_value": { + "type": "keyword" + }, + "option3_value": { + "type": "keyword" + }, + "image_src": { + "type": "keyword", + "index": false + } + } + } + } + } +} + -- libgit2 0.21.2