service.py 8.29 KB
"""
Online suggestion query service.
"""

import logging
import time
from typing import Any, Dict, List, Optional

from config.tenant_config_loader import get_tenant_config_loader
from suggestion.builder import (
    get_suggestion_alias_name,
    get_suggestion_index_name,
    get_suggestion_legacy_index_name,
)
from utils.es_client import ESClient

logger = logging.getLogger(__name__)


class SuggestionService:
    def __init__(self, es_client: ESClient):
        self.es_client = es_client

    def _resolve_language(self, tenant_id: str, language: str) -> str:
        cfg = get_tenant_config_loader().get_tenant_config(tenant_id)
        index_languages = cfg.get("index_languages") or ["en", "zh"]
        primary = cfg.get("primary_language") or "en"
        lang = (language or "").strip().lower().replace("-", "_")
        if lang in {"zh_tw", "pt_br"}:
            normalized = lang
        else:
            normalized = lang.split("_")[0] if lang else ""
        if normalized in index_languages:
            return normalized
        if primary in index_languages:
            return primary
        return index_languages[0]

    def _resolve_search_target(self, tenant_id: str) -> Optional[str]:
        alias_name = get_suggestion_alias_name(tenant_id)
        if self.es_client.alias_exists(alias_name):
            return alias_name

        # Fallback for pre-Phase2 deployments
        legacy = get_suggestion_legacy_index_name(tenant_id)
        if self.es_client.index_exists(legacy):
            return legacy

        # Last fallback: current naming helper
        candidate = get_suggestion_index_name(tenant_id)
        if self.es_client.index_exists(candidate):
            return candidate
        return None

    def _completion_suggest(
        self,
        index_name: str,
        query: str,
        lang: str,
        size: int,
        tenant_id: str,
    ) -> List[Dict[str, Any]]:
        """
        Query ES completion suggester from `completion.<lang>`.

        Returns items in the same shape as search hits -> dicts with "text"/"lang"/"score"/"rank_score"/"sources".
        """
        field_name = f"completion.{lang}"
        body = {
            "suggest": {
                "s": {
                    "prefix": query,
                    "completion": {
                        "field": field_name,
                        "size": size,
                        "skip_duplicates": True,
                    },
                }
            },
            "_source": [
                "text",
                "lang",
                "rank_score",
                "sources",
                "lang_source",
                "lang_confidence",
                "lang_conflict",
            ],
        }
        try:
            resp = self.es_client.client.search(index=index_name, body=body, routing=str(tenant_id))
        except Exception as e:
            # completion is an optimization path; never hard-fail the whole endpoint
            logger.warning("Completion suggest failed for index=%s field=%s: %s", index_name, field_name, e)
            return []

        entries = (resp.get("suggest", {}) or {}).get("s", []) or []
        if not entries:
            return []
        options = entries[0].get("options", []) or []
        out: List[Dict[str, Any]] = []
        for opt in options:
            src = opt.get("_source", {}) or {}
            out.append(
                {
                    "text": src.get("text") or opt.get("text"),
                    "lang": src.get("lang") or lang,
                    "score": opt.get("_score", 0.0),
                    "rank_score": src.get("rank_score"),
                    "sources": src.get("sources", []),
                    "lang_source": src.get("lang_source"),
                    "lang_confidence": src.get("lang_confidence"),
                    "lang_conflict": src.get("lang_conflict", False),
                }
            )
        return out

    def search(
        self,
        tenant_id: str,
        query: str,
        language: str,
        size: int = 10,
    ) -> Dict[str, Any]:
        start = time.time()
        resolved_lang = self._resolve_language(tenant_id, language)
        index_name = self._resolve_search_target(tenant_id)
        if not index_name:
            # On a fresh ES cluster the suggestion index might not be built yet.
            # Keep endpoint stable for frontend autocomplete: return empty list instead of 500.
            took_ms = int((time.time() - start) * 1000)
            return {
                "query": query,
                "language": language,
                "resolved_language": resolved_lang,
                "suggestions": [],
                "took_ms": took_ms,
            }

        sat_field = f"sat.{resolved_lang}"
        dsl = {
            "track_total_hits": False,
            "query": {
                "function_score": {
                    "query": {
                        "bool": {
                            "filter": [
                                {"term": {"lang": resolved_lang}},
                                {"term": {"status": 1}},
                            ],
                            "should": [
                                {
                                    "multi_match": {
                                        "query": query,
                                        "type": "bool_prefix",
                                        "fields": [sat_field, f"{sat_field}._2gram", f"{sat_field}._3gram"],
                                    }
                                }
                            ],
                            "minimum_should_match": 1,
                        }
                    },
                    "field_value_factor": {
                        "field": "rank_score",
                        "factor": 1.0,
                        "modifier": "log1p",
                        "missing": 0.0,
                    },
                    "boost_mode": "sum",
                    "score_mode": "sum",
                }
            },
            "_source": [
                "text",
                "lang",
                "rank_score",
                "sources",
                "lang_source",
                "lang_confidence",
                "lang_conflict",
            ],
        }
        # Recall path A: bool_prefix on search_as_you_type
        es_resp = self.es_client.search(
            index_name=index_name,
            body=dsl,
            size=size,
            from_=0,
            routing=str(tenant_id),
        )
        hits = es_resp.get("hits", {}).get("hits", []) or []

        # Recall path B: completion suggester (optional optimization)
        completion_items = self._completion_suggest(
            index_name=index_name,
            query=query,
            lang=resolved_lang,
            size=size,
            tenant_id=tenant_id,
        )

        suggestions: List[Dict[str, Any]] = []
        seen_text_norm: set = set()

        def _norm_text(v: Any) -> str:
            return str(v or "").strip().lower()

        # Put completion results first (usually better prefix UX), then fill with sat results.
        for item in completion_items:
            text_val = item.get("text")
            norm = _norm_text(text_val)
            if not norm or norm in seen_text_norm:
                continue
            seen_text_norm.add(norm)
            suggestions.append(dict(item))

        for hit in hits:
            src = hit.get("_source", {}) or {}
            text_val = src.get("text")
            norm = _norm_text(text_val)
            if not norm or norm in seen_text_norm:
                continue
            seen_text_norm.add(norm)
            item = {
                "text": text_val,
                "lang": src.get("lang"),
                "score": hit.get("_score", 0.0),
                "rank_score": src.get("rank_score"),
                "sources": src.get("sources", []),
                "lang_source": src.get("lang_source"),
                "lang_confidence": src.get("lang_confidence"),
                "lang_conflict": src.get("lang_conflict", False),
            }
            suggestions.append(item)

        took_ms = int((time.time() - start) * 1000)
        return {
            "query": query,
            "language": language,
            "resolved_language": resolved_lang,
            "suggestions": suggestions[:size],
            "took_ms": took_ms,
        }