service.py 6.52 KB
"""
Online suggestion query service.
"""

import logging
import time
from typing import Any, Dict, List, Optional

from config.tenant_config_loader import get_tenant_config_loader
from indexer.mapping_generator import get_tenant_index_name
from suggestion.builder import get_suggestion_index_name
from utils.es_client import ESClient

logger = logging.getLogger(__name__)


class SuggestionService:
    def __init__(self, es_client: ESClient):
        self.es_client = es_client

    def _resolve_language(self, tenant_id: str, language: str) -> str:
        cfg = get_tenant_config_loader().get_tenant_config(tenant_id)
        index_languages = cfg.get("index_languages") or ["en", "zh"]
        primary = cfg.get("primary_language") or "en"
        lang = (language or "").strip().lower().replace("-", "_")
        if lang in {"zh_tw", "pt_br"}:
            normalized = lang
        else:
            normalized = lang.split("_")[0] if lang else ""
        if normalized in index_languages:
            return normalized
        if primary in index_languages:
            return primary
        return index_languages[0]

    def _search_products_for_suggestion(
        self,
        tenant_id: str,
        text_value: str,
        lang: str,
        result_size: int,
    ) -> List[Dict[str, Any]]:
        index_name = get_tenant_index_name(tenant_id)
        title_field = f"title.{lang}"
        qanchor_field = f"qanchors.{lang}"

        body = {
            "size": result_size,
            "_source": ["spu_id", "title", "min_price", "image_url", "sales", "total_inventory"],
            "query": {
                "bool": {
                    "should": [
                        {"match_phrase": {qanchor_field: {"query": text_value, "boost": 3.0}}},
                        {"match_phrase_prefix": {title_field: {"query": text_value, "boost": 2.0}}},
                        {"match": {title_field: {"query": text_value, "boost": 1.0}}},
                    ],
                    "minimum_should_match": 1,
                }
            },
            "sort": [{"_score": "desc"}, {"sales": "desc"}],
        }
        resp = self.es_client.search(index_name=index_name, body=body, size=result_size, from_=0)
        hits = resp.get("hits", {}).get("hits", []) or []
        out: List[Dict[str, Any]] = []
        for hit in hits:
            src = hit.get("_source", {}) or {}
            title_obj = src.get("title") or {}
            resolved_title = None
            if isinstance(title_obj, dict):
                resolved_title = title_obj.get(lang) or title_obj.get("en") or title_obj.get("zh")
                if not resolved_title:
                    for v in title_obj.values():
                        if v:
                            resolved_title = v
                            break
            out.append(
                {
                    "spu_id": src.get("spu_id"),
                    "title": resolved_title,
                    "price": src.get("min_price"),
                    "image_url": src.get("image_url"),
                    "score": hit.get("_score", 0.0),
                }
            )
        return out

    def search(
        self,
        tenant_id: str,
        query: str,
        language: str,
        size: int = 10,
        with_results: bool = True,
        result_size: int = 3,
    ) -> Dict[str, Any]:
        start = time.time()
        resolved_lang = self._resolve_language(tenant_id, language)
        index_name = get_suggestion_index_name(tenant_id)

        sat_field = f"sat.{resolved_lang}"
        dsl = {
            "size": size,
            "query": {
                "function_score": {
                    "query": {
                        "bool": {
                            "filter": [
                                {"term": {"lang": resolved_lang}},
                                {"term": {"status": 1}},
                            ],
                            "should": [
                                {
                                    "multi_match": {
                                        "query": query,
                                        "type": "bool_prefix",
                                        "fields": [sat_field, f"{sat_field}._2gram", f"{sat_field}._3gram"],
                                    }
                                }
                            ],
                            "minimum_should_match": 1,
                        }
                    },
                    "field_value_factor": {
                        "field": "rank_score",
                        "factor": 1.0,
                        "modifier": "log1p",
                        "missing": 0.0,
                    },
                    "boost_mode": "sum",
                    "score_mode": "sum",
                }
            },
            "_source": [
                "text",
                "lang",
                "rank_score",
                "sources",
                "top_spu_ids",
                "lang_source",
                "lang_confidence",
                "lang_conflict",
            ],
        }
        es_resp = self.es_client.search(index_name=index_name, body=dsl, size=size, from_=0)
        hits = es_resp.get("hits", {}).get("hits", []) or []

        suggestions: List[Dict[str, Any]] = []
        for hit in hits:
            src = hit.get("_source", {}) or {}
            item = {
                "text": src.get("text"),
                "lang": src.get("lang"),
                "score": hit.get("_score", 0.0),
                "rank_score": src.get("rank_score"),
                "sources": src.get("sources", []),
                "lang_source": src.get("lang_source"),
                "lang_confidence": src.get("lang_confidence"),
                "lang_conflict": src.get("lang_conflict", False),
            }
            if with_results:
                try:
                    item["products"] = self._search_products_for_suggestion(
                        tenant_id=tenant_id,
                        text_value=str(src.get("text") or ""),
                        lang=resolved_lang,
                        result_size=result_size,
                    )
                except Exception as e:
                    logger.warning("Failed to enrich suggestion products: %s", e)
                    item["products"] = []
            suggestions.append(item)

        took_ms = int((time.time() - start) * 1000)
        return {
            "query": query,
            "language": language,
            "resolved_language": resolved_lang,
            "suggestions": suggestions,
            "took_ms": took_ms,
        }