search.py 10.4 KB
"""
Search API routes.
"""

from fastapi import APIRouter, HTTPException, Query, Request
from typing import Optional
import uuid

from ..models import (
    SearchRequest,
    ImageSearchRequest,
    SearchResponse,
    SearchSuggestResponse,
    DocumentResponse,
    ErrorResponse
)
from context.request_context import create_request_context, set_current_request_context, clear_current_request_context

router = APIRouter(prefix="/search", tags=["search"])


def extract_request_info(request: Request) -> tuple[str, str]:
    """Extract request ID and user ID from HTTP request"""
    # Try to get request ID from headers
    reqid = request.headers.get('X-Request-ID') or str(uuid.uuid4())[:8]

    # Try to get user ID from headers or default to anonymous
    uid = request.headers.get('X-User-ID') or request.headers.get('User-ID') or 'anonymous'

    return reqid, uid


@router.post("/", response_model=SearchResponse)
async def search(request: SearchRequest, http_request: Request):
    """
    Execute text search query (外部友好格式).

    Supports:
    - Multi-language query processing
    - Boolean operators (AND, OR, RANK, ANDNOT)
    - Semantic search with embeddings
    - Custom ranking functions
    - Exact match filters and range filters
    - Faceted search
    
    Requires tenant_id in header (X-Tenant-ID) or query parameter (tenant_id).
    """
    reqid, uid = extract_request_info(http_request)

    # Extract tenant_id (required)
    tenant_id = http_request.headers.get('X-Tenant-ID')
    if not tenant_id:
        # Try to get from query string
        from urllib.parse import parse_qs
        query_string = http_request.url.query
        if query_string:
            params = parse_qs(query_string)
            tenant_id = params.get('tenant_id', [None])[0]
    
    if not tenant_id:
        raise HTTPException(
            status_code=400,
            detail="tenant_id is required. Provide it via header 'X-Tenant-ID' or query parameter 'tenant_id'"
        )

    # Create request context
    context = create_request_context(reqid=reqid, uid=uid)

    # Set context in thread-local storage
    set_current_request_context(context)

    try:
        # Log request start
        context.logger.info(
            f"收到搜索请求 | Tenant: {tenant_id} | IP: {http_request.client.host if http_request.client else 'unknown'} | "
            f"用户代理: {http_request.headers.get('User-Agent', 'unknown')[:100]}",
            extra={'reqid': context.reqid, 'uid': context.uid}
        )

        # Get searcher from app state
        from api.app import get_searcher
        searcher = get_searcher()

        # Execute search with context (using backend defaults from config)
        result = searcher.search(
            query=request.query,
            tenant_id=tenant_id,
            size=request.size,
            from_=request.from_,
            filters=request.filters,
            range_filters=request.range_filters,
            facets=request.facets,
            min_score=request.min_score,
            context=context,
            sort_by=request.sort_by,
            sort_order=request.sort_order,
            debug=request.debug,
            language=request.language,
            sku_filter_dimension=request.sku_filter_dimension,
        )

        # Include performance summary in response
        performance_summary = context.get_summary() if context else None

        # Convert to response model
        return SearchResponse(
            results=result.results,
            total=result.total,
            max_score=result.max_score,
            took_ms=result.took_ms,
            facets=result.facets,
            query_info=result.query_info,
            suggestions=result.suggestions,
            related_searches=result.related_searches,
            performance_info=performance_summary,
            debug_info=result.debug_info
        )

    except Exception as e:
        # Log error in context
        if context:
            context.set_error(e)
            context.logger.error(
                f"搜索请求失败 | 错误: {str(e)}",
                extra={'reqid': context.reqid, 'uid': context.uid}
            )
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        # Clear thread-local context
        clear_current_request_context()


@router.post("/image", response_model=SearchResponse)
async def search_by_image(request: ImageSearchRequest, http_request: Request):
    """
    Search by image similarity (外部友好格式).

    Uses image embeddings to find visually similar products.
    Supports exact match filters and range filters.
    
    Requires tenant_id in header (X-Tenant-ID) or query parameter (tenant_id).
    """
    reqid, uid = extract_request_info(http_request)

    # Extract tenant_id (required)
    tenant_id = http_request.headers.get('X-Tenant-ID')
    if not tenant_id:
        from urllib.parse import parse_qs
        query_string = http_request.url.query
        if query_string:
            params = parse_qs(query_string)
            tenant_id = params.get('tenant_id', [None])[0]
    
    if not tenant_id:
        raise HTTPException(
            status_code=400,
            detail="tenant_id is required. Provide it via header 'X-Tenant-ID' or query parameter 'tenant_id'"
        )

    # Create request context
    context = create_request_context(reqid=reqid, uid=uid)

    # Set context in thread-local storage
    set_current_request_context(context)

    try:
        # Log request start
        context.logger.info(
            f"收到图片搜索请求 | Tenant: {tenant_id} | 图片URL: {request.image_url} | "
            f"IP: {http_request.client.host if http_request.client else 'unknown'}",
            extra={'reqid': context.reqid, 'uid': context.uid}
        )

        from api.app import get_searcher
        searcher = get_searcher()

        # Execute image search
        result = searcher.search_by_image(
            image_url=request.image_url,
            tenant_id=tenant_id,
            size=request.size,
            filters=request.filters,
            range_filters=request.range_filters
        )

        # Include performance summary in response
        performance_summary = context.get_summary() if context else None

        return SearchResponse(
            results=result.results,
            total=result.total,
            max_score=result.max_score,
            took_ms=result.took_ms,
            facets=result.facets,
            query_info=result.query_info,
            suggestions=result.suggestions,
            related_searches=result.related_searches,
            performance_info=performance_summary
        )

    except ValueError as e:
        if context:
            context.set_error(e)
            context.logger.error(
                f"图片搜索请求参数错误 | 错误: {str(e)}",
                extra={'reqid': context.reqid, 'uid': context.uid}
            )
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        if context:
            context.set_error(e)
            context.logger.error(
                f"图片搜索请求失败 | 错误: {str(e)}",
                extra={'reqid': context.reqid, 'uid': context.uid}
            )
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        # Clear thread-local context
        clear_current_request_context()


@router.get("/suggestions", response_model=SearchSuggestResponse)
async def search_suggestions(
    q: str = Query(..., min_length=1, description="搜索查询"),
    size: int = Query(5, ge=1, le=20, description="建议数量"),
    types: str = Query("query", description="建议类型(逗号分隔): query, product, category, brand")
):
    """
    获取搜索建议(自动补全)。
    
    功能说明:
    - 查询建议(query):基于历史搜索和热门搜索
    - 商品建议(product):匹配的商品
    - 类目建议(category):匹配的类目
    - 品牌建议(brand):匹配的品牌
    
    注意:此功能暂未实现,仅返回框架响应。
    """
    import time
    start_time = time.time()
    
    # TODO: 实现搜索建议逻辑
    # 1. 从搜索历史中获取建议
    # 2. 从商品标题中匹配前缀
    # 3. 从类目、品牌中匹配
    
    # 临时返回空结果
    suggestions = []
    
    # 示例结构(暂不实现):
    # suggestions = [
    #     {
    #         "text": "芭比娃娃",
    #         "type": "query",
    #         "highlight": "<em>芭</em>比娃娃",
    #         "popularity": 850
    #     }
    # ]
    
    took_ms = int((time.time() - start_time) * 1000)
    
    return SearchSuggestResponse(
        query=q,
        suggestions=suggestions,
        took_ms=took_ms
    )


@router.get("/instant", response_model=SearchResponse)
async def instant_search(
    q: str = Query(..., min_length=2, description="搜索查询"),
    size: int = Query(5, ge=1, le=20, description="结果数量"),
    tenant_id: str = Query(..., description="租户ID")
):
    """
    即时搜索(Instant Search)。
    
    功能说明:
    - 边输入边搜索,无需点击搜索按钮
    - 返回简化的搜索结果
    
    注意:此功能暂未实现,调用标准搜索接口。
    TODO: 优化即时搜索性能
    - 添加防抖/节流
    - 实现结果缓存
    - 简化返回字段
    """
    from api.app import get_searcher
    searcher = get_searcher()
    
    result = searcher.search(
        query=q,
        tenant_id=tenant_id,
        size=size,
        from_=0,
        sku_filter_dimension=None  # Instant search doesn't support SKU filtering
    )
    
    return SearchResponse(
        results=result.results,
        total=result.total,
        max_score=result.max_score,
        took_ms=result.took_ms,
        facets=result.facets,
        query_info=result.query_info,
        suggestions=result.suggestions,
        related_searches=result.related_searches
    )


@router.get("/{doc_id}", response_model=DocumentResponse)
async def get_document(doc_id: str):
    """
    Get a single document by ID.
    """
    try:
        from api.app import get_searcher
        searcher = get_searcher()

        doc = searcher.get_document(doc_id)

        if doc is None:
            raise HTTPException(status_code=404, detail=f"Document {doc_id} not found")

        return DocumentResponse(id=doc_id, source=doc)

    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))