Commit 5c2b70a28c05b31a545864a9ab7806d362fdf8a3
1 parent
f62a541c
search_products.json
Showing
3 changed files
with
558 additions
and
50 deletions
Show diff stats
indexer/incremental_service.py
| @@ -3,14 +3,16 @@ | @@ -3,14 +3,16 @@ | ||
| 3 | import pandas as pd | 3 | import pandas as pd |
| 4 | import logging | 4 | import logging |
| 5 | import time | 5 | import time |
| 6 | -from typing import Dict, Any, Optional, List | ||
| 7 | -from sqlalchemy import text | 6 | +import threading |
| 7 | +from typing import Dict, Any, Optional, List, Tuple | ||
| 8 | +from sqlalchemy import text, bindparam | ||
| 8 | from indexer.indexing_utils import load_category_mapping, create_document_transformer | 9 | from indexer.indexing_utils import load_category_mapping, create_document_transformer |
| 9 | from indexer.bulk_indexer import BulkIndexer | 10 | from indexer.bulk_indexer import BulkIndexer |
| 10 | from indexer.mapping_generator import DEFAULT_INDEX_NAME | 11 | from indexer.mapping_generator import DEFAULT_INDEX_NAME |
| 11 | from indexer.indexer_logger import ( | 12 | from indexer.indexer_logger import ( |
| 12 | get_indexer_logger, log_index_request, log_index_result, log_spu_processing | 13 | get_indexer_logger, log_index_request, log_index_result, log_spu_processing |
| 13 | ) | 14 | ) |
| 15 | +from config import ConfigLoader | ||
| 14 | 16 | ||
| 15 | # Configure logger | 17 | # Configure logger |
| 16 | logger = logging.getLogger(__name__) | 18 | logger = logging.getLogger(__name__) |
| @@ -29,6 +31,73 @@ class IncrementalIndexerService: | @@ -29,6 +31,73 @@ class IncrementalIndexerService: | ||
| 29 | self.category_id_to_name = load_category_mapping(db_engine) | 31 | self.category_id_to_name = load_category_mapping(db_engine) |
| 30 | logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings") | 32 | logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings") |
| 31 | 33 | ||
| 34 | + # 缓存:避免频繁增量请求重复加载config / 构造transformer | ||
| 35 | + self._config: Optional[Any] = None | ||
| 36 | + self._config_lock = threading.Lock() | ||
| 37 | + # tenant_id -> (transformer, encoder, enable_embedding) | ||
| 38 | + self._transformer_cache: Dict[str, Tuple[Any, Optional[Any], bool]] = {} | ||
| 39 | + self._transformer_cache_lock = threading.Lock() | ||
| 40 | + | ||
| 41 | + def _get_config(self) -> Any: | ||
| 42 | + """Load config once per process (thread-safe).""" | ||
| 43 | + if self._config is not None: | ||
| 44 | + return self._config | ||
| 45 | + with self._config_lock: | ||
| 46 | + if self._config is None: | ||
| 47 | + self._config = ConfigLoader().load_config() | ||
| 48 | + return self._config | ||
| 49 | + | ||
| 50 | + def _get_transformer_bundle(self, tenant_id: str) -> Tuple[Any, Optional[Any], bool]: | ||
| 51 | + """ | ||
| 52 | + Get a cached document transformer for tenant_id. | ||
| 53 | + | ||
| 54 | + - Transformer is built once per tenant (in-process cache). | ||
| 55 | + - We disable per-document embedding generation inside transformer, and instead | ||
| 56 | + batch-generate embeddings in index_spus_to_es for performance. | ||
| 57 | + """ | ||
| 58 | + with self._transformer_cache_lock: | ||
| 59 | + cached = self._transformer_cache.get(str(tenant_id)) | ||
| 60 | + if cached is not None: | ||
| 61 | + return cached | ||
| 62 | + | ||
| 63 | + config = self._get_config() | ||
| 64 | + enable_embedding = bool(getattr(config.query_config, "enable_text_embedding", False)) | ||
| 65 | + | ||
| 66 | + encoder: Optional[Any] = None | ||
| 67 | + if enable_embedding: | ||
| 68 | + try: | ||
| 69 | + from embeddings.text_encoder import BgeEncoder | ||
| 70 | + encoder = BgeEncoder() | ||
| 71 | + except Exception as e: | ||
| 72 | + logger.warning(f"Failed to initialize BgeEncoder for tenant_id={tenant_id}: {e}") | ||
| 73 | + encoder = None | ||
| 74 | + enable_embedding = False | ||
| 75 | + | ||
| 76 | + transformer = create_document_transformer( | ||
| 77 | + category_id_to_name=self.category_id_to_name, | ||
| 78 | + tenant_id=tenant_id, | ||
| 79 | + encoder=encoder, | ||
| 80 | + enable_title_embedding=False, # batch fill later | ||
| 81 | + config=config, | ||
| 82 | + ) | ||
| 83 | + | ||
| 84 | + bundle = (transformer, encoder, enable_embedding) | ||
| 85 | + with self._transformer_cache_lock: | ||
| 86 | + # simple unbounded cache; tenant count is typically small in one node | ||
| 87 | + self._transformer_cache[str(tenant_id)] = bundle | ||
| 88 | + return bundle | ||
| 89 | + | ||
| 90 | + @staticmethod | ||
| 91 | + def _normalize_spu_ids(spu_ids: List[str]) -> List[int]: | ||
| 92 | + """Normalize SPU IDs to ints for DB queries; skip non-int IDs.""" | ||
| 93 | + out: List[int] = [] | ||
| 94 | + for x in spu_ids: | ||
| 95 | + try: | ||
| 96 | + out.append(int(x)) | ||
| 97 | + except Exception: | ||
| 98 | + continue | ||
| 99 | + return out | ||
| 100 | + | ||
| 32 | def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]: | 101 | def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]: |
| 33 | """获取SPU的ES文档数据""" | 102 | """获取SPU的ES文档数据""" |
| 34 | try: | 103 | try: |
| @@ -44,11 +113,7 @@ class IncrementalIndexerService: | @@ -44,11 +113,7 @@ class IncrementalIndexerService: | ||
| 44 | # 加载Option数据 | 113 | # 加载Option数据 |
| 45 | options_df = self._load_options_for_spu(tenant_id, spu_id) | 114 | options_df = self._load_options_for_spu(tenant_id, spu_id) |
| 46 | 115 | ||
| 47 | - # 创建文档转换器 | ||
| 48 | - transformer = create_document_transformer( | ||
| 49 | - category_id_to_name=self.category_id_to_name, | ||
| 50 | - tenant_id=tenant_id | ||
| 51 | - ) | 116 | + transformer, encoder, enable_embedding = self._get_transformer_bundle(tenant_id) |
| 52 | 117 | ||
| 53 | # 转换为ES文档 | 118 | # 转换为ES文档 |
| 54 | doc = transformer.transform_spu_to_doc( | 119 | doc = transformer.transform_spu_to_doc( |
| @@ -62,6 +127,17 @@ class IncrementalIndexerService: | @@ -62,6 +127,17 @@ class IncrementalIndexerService: | ||
| 62 | logger.warning(f"Failed to transform SPU {spu_id} for tenant_id={tenant_id}") | 127 | logger.warning(f"Failed to transform SPU {spu_id} for tenant_id={tenant_id}") |
| 63 | return None | 128 | return None |
| 64 | 129 | ||
| 130 | + # 单条场景下也可补齐 embedding(仍走缓存) | ||
| 131 | + if enable_embedding and encoder: | ||
| 132 | + title_text = doc.get("title_en") or doc.get("title_zh") | ||
| 133 | + if title_text and str(title_text).strip(): | ||
| 134 | + try: | ||
| 135 | + embeddings = encoder.encode(title_text) | ||
| 136 | + if embeddings is not None and len(embeddings) > 0: | ||
| 137 | + doc["title_embedding"] = embeddings[0].tolist() | ||
| 138 | + except Exception as e: | ||
| 139 | + logger.warning(f"Failed to generate embedding for spu_id={spu_id}: {e}") | ||
| 140 | + | ||
| 65 | return doc | 141 | return doc |
| 66 | 142 | ||
| 67 | except Exception as e: | 143 | except Exception as e: |
| @@ -140,6 +216,95 @@ class IncrementalIndexerService: | @@ -140,6 +216,95 @@ class IncrementalIndexerService: | ||
| 140 | if isinstance(deleted, bytes): | 216 | if isinstance(deleted, bytes): |
| 141 | return deleted == b'\x01' or deleted == 1 | 217 | return deleted == b'\x01' or deleted == 1 |
| 142 | return bool(deleted) | 218 | return bool(deleted) |
| 219 | + | ||
| 220 | + def _load_spus_for_spu_ids(self, tenant_id: str, spu_ids: List[str], include_deleted: bool = True) -> pd.DataFrame: | ||
| 221 | + """Batch load SPU rows for a list of spu_ids using IN (...)""" | ||
| 222 | + spu_ids_int = self._normalize_spu_ids(spu_ids) | ||
| 223 | + if not spu_ids_int: | ||
| 224 | + return pd.DataFrame() | ||
| 225 | + | ||
| 226 | + if include_deleted: | ||
| 227 | + query = text( | ||
| 228 | + """ | ||
| 229 | + SELECT | ||
| 230 | + id, shop_id, shoplazza_id, title, brief, description, | ||
| 231 | + spu, vendor, vendor_url, | ||
| 232 | + image_src, image_width, image_height, image_path, image_alt, | ||
| 233 | + tags, note, category, category_id, category_google_id, | ||
| 234 | + category_level, category_path, | ||
| 235 | + fake_sales, display_fake_sales, | ||
| 236 | + tenant_id, creator, create_time, updater, update_time, deleted | ||
| 237 | + FROM shoplazza_product_spu | ||
| 238 | + WHERE tenant_id = :tenant_id AND id IN :spu_ids | ||
| 239 | + """ | ||
| 240 | + ).bindparams(bindparam("spu_ids", expanding=True)) | ||
| 241 | + else: | ||
| 242 | + query = text( | ||
| 243 | + """ | ||
| 244 | + SELECT | ||
| 245 | + id, shop_id, shoplazza_id, title, brief, description, | ||
| 246 | + spu, vendor, vendor_url, | ||
| 247 | + image_src, image_width, image_height, image_path, image_alt, | ||
| 248 | + tags, note, category, category_id, category_google_id, | ||
| 249 | + category_level, category_path, | ||
| 250 | + fake_sales, display_fake_sales, | ||
| 251 | + tenant_id, creator, create_time, updater, update_time, deleted | ||
| 252 | + FROM shoplazza_product_spu | ||
| 253 | + WHERE tenant_id = :tenant_id AND deleted = 0 AND id IN :spu_ids | ||
| 254 | + """ | ||
| 255 | + ).bindparams(bindparam("spu_ids", expanding=True)) | ||
| 256 | + | ||
| 257 | + with self.db_engine.connect() as conn: | ||
| 258 | + df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_ids": spu_ids_int}) | ||
| 259 | + return df | ||
| 260 | + | ||
| 261 | + def _load_skus_for_spu_ids(self, tenant_id: str, spu_ids: List[str]) -> pd.DataFrame: | ||
| 262 | + """Batch load all SKUs for a list of spu_ids""" | ||
| 263 | + spu_ids_int = self._normalize_spu_ids(spu_ids) | ||
| 264 | + if not spu_ids_int: | ||
| 265 | + return pd.DataFrame() | ||
| 266 | + | ||
| 267 | + query = text( | ||
| 268 | + """ | ||
| 269 | + SELECT | ||
| 270 | + id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, | ||
| 271 | + shoplazza_image_id, title, sku, barcode, position, | ||
| 272 | + price, compare_at_price, cost_price, | ||
| 273 | + option1, option2, option3, | ||
| 274 | + inventory_quantity, weight, weight_unit, image_src, | ||
| 275 | + wholesale_price, note, extend, | ||
| 276 | + shoplazza_created_at, shoplazza_updated_at, tenant_id, | ||
| 277 | + creator, create_time, updater, update_time, deleted | ||
| 278 | + FROM shoplazza_product_sku | ||
| 279 | + WHERE tenant_id = :tenant_id AND deleted = 0 AND spu_id IN :spu_ids | ||
| 280 | + """ | ||
| 281 | + ).bindparams(bindparam("spu_ids", expanding=True)) | ||
| 282 | + | ||
| 283 | + with self.db_engine.connect() as conn: | ||
| 284 | + df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_ids": spu_ids_int}) | ||
| 285 | + return df | ||
| 286 | + | ||
| 287 | + def _load_options_for_spu_ids(self, tenant_id: str, spu_ids: List[str]) -> pd.DataFrame: | ||
| 288 | + """Batch load all options for a list of spu_ids""" | ||
| 289 | + spu_ids_int = self._normalize_spu_ids(spu_ids) | ||
| 290 | + if not spu_ids_int: | ||
| 291 | + return pd.DataFrame() | ||
| 292 | + | ||
| 293 | + query = text( | ||
| 294 | + """ | ||
| 295 | + SELECT | ||
| 296 | + id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, | ||
| 297 | + position, name, `values`, tenant_id, | ||
| 298 | + creator, create_time, updater, update_time, deleted | ||
| 299 | + FROM shoplazza_product_option | ||
| 300 | + WHERE tenant_id = :tenant_id AND deleted = 0 AND spu_id IN :spu_ids | ||
| 301 | + ORDER BY spu_id, position | ||
| 302 | + """ | ||
| 303 | + ).bindparams(bindparam("spu_ids", expanding=True)) | ||
| 304 | + | ||
| 305 | + with self.db_engine.connect() as conn: | ||
| 306 | + df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_ids": spu_ids_int}) | ||
| 307 | + return df | ||
| 143 | 308 | ||
| 144 | def _delete_spu_from_es( | 309 | def _delete_spu_from_es( |
| 145 | self, | 310 | self, |
| @@ -247,6 +412,10 @@ class IncrementalIndexerService: | @@ -247,6 +412,10 @@ class IncrementalIndexerService: | ||
| 247 | Returns: | 412 | Returns: |
| 248 | 包含成功/失败列表的字典,以及删除结果 | 413 | 包含成功/失败列表的字典,以及删除结果 |
| 249 | """ | 414 | """ |
| 415 | + # 去重但保持顺序(避免重复DB/翻译/embedding/写ES) | ||
| 416 | + if spu_ids: | ||
| 417 | + spu_ids = list(dict.fromkeys(spu_ids)) | ||
| 418 | + | ||
| 250 | start_time = time.time() | 419 | start_time = time.time() |
| 251 | total_count = len(spu_ids) | 420 | total_count = len(spu_ids) |
| 252 | delete_count = len(delete_spu_ids) if delete_spu_ids else 0 | 421 | delete_count = len(delete_spu_ids) if delete_spu_ids else 0 |
| @@ -283,53 +452,121 @@ class IncrementalIndexerService: | @@ -283,53 +452,121 @@ class IncrementalIndexerService: | ||
| 283 | result = self._delete_spu_from_es(es_client, tenant_id, spu_id, index_name, "explicit") | 452 | result = self._delete_spu_from_es(es_client, tenant_id, spu_id, index_name, "explicit") |
| 284 | delete_results.append({"spu_id": spu_id, **result}) | 453 | delete_results.append({"spu_id": spu_id, **result}) |
| 285 | 454 | ||
| 286 | - # 步骤1: 处理索引请求(spu_ids),并自动检测删除 | ||
| 287 | - for spu_id in spu_ids: | ||
| 288 | - try: | ||
| 289 | - log_spu_processing(indexer_logger, tenant_id, spu_id, 'fetching') | ||
| 290 | - | ||
| 291 | - # 先检查SPU是否在数据库中被标记为删除 | ||
| 292 | - if self.check_spu_deleted(tenant_id, spu_id): | ||
| 293 | - # SPU已删除,从ES中删除对应文档 | ||
| 294 | - logger.info(f"[IncrementalIndexing] SPU {spu_id} is deleted in DB, removing from ES") | ||
| 295 | - result = self._delete_spu_from_es(es_client, tenant_id, spu_id, index_name, "auto") | ||
| 296 | - # 统一状态:deleted或not_found都算deleted,failed保持failed | 455 | + # 步骤1: 批量获取SPU/SKU/Option数据,并自动检测删除 |
| 456 | + if spu_ids: | ||
| 457 | + log_spu_processing(indexer_logger, tenant_id, ",".join(spu_ids[:10]), 'fetching') | ||
| 458 | + | ||
| 459 | + # 批量加载SPU(包含deleted字段,用于判断删除) | ||
| 460 | + spu_df = self._load_spus_for_spu_ids(tenant_id, spu_ids, include_deleted=True) | ||
| 461 | + if spu_df.empty: | ||
| 462 | + # 所有SPU都不存在,按“需要删除”处理 | ||
| 463 | + for spu_id in spu_ids: | ||
| 464 | + logger.info(f"[IncrementalIndexing] SPU {spu_id} not found in DB, removing from ES") | ||
| 465 | + result = self._delete_spu_from_es(es_client, tenant_id, spu_id, index_name, "auto_missing") | ||
| 297 | status = "deleted" if result["status"] != "failed" else "failed" | 466 | status = "deleted" if result["status"] != "failed" else "failed" |
| 298 | spu_results.append({ | 467 | spu_results.append({ |
| 299 | "spu_id": spu_id, | 468 | "spu_id": spu_id, |
| 300 | "status": status, | 469 | "status": status, |
| 301 | **({"msg": result["msg"]} if status == "failed" else {}) | 470 | **({"msg": result["msg"]} if status == "failed" else {}) |
| 302 | }) | 471 | }) |
| 303 | - continue | ||
| 304 | - | ||
| 305 | - # SPU未删除,正常获取文档 | ||
| 306 | - doc = self.get_spu_document(tenant_id=tenant_id, spu_id=spu_id) | ||
| 307 | - | ||
| 308 | - if doc is None: | ||
| 309 | - # 这种情况不应该发生,因为我们已经检查了deleted字段 | ||
| 310 | - # 但为了健壮性,仍然处理 | ||
| 311 | - error_msg = "SPU not found (unexpected)" | ||
| 312 | - logger.warning(f"[IncrementalIndexing] SPU {spu_id} not found after deleted check") | ||
| 313 | - log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg) | 472 | + else: |
| 473 | + # 建立索引:id -> row | ||
| 474 | + spu_df = spu_df.copy() | ||
| 475 | + # Normalize deleted column to bool | ||
| 476 | + def _is_deleted_value(v: Any) -> bool: | ||
| 477 | + if isinstance(v, bytes): | ||
| 478 | + return v == b"\x01" or v == 1 | ||
| 479 | + return bool(v) | ||
| 480 | + | ||
| 481 | + spu_df["_is_deleted"] = spu_df["deleted"].apply(_is_deleted_value) | ||
| 482 | + spu_df.set_index("id", inplace=True, drop=False) | ||
| 483 | + | ||
| 484 | + found_ids = set(int(x) for x in spu_df.index.tolist()) | ||
| 485 | + requested_ids_int = set(self._normalize_spu_ids(spu_ids)) | ||
| 486 | + missing_ids_int = requested_ids_int - found_ids | ||
| 487 | + | ||
| 488 | + # missing -> delete from ES | ||
| 489 | + for missing_id in sorted(missing_ids_int): | ||
| 490 | + spu_id_str = str(missing_id) | ||
| 491 | + logger.info(f"[IncrementalIndexing] SPU {spu_id_str} not found in DB, removing from ES") | ||
| 492 | + result = self._delete_spu_from_es(es_client, tenant_id, spu_id_str, index_name, "auto_missing") | ||
| 493 | + status = "deleted" if result["status"] != "failed" else "failed" | ||
| 314 | spu_results.append({ | 494 | spu_results.append({ |
| 315 | - "spu_id": spu_id, | ||
| 316 | - "status": "failed", | ||
| 317 | - "msg": error_msg | 495 | + "spu_id": spu_id_str, |
| 496 | + "status": status, | ||
| 497 | + **({"msg": result["msg"]} if status == "failed" else {}) | ||
| 318 | }) | 498 | }) |
| 319 | - continue | ||
| 320 | - | ||
| 321 | - log_spu_processing(indexer_logger, tenant_id, spu_id, 'transforming') | ||
| 322 | - documents.append((spu_id, doc)) # 保存spu_id和doc的对应关系 | ||
| 323 | - | ||
| 324 | - except Exception as e: | ||
| 325 | - error_msg = str(e) | ||
| 326 | - logger.error(f"[IncrementalIndexing] Error processing SPU {spu_id}: {e}", exc_info=True) | ||
| 327 | - log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg) | ||
| 328 | - spu_results.append({ | ||
| 329 | - "spu_id": spu_id, | ||
| 330 | - "status": "failed", | ||
| 331 | - "msg": error_msg | ||
| 332 | - }) | 499 | + |
| 500 | + # deleted -> delete from ES | ||
| 501 | + deleted_rows = spu_df[spu_df["_is_deleted"]] | ||
| 502 | + for _, row in deleted_rows.iterrows(): | ||
| 503 | + spu_id_str = str(int(row["id"])) | ||
| 504 | + logger.info(f"[IncrementalIndexing] SPU {spu_id_str} is deleted in DB, removing from ES") | ||
| 505 | + result = self._delete_spu_from_es(es_client, tenant_id, spu_id_str, index_name, "auto") | ||
| 506 | + status = "deleted" if result["status"] != "failed" else "failed" | ||
| 507 | + spu_results.append({ | ||
| 508 | + "spu_id": spu_id_str, | ||
| 509 | + "status": status, | ||
| 510 | + **({"msg": result["msg"]} if status == "failed" else {}) | ||
| 511 | + }) | ||
| 512 | + | ||
| 513 | + # active -> batch load sku/option then transform | ||
| 514 | + active_spu_df = spu_df[~spu_df["_is_deleted"]] | ||
| 515 | + active_ids_str = [str(int(x)) for x in active_spu_df["id"].tolist()] | ||
| 516 | + | ||
| 517 | + skus_df = self._load_skus_for_spu_ids(tenant_id, active_ids_str) | ||
| 518 | + options_df = self._load_options_for_spu_ids(tenant_id, active_ids_str) | ||
| 519 | + sku_groups = skus_df.groupby("spu_id") if not skus_df.empty else None | ||
| 520 | + option_groups = options_df.groupby("spu_id") if not options_df.empty else None | ||
| 521 | + | ||
| 522 | + transformer, encoder, enable_embedding = self._get_transformer_bundle(tenant_id) | ||
| 523 | + | ||
| 524 | + # 按输入顺序处理 active SPUs | ||
| 525 | + for spu_id in spu_ids: | ||
| 526 | + try: | ||
| 527 | + spu_id_int = int(spu_id) | ||
| 528 | + except Exception: | ||
| 529 | + continue | ||
| 530 | + if spu_id_int not in active_spu_df.index: | ||
| 531 | + continue | ||
| 532 | + | ||
| 533 | + log_spu_processing(indexer_logger, tenant_id, spu_id, 'transforming') | ||
| 534 | + spu_row = active_spu_df.loc[spu_id_int] | ||
| 535 | + skus_for_spu = sku_groups.get_group(spu_id_int) if sku_groups is not None and spu_id_int in sku_groups.groups else pd.DataFrame() | ||
| 536 | + opts_for_spu = option_groups.get_group(spu_id_int) if option_groups is not None and spu_id_int in option_groups.groups else pd.DataFrame() | ||
| 537 | + | ||
| 538 | + doc = transformer.transform_spu_to_doc( | ||
| 539 | + tenant_id=tenant_id, | ||
| 540 | + spu_row=spu_row, | ||
| 541 | + skus=skus_for_spu, | ||
| 542 | + options=opts_for_spu, | ||
| 543 | + ) | ||
| 544 | + if doc is None: | ||
| 545 | + error_msg = "SPU transform returned None" | ||
| 546 | + log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg) | ||
| 547 | + spu_results.append({"spu_id": spu_id, "status": "failed", "msg": error_msg}) | ||
| 548 | + continue | ||
| 549 | + | ||
| 550 | + documents.append((spu_id, doc)) | ||
| 551 | + | ||
| 552 | + # 批量生成 embedding(保持翻译逻辑不变;embedding 走缓存) | ||
| 553 | + if enable_embedding and encoder and documents: | ||
| 554 | + title_texts: List[str] = [] | ||
| 555 | + title_doc_indices: List[int] = [] | ||
| 556 | + for i, (_, doc) in enumerate(documents): | ||
| 557 | + title_text = doc.get("title_en") or doc.get("title_zh") | ||
| 558 | + if title_text and str(title_text).strip(): | ||
| 559 | + title_texts.append(str(title_text)) | ||
| 560 | + title_doc_indices.append(i) | ||
| 561 | + | ||
| 562 | + if title_texts: | ||
| 563 | + try: | ||
| 564 | + embeddings = encoder.encode_batch(title_texts, batch_size=32) | ||
| 565 | + for j, emb in enumerate(embeddings): | ||
| 566 | + doc_idx = title_doc_indices[j] | ||
| 567 | + documents[doc_idx][1]["title_embedding"] = emb.tolist() | ||
| 568 | + except Exception as e: | ||
| 569 | + logger.warning(f"[IncrementalIndexing] Batch embedding failed for tenant_id={tenant_id}: {e}", exc_info=True) | ||
| 333 | 570 | ||
| 334 | logger.info(f"[IncrementalIndexing] Transformed {len(documents)}/{total_count} documents") | 571 | logger.info(f"[IncrementalIndexing] Transformed {len(documents)}/{total_count} documents") |
| 335 | 572 |
indexer/indexing_utils.py
| @@ -58,7 +58,8 @@ def create_document_transformer( | @@ -58,7 +58,8 @@ def create_document_transformer( | ||
| 58 | translator: Optional[Any] = None, | 58 | translator: Optional[Any] = None, |
| 59 | translation_prompts: Optional[Dict[str, str]] = None, | 59 | translation_prompts: Optional[Dict[str, str]] = None, |
| 60 | encoder: Optional[Any] = None, | 60 | encoder: Optional[Any] = None, |
| 61 | - enable_title_embedding: bool = True | 61 | + enable_title_embedding: bool = True, |
| 62 | + config: Optional[Any] = None, | ||
| 62 | ) -> SPUDocumentTransformer: | 63 | ) -> SPUDocumentTransformer: |
| 63 | """ | 64 | """ |
| 64 | 创建文档转换器(统一初始化逻辑)。 | 65 | 创建文档转换器(统一初始化逻辑)。 |
| @@ -80,10 +81,17 @@ def create_document_transformer( | @@ -80,10 +81,17 @@ def create_document_transformer( | ||
| 80 | tenant_config = tenant_config_loader.get_tenant_config(tenant_id) | 81 | tenant_config = tenant_config_loader.get_tenant_config(tenant_id) |
| 81 | 82 | ||
| 82 | # 加载搜索配置(如果需要) | 83 | # 加载搜索配置(如果需要) |
| 83 | - if searchable_option_dimensions is None or translator is None or translation_prompts is None or (encoder is None and enable_title_embedding): | 84 | + if ( |
| 85 | + searchable_option_dimensions is None | ||
| 86 | + or translator is None | ||
| 87 | + or translation_prompts is None | ||
| 88 | + or (encoder is None and enable_title_embedding) | ||
| 89 | + or config is None | ||
| 90 | + ): | ||
| 84 | try: | 91 | try: |
| 85 | - config_loader = ConfigLoader() | ||
| 86 | - config = config_loader.load_config() | 92 | + if config is None: |
| 93 | + config_loader = ConfigLoader() | ||
| 94 | + config = config_loader.load_config() | ||
| 87 | 95 | ||
| 88 | if searchable_option_dimensions is None: | 96 | if searchable_option_dimensions is None: |
| 89 | searchable_option_dimensions = config.spu_config.searchable_option_dimensions | 97 | searchable_option_dimensions = config.spu_config.searchable_option_dimensions |
| @@ -0,0 +1,263 @@ | @@ -0,0 +1,263 @@ | ||
| 1 | +{ | ||
| 2 | + "settings": { | ||
| 3 | + "number_of_shards": 1, | ||
| 4 | + "number_of_replicas": 0, | ||
| 5 | + "refresh_interval": "30s", | ||
| 6 | + "analysis": { | ||
| 7 | + "analyzer": { | ||
| 8 | + "hanlp_index": { | ||
| 9 | + "type": "custom", | ||
| 10 | + "tokenizer": "standard", | ||
| 11 | + "filter": ["lowercase", "asciifolding"] | ||
| 12 | + }, | ||
| 13 | + "hanlp_standard": { | ||
| 14 | + "type": "custom", | ||
| 15 | + "tokenizer": "standard", | ||
| 16 | + "filter": ["lowercase", "asciifolding"] | ||
| 17 | + } | ||
| 18 | + }, | ||
| 19 | + "normalizer": { | ||
| 20 | + "lowercase": { | ||
| 21 | + "type": "custom", | ||
| 22 | + "filter": ["lowercase"] | ||
| 23 | + } | ||
| 24 | + } | ||
| 25 | + }, | ||
| 26 | + "similarity": { | ||
| 27 | + "default": { | ||
| 28 | + "type": "BM25", | ||
| 29 | + "b": 0.0, | ||
| 30 | + "k1": 0.0 | ||
| 31 | + } | ||
| 32 | + } | ||
| 33 | + }, | ||
| 34 | + "mappings": { | ||
| 35 | + "properties": { | ||
| 36 | + "tenant_id": { | ||
| 37 | + "type": "keyword" | ||
| 38 | + }, | ||
| 39 | + "spu_id": { | ||
| 40 | + "type": "keyword" | ||
| 41 | + }, | ||
| 42 | + "create_time": { | ||
| 43 | + "type": "date" | ||
| 44 | + }, | ||
| 45 | + "update_time": { | ||
| 46 | + "type": "date" | ||
| 47 | + }, | ||
| 48 | + "title_zh": { | ||
| 49 | + "type": "text", | ||
| 50 | + "analyzer": "hanlp_index", | ||
| 51 | + "search_analyzer": "hanlp_standard" | ||
| 52 | + }, | ||
| 53 | + "brief_zh": { | ||
| 54 | + "type": "text", | ||
| 55 | + "analyzer": "hanlp_index", | ||
| 56 | + "search_analyzer": "hanlp_standard" | ||
| 57 | + }, | ||
| 58 | + "description_zh": { | ||
| 59 | + "type": "text", | ||
| 60 | + "analyzer": "hanlp_index", | ||
| 61 | + "search_analyzer": "hanlp_standard" | ||
| 62 | + }, | ||
| 63 | + "vendor_zh": { | ||
| 64 | + "type": "text", | ||
| 65 | + "analyzer": "hanlp_index", | ||
| 66 | + "search_analyzer": "hanlp_standard", | ||
| 67 | + "fields": { | ||
| 68 | + "keyword": { | ||
| 69 | + "type": "keyword", | ||
| 70 | + "normalizer": "lowercase" | ||
| 71 | + } | ||
| 72 | + } | ||
| 73 | + }, | ||
| 74 | + "title_en": { | ||
| 75 | + "type": "text", | ||
| 76 | + "analyzer": "english", | ||
| 77 | + "search_analyzer": "english" | ||
| 78 | + }, | ||
| 79 | + "brief_en": { | ||
| 80 | + "type": "text", | ||
| 81 | + "analyzer": "english", | ||
| 82 | + "search_analyzer": "english" | ||
| 83 | + }, | ||
| 84 | + "description_en": { | ||
| 85 | + "type": "text", | ||
| 86 | + "analyzer": "english", | ||
| 87 | + "search_analyzer": "english" | ||
| 88 | + }, | ||
| 89 | + "vendor_en": { | ||
| 90 | + "type": "text", | ||
| 91 | + "analyzer": "english", | ||
| 92 | + "search_analyzer": "english", | ||
| 93 | + "fields": { | ||
| 94 | + "keyword": { | ||
| 95 | + "type": "keyword", | ||
| 96 | + "normalizer": "lowercase" | ||
| 97 | + } | ||
| 98 | + } | ||
| 99 | + }, | ||
| 100 | + "tags": { | ||
| 101 | + "type": "keyword" | ||
| 102 | + }, | ||
| 103 | + "image_url": { | ||
| 104 | + "type": "keyword", | ||
| 105 | + "index": false | ||
| 106 | + }, | ||
| 107 | + "title_embedding": { | ||
| 108 | + "type": "dense_vector", | ||
| 109 | + "dims": 1024, | ||
| 110 | + "index": true, | ||
| 111 | + "similarity": "dot_product" | ||
| 112 | + }, | ||
| 113 | + "image_embedding": { | ||
| 114 | + "type": "nested", | ||
| 115 | + "properties": { | ||
| 116 | + "vector": { | ||
| 117 | + "type": "dense_vector", | ||
| 118 | + "dims": 1024, | ||
| 119 | + "index": true, | ||
| 120 | + "similarity": "dot_product" | ||
| 121 | + }, | ||
| 122 | + "url": { | ||
| 123 | + "type": "text" | ||
| 124 | + } | ||
| 125 | + } | ||
| 126 | + }, | ||
| 127 | + "category_path_zh": { | ||
| 128 | + "type": "text", | ||
| 129 | + "analyzer": "hanlp_index", | ||
| 130 | + "search_analyzer": "hanlp_standard" | ||
| 131 | + }, | ||
| 132 | + "category_path_en": { | ||
| 133 | + "type": "text", | ||
| 134 | + "analyzer": "english", | ||
| 135 | + "search_analyzer": "english" | ||
| 136 | + }, | ||
| 137 | + "category_name_zh": { | ||
| 138 | + "type": "text", | ||
| 139 | + "analyzer": "hanlp_index", | ||
| 140 | + "search_analyzer": "hanlp_standard" | ||
| 141 | + }, | ||
| 142 | + "category_name_en": { | ||
| 143 | + "type": "text", | ||
| 144 | + "analyzer": "english", | ||
| 145 | + "search_analyzer": "english" | ||
| 146 | + }, | ||
| 147 | + "category_id": { | ||
| 148 | + "type": "keyword" | ||
| 149 | + }, | ||
| 150 | + "category_name": { | ||
| 151 | + "type": "keyword" | ||
| 152 | + }, | ||
| 153 | + "category_level": { | ||
| 154 | + "type": "integer" | ||
| 155 | + }, | ||
| 156 | + "category1_name": { | ||
| 157 | + "type": "keyword" | ||
| 158 | + }, | ||
| 159 | + "category2_name": { | ||
| 160 | + "type": "keyword" | ||
| 161 | + }, | ||
| 162 | + "category3_name": { | ||
| 163 | + "type": "keyword" | ||
| 164 | + }, | ||
| 165 | + "specifications": { | ||
| 166 | + "type": "nested", | ||
| 167 | + "properties": { | ||
| 168 | + "sku_id": { | ||
| 169 | + "type": "keyword" | ||
| 170 | + }, | ||
| 171 | + "name": { | ||
| 172 | + "type": "keyword" | ||
| 173 | + }, | ||
| 174 | + "value": { | ||
| 175 | + "type": "keyword" | ||
| 176 | + } | ||
| 177 | + } | ||
| 178 | + }, | ||
| 179 | + "option1_name": { | ||
| 180 | + "type": "keyword" | ||
| 181 | + }, | ||
| 182 | + "option2_name": { | ||
| 183 | + "type": "keyword" | ||
| 184 | + }, | ||
| 185 | + "option3_name": { | ||
| 186 | + "type": "keyword" | ||
| 187 | + }, | ||
| 188 | + "option1_values": { | ||
| 189 | + "type": "keyword" | ||
| 190 | + }, | ||
| 191 | + "option2_values": { | ||
| 192 | + "type": "keyword" | ||
| 193 | + }, | ||
| 194 | + "option3_values": { | ||
| 195 | + "type": "keyword" | ||
| 196 | + }, | ||
| 197 | + "min_price": { | ||
| 198 | + "type": "float" | ||
| 199 | + }, | ||
| 200 | + "max_price": { | ||
| 201 | + "type": "float" | ||
| 202 | + }, | ||
| 203 | + "compare_at_price": { | ||
| 204 | + "type": "float" | ||
| 205 | + }, | ||
| 206 | + "sku_prices": { | ||
| 207 | + "type": "float" | ||
| 208 | + }, | ||
| 209 | + "sku_weights": { | ||
| 210 | + "type": "long" | ||
| 211 | + }, | ||
| 212 | + "sku_weight_units": { | ||
| 213 | + "type": "keyword" | ||
| 214 | + }, | ||
| 215 | + "total_inventory": { | ||
| 216 | + "type": "long" | ||
| 217 | + }, | ||
| 218 | + "sales": { | ||
| 219 | + "type": "long" | ||
| 220 | + }, | ||
| 221 | + "skus": { | ||
| 222 | + "type": "nested", | ||
| 223 | + "properties": { | ||
| 224 | + "sku_id": { | ||
| 225 | + "type": "keyword" | ||
| 226 | + }, | ||
| 227 | + "price": { | ||
| 228 | + "type": "float" | ||
| 229 | + }, | ||
| 230 | + "compare_at_price": { | ||
| 231 | + "type": "float" | ||
| 232 | + }, | ||
| 233 | + "sku_code": { | ||
| 234 | + "type": "keyword" | ||
| 235 | + }, | ||
| 236 | + "stock": { | ||
| 237 | + "type": "long" | ||
| 238 | + }, | ||
| 239 | + "weight": { | ||
| 240 | + "type": "float" | ||
| 241 | + }, | ||
| 242 | + "weight_unit": { | ||
| 243 | + "type": "keyword" | ||
| 244 | + }, | ||
| 245 | + "option1_value": { | ||
| 246 | + "type": "keyword" | ||
| 247 | + }, | ||
| 248 | + "option2_value": { | ||
| 249 | + "type": "keyword" | ||
| 250 | + }, | ||
| 251 | + "option3_value": { | ||
| 252 | + "type": "keyword" | ||
| 253 | + }, | ||
| 254 | + "image_src": { | ||
| 255 | + "type": "keyword", | ||
| 256 | + "index": false | ||
| 257 | + } | ||
| 258 | + } | ||
| 259 | + } | ||
| 260 | + } | ||
| 261 | + } | ||
| 262 | +} | ||
| 263 | + |