Commit 984f14f924c96c9c4d2ac4fc0a7310af8e6a97ab

Authored by tangwang
1 parent 32e9b30c

product_enrich模块迁出

config/config.yaml
... ... @@ -74,10 +74,10 @@ search_evaluation:
74 74 search_base_url: ''
75 75 web_host: 0.0.0.0
76 76 web_port: 6010
77   - judge_model: qwen3.5-plus
  77 + judge_model: qwen3.6-plus
78 78 judge_enable_thinking: false
79 79 judge_dashscope_batch: false
80   - intent_model: qwen3-max
  80 + intent_model: qwen3.6-plus
81 81 intent_enable_thinking: true
82 82 judge_batch_completion_window: 24h
83 83 judge_batch_poll_interval_sec: 10.0
... ...
docs/工作总结-微服务性能优化与架构.md
... ... @@ -98,7 +98,7 @@ instruction: "Given a shopping query, rank product titles by relevance"
98 98 **能力**:支持根据商品标题批量生成 **qanchors**(锚文本)、**enriched_attributes**、**tags**,供索引与 suggest 使用。
99 99  
100 100 **具体内容**:
101   -- **接口**:`POST /indexer/enrich-content`(Indexer 服务端口 **6004**)。请求体为 `items` 数组,每项含 `spu_id`、`title`(必填)及可选多语言标题等;单次请求最多 **50 条**,建议批量调用。响应 `results` 与 `items` 一一对应,每项含 `spu_id`、`qanchors`(按语言键,如 `qanchors.zh`、`qanchors.en`,逗号分隔短语)、`enriched_attributes`、`tags`。
  101 +- **接口**:`POST /indexer/enrich-content`(FacetAwareMatching 服务端口 **6001**)。请求体为 `items` 数组,每项含 `spu_id`、`title`(必填)及可选多语言标题等;单次请求最多 **50 条**,建议批量调用。响应 `results` 与 `items` 一一对应,每项含 `spu_id`、`qanchors`(按语言键,如 `qanchors.zh`、`qanchors.en`,逗号分隔短语)、`enriched_attributes`、`tags`。
102 102 - **索引侧**:微服务组合方式下,调用方先拿不含 qanchors/tags 的 doc,再调用本接口补齐后写入 ES 的 `qanchors.{lang}` 等字段;索引 transformer(`indexer/document_transformer.py`、`indexer/product_enrich.py`)内也可在构建 doc 时调用内容理解逻辑,写入 `qanchors.{lang}`。
103 103 - **Suggest 侧**:`suggestion/builder.py` 从 ES 商品索引读取 `_source: ["id", "spu_id", "title", "qanchors"]`,对 `qanchors.{lang}` 用 `_split_qanchors` 拆成词条,以 `source="qanchor"` 加入候选,排序时 `qanchor` 权重大于纯 title(`add_product("qanchor", ...)`);suggest 配置中 `sources: ["query_log", "qanchor"]` 表示候选来源包含 qanchor。
104 104 - **实现与依赖**:内容理解内部使用大模型(需 `DASHSCOPE_API_KEY`),支持多语言与 Redis 缓存(如 `product_anchors`);逻辑与 `indexer/product_enrich` 一致。
... ...
docs/搜索API对接指南-00-总览与快速开始.md
... ... @@ -90,7 +90,7 @@ curl -X POST "http://43.166.252.75:6002/search/" \
90 90 | 查询文档 | POST | `/indexer/documents` | 查询SPU文档数据(不写入ES) |
91 91 | 构建ES文档(正式对接) | POST | `/indexer/build-docs` | 基于上游提供的 MySQL 行数据构建 ES doc,不写入 ES,供 Java 等调用后自行写入 |
92 92 | 构建ES文档(测试用) | POST | `/indexer/build-docs-from-db` | 仅在测试/调试时使用,根据 `tenant_id + spu_ids` 内部查库并构建 ES doc |
93   -| 内容理解字段生成 | POST | `/indexer/enrich-content` | 根据商品标题批量生成 qanchors、enriched_attributes、tags,供微服务组合方式使用 |
  93 +| 内容理解字段生成 | POST | `/indexer/enrich-content` | 根据商品标题批量生成 qanchors、enriched_attributes、tags,供微服务组合方式使用(独立服务端口 6001) |
94 94 | 索引健康检查 | GET | `/indexer/health` | 检查索引服务状态 |
95 95 | 健康检查 | GET | `/admin/health` | 服务健康检查 |
96 96 | 获取配置 | GET | `/admin/config` | 获取租户配置 |
... ... @@ -104,7 +104,6 @@ curl -X POST "http://43.166.252.75:6002/search/" \
104 104 | 向量服务(图片) | 6008 | `POST /embed/image` | 图片向量化 |
105 105 | 翻译服务 | 6006 | `POST /translate` | 文本翻译(支持 qwen-mt / llm / deepl / 本地模型) |
106 106 | 重排服务 | 6007 | `POST /rerank` | 检索结果重排 |
107   -| 内容理解(Indexer 内) | 6004 | `POST /indexer/enrich-content` | 根据商品标题生成 qanchors、tags 等,供 indexer 微服务组合方式使用 |
  107 +| 内容理解(独立服务) | 6001 | `POST /indexer/enrich-content` | 根据商品标题生成 qanchors、tags 等,供 indexer 微服务组合方式使用 |
108 108  
109 109 ---
110   -
... ...
docs/搜索API对接指南-05-索引接口(Indexer).md
... ... @@ -13,7 +13,7 @@
13 13 | 查询文档 | POST | `/indexer/documents` | 按 SPU ID 列表查询 ES 文档,不写入 ES |
14 14 | 构建 ES 文档(正式) | POST | `/indexer/build-docs` | 由上游提供 MySQL 行数据,返回 ES-ready 文档,不写 ES |
15 15 | 构建 ES 文档(测试) | POST | `/indexer/build-docs-from-db` | 由本服务查库并构建文档,仅测试/调试用 |
16   -| 内容理解字段生成 | POST | `/indexer/enrich-content` | 根据商品标题批量生成 qanchors、enriched_attributes、tags(供微服务组合方式使用) |
  16 +| 内容理解字段生成 | POST | `/indexer/enrich-content` | 根据商品标题批量生成 qanchors、enriched_attributes、tags(供微服务组合方式使用;独立服务端口 6001) |
17 17 | 索引健康检查 | GET | `/indexer/health` | 检查索引服务与数据库连接状态 |
18 18  
19 19 #### 5.0 支撑外部 indexer 的三种方式
... ... @@ -23,7 +23,7 @@
23 23 | 方式 | 说明 | 适用场景 |
24 24 |------|------|----------|
25 25 | **1)doc 填充接口** | 调用 `POST /indexer/build-docs` 或 `POST /indexer/build-docs-from-db`,由本服务基于 MySQL 行数据构建完整 ES 文档(含多语言、向量、规格等),**不写入 ES**,由调用方自行写入。 | 希望一站式拿到 ES-ready doc,由己方控制写 ES 的时机与索引名。 |
26   -| **2)微服务组合** | 单独调用**翻译**、**向量化**、**内容理解字段生成**等接口,由 indexer 程序自己组装 doc 并写入 ES。翻译与向量化为独立微服务(见第 7 节);内容理解为 Indexer 服务内接口 `POST /indexer/enrich-content`。 | 需要灵活编排、或希望将 LLM/向量等耗时步骤与主链路解耦(如异步补齐 qanchors/tags)。 |
  26 +| **2)微服务组合** | 单独调用**翻译**、**向量化**、**内容理解字段生成**等接口,由 indexer 程序自己组装 doc 并写入 ES。翻译与向量化为独立微服务(见第 7 节);内容理解为 FacetAwareMatching 独立服务接口 `POST /indexer/enrich-content`(端口 6001)。 | 需要灵活编排、或希望将 LLM/向量等耗时步骤与主链路解耦(如异步补齐 qanchors/tags)。 |
27 27 | **3)本服务直接写 ES** | 调用全量索引 `POST /indexer/reindex`、增量索引 `POST /indexer/index`(指定 SPU ID 列表),由本服务从 MySQL 拉数并直接写入 ES。 | 自建运维、联调或不需要由 Java 写 ES 的场景。 |
28 28  
29 29 - **方式 1** 与 **方式 2** 下,ES 的写入方均为外部 indexer(或 Java),职责清晰。
... ... @@ -648,7 +648,8 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \
648 648 ### 5.8 内容理解字段生成接口
649 649  
650 650 - **端点**: `POST /indexer/enrich-content`
651   -- **描述**: 根据商品内容信息批量生成 **qanchors**(锚文本)、**enriched_attributes**(通用语义属性)、**enriched_tags**(细分标签)、**enriched_taxonomy_attributes**(taxonomy 结构化属性),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。请求以 `items[]` 传入商品内容字段(必填/可选见下表)。接口只暴露商品内容输入,语言选择、分析维度与最终字段结构统一由 `indexer.product_enrich` 内部决定;当前返回结果与 `search_products` mapping 保持一致。单次请求在线程池中执行,避免阻塞其他接口。
  651 +- **服务**: FacetAwareMatching 独立服务(默认端口 **6001**;由 `/data/FacetAwareMatching/scripts/service_ctl.sh` 管理)
  652 +- **描述**: 根据商品内容信息批量生成 **qanchors**(锚文本)、**enriched_attributes**(通用语义属性)、**enriched_tags**(细分标签)、**enriched_taxonomy_attributes**(taxonomy 结构化属性),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。请求以 `items[]` 传入商品内容字段(必填/可选见下表)。接口只暴露商品内容输入,语言选择、分析维度与最终字段结构统一由 FacetAwareMatching 的 `product_enrich` 内部决定;当前返回结果与 `search_products` mapping 保持一致。单次请求在线程池中执行,避免阻塞其他接口。
652 653  
653 654 当前支持的 `category_taxonomy_profile`:
654 655 - `apparel`
... ... @@ -670,7 +671,7 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \
670 671 说明:
671 672 - 所有 profile 的 `enriched_taxonomy_attributes.value` 都统一返回 `zh` + `en`。
672 673 - 外部调用 `/indexer/enrich-content` 时,以请求中的 `category_taxonomy_profile` 为准。
673   -- 当前 Indexer 内部构建 ES 文档时,taxonomy profile 暂时固定使用 `apparel`;代码里已保留 TODO,后续从数据库读取该租户真实所属行业后再替换
  674 +- 若 indexer 内部仍接入内容理解能力,taxonomy profile 请在调用侧显式传入(建议仍以租户行业配置为准)
674 675  
675 676 #### 请求参数
676 677  
... ... @@ -796,7 +797,7 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \
796 797 #### 请求示例
797 798  
798 799 ```bash
799   -curl -X POST "http://localhost:6004/indexer/enrich-content" \
  800 +curl -X POST "http://localhost:6001/indexer/enrich-content" \
800 801 -H "Content-Type: application/json" \
801 802 -d '{
802 803 "tenant_id": "163",
... ...
suggestion/builder.py
... ... @@ -366,7 +366,8 @@ class SuggestionIndexBuilder:
366 366  
367 367 index_name = get_tenant_index_name(tenant_id)
368 368 search_after: Optional[List[Any]] = None
369   -
  369 + print(f"[DEBUG] Python using index: {index_name} for tenant {tenant_id}")
  370 + total_processed = 0
370 371 while True:
371 372 body: Dict[str, Any] = {
372 373 "size": batch_size,
... ... @@ -385,10 +386,13 @@ class SuggestionIndexBuilder:
385 386 if not hits:
386 387 break
387 388 for hit in hits:
  389 + total_processed += 1
388 390 yield hit
389 391 search_after = hits[-1].get("sort")
390 392 if len(hits) < batch_size:
391 393 break
  394 + print(f"[DEBUG] Python processed total products: {total_processed} for tenant {tenant_id}")
  395 +
392 396  
393 397 def _iter_query_log_rows(
394 398 self,
... ...
suggestion/builder.py.bak 0 → 100644
... ... @@ -0,0 +1,1014 @@
  1 +"""
  2 +Suggestion index builder (Phase 2).
  3 +
  4 +Capabilities:
  5 +- Full rebuild to versioned index
  6 +- Atomic alias publish
  7 +- Incremental update from query logs with watermark
  8 +"""
  9 +
  10 +import json
  11 +import logging
  12 +import math
  13 +import re
  14 +import unicodedata
  15 +from dataclasses import dataclass, field
  16 +from datetime import datetime, timedelta, timezone
  17 +from typing import Any, Dict, Iterator, List, Optional, Tuple
  18 +
  19 +from sqlalchemy import text
  20 +
  21 +from config.loader import get_app_config
  22 +from config.tenant_config_loader import get_tenant_config_loader
  23 +from query.query_parser import detect_text_language_for_suggestions
  24 +from suggestion.mapping import build_suggestion_mapping
  25 +from utils.es_client import ESClient
  26 +
  27 +logger = logging.getLogger(__name__)
  28 +
  29 +
  30 +def _index_prefix() -> str:
  31 + return get_app_config().runtime.index_namespace or ""
  32 +
  33 +
  34 +def get_suggestion_alias_name(tenant_id: str) -> str:
  35 + """Read alias for suggestion index (single source of truth)."""
  36 + return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_current"
  37 +
  38 +
  39 +def get_suggestion_versioned_index_name(tenant_id: str, build_at: Optional[datetime] = None) -> str:
  40 + """Versioned suggestion index name."""
  41 + ts = (build_at or datetime.now(timezone.utc)).strftime("%Y%m%d%H%M%S%f")
  42 + return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_v{ts}"
  43 +
  44 +
  45 +def get_suggestion_versioned_index_pattern(tenant_id: str) -> str:
  46 + return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_v*"
  47 +
  48 +
  49 +def get_suggestion_meta_index_name() -> str:
  50 + return f"{_index_prefix()}search_suggestions_meta"
  51 +
  52 +
  53 +@dataclass
  54 +class SuggestionCandidate:
  55 + text: str
  56 + text_norm: str
  57 + lang: str
  58 + sources: set = field(default_factory=set)
  59 + title_spu_ids: set = field(default_factory=set)
  60 + qanchor_spu_ids: set = field(default_factory=set)
  61 + tag_spu_ids: set = field(default_factory=set)
  62 + query_count_7d: int = 0
  63 + query_count_30d: int = 0
  64 + lang_confidence: float = 1.0
  65 + lang_source: str = "default"
  66 + lang_conflict: bool = False
  67 +
  68 + def add_product(self, source: str, spu_id: str) -> None:
  69 + self.sources.add(source)
  70 + if source == "title":
  71 + self.title_spu_ids.add(spu_id)
  72 + elif source == "qanchor":
  73 + self.qanchor_spu_ids.add(spu_id)
  74 + elif source == "tag":
  75 + self.tag_spu_ids.add(spu_id)
  76 +
  77 + def add_query_log(self, is_7d: bool) -> None:
  78 + self.sources.add("query_log")
  79 + self.query_count_30d += 1
  80 + if is_7d:
  81 + self.query_count_7d += 1
  82 +
  83 +
  84 +@dataclass
  85 +class QueryDelta:
  86 + tenant_id: str
  87 + lang: str
  88 + text: str
  89 + text_norm: str
  90 + delta_7d: int = 0
  91 + delta_30d: int = 0
  92 + lang_confidence: float = 1.0
  93 + lang_source: str = "default"
  94 + lang_conflict: bool = False
  95 +
  96 +
  97 +class SuggestionIndexBuilder:
  98 + """Build and update suggestion index."""
  99 +
  100 + def __init__(self, es_client: ESClient, db_engine: Any):
  101 + self.es_client = es_client
  102 + self.db_engine = db_engine
  103 +
  104 + def _format_allocation_failure(self, index_name: str) -> str:
  105 + health = self.es_client.wait_for_index_ready(index_name=index_name, timeout="5s")
  106 + explain = self.es_client.get_allocation_explain(index_name=index_name)
  107 +
  108 + parts = [
  109 + f"Suggestion index '{index_name}' was created but is not allocatable/readable yet",
  110 + f"health_status={health.get('status')}",
  111 + f"timed_out={health.get('timed_out')}",
  112 + ]
  113 + if health.get("error"):
  114 + parts.append(f"health_error={health['error']}")
  115 +
  116 + if explain:
  117 + unassigned = explain.get("unassigned_info") or {}
  118 + if unassigned.get("reason"):
  119 + parts.append(f"unassigned_reason={unassigned['reason']}")
  120 + if unassigned.get("last_allocation_status"):
  121 + parts.append(f"last_allocation_status={unassigned['last_allocation_status']}")
  122 +
  123 + for node in explain.get("node_allocation_decisions") or []:
  124 + node_name = node.get("node_name") or node.get("node_id") or "unknown-node"
  125 + for decider in node.get("deciders") or []:
  126 + if decider.get("decision") == "NO":
  127 + parts.append(
  128 + f"{node_name}:{decider.get('decider')}={decider.get('explanation')}"
  129 + )
  130 + return "; ".join(parts)
  131 +
  132 + return "; ".join(parts)
  133 +
  134 + def _create_fresh_versioned_index(
  135 + self,
  136 + tenant_id: str,
  137 + mapping: Dict[str, Any],
  138 + max_attempts: int = 5,
  139 + ) -> str:
  140 + for attempt in range(1, max_attempts + 1):
  141 + index_name = get_suggestion_versioned_index_name(tenant_id)
  142 + if self.es_client.index_exists(index_name):
  143 + logger.warning(
  144 + "Suggestion index name collision before create for tenant=%s index=%s attempt=%s/%s",
  145 + tenant_id,
  146 + index_name,
  147 + attempt,
  148 + max_attempts,
  149 + )
  150 + continue
  151 +
  152 + if self.es_client.create_index(index_name, mapping):
  153 + return index_name
  154 +
  155 + if self.es_client.index_exists(index_name):
  156 + logger.warning(
  157 + "Suggestion index name collision during create for tenant=%s index=%s attempt=%s/%s",
  158 + tenant_id,
  159 + index_name,
  160 + attempt,
  161 + max_attempts,
  162 + )
  163 + continue
  164 +
  165 + raise RuntimeError(f"Failed to create suggestion index: {index_name}")
  166 +
  167 + raise RuntimeError(
  168 + f"Failed to allocate a unique suggestion index name for tenant={tenant_id} after {max_attempts} attempts"
  169 + )
  170 +
  171 + def _ensure_new_index_ready(self, index_name: str) -> None:
  172 + health = self.es_client.wait_for_index_ready(index_name=index_name, timeout="5s")
  173 + if health.get("ok"):
  174 + return
  175 + raise RuntimeError(self._format_allocation_failure(index_name))
  176 +
  177 + @staticmethod
  178 + def _to_utc(dt: Any) -> Optional[datetime]:
  179 + if dt is None:
  180 + return None
  181 + if isinstance(dt, datetime):
  182 + if dt.tzinfo is None:
  183 + return dt.replace(tzinfo=timezone.utc)
  184 + return dt.astimezone(timezone.utc)
  185 + return None
  186 +
  187 + @staticmethod
  188 + def _normalize_text(value: str) -> str:
  189 + text_value = unicodedata.normalize("NFKC", (value or "")).strip().lower()
  190 + text_value = re.sub(r"\s+", " ", text_value)
  191 + return text_value
  192 +
  193 + @staticmethod
  194 + def _prepare_title_for_suggest(title: str, max_len: int = 120) -> str:
  195 + """
  196 + Keep title-derived suggestions concise:
  197 + - keep raw title when short enough
  198 + - for long titles, keep the leading phrase before common separators
  199 + - fallback to hard truncate
  200 + """
  201 + raw = str(title or "").strip()
  202 + if not raw:
  203 + return ""
  204 + if len(raw) <= max_len:
  205 + return raw
  206 +
  207 + head = re.split(r"[,,;;|/\\\\((\\[【]", raw, maxsplit=1)[0].strip()
  208 + if 1 < len(head) <= max_len:
  209 + return head
  210 +
  211 + truncated = raw[:max_len].rstrip(" ,,;;|/\\\\-—–()()[]【】")
  212 + return truncated or raw[:max_len]
  213 +
  214 + @staticmethod
  215 + def _split_qanchors(value: Any) -> List[str]:
  216 + if value is None:
  217 + return []
  218 + if isinstance(value, list):
  219 + return [str(x).strip() for x in value if str(x).strip()]
  220 + raw = str(value).strip()
  221 + if not raw:
  222 + return []
  223 + parts = re.split(r"[,、,;|/\n\t]+", raw)
  224 + out = [p.strip() for p in parts if p and p.strip()]
  225 + if not out:
  226 + return [raw]
  227 + return out
  228 +
  229 + @staticmethod
  230 + def _iter_product_tags(raw: Any) -> List[str]:
  231 + if raw is None:
  232 + return []
  233 + if isinstance(raw, list):
  234 + return [str(x).strip() for x in raw if str(x).strip()]
  235 + s = str(raw).strip()
  236 + if not s:
  237 + return []
  238 + parts = re.split(r"[,、,;|/\n\t]+", s)
  239 + out = [p.strip() for p in parts if p and p.strip()]
  240 + return out if out else [s]
  241 +
  242 + def _iter_multilang_product_tags(
  243 + self,
  244 + raw: Any,
  245 + index_languages: List[str],
  246 + primary_language: str,
  247 + ) -> List[Tuple[str, str]]:
  248 + if isinstance(raw, dict):
  249 + pairs: List[Tuple[str, str]] = []
  250 + for lang in index_languages:
  251 + for tag in self._iter_product_tags(raw.get(lang)):
  252 + pairs.append((lang, tag))
  253 + return pairs
  254 +
  255 + pairs = []
  256 + for tag in self._iter_product_tags(raw):
  257 + tag_lang, _, _ = detect_text_language_for_suggestions(
  258 + tag,
  259 + index_languages=index_languages,
  260 + primary_language=primary_language,
  261 + )
  262 + pairs.append((tag_lang, tag))
  263 + return pairs
  264 +
  265 + @staticmethod
  266 + def _looks_noise(text_value: str) -> bool:
  267 + if not text_value:
  268 + return True
  269 + if len(text_value) > 120:
  270 + return True
  271 + if re.fullmatch(r"[\W_]+", text_value):
  272 + return True
  273 + return False
  274 +
  275 + @staticmethod
  276 + def _normalize_lang(lang: Optional[str]) -> Optional[str]:
  277 + if not lang:
  278 + return None
  279 + token = str(lang).strip().lower().replace("-", "_")
  280 + if not token:
  281 + return None
  282 + if token in {"zh_tw", "pt_br"}:
  283 + return token
  284 + return token.split("_")[0]
  285 +
  286 + @staticmethod
  287 + def _parse_request_params_language(raw: Any) -> Optional[str]:
  288 + if raw is None:
  289 + return None
  290 + if isinstance(raw, dict):
  291 + return raw.get("language")
  292 + text_raw = str(raw).strip()
  293 + if not text_raw:
  294 + return None
  295 + try:
  296 + obj = json.loads(text_raw)
  297 + if isinstance(obj, dict):
  298 + return obj.get("language")
  299 + except Exception:
  300 + return None
  301 + return None
  302 +
  303 + def _resolve_query_language(
  304 + self,
  305 + query: str,
  306 + log_language: Optional[str],
  307 + request_params: Any,
  308 + index_languages: List[str],
  309 + primary_language: str,
  310 + ) -> Tuple[str, float, str, bool]:
  311 + """Resolve lang with priority: log field > request_params > script/model."""
  312 + langs_set = set(index_languages or [])
  313 + primary = self._normalize_lang(primary_language) or "en"
  314 + if primary not in langs_set and langs_set:
  315 + primary = index_languages[0]
  316 +
  317 + log_lang = self._normalize_lang(log_language)
  318 + req_lang = self._normalize_lang(self._parse_request_params_language(request_params))
  319 + conflict = bool(log_lang and req_lang and log_lang != req_lang)
  320 +
  321 + if log_lang and (not langs_set or log_lang in langs_set):
  322 + return log_lang, 1.0, "log_field", conflict
  323 +
  324 + if req_lang and (not langs_set or req_lang in langs_set):
  325 + return req_lang, 1.0, "request_params", conflict
  326 +
  327 + det_lang, conf, det_source = detect_text_language_for_suggestions(
  328 + query,
  329 + index_languages=index_languages,
  330 + primary_language=primary,
  331 + )
  332 + if det_lang and (not langs_set or det_lang in langs_set):
  333 + return det_lang, conf, det_source, conflict
  334 +
  335 + return primary, 0.3, "default", conflict
  336 +
  337 + @staticmethod
  338 + def _compute_rank_score(
  339 + query_count_30d: int,
  340 + query_count_7d: int,
  341 + qanchor_doc_count: int,
  342 + title_doc_count: int,
  343 + tag_doc_count: int = 0,
  344 + ) -> float:
  345 + return (
  346 + 1.8 * math.log1p(max(query_count_30d, 0))
  347 + + 1.2 * math.log1p(max(query_count_7d, 0))
  348 + + 1.0 * math.log1p(max(qanchor_doc_count, 0))
  349 + + 0.85 * math.log1p(max(tag_doc_count, 0))
  350 + + 0.6 * math.log1p(max(title_doc_count, 0))
  351 + )
  352 +
  353 + @classmethod
  354 + def _compute_rank_score_from_candidate(cls, c: SuggestionCandidate) -> float:
  355 + return cls._compute_rank_score(
  356 + query_count_30d=c.query_count_30d,
  357 + query_count_7d=c.query_count_7d,
  358 + qanchor_doc_count=len(c.qanchor_spu_ids),
  359 + title_doc_count=len(c.title_spu_ids),
  360 + tag_doc_count=len(c.tag_spu_ids),
  361 + )
  362 +
  363 + def _iter_products(self, tenant_id: str, batch_size: int = 500) -> Iterator[Dict[str, Any]]:
  364 + """Stream product docs from tenant index using search_after."""
  365 + from indexer.mapping_generator import get_tenant_index_name
  366 +
  367 + index_name = get_tenant_index_name(tenant_id)
  368 + search_after: Optional[List[Any]] = None
  369 +
  370 + while True:
  371 + body: Dict[str, Any] = {
  372 + "size": batch_size,
  373 + "_source": ["id", "spu_id", "title", "qanchors", "enriched_tags"],
  374 + "sort": [
  375 + {"spu_id": {"order": "asc", "missing": "_last"}},
  376 + {"id.keyword": {"order": "asc", "missing": "_last"}},
  377 + ],
  378 + "query": {"match_all": {}},
  379 + }
  380 + if search_after is not None:
  381 + body["search_after"] = search_after
  382 +
  383 + resp = self.es_client.client.search(index=index_name, body=body)
  384 + hits = resp.get("hits", {}).get("hits", []) or []
  385 + if not hits:
  386 + break
  387 + for hit in hits:
  388 + yield hit
  389 + search_after = hits[-1].get("sort")
  390 + if len(hits) < batch_size:
  391 + break
  392 +
  393 + def _iter_query_log_rows(
  394 + self,
  395 + tenant_id: str,
  396 + since: datetime,
  397 + until: datetime,
  398 + fetch_size: int = 2000,
  399 + ) -> Iterator[Any]:
  400 + """Stream search logs from MySQL with bounded time range."""
  401 + query_sql = text(
  402 + """
  403 + SELECT query, language, request_params, create_time
  404 + FROM shoplazza_search_log
  405 + WHERE tenant_id = :tenant_id
  406 + AND deleted = 0
  407 + AND query IS NOT NULL
  408 + AND query <> ''
  409 + AND create_time >= :since_time
  410 + AND create_time < :until_time
  411 + ORDER BY create_time ASC
  412 + """
  413 + )
  414 +
  415 + with self.db_engine.connect().execution_options(stream_results=True) as conn:
  416 + result = conn.execute(
  417 + query_sql,
  418 + {
  419 + "tenant_id": int(tenant_id),
  420 + "since_time": since,
  421 + "until_time": until,
  422 + },
  423 + )
  424 + while True:
  425 + rows = result.fetchmany(fetch_size)
  426 + if not rows:
  427 + break
  428 + for row in rows:
  429 + yield row
  430 +
  431 + def _ensure_meta_index(self) -> str:
  432 + meta_index = get_suggestion_meta_index_name()
  433 + if self.es_client.index_exists(meta_index):
  434 + return meta_index
  435 + body = {
  436 + "settings": {
  437 + "number_of_shards": 1,
  438 + "number_of_replicas": 0,
  439 + "refresh_interval": "1s",
  440 + },
  441 + "mappings": {
  442 + "properties": {
  443 + "tenant_id": {"type": "keyword"},
  444 + "active_alias": {"type": "keyword"},
  445 + "active_index": {"type": "keyword"},
  446 + "last_full_build_at": {"type": "date"},
  447 + "last_incremental_build_at": {"type": "date"},
  448 + "last_incremental_watermark": {"type": "date"},
  449 + "updated_at": {"type": "date"},
  450 + }
  451 + },
  452 + }
  453 + if not self.es_client.create_index(meta_index, body):
  454 + raise RuntimeError(f"Failed to create suggestion meta index: {meta_index}")
  455 + return meta_index
  456 +
  457 + def _get_meta(self, tenant_id: str) -> Dict[str, Any]:
  458 + meta_index = self._ensure_meta_index()
  459 + try:
  460 + resp = self.es_client.client.get(index=meta_index, id=str(tenant_id))
  461 + return resp.get("_source", {}) or {}
  462 + except Exception:
  463 + return {}
  464 +
  465 + def _upsert_meta(self, tenant_id: str, patch: Dict[str, Any]) -> None:
  466 + meta_index = self._ensure_meta_index()
  467 + current = self._get_meta(tenant_id)
  468 + now_iso = datetime.now(timezone.utc).isoformat()
  469 + merged = {
  470 + "tenant_id": str(tenant_id),
  471 + **current,
  472 + **patch,
  473 + "updated_at": now_iso,
  474 + }
  475 + self.es_client.client.index(index=meta_index, id=str(tenant_id), document=merged, refresh="wait_for")
  476 +
  477 + def _cleanup_old_versions(self, tenant_id: str, keep_versions: int, protected_indices: Optional[List[str]] = None) -> List[str]:
  478 + if keep_versions < 1:
  479 + keep_versions = 1
  480 + protected = set(protected_indices or [])
  481 + pattern = get_suggestion_versioned_index_pattern(tenant_id)
  482 + all_indices = self.es_client.list_indices(pattern)
  483 + if len(all_indices) <= keep_versions:
  484 + return []
  485 +
  486 + # Names are timestamp-ordered by suffix; keep newest N.
  487 + kept = set(sorted(all_indices)[-keep_versions:])
  488 + dropped: List[str] = []
  489 + for idx in sorted(all_indices):
  490 + if idx in kept or idx in protected:
  491 + continue
  492 + if self.es_client.delete_index(idx):
  493 + dropped.append(idx)
  494 + return dropped
  495 +
  496 + def _publish_alias(self, tenant_id: str, index_name: str, keep_versions: int = 2) -> Dict[str, Any]:
  497 + alias_name = get_suggestion_alias_name(tenant_id)
  498 + current_indices = self.es_client.get_alias_indices(alias_name)
  499 +
  500 + actions: List[Dict[str, Any]] = []
  501 + for idx in current_indices:
  502 + actions.append({"remove": {"index": idx, "alias": alias_name}})
  503 + actions.append({"add": {"index": index_name, "alias": alias_name}})
  504 +
  505 + if not self.es_client.update_aliases(actions):
  506 + raise RuntimeError(f"Failed to publish alias {alias_name} -> {index_name}")
  507 +
  508 + dropped = self._cleanup_old_versions(
  509 + tenant_id=tenant_id,
  510 + keep_versions=keep_versions,
  511 + protected_indices=[index_name],
  512 + )
  513 +
  514 + self._upsert_meta(
  515 + tenant_id,
  516 + {
  517 + "active_alias": alias_name,
  518 + "active_index": index_name,
  519 + },
  520 + )
  521 +
  522 + return {
  523 + "alias": alias_name,
  524 + "previous_indices": current_indices,
  525 + "current_index": index_name,
  526 + "dropped_old_indices": dropped,
  527 + }
  528 +
  529 + def _resolve_incremental_target_index(self, tenant_id: str) -> Optional[str]:
  530 + """Resolve active suggestion index for incremental updates (alias only)."""
  531 + alias_name = get_suggestion_alias_name(tenant_id)
  532 + aliased = self.es_client.get_alias_indices(alias_name)
  533 + if aliased:
  534 + # alias should map to one index in this design
  535 + return sorted(aliased)[-1]
  536 + return None
  537 +
  538 + def _build_full_candidates(
  539 + self,
  540 + tenant_id: str,
  541 + index_languages: List[str],
  542 + primary_language: str,
  543 + days: int,
  544 + batch_size: int,
  545 + min_query_len: int,
  546 + ) -> Dict[Tuple[str, str], SuggestionCandidate]:
  547 + key_to_candidate: Dict[Tuple[str, str], SuggestionCandidate] = {}
  548 +
  549 + # Step 1: product title/qanchors
  550 + for hit in self._iter_products(tenant_id, batch_size=batch_size):
  551 + src = hit.get("_source", {}) or {}
  552 + product_id = str(src.get("spu_id") or src.get("id") or hit.get("_id") or "")
  553 + if not product_id:
  554 + continue
  555 + title_obj = src.get("title") or {}
  556 + qanchor_obj = src.get("qanchors") or {}
  557 +
  558 + for lang in index_languages:
  559 + title = ""
  560 + if isinstance(title_obj, dict):
  561 + title = self._prepare_title_for_suggest(title_obj.get(lang) or "")
  562 + if title:
  563 + text_norm = self._normalize_text(title)
  564 + if not self._looks_noise(text_norm):
  565 + key = (lang, text_norm)
  566 + c = key_to_candidate.get(key)
  567 + if c is None:
  568 + c = SuggestionCandidate(text=title, text_norm=text_norm, lang=lang)
  569 + key_to_candidate[key] = c
  570 + c.add_product("title", spu_id=product_id)
  571 +
  572 + q_raw = None
  573 + if isinstance(qanchor_obj, dict):
  574 + q_raw = qanchor_obj.get(lang)
  575 + for q_text in self._split_qanchors(q_raw):
  576 + text_norm = self._normalize_text(q_text)
  577 + if self._looks_noise(text_norm):
  578 + continue
  579 + key = (lang, text_norm)
  580 + c = key_to_candidate.get(key)
  581 + if c is None:
  582 + c = SuggestionCandidate(text=q_text, text_norm=text_norm, lang=lang)
  583 + key_to_candidate[key] = c
  584 + c.add_product("qanchor", spu_id=product_id)
  585 +
  586 + for tag_lang, tag in self._iter_multilang_product_tags(
  587 + src.get("enriched_tags"),
  588 + index_languages=index_languages,
  589 + primary_language=primary_language,
  590 + ):
  591 + text_norm = self._normalize_text(tag)
  592 + if self._looks_noise(text_norm):
  593 + continue
  594 + key = (tag_lang, text_norm)
  595 + c = key_to_candidate.get(key)
  596 + if c is None:
  597 + c = SuggestionCandidate(text=tag, text_norm=text_norm, lang=tag_lang)
  598 + key_to_candidate[key] = c
  599 + c.add_product("tag", spu_id=product_id)
  600 +
  601 + # Step 2: query logs
  602 + now = datetime.now(timezone.utc)
  603 + since = now - timedelta(days=days)
  604 + since_7d = now - timedelta(days=7)
  605 +
  606 + for row in self._iter_query_log_rows(tenant_id=tenant_id, since=since, until=now):
  607 + q = str(row.query or "").strip()
  608 + if len(q) < min_query_len:
  609 + continue
  610 +
  611 + lang, conf, source, conflict = self._resolve_query_language(
  612 + query=q,
  613 + log_language=getattr(row, "language", None),
  614 + request_params=getattr(row, "request_params", None),
  615 + index_languages=index_languages,
  616 + primary_language=primary_language,
  617 + )
  618 + text_norm = self._normalize_text(q)
  619 + if self._looks_noise(text_norm):
  620 + continue
  621 +
  622 + key = (lang, text_norm)
  623 + c = key_to_candidate.get(key)
  624 + if c is None:
  625 + c = SuggestionCandidate(text=q, text_norm=text_norm, lang=lang)
  626 + key_to_candidate[key] = c
  627 +
  628 + c.lang_confidence = max(c.lang_confidence, conf)
  629 + c.lang_source = source if c.lang_source == "default" else c.lang_source
  630 + c.lang_conflict = c.lang_conflict or conflict
  631 +
  632 + created_at = self._to_utc(getattr(row, "create_time", None))
  633 + is_7d = bool(created_at and created_at >= since_7d)
  634 + c.add_query_log(is_7d=is_7d)
  635 +
  636 + return key_to_candidate
  637 +
  638 + def _candidate_to_doc(self, tenant_id: str, c: SuggestionCandidate, now_iso: str) -> Dict[str, Any]:
  639 + rank_score = self._compute_rank_score_from_candidate(c)
  640 + completion_obj = {c.lang: {"input": [c.text], "weight": int(max(rank_score, 1.0) * 100)}}
  641 + sat_obj = {c.lang: c.text}
  642 + return {
  643 + "_id": f"{tenant_id}|{c.lang}|{c.text_norm}",
  644 + "tenant_id": str(tenant_id),
  645 + "lang": c.lang,
  646 + "text": c.text,
  647 + "text_norm": c.text_norm,
  648 + "sources": sorted(c.sources),
  649 + "title_doc_count": len(c.title_spu_ids),
  650 + "qanchor_doc_count": len(c.qanchor_spu_ids),
  651 + "tag_doc_count": len(c.tag_spu_ids),
  652 + "query_count_7d": c.query_count_7d,
  653 + "query_count_30d": c.query_count_30d,
  654 + "rank_score": float(rank_score),
  655 + "lang_confidence": float(c.lang_confidence),
  656 + "lang_source": c.lang_source,
  657 + "lang_conflict": bool(c.lang_conflict),
  658 + "status": 1,
  659 + "updated_at": now_iso,
  660 + "completion": completion_obj,
  661 + "sat": sat_obj,
  662 + }
  663 +
  664 + def rebuild_tenant_index(
  665 + self,
  666 + tenant_id: str,
  667 + days: int = 365,
  668 + batch_size: int = 500,
  669 + min_query_len: int = 1,
  670 + publish_alias: bool = True,
  671 + keep_versions: int = 2,
  672 + ) -> Dict[str, Any]:
  673 + """
  674 + Full rebuild.
  675 +
  676 + Phase2 default behavior:
  677 + - write to versioned index
  678 + - atomically publish alias
  679 + """
  680 + tenant_loader = get_tenant_config_loader()
  681 + tenant_cfg = tenant_loader.get_tenant_config(tenant_id)
  682 + index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"]
  683 + primary_language: str = tenant_cfg.get("primary_language") or "en"
  684 +
  685 + alias_publish: Optional[Dict[str, Any]] = None
  686 + index_name: Optional[str] = None
  687 + try:
  688 + mapping = build_suggestion_mapping(index_languages=index_languages)
  689 + index_name = self._create_fresh_versioned_index(
  690 + tenant_id=tenant_id,
  691 + mapping=mapping,
  692 + )
  693 + self._ensure_new_index_ready(index_name)
  694 +
  695 + key_to_candidate = self._build_full_candidates(
  696 + tenant_id=tenant_id,
  697 + index_languages=index_languages,
  698 + primary_language=primary_language,
  699 + days=days,
  700 + batch_size=batch_size,
  701 + min_query_len=min_query_len,
  702 + )
  703 +
  704 + now_iso = datetime.now(timezone.utc).isoformat()
  705 + docs = [self._candidate_to_doc(tenant_id, c, now_iso) for c in key_to_candidate.values()]
  706 +
  707 + if docs:
  708 + bulk_result = self.es_client.bulk_index(index_name=index_name, docs=docs)
  709 + self.es_client.refresh(index_name)
  710 + else:
  711 + bulk_result = {"success": 0, "failed": 0, "errors": []}
  712 +
  713 + if publish_alias:
  714 + alias_publish = self._publish_alias(
  715 + tenant_id=tenant_id,
  716 + index_name=index_name,
  717 + keep_versions=keep_versions,
  718 + )
  719 +
  720 + now_utc = datetime.now(timezone.utc).isoformat()
  721 + meta_patch: Dict[str, Any] = {
  722 + "last_full_build_at": now_utc,
  723 + "last_incremental_watermark": now_utc,
  724 + }
  725 + if publish_alias:
  726 + meta_patch["active_index"] = index_name
  727 + meta_patch["active_alias"] = get_suggestion_alias_name(tenant_id)
  728 + self._upsert_meta(tenant_id, meta_patch)
  729 +
  730 + return {
  731 + "mode": "full",
  732 + "tenant_id": str(tenant_id),
  733 + "index_name": index_name,
  734 + "alias_published": bool(alias_publish),
  735 + "alias_publish": alias_publish,
  736 + "total_candidates": len(key_to_candidate),
  737 + "indexed_docs": len(docs),
  738 + "bulk_result": bulk_result,
  739 + }
  740 + except Exception:
  741 + if index_name and not alias_publish:
  742 + self.es_client.delete_index(index_name)
  743 + raise
  744 +
  745 + def _build_incremental_deltas(
  746 + self,
  747 + tenant_id: str,
  748 + index_languages: List[str],
  749 + primary_language: str,
  750 + since: datetime,
  751 + until: datetime,
  752 + min_query_len: int,
  753 + ) -> Dict[Tuple[str, str], QueryDelta]:
  754 + now = datetime.now(timezone.utc)
  755 + since_7d = now - timedelta(days=7)
  756 + deltas: Dict[Tuple[str, str], QueryDelta] = {}
  757 +
  758 + for row in self._iter_query_log_rows(tenant_id=tenant_id, since=since, until=until):
  759 + q = str(row.query or "").strip()
  760 + if len(q) < min_query_len:
  761 + continue
  762 +
  763 + lang, conf, source, conflict = self._resolve_query_language(
  764 + query=q,
  765 + log_language=getattr(row, "language", None),
  766 + request_params=getattr(row, "request_params", None),
  767 + index_languages=index_languages,
  768 + primary_language=primary_language,
  769 + )
  770 + text_norm = self._normalize_text(q)
  771 + if self._looks_noise(text_norm):
  772 + continue
  773 +
  774 + key = (lang, text_norm)
  775 + item = deltas.get(key)
  776 + if item is None:
  777 + item = QueryDelta(
  778 + tenant_id=str(tenant_id),
  779 + lang=lang,
  780 + text=q,
  781 + text_norm=text_norm,
  782 + lang_confidence=conf,
  783 + lang_source=source,
  784 + lang_conflict=conflict,
  785 + )
  786 + deltas[key] = item
  787 +
  788 + created_at = self._to_utc(getattr(row, "create_time", None))
  789 + item.delta_30d += 1
  790 + if created_at and created_at >= since_7d:
  791 + item.delta_7d += 1
  792 +
  793 + if conf > item.lang_confidence:
  794 + item.lang_confidence = conf
  795 + item.lang_source = source
  796 + item.lang_conflict = item.lang_conflict or conflict
  797 +
  798 + return deltas
  799 +
  800 + def _delta_to_upsert_doc(self, delta: QueryDelta, now_iso: str) -> Dict[str, Any]:
  801 + rank_score = self._compute_rank_score(
  802 + query_count_30d=delta.delta_30d,
  803 + query_count_7d=delta.delta_7d,
  804 + qanchor_doc_count=0,
  805 + title_doc_count=0,
  806 + tag_doc_count=0,
  807 + )
  808 + return {
  809 + "tenant_id": delta.tenant_id,
  810 + "lang": delta.lang,
  811 + "text": delta.text,
  812 + "text_norm": delta.text_norm,
  813 + "sources": ["query_log"],
  814 + "title_doc_count": 0,
  815 + "qanchor_doc_count": 0,
  816 + "tag_doc_count": 0,
  817 + "query_count_7d": delta.delta_7d,
  818 + "query_count_30d": delta.delta_30d,
  819 + "rank_score": float(rank_score),
  820 + "lang_confidence": float(delta.lang_confidence),
  821 + "lang_source": delta.lang_source,
  822 + "lang_conflict": bool(delta.lang_conflict),
  823 + "status": 1,
  824 + "updated_at": now_iso,
  825 + "completion": {
  826 + delta.lang: {
  827 + "input": [delta.text],
  828 + "weight": int(max(rank_score, 1.0) * 100),
  829 + }
  830 + },
  831 + "sat": {delta.lang: delta.text},
  832 + }
  833 +
  834 + @staticmethod
  835 + def _build_incremental_update_script() -> str:
  836 + return """
  837 + if (ctx._source == null || ctx._source.isEmpty()) {
  838 + ctx._source = params.upsert;
  839 + return;
  840 + }
  841 +
  842 + if (ctx._source.query_count_30d == null) { ctx._source.query_count_30d = 0; }
  843 + if (ctx._source.query_count_7d == null) { ctx._source.query_count_7d = 0; }
  844 + if (ctx._source.qanchor_doc_count == null) { ctx._source.qanchor_doc_count = 0; }
  845 + if (ctx._source.title_doc_count == null) { ctx._source.title_doc_count = 0; }
  846 + if (ctx._source.tag_doc_count == null) { ctx._source.tag_doc_count = 0; }
  847 +
  848 + ctx._source.query_count_30d += params.delta_30d;
  849 + ctx._source.query_count_7d += params.delta_7d;
  850 +
  851 + if (ctx._source.sources == null) { ctx._source.sources = new ArrayList(); }
  852 + if (!ctx._source.sources.contains('query_log')) { ctx._source.sources.add('query_log'); }
  853 +
  854 + if (ctx._source.lang_conflict == null) { ctx._source.lang_conflict = false; }
  855 + ctx._source.lang_conflict = ctx._source.lang_conflict || params.lang_conflict;
  856 +
  857 + if (ctx._source.lang_confidence == null || params.lang_confidence > ctx._source.lang_confidence) {
  858 + ctx._source.lang_confidence = params.lang_confidence;
  859 + ctx._source.lang_source = params.lang_source;
  860 + }
  861 +
  862 + int q30 = ctx._source.query_count_30d;
  863 + int q7 = ctx._source.query_count_7d;
  864 + int qa = ctx._source.qanchor_doc_count;
  865 + int td = ctx._source.title_doc_count;
  866 + int tg = ctx._source.tag_doc_count;
  867 +
  868 + double score = 1.8 * Math.log(1 + q30)
  869 + + 1.2 * Math.log(1 + q7)
  870 + + 1.0 * Math.log(1 + qa)
  871 + + 0.85 * Math.log(1 + tg)
  872 + + 0.6 * Math.log(1 + td);
  873 + ctx._source.rank_score = score;
  874 + ctx._source.status = 1;
  875 + ctx._source.updated_at = params.now_iso;
  876 + ctx._source.text = params.text;
  877 + ctx._source.lang = params.lang;
  878 + ctx._source.text_norm = params.text_norm;
  879 +
  880 + if (ctx._source.completion == null) { ctx._source.completion = new HashMap(); }
  881 + Map c = new HashMap();
  882 + c.put('input', params.completion_input);
  883 + c.put('weight', params.completion_weight);
  884 + ctx._source.completion.put(params.lang, c);
  885 +
  886 + if (ctx._source.sat == null) { ctx._source.sat = new HashMap(); }
  887 + ctx._source.sat.put(params.lang, params.text);
  888 + """
  889 +
  890 + def _build_incremental_actions(self, target_index: str, deltas: Dict[Tuple[str, str], QueryDelta]) -> List[Dict[str, Any]]:
  891 + now_iso = datetime.now(timezone.utc).isoformat()
  892 + script_source = self._build_incremental_update_script()
  893 + actions: List[Dict[str, Any]] = []
  894 +
  895 + for delta in deltas.values():
  896 + upsert_doc = self._delta_to_upsert_doc(delta=delta, now_iso=now_iso)
  897 + upsert_rank = float(upsert_doc.get("rank_score") or 0.0)
  898 + action = {
  899 + "_op_type": "update",
  900 + "_index": target_index,
  901 + "_id": f"{delta.tenant_id}|{delta.lang}|{delta.text_norm}",
  902 + "scripted_upsert": True,
  903 + "script": {
  904 + "lang": "painless",
  905 + "source": script_source,
  906 + "params": {
  907 + "delta_30d": int(delta.delta_30d),
  908 + "delta_7d": int(delta.delta_7d),
  909 + "lang_confidence": float(delta.lang_confidence),
  910 + "lang_source": delta.lang_source,
  911 + "lang_conflict": bool(delta.lang_conflict),
  912 + "now_iso": now_iso,
  913 + "lang": delta.lang,
  914 + "text": delta.text,
  915 + "text_norm": delta.text_norm,
  916 + "completion_input": [delta.text],
  917 + "completion_weight": int(max(upsert_rank, 1.0) * 100),
  918 + "upsert": upsert_doc,
  919 + },
  920 + },
  921 + "upsert": upsert_doc,
  922 + }
  923 + actions.append(action)
  924 +
  925 + return actions
  926 +
  927 + def incremental_update_tenant_index(
  928 + self,
  929 + tenant_id: str,
  930 + min_query_len: int = 1,
  931 + fallback_days: int = 7,
  932 + overlap_minutes: int = 30,
  933 + bootstrap_if_missing: bool = True,
  934 + bootstrap_days: int = 30,
  935 + batch_size: int = 500,
  936 + ) -> Dict[str, Any]:
  937 + tenant_loader = get_tenant_config_loader()
  938 + tenant_cfg = tenant_loader.get_tenant_config(tenant_id)
  939 + index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"]
  940 + primary_language: str = tenant_cfg.get("primary_language") or "en"
  941 +
  942 + target_index = self._resolve_incremental_target_index(tenant_id)
  943 + if not target_index:
  944 + if not bootstrap_if_missing:
  945 + raise RuntimeError(
  946 + f"No active suggestion index for tenant={tenant_id}. "
  947 + "Run full rebuild first or enable bootstrap_if_missing."
  948 + )
  949 + full_result = self.rebuild_tenant_index(
  950 + tenant_id=tenant_id,
  951 + days=bootstrap_days,
  952 + batch_size=batch_size,
  953 + min_query_len=min_query_len,
  954 + publish_alias=True
  955 + )
  956 + return {
  957 + "mode": "incremental",
  958 + "tenant_id": str(tenant_id),
  959 + "bootstrapped": True,
  960 + "bootstrap_result": full_result,
  961 + }
  962 +
  963 + meta = self._get_meta(tenant_id)
  964 + watermark_raw = meta.get("last_incremental_watermark") or meta.get("last_full_build_at")
  965 + now = datetime.now(timezone.utc)
  966 + default_since = now - timedelta(days=fallback_days)
  967 + since = None
  968 + if isinstance(watermark_raw, str) and watermark_raw.strip():
  969 + try:
  970 + since = self._to_utc(datetime.fromisoformat(watermark_raw.replace("Z", "+00:00")))
  971 + except Exception:
  972 + since = None
  973 + if since is None:
  974 + since = default_since
  975 + since = since - timedelta(minutes=max(overlap_minutes, 0))
  976 + if since < default_since:
  977 + since = default_since
  978 +
  979 + deltas = self._build_incremental_deltas(
  980 + tenant_id=tenant_id,
  981 + index_languages=index_languages,
  982 + primary_language=primary_language,
  983 + since=since,
  984 + until=now,
  985 + min_query_len=min_query_len,
  986 + )
  987 +
  988 + actions = self._build_incremental_actions(target_index=target_index, deltas=deltas)
  989 + bulk_result = self.es_client.bulk_actions(actions)
  990 + self.es_client.refresh(target_index)
  991 +
  992 + now_iso = now.isoformat()
  993 + self._upsert_meta(
  994 + tenant_id,
  995 + {
  996 + "last_incremental_build_at": now_iso,
  997 + "last_incremental_watermark": now_iso,
  998 + "active_index": target_index,
  999 + "active_alias": get_suggestion_alias_name(tenant_id),
  1000 + },
  1001 + )
  1002 +
  1003 + return {
  1004 + "mode": "incremental",
  1005 + "tenant_id": str(tenant_id),
  1006 + "target_index": target_index,
  1007 + "query_window": {
  1008 + "since": since.isoformat(),
  1009 + "until": now_iso,
  1010 + "overlap_minutes": int(overlap_minutes),
  1011 + },
  1012 + "updated_terms": len(deltas),
  1013 + "bulk_result": bulk_result,
  1014 + }
... ...