""" Search API routes. """ from fastapi import APIRouter, HTTPException, Query, Request from typing import Optional import uuid import hashlib import json import logging from ..models import ( SearchRequest, ImageSearchRequest, SearchResponse, SearchSuggestResponse, DocumentResponse, ErrorResponse ) from context.request_context import create_request_context, set_current_request_context, clear_current_request_context router = APIRouter(prefix="/search", tags=["search"]) backend_verbose_logger = logging.getLogger("backend.verbose") def _log_backend_verbose(payload: dict) -> None: if not backend_verbose_logger.handlers: return backend_verbose_logger.info( json.dumps(payload, ensure_ascii=False, separators=(",", ":")) ) def extract_request_info(request: Request) -> tuple[str, str]: """Extract request ID and user ID from HTTP request""" # Try to get request ID from headers reqid = request.headers.get('X-Request-ID') or str(uuid.uuid4())[:8] # Try to get user ID from headers; if not found, use "-1" for correlation uid = request.headers.get('X-User-ID') or request.headers.get('User-ID') or "-1" return reqid, uid @router.post("/", response_model=SearchResponse) async def search(request: SearchRequest, http_request: Request): """ Execute text search query (外部友好格式). Supports: - Multi-language query processing - Unified text retrieval strategy (no boolean AST parsing) - Semantic search with embeddings - Custom ranking functions - Exact match filters and range filters - Faceted search Requires tenant_id in header (X-Tenant-ID) or query parameter (tenant_id). """ reqid, uid = extract_request_info(http_request) # Extract tenant_id (required) tenant_id = http_request.headers.get('X-Tenant-ID') if not tenant_id: # Try to get from query string from urllib.parse import parse_qs query_string = http_request.url.query if query_string: params = parse_qs(query_string) tenant_id = params.get('tenant_id', [None])[0] if not tenant_id: raise HTTPException( status_code=400, detail="tenant_id is required. Provide it via header 'X-Tenant-ID' or query parameter 'tenant_id'" ) # Create request context context = create_request_context(reqid=reqid, uid=uid) # Set context in thread-local storage set_current_request_context(context) try: # Log request start (English logs, with key search parameters) client_ip = http_request.client.host if http_request.client else "unknown" user_agent = http_request.headers.get("User-Agent", "unknown")[:200] context.logger.info( "Received search request | " f"Tenant: {tenant_id} | " f"Query: {request.query} | " f"IP: {client_ip} | " f"User agent: {user_agent} | " f"size: {request.size} | from: {request.from_} | " f"sort_by: {request.sort_by} | sort_order: {request.sort_order} | " f"min_score: {request.min_score} | " f"language: {request.language} | " f"debug: {request.debug} | " f"enable_rerank: {request.enable_rerank} | " f"rerank_query_template: {request.rerank_query_template} | " f"rerank_doc_template: {request.rerank_doc_template} | " f"sku_filter_dimension: {request.sku_filter_dimension} | " f"filters: {request.filters} | " f"range_filters: {request.range_filters} | " f"facets: {request.facets}", extra={'reqid': context.reqid, 'uid': context.uid} ) # Get searcher from app state from api.app import get_searcher searcher = get_searcher() # Execute search with context (using backend defaults from config) result = searcher.search( query=request.query, tenant_id=tenant_id, size=request.size, from_=request.from_, filters=request.filters, range_filters=request.range_filters, facets=request.facets, min_score=request.min_score, context=context, sort_by=request.sort_by, sort_order=request.sort_order, debug=request.debug, language=request.language, sku_filter_dimension=request.sku_filter_dimension, enable_rerank=request.enable_rerank, rerank_query_template=request.rerank_query_template, rerank_doc_template=request.rerank_doc_template, ) # Include performance summary in response performance_summary = context.get_summary() if context else None # Convert to response model response = SearchResponse( results=result.results, total=result.total, max_score=result.max_score, took_ms=result.took_ms, facets=result.facets, query_info=result.query_info, suggestions=result.suggestions, related_searches=result.related_searches, performance_info=performance_summary, debug_info=result.debug_info ) response_payload = response.model_dump(mode="json") response_json = json.dumps(response_payload, ensure_ascii=False, separators=(",", ":")) response_digest = hashlib.sha256(response_json.encode("utf-8")).hexdigest()[:16] max_score = float(response.max_score or 0.0) context.logger.info( "Search response | Total results: %s | Max score: %.4f | Time: %sms | payload_size: %s chars | digest: %s", response.total, max_score, response.took_ms, len(response_json), response_digest, extra={'reqid': context.reqid, 'uid': context.uid} ) _log_backend_verbose({ "event": "search_response", "reqid": context.reqid, "uid": context.uid, "tenant_id": tenant_id, "total_results": response.total, "max_score": max_score, "took_ms": response.took_ms, "payload_size_chars": len(response_json), "sha256_16": response_digest, "response": response_payload, }) return response except Exception as e: # Log error in context if context: context.set_error(e) context.logger.error( f"Search request failed | error: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise HTTPException(status_code=500, detail=str(e)) finally: # Clear thread-local context clear_current_request_context() @router.post("/image", response_model=SearchResponse) async def search_by_image(request: ImageSearchRequest, http_request: Request): """ Search by image similarity (外部友好格式). Uses image embeddings to find visually similar products. Supports exact match filters and range filters. Requires tenant_id in header (X-Tenant-ID) or query parameter (tenant_id). """ reqid, uid = extract_request_info(http_request) # Extract tenant_id (required) tenant_id = http_request.headers.get('X-Tenant-ID') if not tenant_id: from urllib.parse import parse_qs query_string = http_request.url.query if query_string: params = parse_qs(query_string) tenant_id = params.get('tenant_id', [None])[0] if not tenant_id: raise HTTPException( status_code=400, detail="tenant_id is required. Provide it via header 'X-Tenant-ID' or query parameter 'tenant_id'" ) # Create request context context = create_request_context(reqid=reqid, uid=uid) # Set context in thread-local storage set_current_request_context(context) try: # Log request start for image search (English) client_ip = http_request.client.host if http_request.client else "unknown" context.logger.info( "Received image search request | " f"Tenant: {tenant_id} | " f"Image URL: {request.image_url} | " f"IP: {client_ip}", extra={'reqid': context.reqid, 'uid': context.uid} ) from api.app import get_searcher searcher = get_searcher() # Execute image search result = searcher.search_by_image( image_url=request.image_url, tenant_id=tenant_id, size=request.size, filters=request.filters, range_filters=request.range_filters ) # Include performance summary in response performance_summary = context.get_summary() if context else None response = SearchResponse( results=result.results, total=result.total, max_score=result.max_score, took_ms=result.took_ms, facets=result.facets, query_info=result.query_info, suggestions=result.suggestions, related_searches=result.related_searches, performance_info=performance_summary ) response_payload = response.model_dump(mode="json") response_json = json.dumps(response_payload, ensure_ascii=False, separators=(",", ":")) response_digest = hashlib.sha256(response_json.encode("utf-8")).hexdigest()[:16] max_score = float(response.max_score or 0.0) context.logger.info( "Image search response | Total results: %s | Max score: %.4f | Time: %sms | payload_size: %s chars | digest: %s", response.total, max_score, response.took_ms, len(response_json), response_digest, extra={'reqid': context.reqid, 'uid': context.uid} ) _log_backend_verbose({ "event": "image_search_response", "reqid": context.reqid, "uid": context.uid, "tenant_id": tenant_id, "total_results": response.total, "max_score": max_score, "took_ms": response.took_ms, "payload_size_chars": len(response_json), "sha256_16": response_digest, "response": response_payload, }) return response except ValueError as e: if context: context.set_error(e) context.logger.error( f"Image search request parameter error | error: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise HTTPException(status_code=400, detail=str(e)) except Exception as e: if context: context.set_error(e) context.logger.error( f"Image search request failed | error: {str(e)}", extra={'reqid': context.reqid, 'uid': context.uid} ) raise HTTPException(status_code=500, detail=str(e)) finally: # Clear thread-local context clear_current_request_context() @router.get("/suggestions", response_model=SearchSuggestResponse) async def search_suggestions( q: str = Query(..., min_length=1, description="搜索查询"), size: int = Query(10, ge=1, le=50, description="建议数量(1-50)"), language: str = Query("en", description="请求语言,如 zh/en/ar/ru"), debug: bool = Query(False, description="是否返回调试信息"), http_request: Request = None, ): """ 获取搜索建议(自动补全)。 获取搜索建议(自动补全,支持多语言)。 """ # Extract tenant_id (required) tenant_id = http_request.headers.get("X-Tenant-ID") if http_request else None if not tenant_id and http_request: from urllib.parse import parse_qs query_string = http_request.url.query if query_string: params = parse_qs(query_string) tenant_id = params.get("tenant_id", [None])[0] if not tenant_id: raise HTTPException( status_code=400, detail="tenant_id is required. Provide it via header 'X-Tenant-ID' or query parameter 'tenant_id'", ) try: from api.app import get_suggestion_service service = get_suggestion_service() result = service.search( tenant_id=tenant_id, query=q, language=language, size=size, ) response = SearchSuggestResponse( query=result["query"], language=result.get("language"), resolved_language=result.get("resolved_language"), suggestions=result["suggestions"], took_ms=result["took_ms"], ) if debug: # keep response_model stable; debug info stays inside suggestions payload for now return response return response except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/instant", response_model=SearchResponse) async def instant_search( q: str = Query(..., min_length=2, description="搜索查询"), size: int = Query(5, ge=1, le=20, description="结果数量"), ): """ 即时搜索(Instant Search)。 功能说明: - 边输入边搜索,无需点击搜索按钮 - 返回简化的搜索结果 注意:此功能暂未开放,当前明确返回 501。 """ # 明确暴露当前接口尚未完成实现,避免调用不完整逻辑导致隐式运行时错误。 raise HTTPException( status_code=501, detail=( "/search/instant is not implemented yet. " "Use POST /search/ for production traffic." ), ) @router.get("/{doc_id}", response_model=DocumentResponse) async def get_document(doc_id: str, http_request: Request): """ Get a single document by ID. Requires tenant_id in header (X-Tenant-ID) or query parameter (tenant_id). """ try: # Extract tenant_id (required) tenant_id = http_request.headers.get('X-Tenant-ID') if not tenant_id: # Try to get from query string from urllib.parse import parse_qs query_string = http_request.url.query if query_string: params = parse_qs(query_string) tenant_id = params.get('tenant_id', [None])[0] if not tenant_id: raise HTTPException( status_code=400, detail="tenant_id is required. Provide it via header 'X-Tenant-ID' or query parameter 'tenant_id'" ) from api.app import get_searcher searcher = get_searcher() doc = searcher.get_document(tenant_id=tenant_id, doc_id=doc_id) if doc is None: raise HTTPException(status_code=404, detail=f"Document {doc_id} not found for tenant {tenant_id}") return DocumentResponse(id=doc_id, source=doc) except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e))