builder.py 14.8 KB
"""
Full suggestion index builder.

Build data from:
- ES product index fields: title.{lang}, qanchors.{lang}
- MySQL search logs: shoplazza_search_log.query (+ language metadata)
"""

import json
import logging
import math
import re
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple

from sqlalchemy import text

from config.tenant_config_loader import get_tenant_config_loader
from utils.es_client import ESClient
from suggestion.mapping import build_suggestion_mapping
from config.env_config import ES_INDEX_NAMESPACE

logger = logging.getLogger(__name__)


def get_suggestion_index_name(tenant_id: str) -> str:
    """
    生成 suggestion 索引名称。

    命名规则:
      {ES_INDEX_NAMESPACE}search_suggestions_tenant_{tenant_id}

    通过 ES_INDEX_NAMESPACE 统一区分 prod/uat/test 等环境。
    """
    prefix = ES_INDEX_NAMESPACE or ""
    return f"{prefix}search_suggestions_tenant_{tenant_id}"


@dataclass
class SuggestionCandidate:
    text: str
    text_norm: str
    lang: str
    sources: set = field(default_factory=set)
    title_spu_ids: set = field(default_factory=set)
    qanchor_spu_ids: set = field(default_factory=set)
    query_count_7d: int = 0
    query_count_30d: int = 0
    lang_confidence: float = 1.0
    lang_source: str = "default"
    lang_conflict: bool = False
    top_spu_scores: Dict[str, float] = field(default_factory=dict)

    def add_product(self, source: str, spu_id: str, score: float) -> None:
        self.sources.add(source)
        if source == "title":
            self.title_spu_ids.add(spu_id)
        elif source == "qanchor":
            self.qanchor_spu_ids.add(spu_id)
        prev = self.top_spu_scores.get(spu_id)
        if prev is None or score > prev:
            self.top_spu_scores[spu_id] = score

    def add_query_log(self, is_7d: bool) -> None:
        self.sources.add("query_log")
        self.query_count_30d += 1
        if is_7d:
            self.query_count_7d += 1


class SuggestionIndexBuilder:
    """Build and rebuild suggestion index."""

    def __init__(self, es_client: ESClient, db_engine: Any):
        self.es_client = es_client
        self.db_engine = db_engine

    @staticmethod
    def _normalize_text(value: str) -> str:
        text_value = (value or "").strip().lower()
        text_value = re.sub(r"\s+", " ", text_value)
        return text_value

    @staticmethod
    def _split_qanchors(value: Any) -> List[str]:
        if value is None:
            return []
        if isinstance(value, list):
            return [str(x).strip() for x in value if str(x).strip()]
        raw = str(value).strip()
        if not raw:
            return []
        parts = re.split(r"[,;|/\n\t]+", raw)
        out = [p.strip() for p in parts if p and p.strip()]
        if not out:
            return [raw]
        return out

    @staticmethod
    def _looks_noise(text_value: str) -> bool:
        if not text_value:
            return True
        if len(text_value) > 120:
            return True
        if re.fullmatch(r"[\W_]+", text_value):
            return True
        return False

    @staticmethod
    def _normalize_lang(lang: Optional[str]) -> Optional[str]:
        if not lang:
            return None
        token = str(lang).strip().lower().replace("-", "_")
        if not token:
            return None
        # en_us -> en, zh_cn -> zh, keep explicit zh_tw / pt_br
        if token in {"zh_tw", "pt_br"}:
            return token
        return token.split("_")[0]

    @staticmethod
    def _parse_request_params_language(raw: Any) -> Optional[str]:
        if raw is None:
            return None
        if isinstance(raw, dict):
            return raw.get("language")
        text_raw = str(raw).strip()
        if not text_raw:
            return None
        try:
            obj = json.loads(text_raw)
            if isinstance(obj, dict):
                return obj.get("language")
        except Exception:
            return None
        return None

    @staticmethod
    def _detect_script_language(query: str) -> Tuple[Optional[str], float, str]:
        # CJK unified
        if re.search(r"[\u4e00-\u9fff]", query):
            return "zh", 0.98, "script"
        # Arabic
        if re.search(r"[\u0600-\u06FF]", query):
            return "ar", 0.98, "script"
        # Cyrillic
        if re.search(r"[\u0400-\u04FF]", query):
            return "ru", 0.95, "script"
        # Greek
        if re.search(r"[\u0370-\u03FF]", query):
            return "el", 0.95, "script"
        # Latin fallback
        if re.search(r"[a-zA-Z]", query):
            return "en", 0.55, "model"
        return None, 0.0, "default"

    def _resolve_query_language(
        self,
        query: str,
        log_language: Optional[str],
        request_params: Any,
        index_languages: List[str],
        primary_language: str,
    ) -> Tuple[str, float, str, bool]:
        """Resolve lang with priority: log field > request_params > script/model."""
        langs_set = set(index_languages or [])
        primary = self._normalize_lang(primary_language) or "en"
        if primary not in langs_set and langs_set:
            primary = index_languages[0]

        log_lang = self._normalize_lang(log_language)
        req_lang = self._normalize_lang(self._parse_request_params_language(request_params))
        conflict = bool(log_lang and req_lang and log_lang != req_lang)

        if log_lang and (not langs_set or log_lang in langs_set):
            return log_lang, 1.0, "log_field", conflict

        if req_lang and (not langs_set or req_lang in langs_set):
            return req_lang, 1.0, "request_params", conflict

        detected_lang, conf, source = self._detect_script_language(query)
        if detected_lang and (not langs_set or detected_lang in langs_set):
            return detected_lang, conf, source, conflict

        return primary, 0.3, "default", conflict

    @staticmethod
    def _score_product_hit(source: Dict[str, Any]) -> float:
        sales = float(source.get("sales") or 0.0)
        inventory = float(source.get("total_inventory") or 0.0)
        return math.log1p(max(sales, 0.0)) * 1.2 + math.log1p(max(inventory, 0.0)) * 0.4

    @staticmethod
    def _compute_rank_score(c: SuggestionCandidate) -> float:
        return (
            1.8 * math.log1p(c.query_count_30d)
            + 1.2 * math.log1p(c.query_count_7d)
            + 1.0 * math.log1p(len(c.qanchor_spu_ids))
            + 0.6 * math.log1p(len(c.title_spu_ids))
        )

    def _scan_products(self, tenant_id: str, batch_size: int = 500) -> List[Dict[str, Any]]:
        """Scan all product docs from tenant index using search_after."""
        from indexer.mapping_generator import get_tenant_index_name

        index_name = get_tenant_index_name(tenant_id)
        all_hits: List[Dict[str, Any]] = []
        search_after: Optional[List[Any]] = None

        while True:
            body: Dict[str, Any] = {
                "size": batch_size,
                "_source": ["spu_id", "title", "qanchors", "sales", "total_inventory"],
                "sort": [{"spu_id": "asc"}],
                "query": {"match_all": {}},
            }
            if search_after is not None:
                body["search_after"] = search_after

            resp = self.es_client.client.search(index=index_name, body=body)
            hits = resp.get("hits", {}).get("hits", []) or []
            if not hits:
                break
            all_hits.extend(hits)
            search_after = hits[-1].get("sort")
            if len(hits) < batch_size:
                break
        return all_hits

    def _create_or_reset_index(self, tenant_id: str, index_languages: List[str], recreate: bool) -> str:
        index_name = get_suggestion_index_name(tenant_id)
        if recreate and self.es_client.index_exists(index_name):
            logger.info("Deleting existing suggestion index: %s", index_name)
            self.es_client.delete_index(index_name)
        if not self.es_client.index_exists(index_name):
            mapping = build_suggestion_mapping(index_languages=index_languages)
            ok = self.es_client.create_index(index_name, mapping)
            if not ok:
                raise RuntimeError(f"Failed to create suggestion index: {index_name}")
        return index_name

    def rebuild_tenant_index(
        self,
        tenant_id: str,
        days: int = 365,
        recreate: bool = True,
        batch_size: int = 500,
        min_query_len: int = 1,
    ) -> Dict[str, Any]:
        tenant_loader = get_tenant_config_loader()
        tenant_cfg = tenant_loader.get_tenant_config(tenant_id)
        index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"]
        primary_language: str = tenant_cfg.get("primary_language") or "en"

        index_name = self._create_or_reset_index(tenant_id, index_languages, recreate)
        key_to_candidate: Dict[Tuple[str, str], SuggestionCandidate] = {}

        # Step 1: product title/qanchors
        hits = self._scan_products(tenant_id, batch_size=batch_size)
        for hit in hits:
            src = hit.get("_source", {}) or {}
            spu_id = str(src.get("spu_id") or "")
            if not spu_id:
                continue
            title_obj = src.get("title") or {}
            qanchor_obj = src.get("qanchors") or {}
            product_score = self._score_product_hit(src)

            for lang in index_languages:
                title = ""
                if isinstance(title_obj, dict):
                    title = str(title_obj.get(lang) or "").strip()
                if title:
                    text_norm = self._normalize_text(title)
                    if not self._looks_noise(text_norm):
                        key = (lang, text_norm)
                        c = key_to_candidate.get(key)
                        if c is None:
                            c = SuggestionCandidate(text=title, text_norm=text_norm, lang=lang)
                            key_to_candidate[key] = c
                        c.add_product("title", spu_id=spu_id, score=product_score)

                q_raw = None
                if isinstance(qanchor_obj, dict):
                    q_raw = qanchor_obj.get(lang)
                for q_text in self._split_qanchors(q_raw):
                    text_norm = self._normalize_text(q_text)
                    if self._looks_noise(text_norm):
                        continue
                    key = (lang, text_norm)
                    c = key_to_candidate.get(key)
                    if c is None:
                        c = SuggestionCandidate(text=q_text, text_norm=text_norm, lang=lang)
                        key_to_candidate[key] = c
                    c.add_product("qanchor", spu_id=spu_id, score=product_score + 0.6)

        # Step 2: query logs
        now = datetime.now(timezone.utc)
        since_30d = now - timedelta(days=days)
        since_7d = now - timedelta(days=7)
        query_sql = text(
            """
            SELECT query, language, request_params, create_time
            FROM shoplazza_search_log
            WHERE tenant_id = :tenant_id
              AND deleted = 0
              AND query IS NOT NULL
              AND query <> ''
              AND create_time >= :since_30d
            """
        )
        with self.db_engine.connect() as conn:
            rows = conn.execute(query_sql, {"tenant_id": int(tenant_id), "since_30d": since_30d}).fetchall()

        for row in rows:
            q = str(row.query or "").strip()
            if len(q) < min_query_len:
                continue
            lang, conf, source, conflict = self._resolve_query_language(
                query=q,
                log_language=getattr(row, "language", None),
                request_params=getattr(row, "request_params", None),
                index_languages=index_languages,
                primary_language=primary_language,
            )
            text_norm = self._normalize_text(q)
            if self._looks_noise(text_norm):
                continue
            key = (lang, text_norm)
            c = key_to_candidate.get(key)
            if c is None:
                c = SuggestionCandidate(text=q, text_norm=text_norm, lang=lang)
                key_to_candidate[key] = c
            c.lang_confidence = max(c.lang_confidence, conf)
            c.lang_source = source if c.lang_source == "default" else c.lang_source
            c.lang_conflict = c.lang_conflict or conflict

            created_at = getattr(row, "create_time", None)
            if created_at is None:
                is_7d = False
            else:
                # DB datetime usually naive local time; compare conservatively
                if isinstance(created_at, datetime) and created_at.tzinfo is None:
                    created_at = created_at.replace(tzinfo=timezone.utc)
                is_7d = bool(created_at and created_at >= since_7d)
            c.add_query_log(is_7d=is_7d)

        # Step 3: build docs
        now_iso = datetime.now(timezone.utc).isoformat()
        docs: List[Dict[str, Any]] = []
        for (_, _), c in key_to_candidate.items():
            rank_score = self._compute_rank_score(c)
            # keep top 20 product ids by score
            top_spu_ids = [
                item[0]
                for item in sorted(c.top_spu_scores.items(), key=lambda kv: kv[1], reverse=True)[:20]
            ]

            completion_obj = {c.lang: {"input": [c.text], "weight": int(max(rank_score, 1.0) * 100)}}
            sat_obj = {c.lang: c.text}
            doc_id = f"{tenant_id}|{c.lang}|{c.text_norm}"
            docs.append(
                {
                    "_id": doc_id,
                    "tenant_id": str(tenant_id),
                    "lang": c.lang,
                    "text": c.text,
                    "text_norm": c.text_norm,
                    "sources": sorted(c.sources),
                    "title_doc_count": len(c.title_spu_ids),
                    "qanchor_doc_count": len(c.qanchor_spu_ids),
                    "query_count_7d": c.query_count_7d,
                    "query_count_30d": c.query_count_30d,
                    "rank_score": float(rank_score),
                    "lang_confidence": float(c.lang_confidence),
                    "lang_source": c.lang_source,
                    "lang_conflict": bool(c.lang_conflict),
                    "top_spu_ids": top_spu_ids,
                    "status": 1,
                    "updated_at": now_iso,
                    "completion": completion_obj,
                    "sat": sat_obj,
                }
            )

        if docs:
            result = self.es_client.bulk_index(index_name=index_name, docs=docs)
            self.es_client.refresh(index_name)
        else:
            result = {"success": 0, "failed": 0, "errors": []}

        return {
            "tenant_id": str(tenant_id),
            "index_name": index_name,
            "total_candidates": len(key_to_candidate),
            "indexed_docs": len(docs),
            "bulk_result": result,
        }