""" Suggestion index builder (Phase 2). Capabilities: - Full rebuild to versioned index - Atomic alias publish - Incremental update from query logs with watermark """ import json import logging import math import re import unicodedata from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from typing import Any, Dict, Iterator, List, Optional, Tuple from sqlalchemy import text from config.loader import get_app_config from config.tenant_config_loader import get_tenant_config_loader from query.query_parser import detect_text_language_for_suggestions from suggestion.mapping import build_suggestion_mapping from utils.es_client import ESClient logger = logging.getLogger(__name__) def _index_prefix() -> str: return get_app_config().runtime.index_namespace or "" def get_suggestion_alias_name(tenant_id: str) -> str: """Read alias for suggestion index (single source of truth).""" return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_current" def get_suggestion_versioned_index_name(tenant_id: str, build_at: Optional[datetime] = None) -> str: """Versioned suggestion index name.""" ts = (build_at or datetime.now(timezone.utc)).strftime("%Y%m%d%H%M%S%f") return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_v{ts}" def get_suggestion_versioned_index_pattern(tenant_id: str) -> str: return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_v*" def get_suggestion_meta_index_name() -> str: return f"{_index_prefix()}search_suggestions_meta" @dataclass class SuggestionCandidate: text: str text_norm: str lang: str sources: set = field(default_factory=set) title_spu_ids: set = field(default_factory=set) qanchor_spu_ids: set = field(default_factory=set) tag_spu_ids: set = field(default_factory=set) query_count_7d: int = 0 query_count_30d: int = 0 lang_confidence: float = 1.0 lang_source: str = "default" lang_conflict: bool = False def add_product(self, source: str, spu_id: str) -> None: self.sources.add(source) if source == "title": self.title_spu_ids.add(spu_id) elif source == "qanchor": self.qanchor_spu_ids.add(spu_id) elif source == "tag": self.tag_spu_ids.add(spu_id) def add_query_log(self, is_7d: bool) -> None: self.sources.add("query_log") self.query_count_30d += 1 if is_7d: self.query_count_7d += 1 @dataclass class QueryDelta: tenant_id: str lang: str text: str text_norm: str delta_7d: int = 0 delta_30d: int = 0 lang_confidence: float = 1.0 lang_source: str = "default" lang_conflict: bool = False class SuggestionIndexBuilder: """Build and update suggestion index.""" def __init__(self, es_client: ESClient, db_engine: Any): self.es_client = es_client self.db_engine = db_engine def _format_allocation_failure(self, index_name: str) -> str: health = self.es_client.wait_for_index_ready(index_name=index_name, timeout="5s") explain = self.es_client.get_allocation_explain(index_name=index_name) parts = [ f"Suggestion index '{index_name}' was created but is not allocatable/readable yet", f"health_status={health.get('status')}", f"timed_out={health.get('timed_out')}", ] if health.get("error"): parts.append(f"health_error={health['error']}") if explain: unassigned = explain.get("unassigned_info") or {} if unassigned.get("reason"): parts.append(f"unassigned_reason={unassigned['reason']}") if unassigned.get("last_allocation_status"): parts.append(f"last_allocation_status={unassigned['last_allocation_status']}") for node in explain.get("node_allocation_decisions") or []: node_name = node.get("node_name") or node.get("node_id") or "unknown-node" for decider in node.get("deciders") or []: if decider.get("decision") == "NO": parts.append( f"{node_name}:{decider.get('decider')}={decider.get('explanation')}" ) return "; ".join(parts) return "; ".join(parts) def _create_fresh_versioned_index( self, tenant_id: str, mapping: Dict[str, Any], max_attempts: int = 5, ) -> str: for attempt in range(1, max_attempts + 1): index_name = get_suggestion_versioned_index_name(tenant_id) if self.es_client.index_exists(index_name): logger.warning( "Suggestion index name collision before create for tenant=%s index=%s attempt=%s/%s", tenant_id, index_name, attempt, max_attempts, ) continue if self.es_client.create_index(index_name, mapping): return index_name if self.es_client.index_exists(index_name): logger.warning( "Suggestion index name collision during create for tenant=%s index=%s attempt=%s/%s", tenant_id, index_name, attempt, max_attempts, ) continue raise RuntimeError(f"Failed to create suggestion index: {index_name}") raise RuntimeError( f"Failed to allocate a unique suggestion index name for tenant={tenant_id} after {max_attempts} attempts" ) def _ensure_new_index_ready(self, index_name: str) -> None: health = self.es_client.wait_for_index_ready(index_name=index_name, timeout="5s") if health.get("ok"): return raise RuntimeError(self._format_allocation_failure(index_name)) @staticmethod def _to_utc(dt: Any) -> Optional[datetime]: if dt is None: return None if isinstance(dt, datetime): if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) return None @staticmethod def _normalize_text(value: str) -> str: text_value = unicodedata.normalize("NFKC", (value or "")).strip().lower() text_value = re.sub(r"\s+", " ", text_value) return text_value @staticmethod def _prepare_title_for_suggest(title: str, max_len: int = 120) -> str: """ Keep title-derived suggestions concise: - keep raw title when short enough - for long titles, keep the leading phrase before common separators - fallback to hard truncate """ raw = str(title or "").strip() if not raw: return "" if len(raw) <= max_len: return raw head = re.split(r"[,,;;|/\\\\((\\[【]", raw, maxsplit=1)[0].strip() if 1 < len(head) <= max_len: return head truncated = raw[:max_len].rstrip(" ,,;;|/\\\\-—–()()[]【】") return truncated or raw[:max_len] @staticmethod def _split_qanchors(value: Any) -> List[str]: if value is None: return [] if isinstance(value, list): return [str(x).strip() for x in value if str(x).strip()] raw = str(value).strip() if not raw: return [] parts = re.split(r"[,、,;|/\n\t]+", raw) out = [p.strip() for p in parts if p and p.strip()] if not out: return [raw] return out @staticmethod def _iter_product_tags(raw: Any) -> List[str]: if raw is None: return [] if isinstance(raw, list): return [str(x).strip() for x in raw if str(x).strip()] s = str(raw).strip() if not s: return [] parts = re.split(r"[,、,;|/\n\t]+", s) out = [p.strip() for p in parts if p and p.strip()] return out if out else [s] def _iter_multilang_product_tags( self, raw: Any, index_languages: List[str], primary_language: str, ) -> List[Tuple[str, str]]: if isinstance(raw, dict): pairs: List[Tuple[str, str]] = [] for lang in index_languages: for tag in self._iter_product_tags(raw.get(lang)): pairs.append((lang, tag)) return pairs pairs = [] for tag in self._iter_product_tags(raw): tag_lang, _, _ = detect_text_language_for_suggestions( tag, index_languages=index_languages, primary_language=primary_language, ) pairs.append((tag_lang, tag)) return pairs @staticmethod def _looks_noise(text_value: str) -> bool: if not text_value: return True if len(text_value) > 120: return True if re.fullmatch(r"[\W_]+", text_value): return True return False @staticmethod def _normalize_lang(lang: Optional[str]) -> Optional[str]: if not lang: return None token = str(lang).strip().lower().replace("-", "_") if not token: return None if token in {"zh_tw", "pt_br"}: return token return token.split("_")[0] @staticmethod def _parse_request_params_language(raw: Any) -> Optional[str]: if raw is None: return None if isinstance(raw, dict): return raw.get("language") text_raw = str(raw).strip() if not text_raw: return None try: obj = json.loads(text_raw) if isinstance(obj, dict): return obj.get("language") except Exception: return None return None def _resolve_query_language( self, query: str, log_language: Optional[str], request_params: Any, index_languages: List[str], primary_language: str, ) -> Tuple[str, float, str, bool]: """Resolve lang with priority: log field > request_params > script/model.""" langs_set = set(index_languages or []) primary = self._normalize_lang(primary_language) or "en" if primary not in langs_set and langs_set: primary = index_languages[0] log_lang = self._normalize_lang(log_language) req_lang = self._normalize_lang(self._parse_request_params_language(request_params)) conflict = bool(log_lang and req_lang and log_lang != req_lang) if log_lang and (not langs_set or log_lang in langs_set): return log_lang, 1.0, "log_field", conflict if req_lang and (not langs_set or req_lang in langs_set): return req_lang, 1.0, "request_params", conflict det_lang, conf, det_source = detect_text_language_for_suggestions( query, index_languages=index_languages, primary_language=primary, ) if det_lang and (not langs_set or det_lang in langs_set): return det_lang, conf, det_source, conflict return primary, 0.3, "default", conflict @staticmethod def _compute_rank_score( query_count_30d: int, query_count_7d: int, qanchor_doc_count: int, title_doc_count: int, tag_doc_count: int = 0, ) -> float: return ( 1.8 * math.log1p(max(query_count_30d, 0)) + 1.2 * math.log1p(max(query_count_7d, 0)) + 1.0 * math.log1p(max(qanchor_doc_count, 0)) + 0.85 * math.log1p(max(tag_doc_count, 0)) + 0.6 * math.log1p(max(title_doc_count, 0)) ) @classmethod def _compute_rank_score_from_candidate(cls, c: SuggestionCandidate) -> float: return cls._compute_rank_score( query_count_30d=c.query_count_30d, query_count_7d=c.query_count_7d, qanchor_doc_count=len(c.qanchor_spu_ids), title_doc_count=len(c.title_spu_ids), tag_doc_count=len(c.tag_spu_ids), ) def _iter_products(self, tenant_id: str, batch_size: int = 500) -> Iterator[Dict[str, Any]]: """Stream product docs from tenant index using search_after.""" from indexer.mapping_generator import get_tenant_index_name index_name = get_tenant_index_name(tenant_id) search_after: Optional[List[Any]] = None print(f"[DEBUG] Python using index: {index_name} for tenant {tenant_id}") total_processed = 0 while True: body: Dict[str, Any] = { "size": batch_size, "_source": ["id", "spu_id", "title", "qanchors", "enriched_tags"], "sort": [ {"spu_id": {"order": "asc", "missing": "_last"}}, {"id.keyword": {"order": "asc", "missing": "_last"}}, ], "query": {"match_all": {}}, } if search_after is not None: body["search_after"] = search_after resp = self.es_client.client.search(index=index_name, body=body) hits = resp.get("hits", {}).get("hits", []) or [] if not hits: break for hit in hits: total_processed += 1 yield hit search_after = hits[-1].get("sort") if len(hits) < batch_size: break print(f"[DEBUG] Python processed total products: {total_processed} for tenant {tenant_id}") def _iter_query_log_rows( self, tenant_id: str, since: datetime, until: datetime, fetch_size: int = 2000, ) -> Iterator[Any]: """Stream search logs from MySQL with bounded time range.""" query_sql = text( """ SELECT query, language, request_params, create_time FROM shoplazza_search_log WHERE tenant_id = :tenant_id AND deleted = 0 AND query IS NOT NULL AND query <> '' AND create_time >= :since_time AND create_time < :until_time ORDER BY create_time ASC """ ) with self.db_engine.connect().execution_options(stream_results=True) as conn: result = conn.execute( query_sql, { "tenant_id": int(tenant_id), "since_time": since, "until_time": until, }, ) while True: rows = result.fetchmany(fetch_size) if not rows: break for row in rows: yield row def _ensure_meta_index(self) -> str: meta_index = get_suggestion_meta_index_name() if self.es_client.index_exists(meta_index): return meta_index body = { "settings": { "number_of_shards": 1, "number_of_replicas": 0, "refresh_interval": "1s", }, "mappings": { "properties": { "tenant_id": {"type": "keyword"}, "active_alias": {"type": "keyword"}, "active_index": {"type": "keyword"}, "last_full_build_at": {"type": "date"}, "last_incremental_build_at": {"type": "date"}, "last_incremental_watermark": {"type": "date"}, "updated_at": {"type": "date"}, } }, } if not self.es_client.create_index(meta_index, body): raise RuntimeError(f"Failed to create suggestion meta index: {meta_index}") return meta_index def _get_meta(self, tenant_id: str) -> Dict[str, Any]: meta_index = self._ensure_meta_index() try: resp = self.es_client.client.get(index=meta_index, id=str(tenant_id)) return resp.get("_source", {}) or {} except Exception: return {} def _upsert_meta(self, tenant_id: str, patch: Dict[str, Any]) -> None: meta_index = self._ensure_meta_index() current = self._get_meta(tenant_id) now_iso = datetime.now(timezone.utc).isoformat() merged = { "tenant_id": str(tenant_id), **current, **patch, "updated_at": now_iso, } self.es_client.client.index(index=meta_index, id=str(tenant_id), document=merged, refresh="wait_for") def _cleanup_old_versions(self, tenant_id: str, keep_versions: int, protected_indices: Optional[List[str]] = None) -> List[str]: if keep_versions < 1: keep_versions = 1 protected = set(protected_indices or []) pattern = get_suggestion_versioned_index_pattern(tenant_id) all_indices = self.es_client.list_indices(pattern) if len(all_indices) <= keep_versions: return [] # Names are timestamp-ordered by suffix; keep newest N. kept = set(sorted(all_indices)[-keep_versions:]) dropped: List[str] = [] for idx in sorted(all_indices): if idx in kept or idx in protected: continue if self.es_client.delete_index(idx): dropped.append(idx) return dropped def _publish_alias(self, tenant_id: str, index_name: str, keep_versions: int = 2) -> Dict[str, Any]: alias_name = get_suggestion_alias_name(tenant_id) current_indices = self.es_client.get_alias_indices(alias_name) actions: List[Dict[str, Any]] = [] for idx in current_indices: actions.append({"remove": {"index": idx, "alias": alias_name}}) actions.append({"add": {"index": index_name, "alias": alias_name}}) if not self.es_client.update_aliases(actions): raise RuntimeError(f"Failed to publish alias {alias_name} -> {index_name}") dropped = self._cleanup_old_versions( tenant_id=tenant_id, keep_versions=keep_versions, protected_indices=[index_name], ) self._upsert_meta( tenant_id, { "active_alias": alias_name, "active_index": index_name, }, ) return { "alias": alias_name, "previous_indices": current_indices, "current_index": index_name, "dropped_old_indices": dropped, } def _resolve_incremental_target_index(self, tenant_id: str) -> Optional[str]: """Resolve active suggestion index for incremental updates (alias only).""" alias_name = get_suggestion_alias_name(tenant_id) aliased = self.es_client.get_alias_indices(alias_name) if aliased: # alias should map to one index in this design return sorted(aliased)[-1] return None def _build_full_candidates( self, tenant_id: str, index_languages: List[str], primary_language: str, days: int, batch_size: int, min_query_len: int, ) -> Dict[Tuple[str, str], SuggestionCandidate]: key_to_candidate: Dict[Tuple[str, str], SuggestionCandidate] = {} # Step 1: product title/qanchors for hit in self._iter_products(tenant_id, batch_size=batch_size): src = hit.get("_source", {}) or {} product_id = str(src.get("spu_id") or src.get("id") or hit.get("_id") or "") if not product_id: continue title_obj = src.get("title") or {} qanchor_obj = src.get("qanchors") or {} for lang in index_languages: title = "" if isinstance(title_obj, dict): title = self._prepare_title_for_suggest(title_obj.get(lang) or "") if title: text_norm = self._normalize_text(title) if not self._looks_noise(text_norm): key = (lang, text_norm) c = key_to_candidate.get(key) if c is None: c = SuggestionCandidate(text=title, text_norm=text_norm, lang=lang) key_to_candidate[key] = c c.add_product("title", spu_id=product_id) q_raw = None if isinstance(qanchor_obj, dict): q_raw = qanchor_obj.get(lang) for q_text in self._split_qanchors(q_raw): text_norm = self._normalize_text(q_text) if self._looks_noise(text_norm): continue key = (lang, text_norm) c = key_to_candidate.get(key) if c is None: c = SuggestionCandidate(text=q_text, text_norm=text_norm, lang=lang) key_to_candidate[key] = c c.add_product("qanchor", spu_id=product_id) for tag_lang, tag in self._iter_multilang_product_tags( src.get("enriched_tags"), index_languages=index_languages, primary_language=primary_language, ): text_norm = self._normalize_text(tag) if self._looks_noise(text_norm): continue key = (tag_lang, text_norm) c = key_to_candidate.get(key) if c is None: c = SuggestionCandidate(text=tag, text_norm=text_norm, lang=tag_lang) key_to_candidate[key] = c c.add_product("tag", spu_id=product_id) # Step 2: query logs now = datetime.now(timezone.utc) since = now - timedelta(days=days) since_7d = now - timedelta(days=7) for row in self._iter_query_log_rows(tenant_id=tenant_id, since=since, until=now): q = str(row.query or "").strip() if len(q) < min_query_len: continue lang, conf, source, conflict = self._resolve_query_language( query=q, log_language=getattr(row, "language", None), request_params=getattr(row, "request_params", None), index_languages=index_languages, primary_language=primary_language, ) text_norm = self._normalize_text(q) if self._looks_noise(text_norm): continue key = (lang, text_norm) c = key_to_candidate.get(key) if c is None: c = SuggestionCandidate(text=q, text_norm=text_norm, lang=lang) key_to_candidate[key] = c c.lang_confidence = max(c.lang_confidence, conf) c.lang_source = source if c.lang_source == "default" else c.lang_source c.lang_conflict = c.lang_conflict or conflict created_at = self._to_utc(getattr(row, "create_time", None)) is_7d = bool(created_at and created_at >= since_7d) c.add_query_log(is_7d=is_7d) return key_to_candidate def _candidate_to_doc(self, tenant_id: str, c: SuggestionCandidate, now_iso: str) -> Dict[str, Any]: rank_score = self._compute_rank_score_from_candidate(c) completion_obj = {c.lang: {"input": [c.text], "weight": int(max(rank_score, 1.0) * 100)}} sat_obj = {c.lang: c.text} return { "_id": f"{tenant_id}|{c.lang}|{c.text_norm}", "tenant_id": str(tenant_id), "lang": c.lang, "text": c.text, "text_norm": c.text_norm, "sources": sorted(c.sources), "title_doc_count": len(c.title_spu_ids), "qanchor_doc_count": len(c.qanchor_spu_ids), "tag_doc_count": len(c.tag_spu_ids), "query_count_7d": c.query_count_7d, "query_count_30d": c.query_count_30d, "rank_score": float(rank_score), "lang_confidence": float(c.lang_confidence), "lang_source": c.lang_source, "lang_conflict": bool(c.lang_conflict), "status": 1, "updated_at": now_iso, "completion": completion_obj, "sat": sat_obj, } def rebuild_tenant_index( self, tenant_id: str, days: int = 365, batch_size: int = 500, min_query_len: int = 1, publish_alias: bool = True, keep_versions: int = 2, ) -> Dict[str, Any]: """ Full rebuild. Phase2 default behavior: - write to versioned index - atomically publish alias """ tenant_loader = get_tenant_config_loader() tenant_cfg = tenant_loader.get_tenant_config(tenant_id) index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"] primary_language: str = tenant_cfg.get("primary_language") or "en" alias_publish: Optional[Dict[str, Any]] = None index_name: Optional[str] = None try: mapping = build_suggestion_mapping(index_languages=index_languages) index_name = self._create_fresh_versioned_index( tenant_id=tenant_id, mapping=mapping, ) self._ensure_new_index_ready(index_name) key_to_candidate = self._build_full_candidates( tenant_id=tenant_id, index_languages=index_languages, primary_language=primary_language, days=days, batch_size=batch_size, min_query_len=min_query_len, ) now_iso = datetime.now(timezone.utc).isoformat() docs = [self._candidate_to_doc(tenant_id, c, now_iso) for c in key_to_candidate.values()] if docs: bulk_result = self.es_client.bulk_index(index_name=index_name, docs=docs) self.es_client.refresh(index_name) else: bulk_result = {"success": 0, "failed": 0, "errors": []} if publish_alias: alias_publish = self._publish_alias( tenant_id=tenant_id, index_name=index_name, keep_versions=keep_versions, ) now_utc = datetime.now(timezone.utc).isoformat() meta_patch: Dict[str, Any] = { "last_full_build_at": now_utc, "last_incremental_watermark": now_utc, } if publish_alias: meta_patch["active_index"] = index_name meta_patch["active_alias"] = get_suggestion_alias_name(tenant_id) self._upsert_meta(tenant_id, meta_patch) return { "mode": "full", "tenant_id": str(tenant_id), "index_name": index_name, "alias_published": bool(alias_publish), "alias_publish": alias_publish, "total_candidates": len(key_to_candidate), "indexed_docs": len(docs), "bulk_result": bulk_result, } except Exception: if index_name and not alias_publish: self.es_client.delete_index(index_name) raise def _build_incremental_deltas( self, tenant_id: str, index_languages: List[str], primary_language: str, since: datetime, until: datetime, min_query_len: int, ) -> Dict[Tuple[str, str], QueryDelta]: now = datetime.now(timezone.utc) since_7d = now - timedelta(days=7) deltas: Dict[Tuple[str, str], QueryDelta] = {} for row in self._iter_query_log_rows(tenant_id=tenant_id, since=since, until=until): q = str(row.query or "").strip() if len(q) < min_query_len: continue lang, conf, source, conflict = self._resolve_query_language( query=q, log_language=getattr(row, "language", None), request_params=getattr(row, "request_params", None), index_languages=index_languages, primary_language=primary_language, ) text_norm = self._normalize_text(q) if self._looks_noise(text_norm): continue key = (lang, text_norm) item = deltas.get(key) if item is None: item = QueryDelta( tenant_id=str(tenant_id), lang=lang, text=q, text_norm=text_norm, lang_confidence=conf, lang_source=source, lang_conflict=conflict, ) deltas[key] = item created_at = self._to_utc(getattr(row, "create_time", None)) item.delta_30d += 1 if created_at and created_at >= since_7d: item.delta_7d += 1 if conf > item.lang_confidence: item.lang_confidence = conf item.lang_source = source item.lang_conflict = item.lang_conflict or conflict return deltas def _delta_to_upsert_doc(self, delta: QueryDelta, now_iso: str) -> Dict[str, Any]: rank_score = self._compute_rank_score( query_count_30d=delta.delta_30d, query_count_7d=delta.delta_7d, qanchor_doc_count=0, title_doc_count=0, tag_doc_count=0, ) return { "tenant_id": delta.tenant_id, "lang": delta.lang, "text": delta.text, "text_norm": delta.text_norm, "sources": ["query_log"], "title_doc_count": 0, "qanchor_doc_count": 0, "tag_doc_count": 0, "query_count_7d": delta.delta_7d, "query_count_30d": delta.delta_30d, "rank_score": float(rank_score), "lang_confidence": float(delta.lang_confidence), "lang_source": delta.lang_source, "lang_conflict": bool(delta.lang_conflict), "status": 1, "updated_at": now_iso, "completion": { delta.lang: { "input": [delta.text], "weight": int(max(rank_score, 1.0) * 100), } }, "sat": {delta.lang: delta.text}, } @staticmethod def _build_incremental_update_script() -> str: return """ if (ctx._source == null || ctx._source.isEmpty()) { ctx._source = params.upsert; return; } if (ctx._source.query_count_30d == null) { ctx._source.query_count_30d = 0; } if (ctx._source.query_count_7d == null) { ctx._source.query_count_7d = 0; } if (ctx._source.qanchor_doc_count == null) { ctx._source.qanchor_doc_count = 0; } if (ctx._source.title_doc_count == null) { ctx._source.title_doc_count = 0; } if (ctx._source.tag_doc_count == null) { ctx._source.tag_doc_count = 0; } ctx._source.query_count_30d += params.delta_30d; ctx._source.query_count_7d += params.delta_7d; if (ctx._source.sources == null) { ctx._source.sources = new ArrayList(); } if (!ctx._source.sources.contains('query_log')) { ctx._source.sources.add('query_log'); } if (ctx._source.lang_conflict == null) { ctx._source.lang_conflict = false; } ctx._source.lang_conflict = ctx._source.lang_conflict || params.lang_conflict; if (ctx._source.lang_confidence == null || params.lang_confidence > ctx._source.lang_confidence) { ctx._source.lang_confidence = params.lang_confidence; ctx._source.lang_source = params.lang_source; } int q30 = ctx._source.query_count_30d; int q7 = ctx._source.query_count_7d; int qa = ctx._source.qanchor_doc_count; int td = ctx._source.title_doc_count; int tg = ctx._source.tag_doc_count; double score = 1.8 * Math.log(1 + q30) + 1.2 * Math.log(1 + q7) + 1.0 * Math.log(1 + qa) + 0.85 * Math.log(1 + tg) + 0.6 * Math.log(1 + td); ctx._source.rank_score = score; ctx._source.status = 1; ctx._source.updated_at = params.now_iso; ctx._source.text = params.text; ctx._source.lang = params.lang; ctx._source.text_norm = params.text_norm; if (ctx._source.completion == null) { ctx._source.completion = new HashMap(); } Map c = new HashMap(); c.put('input', params.completion_input); c.put('weight', params.completion_weight); ctx._source.completion.put(params.lang, c); if (ctx._source.sat == null) { ctx._source.sat = new HashMap(); } ctx._source.sat.put(params.lang, params.text); """ def _build_incremental_actions(self, target_index: str, deltas: Dict[Tuple[str, str], QueryDelta]) -> List[Dict[str, Any]]: now_iso = datetime.now(timezone.utc).isoformat() script_source = self._build_incremental_update_script() actions: List[Dict[str, Any]] = [] for delta in deltas.values(): upsert_doc = self._delta_to_upsert_doc(delta=delta, now_iso=now_iso) upsert_rank = float(upsert_doc.get("rank_score") or 0.0) action = { "_op_type": "update", "_index": target_index, "_id": f"{delta.tenant_id}|{delta.lang}|{delta.text_norm}", "scripted_upsert": True, "script": { "lang": "painless", "source": script_source, "params": { "delta_30d": int(delta.delta_30d), "delta_7d": int(delta.delta_7d), "lang_confidence": float(delta.lang_confidence), "lang_source": delta.lang_source, "lang_conflict": bool(delta.lang_conflict), "now_iso": now_iso, "lang": delta.lang, "text": delta.text, "text_norm": delta.text_norm, "completion_input": [delta.text], "completion_weight": int(max(upsert_rank, 1.0) * 100), "upsert": upsert_doc, }, }, "upsert": upsert_doc, } actions.append(action) return actions def incremental_update_tenant_index( self, tenant_id: str, min_query_len: int = 1, fallback_days: int = 7, overlap_minutes: int = 30, bootstrap_if_missing: bool = True, bootstrap_days: int = 30, batch_size: int = 500, ) -> Dict[str, Any]: tenant_loader = get_tenant_config_loader() tenant_cfg = tenant_loader.get_tenant_config(tenant_id) index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"] primary_language: str = tenant_cfg.get("primary_language") or "en" target_index = self._resolve_incremental_target_index(tenant_id) if not target_index: if not bootstrap_if_missing: raise RuntimeError( f"No active suggestion index for tenant={tenant_id}. " "Run full rebuild first or enable bootstrap_if_missing." ) full_result = self.rebuild_tenant_index( tenant_id=tenant_id, days=bootstrap_days, batch_size=batch_size, min_query_len=min_query_len, publish_alias=True ) return { "mode": "incremental", "tenant_id": str(tenant_id), "bootstrapped": True, "bootstrap_result": full_result, } meta = self._get_meta(tenant_id) watermark_raw = meta.get("last_incremental_watermark") or meta.get("last_full_build_at") now = datetime.now(timezone.utc) default_since = now - timedelta(days=fallback_days) since = None if isinstance(watermark_raw, str) and watermark_raw.strip(): try: since = self._to_utc(datetime.fromisoformat(watermark_raw.replace("Z", "+00:00"))) except Exception: since = None if since is None: since = default_since since = since - timedelta(minutes=max(overlap_minutes, 0)) if since < default_since: since = default_since deltas = self._build_incremental_deltas( tenant_id=tenant_id, index_languages=index_languages, primary_language=primary_language, since=since, until=now, min_query_len=min_query_len, ) actions = self._build_incremental_actions(target_index=target_index, deltas=deltas) bulk_result = self.es_client.bulk_actions(actions) self.es_client.refresh(target_index) now_iso = now.isoformat() self._upsert_meta( tenant_id, { "last_incremental_build_at": now_iso, "last_incremental_watermark": now_iso, "active_index": target_index, "active_alias": get_suggestion_alias_name(tenant_id), }, ) return { "mode": "incremental", "tenant_id": str(tenant_id), "target_index": target_index, "query_window": { "since": since.isoformat(), "until": now_iso, "overlap_minutes": int(overlap_minutes), }, "updated_terms": len(deltas), "bulk_result": bulk_result, }