From 984f14f924c96c9c4d2ac4fc0a7310af8e6a97ab Mon Sep 17 00:00:00 2001 From: tangwang Date: Fri, 10 Apr 2026 21:47:37 +0800 Subject: [PATCH] product_enrich模块迁出 --- config/config.yaml | 4 ++-- docs/工作总结-微服务性能优化与架构.md | 2 +- docs/搜索API对接指南-00-总览与快速开始.md | 5 ++--- docs/搜索API对接指南-05-索引接口(Indexer).md | 11 ++++++----- suggestion/builder.py | 6 +++++- suggestion/builder.py.bak | 1014 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 1030 insertions(+), 12 deletions(-) create mode 100644 suggestion/builder.py.bak diff --git a/config/config.yaml b/config/config.yaml index 620c3dc..2ef3790 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -74,10 +74,10 @@ search_evaluation: search_base_url: '' web_host: 0.0.0.0 web_port: 6010 - judge_model: qwen3.5-plus + judge_model: qwen3.6-plus judge_enable_thinking: false judge_dashscope_batch: false - intent_model: qwen3-max + intent_model: qwen3.6-plus intent_enable_thinking: true judge_batch_completion_window: 24h judge_batch_poll_interval_sec: 10.0 diff --git a/docs/工作总结-微服务性能优化与架构.md b/docs/工作总结-微服务性能优化与架构.md index 6a2b5df..115e6ee 100644 --- a/docs/工作总结-微服务性能优化与架构.md +++ b/docs/工作总结-微服务性能优化与架构.md @@ -98,7 +98,7 @@ instruction: "Given a shopping query, rank product titles by relevance" **能力**:支持根据商品标题批量生成 **qanchors**(锚文本)、**enriched_attributes**、**tags**,供索引与 suggest 使用。 **具体内容**: -- **接口**:`POST /indexer/enrich-content`(Indexer 服务端口 **6004**)。请求体为 `items` 数组,每项含 `spu_id`、`title`(必填)及可选多语言标题等;单次请求最多 **50 条**,建议批量调用。响应 `results` 与 `items` 一一对应,每项含 `spu_id`、`qanchors`(按语言键,如 `qanchors.zh`、`qanchors.en`,逗号分隔短语)、`enriched_attributes`、`tags`。 +- **接口**:`POST /indexer/enrich-content`(FacetAwareMatching 服务端口 **6001**)。请求体为 `items` 数组,每项含 `spu_id`、`title`(必填)及可选多语言标题等;单次请求最多 **50 条**,建议批量调用。响应 `results` 与 `items` 一一对应,每项含 `spu_id`、`qanchors`(按语言键,如 `qanchors.zh`、`qanchors.en`,逗号分隔短语)、`enriched_attributes`、`tags`。 - **索引侧**:微服务组合方式下,调用方先拿不含 qanchors/tags 的 doc,再调用本接口补齐后写入 ES 的 `qanchors.{lang}` 等字段;索引 transformer(`indexer/document_transformer.py`、`indexer/product_enrich.py`)内也可在构建 doc 时调用内容理解逻辑,写入 `qanchors.{lang}`。 - **Suggest 侧**:`suggestion/builder.py` 从 ES 商品索引读取 `_source: ["id", "spu_id", "title", "qanchors"]`,对 `qanchors.{lang}` 用 `_split_qanchors` 拆成词条,以 `source="qanchor"` 加入候选,排序时 `qanchor` 权重大于纯 title(`add_product("qanchor", ...)`);suggest 配置中 `sources: ["query_log", "qanchor"]` 表示候选来源包含 qanchor。 - **实现与依赖**:内容理解内部使用大模型(需 `DASHSCOPE_API_KEY`),支持多语言与 Redis 缓存(如 `product_anchors`);逻辑与 `indexer/product_enrich` 一致。 diff --git a/docs/搜索API对接指南-00-总览与快速开始.md b/docs/搜索API对接指南-00-总览与快速开始.md index 71948b1..1aecfe3 100644 --- a/docs/搜索API对接指南-00-总览与快速开始.md +++ b/docs/搜索API对接指南-00-总览与快速开始.md @@ -90,7 +90,7 @@ curl -X POST "http://43.166.252.75:6002/search/" \ | 查询文档 | POST | `/indexer/documents` | 查询SPU文档数据(不写入ES) | | 构建ES文档(正式对接) | POST | `/indexer/build-docs` | 基于上游提供的 MySQL 行数据构建 ES doc,不写入 ES,供 Java 等调用后自行写入 | | 构建ES文档(测试用) | POST | `/indexer/build-docs-from-db` | 仅在测试/调试时使用,根据 `tenant_id + spu_ids` 内部查库并构建 ES doc | -| 内容理解字段生成 | POST | `/indexer/enrich-content` | 根据商品标题批量生成 qanchors、enriched_attributes、tags,供微服务组合方式使用 | +| 内容理解字段生成 | POST | `/indexer/enrich-content` | 根据商品标题批量生成 qanchors、enriched_attributes、tags,供微服务组合方式使用(独立服务端口 6001) | | 索引健康检查 | GET | `/indexer/health` | 检查索引服务状态 | | 健康检查 | GET | `/admin/health` | 服务健康检查 | | 获取配置 | GET | `/admin/config` | 获取租户配置 | @@ -104,7 +104,6 @@ curl -X POST "http://43.166.252.75:6002/search/" \ | 向量服务(图片) | 6008 | `POST /embed/image` | 图片向量化 | | 翻译服务 | 6006 | `POST /translate` | 文本翻译(支持 qwen-mt / llm / deepl / 本地模型) | | 重排服务 | 6007 | `POST /rerank` | 检索结果重排 | -| 内容理解(Indexer 内) | 6004 | `POST /indexer/enrich-content` | 根据商品标题生成 qanchors、tags 等,供 indexer 微服务组合方式使用 | +| 内容理解(独立服务) | 6001 | `POST /indexer/enrich-content` | 根据商品标题生成 qanchors、tags 等,供 indexer 微服务组合方式使用 | --- - diff --git a/docs/搜索API对接指南-05-索引接口(Indexer).md b/docs/搜索API对接指南-05-索引接口(Indexer).md index 398a5c1..4856716 100644 --- a/docs/搜索API对接指南-05-索引接口(Indexer).md +++ b/docs/搜索API对接指南-05-索引接口(Indexer).md @@ -13,7 +13,7 @@ | 查询文档 | POST | `/indexer/documents` | 按 SPU ID 列表查询 ES 文档,不写入 ES | | 构建 ES 文档(正式) | POST | `/indexer/build-docs` | 由上游提供 MySQL 行数据,返回 ES-ready 文档,不写 ES | | 构建 ES 文档(测试) | POST | `/indexer/build-docs-from-db` | 由本服务查库并构建文档,仅测试/调试用 | -| 内容理解字段生成 | POST | `/indexer/enrich-content` | 根据商品标题批量生成 qanchors、enriched_attributes、tags(供微服务组合方式使用) | +| 内容理解字段生成 | POST | `/indexer/enrich-content` | 根据商品标题批量生成 qanchors、enriched_attributes、tags(供微服务组合方式使用;独立服务端口 6001) | | 索引健康检查 | GET | `/indexer/health` | 检查索引服务与数据库连接状态 | #### 5.0 支撑外部 indexer 的三种方式 @@ -23,7 +23,7 @@ | 方式 | 说明 | 适用场景 | |------|------|----------| | **1)doc 填充接口** | 调用 `POST /indexer/build-docs` 或 `POST /indexer/build-docs-from-db`,由本服务基于 MySQL 行数据构建完整 ES 文档(含多语言、向量、规格等),**不写入 ES**,由调用方自行写入。 | 希望一站式拿到 ES-ready doc,由己方控制写 ES 的时机与索引名。 | -| **2)微服务组合** | 单独调用**翻译**、**向量化**、**内容理解字段生成**等接口,由 indexer 程序自己组装 doc 并写入 ES。翻译与向量化为独立微服务(见第 7 节);内容理解为 Indexer 服务内接口 `POST /indexer/enrich-content`。 | 需要灵活编排、或希望将 LLM/向量等耗时步骤与主链路解耦(如异步补齐 qanchors/tags)。 | +| **2)微服务组合** | 单独调用**翻译**、**向量化**、**内容理解字段生成**等接口,由 indexer 程序自己组装 doc 并写入 ES。翻译与向量化为独立微服务(见第 7 节);内容理解为 FacetAwareMatching 独立服务接口 `POST /indexer/enrich-content`(端口 6001)。 | 需要灵活编排、或希望将 LLM/向量等耗时步骤与主链路解耦(如异步补齐 qanchors/tags)。 | | **3)本服务直接写 ES** | 调用全量索引 `POST /indexer/reindex`、增量索引 `POST /indexer/index`(指定 SPU ID 列表),由本服务从 MySQL 拉数并直接写入 ES。 | 自建运维、联调或不需要由 Java 写 ES 的场景。 | - **方式 1** 与 **方式 2** 下,ES 的写入方均为外部 indexer(或 Java),职责清晰。 @@ -648,7 +648,8 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ ### 5.8 内容理解字段生成接口 - **端点**: `POST /indexer/enrich-content` -- **描述**: 根据商品内容信息批量生成 **qanchors**(锚文本)、**enriched_attributes**(通用语义属性)、**enriched_tags**(细分标签)、**enriched_taxonomy_attributes**(taxonomy 结构化属性),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。请求以 `items[]` 传入商品内容字段(必填/可选见下表)。接口只暴露商品内容输入,语言选择、分析维度与最终字段结构统一由 `indexer.product_enrich` 内部决定;当前返回结果与 `search_products` mapping 保持一致。单次请求在线程池中执行,避免阻塞其他接口。 +- **服务**: FacetAwareMatching 独立服务(默认端口 **6001**;由 `/data/FacetAwareMatching/scripts/service_ctl.sh` 管理) +- **描述**: 根据商品内容信息批量生成 **qanchors**(锚文本)、**enriched_attributes**(通用语义属性)、**enriched_tags**(细分标签)、**enriched_taxonomy_attributes**(taxonomy 结构化属性),供外部 indexer 在「微服务组合」方式下自行拼装 doc 时使用。请求以 `items[]` 传入商品内容字段(必填/可选见下表)。接口只暴露商品内容输入,语言选择、分析维度与最终字段结构统一由 FacetAwareMatching 的 `product_enrich` 内部决定;当前返回结果与 `search_products` mapping 保持一致。单次请求在线程池中执行,避免阻塞其他接口。 当前支持的 `category_taxonomy_profile`: - `apparel` @@ -670,7 +671,7 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ 说明: - 所有 profile 的 `enriched_taxonomy_attributes.value` 都统一返回 `zh` + `en`。 - 外部调用 `/indexer/enrich-content` 时,以请求中的 `category_taxonomy_profile` 为准。 -- 当前 Indexer 内部构建 ES 文档时,taxonomy profile 暂时固定使用 `apparel`;代码里已保留 TODO,后续从数据库读取该租户真实所属行业后再替换。 +- 若 indexer 内部仍接入内容理解能力,taxonomy profile 请在调用侧显式传入(建议仍以租户行业配置为准)。 #### 请求参数 @@ -796,7 +797,7 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ #### 请求示例 ```bash -curl -X POST "http://localhost:6004/indexer/enrich-content" \ +curl -X POST "http://localhost:6001/indexer/enrich-content" \ -H "Content-Type: application/json" \ -d '{ "tenant_id": "163", diff --git a/suggestion/builder.py b/suggestion/builder.py index 5ee53df..0323bfe 100644 --- a/suggestion/builder.py +++ b/suggestion/builder.py @@ -366,7 +366,8 @@ class SuggestionIndexBuilder: index_name = get_tenant_index_name(tenant_id) search_after: Optional[List[Any]] = None - + print(f"[DEBUG] Python using index: {index_name} for tenant {tenant_id}") + total_processed = 0 while True: body: Dict[str, Any] = { "size": batch_size, @@ -385,10 +386,13 @@ class SuggestionIndexBuilder: if not hits: break for hit in hits: + total_processed += 1 yield hit search_after = hits[-1].get("sort") if len(hits) < batch_size: break + print(f"[DEBUG] Python processed total products: {total_processed} for tenant {tenant_id}") + def _iter_query_log_rows( self, diff --git a/suggestion/builder.py.bak b/suggestion/builder.py.bak new file mode 100644 index 0000000..5ee53df --- /dev/null +++ b/suggestion/builder.py.bak @@ -0,0 +1,1014 @@ +""" +Suggestion index builder (Phase 2). + +Capabilities: +- Full rebuild to versioned index +- Atomic alias publish +- Incremental update from query logs with watermark +""" + +import json +import logging +import math +import re +import unicodedata +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Iterator, List, Optional, Tuple + +from sqlalchemy import text + +from config.loader import get_app_config +from config.tenant_config_loader import get_tenant_config_loader +from query.query_parser import detect_text_language_for_suggestions +from suggestion.mapping import build_suggestion_mapping +from utils.es_client import ESClient + +logger = logging.getLogger(__name__) + + +def _index_prefix() -> str: + return get_app_config().runtime.index_namespace or "" + + +def get_suggestion_alias_name(tenant_id: str) -> str: + """Read alias for suggestion index (single source of truth).""" + return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_current" + + +def get_suggestion_versioned_index_name(tenant_id: str, build_at: Optional[datetime] = None) -> str: + """Versioned suggestion index name.""" + ts = (build_at or datetime.now(timezone.utc)).strftime("%Y%m%d%H%M%S%f") + return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_v{ts}" + + +def get_suggestion_versioned_index_pattern(tenant_id: str) -> str: + return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_v*" + + +def get_suggestion_meta_index_name() -> str: + return f"{_index_prefix()}search_suggestions_meta" + + +@dataclass +class SuggestionCandidate: + text: str + text_norm: str + lang: str + sources: set = field(default_factory=set) + title_spu_ids: set = field(default_factory=set) + qanchor_spu_ids: set = field(default_factory=set) + tag_spu_ids: set = field(default_factory=set) + query_count_7d: int = 0 + query_count_30d: int = 0 + lang_confidence: float = 1.0 + lang_source: str = "default" + lang_conflict: bool = False + + def add_product(self, source: str, spu_id: str) -> None: + self.sources.add(source) + if source == "title": + self.title_spu_ids.add(spu_id) + elif source == "qanchor": + self.qanchor_spu_ids.add(spu_id) + elif source == "tag": + self.tag_spu_ids.add(spu_id) + + def add_query_log(self, is_7d: bool) -> None: + self.sources.add("query_log") + self.query_count_30d += 1 + if is_7d: + self.query_count_7d += 1 + + +@dataclass +class QueryDelta: + tenant_id: str + lang: str + text: str + text_norm: str + delta_7d: int = 0 + delta_30d: int = 0 + lang_confidence: float = 1.0 + lang_source: str = "default" + lang_conflict: bool = False + + +class SuggestionIndexBuilder: + """Build and update suggestion index.""" + + def __init__(self, es_client: ESClient, db_engine: Any): + self.es_client = es_client + self.db_engine = db_engine + + def _format_allocation_failure(self, index_name: str) -> str: + health = self.es_client.wait_for_index_ready(index_name=index_name, timeout="5s") + explain = self.es_client.get_allocation_explain(index_name=index_name) + + parts = [ + f"Suggestion index '{index_name}' was created but is not allocatable/readable yet", + f"health_status={health.get('status')}", + f"timed_out={health.get('timed_out')}", + ] + if health.get("error"): + parts.append(f"health_error={health['error']}") + + if explain: + unassigned = explain.get("unassigned_info") or {} + if unassigned.get("reason"): + parts.append(f"unassigned_reason={unassigned['reason']}") + if unassigned.get("last_allocation_status"): + parts.append(f"last_allocation_status={unassigned['last_allocation_status']}") + + for node in explain.get("node_allocation_decisions") or []: + node_name = node.get("node_name") or node.get("node_id") or "unknown-node" + for decider in node.get("deciders") or []: + if decider.get("decision") == "NO": + parts.append( + f"{node_name}:{decider.get('decider')}={decider.get('explanation')}" + ) + return "; ".join(parts) + + return "; ".join(parts) + + def _create_fresh_versioned_index( + self, + tenant_id: str, + mapping: Dict[str, Any], + max_attempts: int = 5, + ) -> str: + for attempt in range(1, max_attempts + 1): + index_name = get_suggestion_versioned_index_name(tenant_id) + if self.es_client.index_exists(index_name): + logger.warning( + "Suggestion index name collision before create for tenant=%s index=%s attempt=%s/%s", + tenant_id, + index_name, + attempt, + max_attempts, + ) + continue + + if self.es_client.create_index(index_name, mapping): + return index_name + + if self.es_client.index_exists(index_name): + logger.warning( + "Suggestion index name collision during create for tenant=%s index=%s attempt=%s/%s", + tenant_id, + index_name, + attempt, + max_attempts, + ) + continue + + raise RuntimeError(f"Failed to create suggestion index: {index_name}") + + raise RuntimeError( + f"Failed to allocate a unique suggestion index name for tenant={tenant_id} after {max_attempts} attempts" + ) + + def _ensure_new_index_ready(self, index_name: str) -> None: + health = self.es_client.wait_for_index_ready(index_name=index_name, timeout="5s") + if health.get("ok"): + return + raise RuntimeError(self._format_allocation_failure(index_name)) + + @staticmethod + def _to_utc(dt: Any) -> Optional[datetime]: + if dt is None: + return None + if isinstance(dt, datetime): + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + return None + + @staticmethod + def _normalize_text(value: str) -> str: + text_value = unicodedata.normalize("NFKC", (value or "")).strip().lower() + text_value = re.sub(r"\s+", " ", text_value) + return text_value + + @staticmethod + def _prepare_title_for_suggest(title: str, max_len: int = 120) -> str: + """ + Keep title-derived suggestions concise: + - keep raw title when short enough + - for long titles, keep the leading phrase before common separators + - fallback to hard truncate + """ + raw = str(title or "").strip() + if not raw: + return "" + if len(raw) <= max_len: + return raw + + head = re.split(r"[,,;;|/\\\\((\\[【]", raw, maxsplit=1)[0].strip() + if 1 < len(head) <= max_len: + return head + + truncated = raw[:max_len].rstrip(" ,,;;|/\\\\-—–()()[]【】") + return truncated or raw[:max_len] + + @staticmethod + def _split_qanchors(value: Any) -> List[str]: + if value is None: + return [] + if isinstance(value, list): + return [str(x).strip() for x in value if str(x).strip()] + raw = str(value).strip() + if not raw: + return [] + parts = re.split(r"[,、,;|/\n\t]+", raw) + out = [p.strip() for p in parts if p and p.strip()] + if not out: + return [raw] + return out + + @staticmethod + def _iter_product_tags(raw: Any) -> List[str]: + if raw is None: + return [] + if isinstance(raw, list): + return [str(x).strip() for x in raw if str(x).strip()] + s = str(raw).strip() + if not s: + return [] + parts = re.split(r"[,、,;|/\n\t]+", s) + out = [p.strip() for p in parts if p and p.strip()] + return out if out else [s] + + def _iter_multilang_product_tags( + self, + raw: Any, + index_languages: List[str], + primary_language: str, + ) -> List[Tuple[str, str]]: + if isinstance(raw, dict): + pairs: List[Tuple[str, str]] = [] + for lang in index_languages: + for tag in self._iter_product_tags(raw.get(lang)): + pairs.append((lang, tag)) + return pairs + + pairs = [] + for tag in self._iter_product_tags(raw): + tag_lang, _, _ = detect_text_language_for_suggestions( + tag, + index_languages=index_languages, + primary_language=primary_language, + ) + pairs.append((tag_lang, tag)) + return pairs + + @staticmethod + def _looks_noise(text_value: str) -> bool: + if not text_value: + return True + if len(text_value) > 120: + return True + if re.fullmatch(r"[\W_]+", text_value): + return True + return False + + @staticmethod + def _normalize_lang(lang: Optional[str]) -> Optional[str]: + if not lang: + return None + token = str(lang).strip().lower().replace("-", "_") + if not token: + return None + if token in {"zh_tw", "pt_br"}: + return token + return token.split("_")[0] + + @staticmethod + def _parse_request_params_language(raw: Any) -> Optional[str]: + if raw is None: + return None + if isinstance(raw, dict): + return raw.get("language") + text_raw = str(raw).strip() + if not text_raw: + return None + try: + obj = json.loads(text_raw) + if isinstance(obj, dict): + return obj.get("language") + except Exception: + return None + return None + + def _resolve_query_language( + self, + query: str, + log_language: Optional[str], + request_params: Any, + index_languages: List[str], + primary_language: str, + ) -> Tuple[str, float, str, bool]: + """Resolve lang with priority: log field > request_params > script/model.""" + langs_set = set(index_languages or []) + primary = self._normalize_lang(primary_language) or "en" + if primary not in langs_set and langs_set: + primary = index_languages[0] + + log_lang = self._normalize_lang(log_language) + req_lang = self._normalize_lang(self._parse_request_params_language(request_params)) + conflict = bool(log_lang and req_lang and log_lang != req_lang) + + if log_lang and (not langs_set or log_lang in langs_set): + return log_lang, 1.0, "log_field", conflict + + if req_lang and (not langs_set or req_lang in langs_set): + return req_lang, 1.0, "request_params", conflict + + det_lang, conf, det_source = detect_text_language_for_suggestions( + query, + index_languages=index_languages, + primary_language=primary, + ) + if det_lang and (not langs_set or det_lang in langs_set): + return det_lang, conf, det_source, conflict + + return primary, 0.3, "default", conflict + + @staticmethod + def _compute_rank_score( + query_count_30d: int, + query_count_7d: int, + qanchor_doc_count: int, + title_doc_count: int, + tag_doc_count: int = 0, + ) -> float: + return ( + 1.8 * math.log1p(max(query_count_30d, 0)) + + 1.2 * math.log1p(max(query_count_7d, 0)) + + 1.0 * math.log1p(max(qanchor_doc_count, 0)) + + 0.85 * math.log1p(max(tag_doc_count, 0)) + + 0.6 * math.log1p(max(title_doc_count, 0)) + ) + + @classmethod + def _compute_rank_score_from_candidate(cls, c: SuggestionCandidate) -> float: + return cls._compute_rank_score( + query_count_30d=c.query_count_30d, + query_count_7d=c.query_count_7d, + qanchor_doc_count=len(c.qanchor_spu_ids), + title_doc_count=len(c.title_spu_ids), + tag_doc_count=len(c.tag_spu_ids), + ) + + def _iter_products(self, tenant_id: str, batch_size: int = 500) -> Iterator[Dict[str, Any]]: + """Stream product docs from tenant index using search_after.""" + from indexer.mapping_generator import get_tenant_index_name + + index_name = get_tenant_index_name(tenant_id) + search_after: Optional[List[Any]] = None + + while True: + body: Dict[str, Any] = { + "size": batch_size, + "_source": ["id", "spu_id", "title", "qanchors", "enriched_tags"], + "sort": [ + {"spu_id": {"order": "asc", "missing": "_last"}}, + {"id.keyword": {"order": "asc", "missing": "_last"}}, + ], + "query": {"match_all": {}}, + } + if search_after is not None: + body["search_after"] = search_after + + resp = self.es_client.client.search(index=index_name, body=body) + hits = resp.get("hits", {}).get("hits", []) or [] + if not hits: + break + for hit in hits: + yield hit + search_after = hits[-1].get("sort") + if len(hits) < batch_size: + break + + def _iter_query_log_rows( + self, + tenant_id: str, + since: datetime, + until: datetime, + fetch_size: int = 2000, + ) -> Iterator[Any]: + """Stream search logs from MySQL with bounded time range.""" + query_sql = text( + """ + SELECT query, language, request_params, create_time + FROM shoplazza_search_log + WHERE tenant_id = :tenant_id + AND deleted = 0 + AND query IS NOT NULL + AND query <> '' + AND create_time >= :since_time + AND create_time < :until_time + ORDER BY create_time ASC + """ + ) + + with self.db_engine.connect().execution_options(stream_results=True) as conn: + result = conn.execute( + query_sql, + { + "tenant_id": int(tenant_id), + "since_time": since, + "until_time": until, + }, + ) + while True: + rows = result.fetchmany(fetch_size) + if not rows: + break + for row in rows: + yield row + + def _ensure_meta_index(self) -> str: + meta_index = get_suggestion_meta_index_name() + if self.es_client.index_exists(meta_index): + return meta_index + body = { + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0, + "refresh_interval": "1s", + }, + "mappings": { + "properties": { + "tenant_id": {"type": "keyword"}, + "active_alias": {"type": "keyword"}, + "active_index": {"type": "keyword"}, + "last_full_build_at": {"type": "date"}, + "last_incremental_build_at": {"type": "date"}, + "last_incremental_watermark": {"type": "date"}, + "updated_at": {"type": "date"}, + } + }, + } + if not self.es_client.create_index(meta_index, body): + raise RuntimeError(f"Failed to create suggestion meta index: {meta_index}") + return meta_index + + def _get_meta(self, tenant_id: str) -> Dict[str, Any]: + meta_index = self._ensure_meta_index() + try: + resp = self.es_client.client.get(index=meta_index, id=str(tenant_id)) + return resp.get("_source", {}) or {} + except Exception: + return {} + + def _upsert_meta(self, tenant_id: str, patch: Dict[str, Any]) -> None: + meta_index = self._ensure_meta_index() + current = self._get_meta(tenant_id) + now_iso = datetime.now(timezone.utc).isoformat() + merged = { + "tenant_id": str(tenant_id), + **current, + **patch, + "updated_at": now_iso, + } + self.es_client.client.index(index=meta_index, id=str(tenant_id), document=merged, refresh="wait_for") + + def _cleanup_old_versions(self, tenant_id: str, keep_versions: int, protected_indices: Optional[List[str]] = None) -> List[str]: + if keep_versions < 1: + keep_versions = 1 + protected = set(protected_indices or []) + pattern = get_suggestion_versioned_index_pattern(tenant_id) + all_indices = self.es_client.list_indices(pattern) + if len(all_indices) <= keep_versions: + return [] + + # Names are timestamp-ordered by suffix; keep newest N. + kept = set(sorted(all_indices)[-keep_versions:]) + dropped: List[str] = [] + for idx in sorted(all_indices): + if idx in kept or idx in protected: + continue + if self.es_client.delete_index(idx): + dropped.append(idx) + return dropped + + def _publish_alias(self, tenant_id: str, index_name: str, keep_versions: int = 2) -> Dict[str, Any]: + alias_name = get_suggestion_alias_name(tenant_id) + current_indices = self.es_client.get_alias_indices(alias_name) + + actions: List[Dict[str, Any]] = [] + for idx in current_indices: + actions.append({"remove": {"index": idx, "alias": alias_name}}) + actions.append({"add": {"index": index_name, "alias": alias_name}}) + + if not self.es_client.update_aliases(actions): + raise RuntimeError(f"Failed to publish alias {alias_name} -> {index_name}") + + dropped = self._cleanup_old_versions( + tenant_id=tenant_id, + keep_versions=keep_versions, + protected_indices=[index_name], + ) + + self._upsert_meta( + tenant_id, + { + "active_alias": alias_name, + "active_index": index_name, + }, + ) + + return { + "alias": alias_name, + "previous_indices": current_indices, + "current_index": index_name, + "dropped_old_indices": dropped, + } + + def _resolve_incremental_target_index(self, tenant_id: str) -> Optional[str]: + """Resolve active suggestion index for incremental updates (alias only).""" + alias_name = get_suggestion_alias_name(tenant_id) + aliased = self.es_client.get_alias_indices(alias_name) + if aliased: + # alias should map to one index in this design + return sorted(aliased)[-1] + return None + + def _build_full_candidates( + self, + tenant_id: str, + index_languages: List[str], + primary_language: str, + days: int, + batch_size: int, + min_query_len: int, + ) -> Dict[Tuple[str, str], SuggestionCandidate]: + key_to_candidate: Dict[Tuple[str, str], SuggestionCandidate] = {} + + # Step 1: product title/qanchors + for hit in self._iter_products(tenant_id, batch_size=batch_size): + src = hit.get("_source", {}) or {} + product_id = str(src.get("spu_id") or src.get("id") or hit.get("_id") or "") + if not product_id: + continue + title_obj = src.get("title") or {} + qanchor_obj = src.get("qanchors") or {} + + for lang in index_languages: + title = "" + if isinstance(title_obj, dict): + title = self._prepare_title_for_suggest(title_obj.get(lang) or "") + if title: + text_norm = self._normalize_text(title) + if not self._looks_noise(text_norm): + key = (lang, text_norm) + c = key_to_candidate.get(key) + if c is None: + c = SuggestionCandidate(text=title, text_norm=text_norm, lang=lang) + key_to_candidate[key] = c + c.add_product("title", spu_id=product_id) + + q_raw = None + if isinstance(qanchor_obj, dict): + q_raw = qanchor_obj.get(lang) + for q_text in self._split_qanchors(q_raw): + text_norm = self._normalize_text(q_text) + if self._looks_noise(text_norm): + continue + key = (lang, text_norm) + c = key_to_candidate.get(key) + if c is None: + c = SuggestionCandidate(text=q_text, text_norm=text_norm, lang=lang) + key_to_candidate[key] = c + c.add_product("qanchor", spu_id=product_id) + + for tag_lang, tag in self._iter_multilang_product_tags( + src.get("enriched_tags"), + index_languages=index_languages, + primary_language=primary_language, + ): + text_norm = self._normalize_text(tag) + if self._looks_noise(text_norm): + continue + key = (tag_lang, text_norm) + c = key_to_candidate.get(key) + if c is None: + c = SuggestionCandidate(text=tag, text_norm=text_norm, lang=tag_lang) + key_to_candidate[key] = c + c.add_product("tag", spu_id=product_id) + + # Step 2: query logs + now = datetime.now(timezone.utc) + since = now - timedelta(days=days) + since_7d = now - timedelta(days=7) + + for row in self._iter_query_log_rows(tenant_id=tenant_id, since=since, until=now): + q = str(row.query or "").strip() + if len(q) < min_query_len: + continue + + lang, conf, source, conflict = self._resolve_query_language( + query=q, + log_language=getattr(row, "language", None), + request_params=getattr(row, "request_params", None), + index_languages=index_languages, + primary_language=primary_language, + ) + text_norm = self._normalize_text(q) + if self._looks_noise(text_norm): + continue + + key = (lang, text_norm) + c = key_to_candidate.get(key) + if c is None: + c = SuggestionCandidate(text=q, text_norm=text_norm, lang=lang) + key_to_candidate[key] = c + + c.lang_confidence = max(c.lang_confidence, conf) + c.lang_source = source if c.lang_source == "default" else c.lang_source + c.lang_conflict = c.lang_conflict or conflict + + created_at = self._to_utc(getattr(row, "create_time", None)) + is_7d = bool(created_at and created_at >= since_7d) + c.add_query_log(is_7d=is_7d) + + return key_to_candidate + + def _candidate_to_doc(self, tenant_id: str, c: SuggestionCandidate, now_iso: str) -> Dict[str, Any]: + rank_score = self._compute_rank_score_from_candidate(c) + completion_obj = {c.lang: {"input": [c.text], "weight": int(max(rank_score, 1.0) * 100)}} + sat_obj = {c.lang: c.text} + return { + "_id": f"{tenant_id}|{c.lang}|{c.text_norm}", + "tenant_id": str(tenant_id), + "lang": c.lang, + "text": c.text, + "text_norm": c.text_norm, + "sources": sorted(c.sources), + "title_doc_count": len(c.title_spu_ids), + "qanchor_doc_count": len(c.qanchor_spu_ids), + "tag_doc_count": len(c.tag_spu_ids), + "query_count_7d": c.query_count_7d, + "query_count_30d": c.query_count_30d, + "rank_score": float(rank_score), + "lang_confidence": float(c.lang_confidence), + "lang_source": c.lang_source, + "lang_conflict": bool(c.lang_conflict), + "status": 1, + "updated_at": now_iso, + "completion": completion_obj, + "sat": sat_obj, + } + + def rebuild_tenant_index( + self, + tenant_id: str, + days: int = 365, + batch_size: int = 500, + min_query_len: int = 1, + publish_alias: bool = True, + keep_versions: int = 2, + ) -> Dict[str, Any]: + """ + Full rebuild. + + Phase2 default behavior: + - write to versioned index + - atomically publish alias + """ + tenant_loader = get_tenant_config_loader() + tenant_cfg = tenant_loader.get_tenant_config(tenant_id) + index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"] + primary_language: str = tenant_cfg.get("primary_language") or "en" + + alias_publish: Optional[Dict[str, Any]] = None + index_name: Optional[str] = None + try: + mapping = build_suggestion_mapping(index_languages=index_languages) + index_name = self._create_fresh_versioned_index( + tenant_id=tenant_id, + mapping=mapping, + ) + self._ensure_new_index_ready(index_name) + + key_to_candidate = self._build_full_candidates( + tenant_id=tenant_id, + index_languages=index_languages, + primary_language=primary_language, + days=days, + batch_size=batch_size, + min_query_len=min_query_len, + ) + + now_iso = datetime.now(timezone.utc).isoformat() + docs = [self._candidate_to_doc(tenant_id, c, now_iso) for c in key_to_candidate.values()] + + if docs: + bulk_result = self.es_client.bulk_index(index_name=index_name, docs=docs) + self.es_client.refresh(index_name) + else: + bulk_result = {"success": 0, "failed": 0, "errors": []} + + if publish_alias: + alias_publish = self._publish_alias( + tenant_id=tenant_id, + index_name=index_name, + keep_versions=keep_versions, + ) + + now_utc = datetime.now(timezone.utc).isoformat() + meta_patch: Dict[str, Any] = { + "last_full_build_at": now_utc, + "last_incremental_watermark": now_utc, + } + if publish_alias: + meta_patch["active_index"] = index_name + meta_patch["active_alias"] = get_suggestion_alias_name(tenant_id) + self._upsert_meta(tenant_id, meta_patch) + + return { + "mode": "full", + "tenant_id": str(tenant_id), + "index_name": index_name, + "alias_published": bool(alias_publish), + "alias_publish": alias_publish, + "total_candidates": len(key_to_candidate), + "indexed_docs": len(docs), + "bulk_result": bulk_result, + } + except Exception: + if index_name and not alias_publish: + self.es_client.delete_index(index_name) + raise + + def _build_incremental_deltas( + self, + tenant_id: str, + index_languages: List[str], + primary_language: str, + since: datetime, + until: datetime, + min_query_len: int, + ) -> Dict[Tuple[str, str], QueryDelta]: + now = datetime.now(timezone.utc) + since_7d = now - timedelta(days=7) + deltas: Dict[Tuple[str, str], QueryDelta] = {} + + for row in self._iter_query_log_rows(tenant_id=tenant_id, since=since, until=until): + q = str(row.query or "").strip() + if len(q) < min_query_len: + continue + + lang, conf, source, conflict = self._resolve_query_language( + query=q, + log_language=getattr(row, "language", None), + request_params=getattr(row, "request_params", None), + index_languages=index_languages, + primary_language=primary_language, + ) + text_norm = self._normalize_text(q) + if self._looks_noise(text_norm): + continue + + key = (lang, text_norm) + item = deltas.get(key) + if item is None: + item = QueryDelta( + tenant_id=str(tenant_id), + lang=lang, + text=q, + text_norm=text_norm, + lang_confidence=conf, + lang_source=source, + lang_conflict=conflict, + ) + deltas[key] = item + + created_at = self._to_utc(getattr(row, "create_time", None)) + item.delta_30d += 1 + if created_at and created_at >= since_7d: + item.delta_7d += 1 + + if conf > item.lang_confidence: + item.lang_confidence = conf + item.lang_source = source + item.lang_conflict = item.lang_conflict or conflict + + return deltas + + def _delta_to_upsert_doc(self, delta: QueryDelta, now_iso: str) -> Dict[str, Any]: + rank_score = self._compute_rank_score( + query_count_30d=delta.delta_30d, + query_count_7d=delta.delta_7d, + qanchor_doc_count=0, + title_doc_count=0, + tag_doc_count=0, + ) + return { + "tenant_id": delta.tenant_id, + "lang": delta.lang, + "text": delta.text, + "text_norm": delta.text_norm, + "sources": ["query_log"], + "title_doc_count": 0, + "qanchor_doc_count": 0, + "tag_doc_count": 0, + "query_count_7d": delta.delta_7d, + "query_count_30d": delta.delta_30d, + "rank_score": float(rank_score), + "lang_confidence": float(delta.lang_confidence), + "lang_source": delta.lang_source, + "lang_conflict": bool(delta.lang_conflict), + "status": 1, + "updated_at": now_iso, + "completion": { + delta.lang: { + "input": [delta.text], + "weight": int(max(rank_score, 1.0) * 100), + } + }, + "sat": {delta.lang: delta.text}, + } + + @staticmethod + def _build_incremental_update_script() -> str: + return """ + if (ctx._source == null || ctx._source.isEmpty()) { + ctx._source = params.upsert; + return; + } + + if (ctx._source.query_count_30d == null) { ctx._source.query_count_30d = 0; } + if (ctx._source.query_count_7d == null) { ctx._source.query_count_7d = 0; } + if (ctx._source.qanchor_doc_count == null) { ctx._source.qanchor_doc_count = 0; } + if (ctx._source.title_doc_count == null) { ctx._source.title_doc_count = 0; } + if (ctx._source.tag_doc_count == null) { ctx._source.tag_doc_count = 0; } + + ctx._source.query_count_30d += params.delta_30d; + ctx._source.query_count_7d += params.delta_7d; + + if (ctx._source.sources == null) { ctx._source.sources = new ArrayList(); } + if (!ctx._source.sources.contains('query_log')) { ctx._source.sources.add('query_log'); } + + if (ctx._source.lang_conflict == null) { ctx._source.lang_conflict = false; } + ctx._source.lang_conflict = ctx._source.lang_conflict || params.lang_conflict; + + if (ctx._source.lang_confidence == null || params.lang_confidence > ctx._source.lang_confidence) { + ctx._source.lang_confidence = params.lang_confidence; + ctx._source.lang_source = params.lang_source; + } + + int q30 = ctx._source.query_count_30d; + int q7 = ctx._source.query_count_7d; + int qa = ctx._source.qanchor_doc_count; + int td = ctx._source.title_doc_count; + int tg = ctx._source.tag_doc_count; + + double score = 1.8 * Math.log(1 + q30) + + 1.2 * Math.log(1 + q7) + + 1.0 * Math.log(1 + qa) + + 0.85 * Math.log(1 + tg) + + 0.6 * Math.log(1 + td); + ctx._source.rank_score = score; + ctx._source.status = 1; + ctx._source.updated_at = params.now_iso; + ctx._source.text = params.text; + ctx._source.lang = params.lang; + ctx._source.text_norm = params.text_norm; + + if (ctx._source.completion == null) { ctx._source.completion = new HashMap(); } + Map c = new HashMap(); + c.put('input', params.completion_input); + c.put('weight', params.completion_weight); + ctx._source.completion.put(params.lang, c); + + if (ctx._source.sat == null) { ctx._source.sat = new HashMap(); } + ctx._source.sat.put(params.lang, params.text); + """ + + def _build_incremental_actions(self, target_index: str, deltas: Dict[Tuple[str, str], QueryDelta]) -> List[Dict[str, Any]]: + now_iso = datetime.now(timezone.utc).isoformat() + script_source = self._build_incremental_update_script() + actions: List[Dict[str, Any]] = [] + + for delta in deltas.values(): + upsert_doc = self._delta_to_upsert_doc(delta=delta, now_iso=now_iso) + upsert_rank = float(upsert_doc.get("rank_score") or 0.0) + action = { + "_op_type": "update", + "_index": target_index, + "_id": f"{delta.tenant_id}|{delta.lang}|{delta.text_norm}", + "scripted_upsert": True, + "script": { + "lang": "painless", + "source": script_source, + "params": { + "delta_30d": int(delta.delta_30d), + "delta_7d": int(delta.delta_7d), + "lang_confidence": float(delta.lang_confidence), + "lang_source": delta.lang_source, + "lang_conflict": bool(delta.lang_conflict), + "now_iso": now_iso, + "lang": delta.lang, + "text": delta.text, + "text_norm": delta.text_norm, + "completion_input": [delta.text], + "completion_weight": int(max(upsert_rank, 1.0) * 100), + "upsert": upsert_doc, + }, + }, + "upsert": upsert_doc, + } + actions.append(action) + + return actions + + def incremental_update_tenant_index( + self, + tenant_id: str, + min_query_len: int = 1, + fallback_days: int = 7, + overlap_minutes: int = 30, + bootstrap_if_missing: bool = True, + bootstrap_days: int = 30, + batch_size: int = 500, + ) -> Dict[str, Any]: + tenant_loader = get_tenant_config_loader() + tenant_cfg = tenant_loader.get_tenant_config(tenant_id) + index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"] + primary_language: str = tenant_cfg.get("primary_language") or "en" + + target_index = self._resolve_incremental_target_index(tenant_id) + if not target_index: + if not bootstrap_if_missing: + raise RuntimeError( + f"No active suggestion index for tenant={tenant_id}. " + "Run full rebuild first or enable bootstrap_if_missing." + ) + full_result = self.rebuild_tenant_index( + tenant_id=tenant_id, + days=bootstrap_days, + batch_size=batch_size, + min_query_len=min_query_len, + publish_alias=True + ) + return { + "mode": "incremental", + "tenant_id": str(tenant_id), + "bootstrapped": True, + "bootstrap_result": full_result, + } + + meta = self._get_meta(tenant_id) + watermark_raw = meta.get("last_incremental_watermark") or meta.get("last_full_build_at") + now = datetime.now(timezone.utc) + default_since = now - timedelta(days=fallback_days) + since = None + if isinstance(watermark_raw, str) and watermark_raw.strip(): + try: + since = self._to_utc(datetime.fromisoformat(watermark_raw.replace("Z", "+00:00"))) + except Exception: + since = None + if since is None: + since = default_since + since = since - timedelta(minutes=max(overlap_minutes, 0)) + if since < default_since: + since = default_since + + deltas = self._build_incremental_deltas( + tenant_id=tenant_id, + index_languages=index_languages, + primary_language=primary_language, + since=since, + until=now, + min_query_len=min_query_len, + ) + + actions = self._build_incremental_actions(target_index=target_index, deltas=deltas) + bulk_result = self.es_client.bulk_actions(actions) + self.es_client.refresh(target_index) + + now_iso = now.isoformat() + self._upsert_meta( + tenant_id, + { + "last_incremental_build_at": now_iso, + "last_incremental_watermark": now_iso, + "active_index": target_index, + "active_alias": get_suggestion_alias_name(tenant_id), + }, + ) + + return { + "mode": "incremental", + "tenant_id": str(tenant_id), + "target_index": target_index, + "query_window": { + "since": since.isoformat(), + "until": now_iso, + "overlap_minutes": int(overlap_minutes), + }, + "updated_terms": len(deltas), + "bulk_result": bulk_result, + } -- libgit2 0.21.2