""" Full suggestion index builder. Build data from: - ES product index fields: title.{lang}, qanchors.{lang} - MySQL search logs: shoplazza_search_log.query (+ language metadata) """ import json import logging import math import re from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional, Tuple from sqlalchemy import text from config.tenant_config_loader import get_tenant_config_loader from utils.es_client import ESClient from suggestion.mapping import build_suggestion_mapping logger = logging.getLogger(__name__) def get_suggestion_index_name(tenant_id: str) -> str: return f"search_suggestions_tenant_{tenant_id}" @dataclass class SuggestionCandidate: text: str text_norm: str lang: str sources: set = field(default_factory=set) title_spu_ids: set = field(default_factory=set) qanchor_spu_ids: set = field(default_factory=set) query_count_7d: int = 0 query_count_30d: int = 0 lang_confidence: float = 1.0 lang_source: str = "default" lang_conflict: bool = False top_spu_scores: Dict[str, float] = field(default_factory=dict) def add_product(self, source: str, spu_id: str, score: float) -> None: self.sources.add(source) if source == "title": self.title_spu_ids.add(spu_id) elif source == "qanchor": self.qanchor_spu_ids.add(spu_id) prev = self.top_spu_scores.get(spu_id) if prev is None or score > prev: self.top_spu_scores[spu_id] = score def add_query_log(self, is_7d: bool) -> None: self.sources.add("query_log") self.query_count_30d += 1 if is_7d: self.query_count_7d += 1 class SuggestionIndexBuilder: """Build and rebuild suggestion index.""" def __init__(self, es_client: ESClient, db_engine: Any): self.es_client = es_client self.db_engine = db_engine @staticmethod def _normalize_text(value: str) -> str: text_value = (value or "").strip().lower() text_value = re.sub(r"\s+", " ", text_value) return text_value @staticmethod def _split_qanchors(value: Any) -> List[str]: if value is None: return [] if isinstance(value, list): return [str(x).strip() for x in value if str(x).strip()] raw = str(value).strip() if not raw: return [] parts = re.split(r"[,;|/\n\t]+", raw) out = [p.strip() for p in parts if p and p.strip()] if not out: return [raw] return out @staticmethod def _looks_noise(text_value: str) -> bool: if not text_value: return True if len(text_value) > 120: return True if re.fullmatch(r"[\W_]+", text_value): return True return False @staticmethod def _normalize_lang(lang: Optional[str]) -> Optional[str]: if not lang: return None token = str(lang).strip().lower().replace("-", "_") if not token: return None # en_us -> en, zh_cn -> zh, keep explicit zh_tw / pt_br if token in {"zh_tw", "pt_br"}: return token return token.split("_")[0] @staticmethod def _parse_request_params_language(raw: Any) -> Optional[str]: if raw is None: return None if isinstance(raw, dict): return raw.get("language") text_raw = str(raw).strip() if not text_raw: return None try: obj = json.loads(text_raw) if isinstance(obj, dict): return obj.get("language") except Exception: return None return None @staticmethod def _detect_script_language(query: str) -> Tuple[Optional[str], float, str]: # CJK unified if re.search(r"[\u4e00-\u9fff]", query): return "zh", 0.98, "script" # Arabic if re.search(r"[\u0600-\u06FF]", query): return "ar", 0.98, "script" # Cyrillic if re.search(r"[\u0400-\u04FF]", query): return "ru", 0.95, "script" # Greek if re.search(r"[\u0370-\u03FF]", query): return "el", 0.95, "script" # Latin fallback if re.search(r"[a-zA-Z]", query): return "en", 0.55, "model" return None, 0.0, "default" def _resolve_query_language( self, query: str, log_language: Optional[str], request_params: Any, index_languages: List[str], primary_language: str, ) -> Tuple[str, float, str, bool]: """Resolve lang with priority: log field > request_params > script/model.""" langs_set = set(index_languages or []) primary = self._normalize_lang(primary_language) or "en" if primary not in langs_set and langs_set: primary = index_languages[0] log_lang = self._normalize_lang(log_language) req_lang = self._normalize_lang(self._parse_request_params_language(request_params)) conflict = bool(log_lang and req_lang and log_lang != req_lang) if log_lang and (not langs_set or log_lang in langs_set): return log_lang, 1.0, "log_field", conflict if req_lang and (not langs_set or req_lang in langs_set): return req_lang, 1.0, "request_params", conflict detected_lang, conf, source = self._detect_script_language(query) if detected_lang and (not langs_set or detected_lang in langs_set): return detected_lang, conf, source, conflict return primary, 0.3, "default", conflict @staticmethod def _score_product_hit(source: Dict[str, Any]) -> float: sales = float(source.get("sales") or 0.0) inventory = float(source.get("total_inventory") or 0.0) return math.log1p(max(sales, 0.0)) * 1.2 + math.log1p(max(inventory, 0.0)) * 0.4 @staticmethod def _compute_rank_score(c: SuggestionCandidate) -> float: return ( 1.8 * math.log1p(c.query_count_30d) + 1.2 * math.log1p(c.query_count_7d) + 1.0 * math.log1p(len(c.qanchor_spu_ids)) + 0.6 * math.log1p(len(c.title_spu_ids)) ) def _scan_products(self, tenant_id: str, batch_size: int = 500) -> List[Dict[str, Any]]: """Scan all product docs from tenant index using search_after.""" from indexer.mapping_generator import get_tenant_index_name index_name = get_tenant_index_name(tenant_id) all_hits: List[Dict[str, Any]] = [] search_after: Optional[List[Any]] = None while True: body: Dict[str, Any] = { "size": batch_size, "_source": ["spu_id", "title", "qanchors", "sales", "total_inventory"], "sort": [{"spu_id": "asc"}], "query": {"match_all": {}}, } if search_after is not None: body["search_after"] = search_after resp = self.es_client.client.search(index=index_name, body=body) hits = resp.get("hits", {}).get("hits", []) or [] if not hits: break all_hits.extend(hits) search_after = hits[-1].get("sort") if len(hits) < batch_size: break return all_hits def _create_or_reset_index(self, tenant_id: str, index_languages: List[str], recreate: bool) -> str: index_name = get_suggestion_index_name(tenant_id) if recreate and self.es_client.index_exists(index_name): logger.info("Deleting existing suggestion index: %s", index_name) self.es_client.delete_index(index_name) if not self.es_client.index_exists(index_name): mapping = build_suggestion_mapping(index_languages=index_languages) ok = self.es_client.create_index(index_name, mapping) if not ok: raise RuntimeError(f"Failed to create suggestion index: {index_name}") return index_name def rebuild_tenant_index( self, tenant_id: str, days: int = 365, recreate: bool = True, batch_size: int = 500, min_query_len: int = 1, ) -> Dict[str, Any]: tenant_loader = get_tenant_config_loader() tenant_cfg = tenant_loader.get_tenant_config(tenant_id) index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"] primary_language: str = tenant_cfg.get("primary_language") or "en" index_name = self._create_or_reset_index(tenant_id, index_languages, recreate) key_to_candidate: Dict[Tuple[str, str], SuggestionCandidate] = {} # Step 1: product title/qanchors hits = self._scan_products(tenant_id, batch_size=batch_size) for hit in hits: src = hit.get("_source", {}) or {} spu_id = str(src.get("spu_id") or "") if not spu_id: continue title_obj = src.get("title") or {} qanchor_obj = src.get("qanchors") or {} product_score = self._score_product_hit(src) for lang in index_languages: title = "" if isinstance(title_obj, dict): title = str(title_obj.get(lang) or "").strip() if title: text_norm = self._normalize_text(title) if not self._looks_noise(text_norm): key = (lang, text_norm) c = key_to_candidate.get(key) if c is None: c = SuggestionCandidate(text=title, text_norm=text_norm, lang=lang) key_to_candidate[key] = c c.add_product("title", spu_id=spu_id, score=product_score) q_raw = None if isinstance(qanchor_obj, dict): q_raw = qanchor_obj.get(lang) for q_text in self._split_qanchors(q_raw): text_norm = self._normalize_text(q_text) if self._looks_noise(text_norm): continue key = (lang, text_norm) c = key_to_candidate.get(key) if c is None: c = SuggestionCandidate(text=q_text, text_norm=text_norm, lang=lang) key_to_candidate[key] = c c.add_product("qanchor", spu_id=spu_id, score=product_score + 0.6) # Step 2: query logs now = datetime.now(timezone.utc) since_30d = now - timedelta(days=days) since_7d = now - timedelta(days=7) query_sql = text( """ SELECT query, language, request_params, create_time FROM shoplazza_search_log WHERE tenant_id = :tenant_id AND deleted = 0 AND query IS NOT NULL AND query <> '' AND create_time >= :since_30d """ ) with self.db_engine.connect() as conn: rows = conn.execute(query_sql, {"tenant_id": int(tenant_id), "since_30d": since_30d}).fetchall() for row in rows: q = str(row.query or "").strip() if len(q) < min_query_len: continue lang, conf, source, conflict = self._resolve_query_language( query=q, log_language=getattr(row, "language", None), request_params=getattr(row, "request_params", None), index_languages=index_languages, primary_language=primary_language, ) text_norm = self._normalize_text(q) if self._looks_noise(text_norm): continue key = (lang, text_norm) c = key_to_candidate.get(key) if c is None: c = SuggestionCandidate(text=q, text_norm=text_norm, lang=lang) key_to_candidate[key] = c c.lang_confidence = max(c.lang_confidence, conf) c.lang_source = source if c.lang_source == "default" else c.lang_source c.lang_conflict = c.lang_conflict or conflict created_at = getattr(row, "create_time", None) if created_at is None: is_7d = False else: # DB datetime usually naive local time; compare conservatively if isinstance(created_at, datetime) and created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) is_7d = bool(created_at and created_at >= since_7d) c.add_query_log(is_7d=is_7d) # Step 3: build docs now_iso = datetime.now(timezone.utc).isoformat() docs: List[Dict[str, Any]] = [] for (_, _), c in key_to_candidate.items(): rank_score = self._compute_rank_score(c) # keep top 20 product ids by score top_spu_ids = [ item[0] for item in sorted(c.top_spu_scores.items(), key=lambda kv: kv[1], reverse=True)[:20] ] completion_obj = {c.lang: {"input": [c.text], "weight": int(max(rank_score, 1.0) * 100)}} sat_obj = {c.lang: c.text} doc_id = f"{tenant_id}|{c.lang}|{c.text_norm}" docs.append( { "_id": doc_id, "tenant_id": str(tenant_id), "lang": c.lang, "text": c.text, "text_norm": c.text_norm, "sources": sorted(c.sources), "title_doc_count": len(c.title_spu_ids), "qanchor_doc_count": len(c.qanchor_spu_ids), "query_count_7d": c.query_count_7d, "query_count_30d": c.query_count_30d, "rank_score": float(rank_score), "lang_confidence": float(c.lang_confidence), "lang_source": c.lang_source, "lang_conflict": bool(c.lang_conflict), "top_spu_ids": top_spu_ids, "status": 1, "updated_at": now_iso, "completion": completion_obj, "sat": sat_obj, } ) if docs: result = self.es_client.bulk_index(index_name=index_name, docs=docs) self.es_client.refresh(index_name) else: result = {"success": 0, "failed": 0, "errors": []} return { "tenant_id": str(tenant_id), "index_name": index_name, "total_candidates": len(key_to_candidate), "indexed_docs": len(docs), "bulk_result": result, }