""" Online suggestion query service. """ import logging import time from typing import Any, Dict, List, Optional from config.tenant_config_loader import get_tenant_config_loader from indexer.mapping_generator import get_tenant_index_name from suggestion.builder import get_suggestion_index_name from utils.es_client import ESClient logger = logging.getLogger(__name__) class SuggestionService: def __init__(self, es_client: ESClient): self.es_client = es_client def _resolve_language(self, tenant_id: str, language: str) -> str: cfg = get_tenant_config_loader().get_tenant_config(tenant_id) index_languages = cfg.get("index_languages") or ["en", "zh"] primary = cfg.get("primary_language") or "en" lang = (language or "").strip().lower().replace("-", "_") if lang in {"zh_tw", "pt_br"}: normalized = lang else: normalized = lang.split("_")[0] if lang else "" if normalized in index_languages: return normalized if primary in index_languages: return primary return index_languages[0] def _search_products_for_suggestion( self, tenant_id: str, text_value: str, lang: str, result_size: int, ) -> List[Dict[str, Any]]: index_name = get_tenant_index_name(tenant_id) title_field = f"title.{lang}" qanchor_field = f"qanchors.{lang}" body = { "size": result_size, "_source": ["spu_id", "title", "min_price", "image_url", "sales", "total_inventory"], "query": { "bool": { "should": [ {"match_phrase": {qanchor_field: {"query": text_value, "boost": 3.0}}}, {"match_phrase_prefix": {title_field: {"query": text_value, "boost": 2.0}}}, {"match": {title_field: {"query": text_value, "boost": 1.0}}}, ], "minimum_should_match": 1, } }, "sort": [{"_score": "desc"}, {"sales": "desc"}], } resp = self.es_client.search(index_name=index_name, body=body, size=result_size, from_=0) hits = resp.get("hits", {}).get("hits", []) or [] out: List[Dict[str, Any]] = [] for hit in hits: src = hit.get("_source", {}) or {} title_obj = src.get("title") or {} resolved_title = None if isinstance(title_obj, dict): resolved_title = title_obj.get(lang) or title_obj.get("en") or title_obj.get("zh") if not resolved_title: for v in title_obj.values(): if v: resolved_title = v break out.append( { "spu_id": src.get("spu_id"), "title": resolved_title, "price": src.get("min_price"), "image_url": src.get("image_url"), "score": hit.get("_score", 0.0), } ) return out def search( self, tenant_id: str, query: str, language: str, size: int = 10, with_results: bool = True, result_size: int = 3, ) -> Dict[str, Any]: start = time.time() resolved_lang = self._resolve_language(tenant_id, language) index_name = get_suggestion_index_name(tenant_id) sat_field = f"sat.{resolved_lang}" dsl = { "size": size, "query": { "function_score": { "query": { "bool": { "filter": [ {"term": {"lang": resolved_lang}}, {"term": {"status": 1}}, ], "should": [ { "multi_match": { "query": query, "type": "bool_prefix", "fields": [sat_field, f"{sat_field}._2gram", f"{sat_field}._3gram"], } } ], "minimum_should_match": 1, } }, "field_value_factor": { "field": "rank_score", "factor": 1.0, "modifier": "log1p", "missing": 0.0, }, "boost_mode": "sum", "score_mode": "sum", } }, "_source": [ "text", "lang", "rank_score", "sources", "top_spu_ids", "lang_source", "lang_confidence", "lang_conflict", ], } es_resp = self.es_client.search(index_name=index_name, body=dsl, size=size, from_=0) hits = es_resp.get("hits", {}).get("hits", []) or [] suggestions: List[Dict[str, Any]] = [] for hit in hits: src = hit.get("_source", {}) or {} item = { "text": src.get("text"), "lang": src.get("lang"), "score": hit.get("_score", 0.0), "rank_score": src.get("rank_score"), "sources": src.get("sources", []), "lang_source": src.get("lang_source"), "lang_confidence": src.get("lang_confidence"), "lang_conflict": src.get("lang_conflict", False), } if with_results: try: item["products"] = self._search_products_for_suggestion( tenant_id=tenant_id, text_value=str(src.get("text") or ""), lang=resolved_lang, result_size=result_size, ) except Exception as e: logger.warning("Failed to enrich suggestion products: %s", e) item["products"] = [] suggestions.append(item) took_ms = int((time.time() - start) * 1000) return { "query": query, "language": language, "resolved_language": resolved_lang, "suggestions": suggestions, "took_ms": took_ms, }