From 1f6d15faa32291662f64d4e0e3f50269b8e104e5 Mon Sep 17 00:00:00 2001 From: tangwang Date: Thu, 13 Nov 2025 11:41:31 +0800 Subject: [PATCH] 重构:SPU级别索引、统一索引架构和API响应格式优化 --- api/models.py | 38 ++++++++++++++++++++++++++++++++++---- api/result_formatter.py | 177 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ api/routes/search.py | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--------- config/config_loader.py | 47 +++++++++++++++++++++-------------------------- config/field_types.py | 34 ++++++++++++++++++++++++++++------ config/schema/base/config.yaml | 271 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ docs/BASE_CONFIG_GUIDE.md | 257 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ indexer/spu_transformer.py | 288 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ scripts/generate_test_data.py | 325 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ scripts/import_test_data.py | 132 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ scripts/ingest_shoplazza.py | 148 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ scripts/test_base.py | 242 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ search/searcher.py | 145 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------------------------------ 设计文档.md | 282 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------------------------------------------------------------------- 14 files changed, 2216 insertions(+), 233 deletions(-) create mode 100644 api/result_formatter.py create mode 100644 docs/BASE_CONFIG_GUIDE.md create mode 100644 indexer/spu_transformer.py create mode 100644 scripts/generate_test_data.py create mode 100644 scripts/import_test_data.py create mode 100644 scripts/ingest_shoplazza.py create mode 100644 scripts/test_base.py diff --git a/api/models.py b/api/models.py index fb8c3ba..2e6a3ca 100644 --- a/api/models.py +++ b/api/models.py @@ -172,11 +172,40 @@ class FacetResult(BaseModel): total_count: Optional[int] = Field(None, description="该字段的总文档数") +class VariantResult(BaseModel): + """商品变体结果""" + variant_id: str = Field(..., description="变体ID") + title: Optional[str] = Field(None, description="变体标题") + price: Optional[float] = Field(None, description="价格") + compare_at_price: Optional[float] = Field(None, description="原价") + sku: Optional[str] = Field(None, description="SKU编码") + stock: int = Field(0, description="库存数量") + options: Optional[Dict[str, Any]] = Field(None, description="选项(颜色、尺寸等)") + + +class ProductResult(BaseModel): + """商品搜索结果""" + product_id: str = Field(..., description="商品ID") + title: Optional[str] = Field(None, description="商品标题") + handle: Optional[str] = Field(None, description="商品handle") + description: Optional[str] = Field(None, description="商品描述") + vendor: Optional[str] = Field(None, description="供应商/品牌") + product_type: Optional[str] = Field(None, description="商品类型") + tags: Optional[str] = Field(None, description="标签") + price: Optional[float] = Field(None, description="价格(min_price)") + compare_at_price: Optional[float] = Field(None, description="原价") + currency: str = Field("USD", description="货币单位") + image_url: Optional[str] = Field(None, description="主图URL") + in_stock: bool = Field(True, description="是否有库存") + variants: List[VariantResult] = Field(default_factory=list, description="变体列表") + relevance_score: float = Field(..., ge=0.0, le=1.0, description="相关性分数(0-1)") + + class SearchResponse(BaseModel): - """搜索响应模型(重构版)""" + """搜索响应模型(外部友好格式)""" # 核心结果 - hits: List[Dict[str, Any]] = Field(..., description="搜索结果列表") + results: List[ProductResult] = Field(..., description="搜索结果列表") total: int = Field(..., description="匹配的总文档数") max_score: float = Field(..., description="最高相关性分数") @@ -192,8 +221,9 @@ class SearchResponse(BaseModel): description="查询处理信息(原始查询、改写、语言检测、翻译等)" ) - # 推荐与建议(预留) - related_queries: Optional[List[str]] = Field(None, description="相关搜索查询") + # 推荐与建议 + suggestions: List[str] = Field(default_factory=list, description="搜索建议") + related_searches: List[str] = Field(default_factory=list, description="相关搜索") # 性能指标 took_ms: int = Field(..., description="搜索总耗时(毫秒)") diff --git a/api/result_formatter.py b/api/result_formatter.py new file mode 100644 index 0000000..b17a6ab --- /dev/null +++ b/api/result_formatter.py @@ -0,0 +1,177 @@ +""" +Result formatter for converting ES internal format to external-friendly format. +""" + +from typing import List, Dict, Any, Optional +from .models import ProductResult, VariantResult, FacetResult, FacetValue + + +class ResultFormatter: + """Formats ES search results to external-friendly format.""" + + @staticmethod + def format_search_results( + es_hits: List[Dict[str, Any]], + max_score: float = 1.0 + ) -> List[ProductResult]: + """ + Convert ES hits to ProductResult list. + + Args: + es_hits: List of ES hit dictionaries (with _id, _score, _source) + max_score: Maximum score for normalization + + Returns: + List of ProductResult objects + """ + results = [] + + for hit in es_hits: + source = hit.get('_source', {}) + score = hit.get('_score', 0.0) + + # Normalize relevance score to 0-1 + if max_score > 0: + relevance_score = min(score / max_score, 1.0) + else: + relevance_score = 0.0 + + # Extract variants + variants = [] + variants_data = source.get('variants', []) + if isinstance(variants_data, list): + for variant_data in variants_data: + variant = VariantResult( + variant_id=str(variant_data.get('variant_id', '')), + title=variant_data.get('title'), + price=variant_data.get('price'), + compare_at_price=variant_data.get('compare_at_price'), + sku=variant_data.get('sku'), + stock=variant_data.get('stock', 0), + options=variant_data.get('options') + ) + variants.append(variant) + + # Determine in_stock (any variant has stock > 0) + in_stock = any(v.stock > 0 for v in variants) if variants else True + + # Build ProductResult + product = ProductResult( + product_id=str(source.get('product_id', '')), + title=source.get('title'), + handle=source.get('handle'), + description=source.get('description'), + vendor=source.get('vendor'), + product_type=source.get('product_type'), + tags=source.get('tags'), + price=source.get('min_price'), + compare_at_price=source.get('compare_at_price'), + currency="USD", # Default currency + image_url=source.get('image_url'), + in_stock=in_stock, + variants=variants, + relevance_score=relevance_score + ) + + results.append(product) + + return results + + @staticmethod + def format_facets( + es_aggregations: Dict[str, Any], + facet_configs: Optional[List[Any]] = None + ) -> List[FacetResult]: + """ + Format ES aggregations to FacetResult list. + + Args: + es_aggregations: ES aggregations response + facet_configs: Facet configurations (optional) + + Returns: + List of FacetResult objects + """ + facets = [] + + for field_name, agg_data in es_aggregations.items(): + # Handle terms aggregation + if 'buckets' in agg_data: + values = [] + for bucket in agg_data['buckets']: + value = FacetValue( + value=bucket['key'], + label=bucket.get('key_as_string', str(bucket['key'])), + count=bucket['doc_count'], + selected=False + ) + values.append(value) + + facet = FacetResult( + field=field_name, + label=field_name, # Can be enhanced with field labels + type="terms", + values=values, + total_count=agg_data.get('sum_other_doc_count', 0) + len(values) + ) + facets.append(facet) + + # Handle range aggregation + elif 'buckets' in agg_data and any('from' in b or 'to' in b for b in agg_data['buckets']): + values = [] + for bucket in agg_data['buckets']: + range_key = bucket.get('key', '') + value = FacetValue( + value=range_key, + label=range_key, + count=bucket['doc_count'], + selected=False + ) + values.append(value) + + facet = FacetResult( + field=field_name, + label=field_name, + type="range", + values=values + ) + facets.append(facet) + + return facets + + @staticmethod + def generate_suggestions( + query: str, + results: List[ProductResult] + ) -> List[str]: + """ + Generate search suggestions. + + Args: + query: Original search query + results: Search results + + Returns: + List of suggestion strings (currently returns empty list) + """ + # TODO: Implement suggestion generation logic + return [] + + @staticmethod + def generate_related_searches( + query: str, + results: List[ProductResult] + ) -> List[str]: + """ + Generate related searches. + + Args: + query: Original search query + results: Search results + + Returns: + List of related search strings (currently returns empty list) + """ + # TODO: Implement related search generation logic + return [] + diff --git a/api/routes/search.py b/api/routes/search.py index 94f5b50..cc53968 100644 --- a/api/routes/search.py +++ b/api/routes/search.py @@ -33,7 +33,7 @@ def extract_request_info(request: Request) -> tuple[str, str]: @router.post("/", response_model=SearchResponse) async def search(request: SearchRequest, http_request: Request): """ - Execute text search query (重构版). + Execute text search query (外部友好格式). Supports: - Multi-language query processing @@ -42,9 +42,27 @@ async def search(request: SearchRequest, http_request: Request): - Custom ranking functions - Exact match filters and range filters - Faceted search + + Requires tenant_id in header (X-Tenant-ID) or query parameter (tenant_id). """ reqid, uid = extract_request_info(http_request) + # Extract tenant_id (required) + tenant_id = http_request.headers.get('X-Tenant-ID') + if not tenant_id: + # Try to get from query string + from urllib.parse import parse_qs + query_string = http_request.url.query + if query_string: + params = parse_qs(query_string) + tenant_id = params.get('tenant_id', [None])[0] + + if not tenant_id: + raise HTTPException( + status_code=400, + detail="tenant_id is required. Provide it via header 'X-Tenant-ID' or query parameter 'tenant_id'" + ) + # Create request context context = create_request_context(reqid=reqid, uid=uid) @@ -54,7 +72,7 @@ async def search(request: SearchRequest, http_request: Request): try: # Log request start context.logger.info( - f"收到搜索请求 | IP: {http_request.client.host if http_request.client else 'unknown'} | " + f"收到搜索请求 | Tenant: {tenant_id} | IP: {http_request.client.host if http_request.client else 'unknown'} | " f"用户代理: {http_request.headers.get('User-Agent', 'unknown')[:100]}", extra={'reqid': context.reqid, 'uid': context.uid} ) @@ -66,6 +84,7 @@ async def search(request: SearchRequest, http_request: Request): # Execute search with context (using backend defaults from config) result = searcher.search( query=request.query, + tenant_id=tenant_id, size=request.size, from_=request.from_, filters=request.filters, @@ -83,12 +102,14 @@ async def search(request: SearchRequest, http_request: Request): # Convert to response model return SearchResponse( - hits=result.hits, + results=result.results, total=result.total, max_score=result.max_score, took_ms=result.took_ms, facets=result.facets, query_info=result.query_info, + suggestions=result.suggestions, + related_searches=result.related_searches, performance_info=performance_summary, debug_info=result.debug_info ) @@ -110,13 +131,30 @@ async def search(request: SearchRequest, http_request: Request): @router.post("/image", response_model=SearchResponse) async def search_by_image(request: ImageSearchRequest, http_request: Request): """ - Search by image similarity (重构版). + Search by image similarity (外部友好格式). Uses image embeddings to find visually similar products. Supports exact match filters and range filters. + + Requires tenant_id in header (X-Tenant-ID) or query parameter (tenant_id). """ reqid, uid = extract_request_info(http_request) + # Extract tenant_id (required) + tenant_id = http_request.headers.get('X-Tenant-ID') + if not tenant_id: + from urllib.parse import parse_qs + query_string = http_request.url.query + if query_string: + params = parse_qs(query_string) + tenant_id = params.get('tenant_id', [None])[0] + + if not tenant_id: + raise HTTPException( + status_code=400, + detail="tenant_id is required. Provide it via header 'X-Tenant-ID' or query parameter 'tenant_id'" + ) + # Create request context context = create_request_context(reqid=reqid, uid=uid) @@ -126,7 +164,7 @@ async def search_by_image(request: ImageSearchRequest, http_request: Request): try: # Log request start context.logger.info( - f"收到图片搜索请求 | 图片URL: {request.image_url} | " + f"收到图片搜索请求 | Tenant: {tenant_id} | 图片URL: {request.image_url} | " f"IP: {http_request.client.host if http_request.client else 'unknown'}", extra={'reqid': context.reqid, 'uid': context.uid} ) @@ -137,6 +175,7 @@ async def search_by_image(request: ImageSearchRequest, http_request: Request): # Execute image search result = searcher.search_by_image( image_url=request.image_url, + tenant_id=tenant_id, size=request.size, filters=request.filters, range_filters=request.range_filters @@ -146,12 +185,14 @@ async def search_by_image(request: ImageSearchRequest, http_request: Request): performance_summary = context.get_summary() if context else None return SearchResponse( - hits=result.hits, + results=result.results, total=result.total, max_score=result.max_score, took_ms=result.took_ms, facets=result.facets, query_info=result.query_info, + suggestions=result.suggestions, + related_searches=result.related_searches, performance_info=performance_summary ) @@ -226,7 +267,8 @@ async def search_suggestions( @router.get("/instant", response_model=SearchResponse) async def instant_search( q: str = Query(..., min_length=2, description="搜索查询"), - size: int = Query(5, ge=1, le=20, description="结果数量") + size: int = Query(5, ge=1, le=20, description="结果数量"), + tenant_id: str = Query(..., description="租户ID") ): """ 即时搜索(Instant Search)。 @@ -246,17 +288,20 @@ async def instant_search( result = searcher.search( query=q, + tenant_id=tenant_id, size=size, from_=0 ) return SearchResponse( - hits=result.hits, + results=result.results, total=result.total, max_score=result.max_score, took_ms=result.took_ms, facets=result.facets, - query_info=result.query_info + query_info=result.query_info, + suggestions=result.suggestions, + related_searches=result.related_searches ) diff --git a/config/config_loader.py b/config/config_loader.py index dd230ef..9b2eb92 100644 --- a/config/config_loader.py +++ b/config/config_loader.py @@ -90,9 +90,6 @@ class CustomerConfig: customer_id: str customer_name: str - # Database settings - mysql_config: Dict[str, Any] - # Field definitions fields: List[FieldConfig] @@ -116,10 +113,6 @@ class CustomerConfig: # ES index settings es_index_name: str - - # Optional fields with defaults - main_table: str = "shoplazza_product_sku" - extension_table: Optional[str] = None es_settings: Dict[str, Any] = field(default_factory=dict) @@ -272,9 +265,6 @@ class ConfigLoader: return CustomerConfig( customer_id=customer_id, customer_name=config_data.get("customer_name", customer_id), - mysql_config=config_data.get("mysql_config", {}), - main_table=config_data.get("main_table", "shoplazza_product_sku"), - extension_table=config_data.get("extension_table"), fields=fields, indexes=indexes, query_config=query_config, @@ -310,8 +300,6 @@ class ConfigLoader: return FieldConfig( name=name, field_type=field_type, - source_table=field_data.get("source_table"), - source_column=field_data.get("source_column", name), analyzer=analyzer, search_analyzer=search_analyzer, required=field_data.get("required", False), @@ -426,15 +414,17 @@ class ConfigLoader: if field.embedding_similarity not in ["dot_product", "cosine", "l2_norm"]: errors.append(f"Field '{field.name}': invalid embedding_similarity") - # Validate MySQL config - if "host" not in config.mysql_config: - errors.append("MySQL configuration missing 'host'") - if "username" not in config.mysql_config: - errors.append("MySQL configuration missing 'username'") - if "password" not in config.mysql_config: - errors.append("MySQL configuration missing 'password'") - if "database" not in config.mysql_config: - errors.append("MySQL configuration missing 'database'") + # Validate tenant_id field (required) + tenant_id_field = None + for field in config.fields: + if field.name == "tenant_id": + tenant_id_field = field + break + + if not tenant_id_field: + errors.append("Required field 'tenant_id' not found in fields") + elif not tenant_id_field.required: + errors.append("Field 'tenant_id' must be marked as required") return errors @@ -457,9 +447,6 @@ class ConfigLoader: # Convert config back to dictionary format config_dict = { "customer_name": config.customer_name, - "mysql_config": config.mysql_config, - "main_table": config.main_table, - "extension_table": config.extension_table, "es_index_name": config.es_index_name, "es_settings": config.es_settings, "fields": [self._field_to_dict(field) for field in config.fields], @@ -478,6 +465,16 @@ class ConfigLoader: "expression": config.ranking.expression, "description": config.ranking.description }, + "function_score": { + "score_mode": config.function_score.score_mode, + "boost_mode": config.function_score.boost_mode, + "functions": config.function_score.functions + }, + "rerank": { + "enabled": config.rerank.enabled, + "expression": config.rerank.expression, + "description": config.rerank.description + }, "spu_config": { "enabled": config.spu_config.enabled, "spu_field": config.spu_config.spu_field, @@ -512,8 +509,6 @@ class ConfigLoader: result = { "name": field.name, "type": field.field_type.value, - "source_table": field.source_table, - "source_column": field.source_column, "required": field.required, "boost": field.boost, "store": field.store, diff --git a/config/field_types.py b/config/field_types.py index 9ec46fe..8275f19 100644 --- a/config/field_types.py +++ b/config/field_types.py @@ -54,8 +54,6 @@ class FieldConfig: """Configuration for a single field.""" name: str field_type: FieldType - source_table: Optional[str] = None # 'main' or 'extension' or specific table name - source_column: Optional[str] = None analyzer: Optional[AnalyzerType] = None search_analyzer: Optional[AnalyzerType] = None required: bool = False @@ -172,10 +170,34 @@ def get_es_mapping_for_field(field_config: FieldConfig) -> Dict[str, Any]: } elif field_config.field_type == FieldType.JSON: - mapping = { - "type": "object", - "enabled": True - } + if field_config.nested and field_config.nested_properties: + # Nested type with properties (e.g., variants) + mapping = { + "type": "nested", + "properties": {} + } + # Generate mappings for nested properties + for prop_name, prop_config in field_config.nested_properties.items(): + prop_type = prop_config.get("type", "keyword") + prop_mapping = {"type": prop_type} + + # Add analyzer for text fields + if prop_type == "text" and "analyzer" in prop_config: + prop_mapping["analyzer"] = prop_config["analyzer"] + + # Add other properties + if "index" in prop_config: + prop_mapping["index"] = prop_config["index"] + if "store" in prop_config: + prop_mapping["store"] = prop_config["store"] + + mapping["properties"][prop_name] = prop_mapping + else: + # Simple object type + mapping = { + "type": "object", + "enabled": True + } return mapping diff --git a/config/schema/base/config.yaml b/config/schema/base/config.yaml index e69de29..5fc3013 100644 --- a/config/schema/base/config.yaml +++ b/config/schema/base/config.yaml @@ -0,0 +1,271 @@ +# Base Configuration for Shoplazza +# 店匠通用配置文件,所有使用店匠表的客户共用 +# 注意:此配置不包含MySQL相关配置,只包含ES搜索相关配置 + +customer_name: "Shoplazza Base Configuration" + +# Elasticsearch Index +es_index_name: "search_products" + +# ES Index Settings +es_settings: + number_of_shards: 1 + number_of_replicas: 0 + refresh_interval: "30s" + +# Field Definitions (SPU级别,只包含对搜索有帮助的字段) +fields: + # 租户隔离字段(必需) + - name: "tenant_id" + type: "KEYWORD" + required: true + index: true + store: true + + # 商品标识字段 + - name: "product_id" + type: "KEYWORD" + required: true + index: true + store: true + + - name: "handle" + type: "KEYWORD" + index: true + store: true + + # 文本搜索字段 + - name: "title" + type: "TEXT" + analyzer: "chinese_ecommerce" + boost: 3.0 + index: true + store: true + + - name: "brief" + type: "TEXT" + analyzer: "chinese_ecommerce" + boost: 1.5 + index: true + store: true + + - name: "description" + type: "TEXT" + analyzer: "chinese_ecommerce" + boost: 1.0 + index: true + store: true + + # SEO字段(提升相关性) + - name: "seo_title" + type: "TEXT" + analyzer: "chinese_ecommerce" + boost: 2.0 + index: true + store: true + + - name: "seo_description" + type: "TEXT" + analyzer: "chinese_ecommerce" + boost: 1.5 + index: true + store: true + + - name: "seo_keywords" + type: "TEXT" + analyzer: "chinese_ecommerce" + boost: 2.0 + index: true + store: true + + # 分类和标签字段(TEXT + KEYWORD双重索引) + - name: "vendor" + type: "TEXT" + analyzer: "chinese_ecommerce" + boost: 1.5 + index: true + store: true + + - name: "vendor_keyword" + type: "KEYWORD" + index: true + store: false + + - name: "product_type" + type: "TEXT" + analyzer: "chinese_ecommerce" + boost: 1.5 + index: true + store: true + + - name: "product_type_keyword" + type: "KEYWORD" + index: true + store: false + + - name: "tags" + type: "TEXT" + analyzer: "chinese_ecommerce" + boost: 1.0 + index: true + store: true + + - name: "tags_keyword" + type: "KEYWORD" + index: true + store: false + + - name: "category" + type: "TEXT" + analyzer: "chinese_ecommerce" + boost: 1.5 + index: true + store: true + + - name: "category_keyword" + type: "KEYWORD" + index: true + store: false + + # 价格字段(扁平化) + - name: "min_price" + type: "FLOAT" + index: true + store: true + + - name: "max_price" + type: "FLOAT" + index: true + store: true + + - name: "compare_at_price" + type: "FLOAT" + index: true + store: true + + # 图片字段(用于显示,不参与搜索) + - name: "image_url" + type: "KEYWORD" + index: false + store: true + + # 嵌套variants字段 + - name: "variants" + type: "JSON" + nested: true + nested_properties: + variant_id: + type: "keyword" + index: true + store: true + title: + type: "text" + analyzer: "chinese_ecommerce" + index: true + store: true + price: + type: "float" + index: true + store: true + compare_at_price: + type: "float" + index: true + store: true + sku: + type: "keyword" + index: true + store: true + stock: + type: "long" + index: true + store: true + options: + type: "object" + enabled: true + +# Index Structure (Query Domains) +indexes: + - name: "default" + label: "默认索引" + fields: + - "title" + - "brief" + - "description" + - "seo_title" + - "seo_description" + - "seo_keywords" + - "vendor" + - "product_type" + - "tags" + - "category" + analyzer: "chinese_ecommerce" + boost: 1.0 + + - name: "title" + label: "标题索引" + fields: + - "title" + - "seo_title" + analyzer: "chinese_ecommerce" + boost: 2.0 + + - name: "vendor" + label: "品牌索引" + fields: + - "vendor" + analyzer: "chinese_ecommerce" + boost: 1.5 + + - name: "category" + label: "类目索引" + fields: + - "category" + analyzer: "chinese_ecommerce" + boost: 1.5 + + - name: "tags" + label: "标签索引" + fields: + - "tags" + - "seo_keywords" + analyzer: "chinese_ecommerce" + boost: 1.0 + +# Query Configuration +query_config: + supported_languages: + - "zh" + - "en" + default_language: "zh" + enable_translation: true + enable_text_embedding: true + enable_query_rewrite: true + + # Translation API (DeepL) + translation_service: "deepl" + translation_api_key: null # Set via environment variable + +# Ranking Configuration +ranking: + expression: "bm25() + 0.2*text_embedding_relevance()" + description: "BM25 text relevance combined with semantic embedding similarity" + +# Function Score配置(ES层打分规则) +function_score: + score_mode: "sum" + boost_mode: "multiply" + + functions: [] + +# Rerank配置(本地重排,当前禁用) +rerank: + enabled: false + expression: "" + description: "Local reranking (disabled, use ES function_score instead)" + +# SPU配置(已启用,使用嵌套variants) +spu_config: + enabled: true + spu_field: "product_id" + inner_hits_size: 10 + diff --git a/docs/BASE_CONFIG_GUIDE.md b/docs/BASE_CONFIG_GUIDE.md new file mode 100644 index 0000000..c30bdf8 --- /dev/null +++ b/docs/BASE_CONFIG_GUIDE.md @@ -0,0 +1,257 @@ +# Base Configuration Guide + +店匠通用配置(Base Configuration)使用指南 + +## 概述 + +Base配置是店匠(Shoplazza)通用配置,适用于所有使用店匠标准表的客户。该配置采用SPU级别的索引结构,所有客户共享同一个Elasticsearch索引(`search_products`),通过`tenant_id`字段实现数据隔离。 + +## 核心特性 + +- **SPU级别索引**:每个ES文档代表一个SPU,包含嵌套的variants数组 +- **统一索引**:所有客户共享`search_products`索引 +- **租户隔离**:通过`tenant_id`字段实现数据隔离 +- **配置简化**:配置只包含ES搜索相关配置,不包含MySQL数据源配置 +- **外部友好格式**:API返回格式不包含ES内部字段(`_id`, `_score`, `_source`) + +## 配置说明 + +### 配置文件位置 + +`config/schema/base/config.yaml` + +### 配置内容 + +Base配置**不包含**以下内容: +- `mysql_config` - MySQL数据库配置 +- `main_table` - 主表配置 +- `extension_table` - 扩展表配置 +- `source_table` / `source_column` - 字段数据源映射 + +Base配置**只包含**: +- ES字段定义(字段类型、分析器、boost等) +- 查询域(indexes)配置 +- 查询处理配置(query_config) +- 排序和打分配置(function_score) +- SPU配置(spu_config) + +### 必需字段 + +- `tenant_id` (KEYWORD, required) - 租户隔离字段 + +### 主要字段 + +- `product_id` - 商品ID +- `title`, `brief`, `description` - 文本搜索字段 +- `seo_title`, `seo_description`, `seo_keywords` - SEO字段 +- `vendor`, `product_type`, `tags`, `category` - 分类和标签字段 +- `min_price`, `max_price`, `compare_at_price` - 价格字段 +- `variants` (nested) - 嵌套变体数组 + +## 数据导入流程 + +### 1. 生成测试数据 + +```bash +python scripts/generate_test_data.py \ + --num-spus 100 \ + --tenant-id "1" \ + --start-spu-id 1 \ + --start-sku-id 1 \ + --output test_data.sql +``` + +### 2. 导入测试数据到MySQL + +```bash +python scripts/import_test_data.py \ + --db-host localhost \ + --db-port 3306 \ + --db-database saas \ + --db-username root \ + --db-password password \ + --sql-file test_data.sql \ + --tenant-id "1" +``` + +### 3. 导入数据到Elasticsearch + +```bash +python scripts/ingest_shoplazza.py \ + --db-host localhost \ + --db-port 3306 \ + --db-database saas \ + --db-username root \ + --db-password password \ + --tenant-id "1" \ + --config base \ + --es-host http://localhost:9200 \ + --recreate \ + --batch-size 500 +``` + +## API使用 + +### 搜索接口 + +**端点**: `POST /search/` + +**请求头**: +``` +X-Tenant-ID: 1 +Content-Type: application/json +``` + +**请求体**: +```json +{ + "query": "耳机", + "size": 10, + "from": 0, + "filters": { + "category_keyword": "电子产品" + }, + "facets": ["category_keyword", "vendor_keyword"] +} +``` + +**响应格式**: +```json +{ + "results": [ + { + "product_id": "1", + "title": "蓝牙耳机 Sony", + "handle": "product-1", + "description": "高品质无线蓝牙耳机", + "vendor": "Sony", + "product_type": "电子产品", + "price": 199.99, + "compare_at_price": 299.99, + "currency": "USD", + "image_url": "//cdn.example.com/products/1.jpg", + "in_stock": true, + "variants": [ + { + "variant_id": "1", + "title": "黑色", + "price": 199.99, + "compare_at_price": 299.99, + "sku": "SKU-1-1", + "stock": 50, + "options": { + "option1": "黑色" + } + } + ], + "relevance_score": 0.95 + } + ], + "total": 10, + "max_score": 1.0, + "facets": [ + { + "field": "category_keyword", + "label": "category_keyword", + "type": "terms", + "values": [ + { + "value": "电子产品", + "label": "电子产品", + "count": 5, + "selected": false + } + ] + } + ], + "suggestions": [], + "related_searches": [], + "took_ms": 15, + "query_info": {} +} +``` + +### 响应格式说明 + +#### 主要变化 + +1. **`results`替代`hits`**:返回字段从`hits`改为`results` +2. **结构化结果**:每个结果包含`product_id`, `title`, `variants`, `relevance_score`等字段 +3. **无ES内部字段**:不包含`_id`, `_score`, `_source`等ES内部字段 +4. **嵌套variants**:每个商品包含variants数组,每个variant包含完整的变体信息 +5. **相关性分数**:`relevance_score`是0-1之间的归一化分数 + +#### ProductResult字段 + +- `product_id` - 商品ID +- `title` - 商品标题 +- `handle` - 商品handle +- `description` - 商品描述 +- `vendor` - 供应商/品牌 +- `product_type` - 商品类型 +- `tags` - 标签 +- `price` - 最低价格(min_price) +- `compare_at_price` - 原价 +- `currency` - 货币单位(默认USD) +- `image_url` - 主图URL +- `in_stock` - 是否有库存 +- `variants` - 变体列表 +- `relevance_score` - 相关性分数(0-1) + +#### VariantResult字段 + +- `variant_id` - 变体ID +- `title` - 变体标题 +- `price` - 价格 +- `compare_at_price` - 原价 +- `sku` - SKU编码 +- `stock` - 库存数量 +- `options` - 选项(颜色、尺寸等) + +## 测试 + +### 运行测试脚本 + +```bash +python scripts/test_base.py \ + --api-url http://localhost:8000 \ + --tenant-id "1" \ + --test-tenant-2 "2" +``` + +### 测试内容 + +1. **基本搜索**:测试搜索API基本功能 +2. **响应格式验证**:验证返回格式是否符合要求 +3. **Facets聚合**:测试分面搜索功能 +4. **租户隔离**:验证不同租户的数据隔离 + +## 常见问题 + +### Q: 为什么配置中没有MySQL相关配置? + +A: 数据源配置和数据导入流程是写死的脚本,不在搜索配置中。搜索配置只关注ES搜索相关的内容。 + +### Q: 如何为新的租户导入数据? + +A: 使用`ingest_shoplazza.py`脚本,指定不同的`--tenant-id`参数即可。 + +### Q: 如何验证租户隔离是否生效? + +A: 使用`test_base.py`脚本,指定两个不同的`--tenant-id`,检查搜索结果是否隔离。 + +### Q: API返回格式中为什么没有`_id`和`_score`? + +A: 为了提供外部友好的API格式,我们移除了ES内部字段,使用`product_id`和`relevance_score`替代。 + +### Q: 如何添加新的搜索字段? + +A: 在`config/schema/base/config.yaml`中添加字段定义,然后重新生成索引映射并重新导入数据。 + +## 注意事项 + +1. **tenant_id必需**:所有API请求必须提供`tenant_id`(通过请求头`X-Tenant-ID`或查询参数`tenant_id`) +2. **索引共享**:所有客户共享`search_products`索引,确保`tenant_id`字段正确设置 +3. **数据导入**:数据导入脚本是写死的,不依赖配置中的MySQL设置 +4. **配置分离**:搜索配置和数据源配置完全分离,提高可维护性 + diff --git a/indexer/spu_transformer.py b/indexer/spu_transformer.py new file mode 100644 index 0000000..c141daf --- /dev/null +++ b/indexer/spu_transformer.py @@ -0,0 +1,288 @@ +""" +SPU data transformer for Shoplazza products. + +Transforms SPU and SKU data from MySQL into SPU-level ES documents with nested variants. +""" + +import pandas as pd +import numpy as np +from typing import Dict, Any, List, Optional +from sqlalchemy import create_engine, text +from utils.db_connector import create_db_connection + + +class SPUTransformer: + """Transform SPU and SKU data into SPU-level ES documents.""" + + def __init__( + self, + db_engine: Any, + tenant_id: str + ): + """ + Initialize SPU transformer. + + Args: + db_engine: SQLAlchemy database engine + tenant_id: Tenant ID for filtering data + """ + self.db_engine = db_engine + self.tenant_id = tenant_id + + def load_spu_data(self) -> pd.DataFrame: + """ + Load SPU data from MySQL. + + Returns: + DataFrame with SPU data + """ + query = text(""" + SELECT + id, shop_id, shoplazza_id, handle, title, brief, description, + spu, vendor, vendor_url, seo_title, seo_description, seo_keywords, + image_src, image_width, image_height, image_path, image_alt, + tags, note, category, + shoplazza_created_at, shoplazza_updated_at, tenant_id, + creator, create_time, updater, update_time, deleted + FROM shoplazza_product_spu + WHERE tenant_id = :tenant_id AND deleted = 0 + """) + + with self.db_engine.connect() as conn: + df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id}) + + return df + + def load_sku_data(self) -> pd.DataFrame: + """ + Load SKU data from MySQL. + + Returns: + DataFrame with SKU data + """ + query = text(""" + SELECT + id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, + shoplazza_image_id, title, sku, barcode, position, + price, compare_at_price, cost_price, + option1, option2, option3, + inventory_quantity, weight, weight_unit, image_src, + wholesale_price, note, extend, + shoplazza_created_at, shoplazza_updated_at, tenant_id, + creator, create_time, updater, update_time, deleted + FROM shoplazza_product_sku + WHERE tenant_id = :tenant_id AND deleted = 0 + """) + + with self.db_engine.connect() as conn: + df = pd.read_sql(query, conn, params={"tenant_id": self.tenant_id}) + + return df + + def transform_batch(self) -> List[Dict[str, Any]]: + """ + Transform SPU and SKU data into ES documents. + + Returns: + List of SPU-level ES documents + """ + # Load data + spu_df = self.load_spu_data() + sku_df = self.load_sku_data() + + if spu_df.empty: + return [] + + # Group SKUs by SPU + sku_groups = sku_df.groupby('spu_id') + + documents = [] + for _, spu_row in spu_df.iterrows(): + spu_id = spu_row['id'] + + # Get SKUs for this SPU + skus = sku_groups.get_group(spu_id) if spu_id in sku_groups.groups else pd.DataFrame() + + # Transform to ES document + doc = self._transform_spu_to_doc(spu_row, skus) + if doc: + documents.append(doc) + + return documents + + def _transform_spu_to_doc( + self, + spu_row: pd.Series, + skus: pd.DataFrame + ) -> Optional[Dict[str, Any]]: + """ + Transform a single SPU row and its SKUs into an ES document. + + Args: + spu_row: SPU row from database + skus: DataFrame with SKUs for this SPU + + Returns: + ES document or None if transformation fails + """ + doc = {} + + # Tenant ID (required) + doc['tenant_id'] = str(self.tenant_id) + + # Product ID + doc['product_id'] = str(spu_row['id']) + + # Handle + if pd.notna(spu_row.get('handle')): + doc['handle'] = str(spu_row['handle']) + + # Title + if pd.notna(spu_row.get('title')): + doc['title'] = str(spu_row['title']) + + # Brief + if pd.notna(spu_row.get('brief')): + doc['brief'] = str(spu_row['brief']) + + # Description + if pd.notna(spu_row.get('description')): + doc['description'] = str(spu_row['description']) + + # SEO fields + if pd.notna(spu_row.get('seo_title')): + doc['seo_title'] = str(spu_row['seo_title']) + if pd.notna(spu_row.get('seo_description')): + doc['seo_description'] = str(spu_row['seo_description']) + if pd.notna(spu_row.get('seo_keywords')): + doc['seo_keywords'] = str(spu_row['seo_keywords']) + + # Vendor + if pd.notna(spu_row.get('vendor')): + doc['vendor'] = str(spu_row['vendor']) + doc['vendor_keyword'] = str(spu_row['vendor']) + + # Product type (from category or tags) + if pd.notna(spu_row.get('category')): + doc['product_type'] = str(spu_row['category']) + doc['product_type_keyword'] = str(spu_row['category']) + + # Tags + if pd.notna(spu_row.get('tags')): + tags_str = str(spu_row['tags']) + doc['tags'] = tags_str + doc['tags_keyword'] = tags_str + + # Category + if pd.notna(spu_row.get('category')): + doc['category'] = str(spu_row['category']) + doc['category_keyword'] = str(spu_row['category']) + + # Image URL + if pd.notna(spu_row.get('image_src')): + image_src = str(spu_row['image_src']) + if not image_src.startswith('http'): + image_src = f"//{image_src}" if image_src.startswith('//') else image_src + doc['image_url'] = image_src + + # Process variants + variants = [] + prices = [] + compare_prices = [] + + for _, sku_row in skus.iterrows(): + variant = self._transform_sku_to_variant(sku_row) + if variant: + variants.append(variant) + if 'price' in variant and variant['price'] is not None: + try: + prices.append(float(variant['price'])) + except (ValueError, TypeError): + pass + if 'compare_at_price' in variant and variant['compare_at_price'] is not None: + try: + compare_prices.append(float(variant['compare_at_price'])) + except (ValueError, TypeError): + pass + + doc['variants'] = variants + + # Calculate price ranges + if prices: + doc['min_price'] = float(min(prices)) + doc['max_price'] = float(max(prices)) + else: + doc['min_price'] = 0.0 + doc['max_price'] = 0.0 + + if compare_prices: + doc['compare_at_price'] = float(max(compare_prices)) + else: + doc['compare_at_price'] = None + + return doc + + def _transform_sku_to_variant(self, sku_row: pd.Series) -> Optional[Dict[str, Any]]: + """ + Transform a SKU row into a variant object. + + Args: + sku_row: SKU row from database + + Returns: + Variant dictionary or None + """ + variant = {} + + # Variant ID + variant['variant_id'] = str(sku_row['id']) + + # Title + if pd.notna(sku_row.get('title')): + variant['title'] = str(sku_row['title']) + + # Price + if pd.notna(sku_row.get('price')): + try: + variant['price'] = float(sku_row['price']) + except (ValueError, TypeError): + variant['price'] = None + else: + variant['price'] = None + + # Compare at price + if pd.notna(sku_row.get('compare_at_price')): + try: + variant['compare_at_price'] = float(sku_row['compare_at_price']) + except (ValueError, TypeError): + variant['compare_at_price'] = None + else: + variant['compare_at_price'] = None + + # SKU + if pd.notna(sku_row.get('sku')): + variant['sku'] = str(sku_row['sku']) + + # Stock + if pd.notna(sku_row.get('inventory_quantity')): + try: + variant['stock'] = int(sku_row['inventory_quantity']) + except (ValueError, TypeError): + variant['stock'] = 0 + else: + variant['stock'] = 0 + + # Options (from option1, option2, option3) + options = {} + if pd.notna(sku_row.get('option1')): + options['option1'] = str(sku_row['option1']) + if pd.notna(sku_row.get('option2')): + options['option2'] = str(sku_row['option2']) + if pd.notna(sku_row.get('option3')): + options['option3'] = str(sku_row['option3']) + + if options: + variant['options'] = options + + return variant + diff --git a/scripts/generate_test_data.py b/scripts/generate_test_data.py new file mode 100644 index 0000000..5eba021 --- /dev/null +++ b/scripts/generate_test_data.py @@ -0,0 +1,325 @@ +#!/usr/bin/env python3 +""" +Generate test data for Shoplazza SPU and SKU tables. + +Generates 100 SPU records with 1-5 SKU variants each. +""" + +import sys +import os +import random +import argparse +from pathlib import Path +from datetime import datetime, timedelta + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + + +def generate_spu_data(num_spus: int = 100, tenant_id: str = "1", start_id: int = 1): + """ + Generate SPU test data. + + Args: + num_spus: Number of SPUs to generate + tenant_id: Tenant ID + start_id: Starting ID for SPUs + + Returns: + List of SPU data dictionaries + """ + categories = ["电子产品", "服装", "家居用品", "美妆", "食品", "运动用品", "图书", "玩具"] + vendors = ["Sony", "Nike", "Apple", "Samsung", "华为", "小米", "美的", "海尔"] + + products = [ + ("蓝牙耳机", "Bluetooth Headphone", "高品质无线蓝牙耳机", "High-quality wireless Bluetooth headphone"), + ("运动鞋", "Running Shoes", "舒适透气的运动鞋", "Comfortable and breathable running shoes"), + ("智能手机", "Smartphone", "高性能智能手机", "High-performance smartphone"), + ("笔记本电脑", "Laptop", "轻薄便携笔记本电脑", "Lightweight and portable laptop"), + ("智能手表", "Smart Watch", "多功能智能手表", "Multi-function smart watch"), + ("平板电脑", "Tablet", "高清平板电脑", "High-definition tablet"), + ("无线鼠标", "Wireless Mouse", "人体工学无线鼠标", "Ergonomic wireless mouse"), + ("机械键盘", "Mechanical Keyboard", "RGB背光机械键盘", "RGB backlit mechanical keyboard"), + ("显示器", "Monitor", "4K高清显示器", "4K high-definition monitor"), + ("音响", "Speaker", "蓝牙无线音响", "Bluetooth wireless speaker"), + ] + + spus = [] + for i in range(num_spus): + spu_id = start_id + i + product = random.choice(products) + category = random.choice(categories) + vendor = random.choice(vendors) + + # Generate handle + handle = f"product-{spu_id}" + + # Generate title (Chinese) + title_zh = f"{product[0]} {vendor}" + + # Generate brief + brief_zh = product[2] + + # Generate description + description_zh = f"

{product[2]},来自{vendor}品牌。{product[3]}

" + + # Generate SEO fields + seo_title = f"{title_zh} - {category}" + seo_description = f"购买{vendor}{product[0]},{product[2]}" + seo_keywords = f"{product[0]},{vendor},{category}" + + # Generate tags + tags = f"{category},{vendor},{product[0]}" + + # Generate image + image_src = f"//cdn.example.com/products/{spu_id}.jpg" + + # Generate dates + created_at = datetime.now() - timedelta(days=random.randint(1, 365)) + updated_at = created_at + timedelta(days=random.randint(0, 30)) + + spu = { + 'id': spu_id, + 'shop_id': 1, + 'shoplazza_id': f"spu-{spu_id}", + 'handle': handle, + 'title': title_zh, + 'brief': brief_zh, + 'description': description_zh, + 'spu': '', + 'vendor': vendor, + 'vendor_url': f"https://{vendor.lower()}.com", + 'seo_title': seo_title, + 'seo_description': seo_description, + 'seo_keywords': seo_keywords, + 'image_src': image_src, + 'image_width': 800, + 'image_height': 600, + 'image_path': f"products/{spu_id}.jpg", + 'image_alt': title_zh, + 'inventory_policy': '', + 'inventory_quantity': 0, + 'inventory_tracking': '0', + 'published': 1, + 'published_at': created_at.strftime('%Y-%m-%d %H:%M:%S'), + 'requires_shipping': 1, + 'taxable': 0, + 'fake_sales': 0, + 'display_fake_sales': 0, + 'mixed_wholesale': 0, + 'need_variant_image': 0, + 'has_only_default_variant': 0, + 'tags': tags, + 'note': '', + 'category': category, + 'shoplazza_created_at': created_at.strftime('%Y-%m-%d %H:%M:%S'), + 'shoplazza_updated_at': updated_at.strftime('%Y-%m-%d %H:%M:%S'), + 'tenant_id': tenant_id, + 'creator': '1', + 'create_time': created_at.strftime('%Y-%m-%d %H:%M:%S'), + 'updater': '1', + 'update_time': updated_at.strftime('%Y-%m-%d %H:%M:%S'), + 'deleted': 0 + } + spus.append(spu) + + return spus + + +def generate_sku_data(spus: list, start_sku_id: int = 1): + """ + Generate SKU test data for SPUs. + + Args: + spus: List of SPU data + start_sku_id: Starting ID for SKUs + + Returns: + List of SKU data dictionaries + """ + colors = ["黑色", "白色", "红色", "蓝色", "绿色", "灰色"] + sizes = ["S", "M", "L", "XL", "XXL"] + + skus = [] + sku_id = start_sku_id + + for spu in spus: + spu_id = spu['id'] + num_variants = random.randint(1, 5) + + # Base price + base_price = random.uniform(50, 500) + + for i in range(num_variants): + # Generate variant options + color = random.choice(colors) if num_variants > 1 else None + size = random.choice(sizes) if num_variants > 2 else None + + # Generate title + title_parts = [] + if color: + title_parts.append(color) + if size: + title_parts.append(size) + title = " / ".join(title_parts) if title_parts else "" + + # Generate SKU + sku_code = f"SKU-{spu_id}-{i+1}" + + # Generate price (variation from base) + price = base_price + random.uniform(-20, 50) + compare_at_price = price * random.uniform(1.2, 1.5) + + # Generate stock + stock = random.randint(0, 100) + + # Generate dates + created_at = datetime.now() - timedelta(days=random.randint(1, 365)) + updated_at = created_at + timedelta(days=random.randint(0, 30)) + + sku = { + 'id': sku_id, + 'spu_id': spu_id, + 'shop_id': 1, + 'shoplazza_id': f"sku-{sku_id}", + 'shoplazza_product_id': spu['shoplazza_id'], + 'shoplazza_image_id': '', + 'title': title, + 'sku': sku_code, + 'barcode': f"BAR{sku_id:08d}", + 'position': i + 1, + 'price': round(price, 2), + 'compare_at_price': round(compare_at_price, 2), + 'cost_price': round(price * 0.6, 2), + 'option1': color if color else '', + 'option2': size if size else '', + 'option3': '', + 'inventory_quantity': stock, + 'weight': round(random.uniform(0.1, 5.0), 2), + 'weight_unit': 'kg', + 'image_src': '', + 'wholesale_price': '[{"price": ' + str(round(price * 0.8, 2)) + ', "minQuantity": 10}]', + 'note': '', + 'extend': '', + 'shoplazza_created_at': created_at.strftime('%Y-%m-%d %H:%M:%S'), + 'shoplazza_updated_at': updated_at.strftime('%Y-%m-%d %H:%M:%S'), + 'tenant_id': spu['tenant_id'], + 'creator': '1', + 'create_time': created_at.strftime('%Y-%m-%d %H:%M:%S'), + 'updater': '1', + 'update_time': updated_at.strftime('%Y-%m-%d %H:%M:%S'), + 'deleted': 0 + } + skus.append(sku) + sku_id += 1 + + return skus + + +def generate_sql_inserts(spus: list, skus: list, output_file: str): + """ + Generate SQL INSERT statements. + + Args: + spus: List of SPU data + skus: List of SKU data + output_file: Output file path + """ + with open(output_file, 'w', encoding='utf-8') as f: + f.write("-- SPU Test Data\n") + f.write("INSERT INTO shoplazza_product_spu (\n") + f.write(" id, shop_id, shoplazza_id, handle, title, brief, description, spu,\n") + f.write(" vendor, vendor_url, seo_title, seo_description, seo_keywords,\n") + f.write(" image_src, image_width, image_height, image_path, image_alt,\n") + f.write(" inventory_policy, inventory_quantity, inventory_tracking,\n") + f.write(" published, published_at, requires_shipping, taxable,\n") + f.write(" fake_sales, display_fake_sales, mixed_wholesale, need_variant_image,\n") + f.write(" has_only_default_variant, tags, note, category,\n") + f.write(" shoplazza_created_at, shoplazza_updated_at, tenant_id,\n") + f.write(" creator, create_time, updater, update_time, deleted\n") + f.write(") VALUES\n") + + for i, spu in enumerate(spus): + values = ( + f"({spu['id']}, {spu['shop_id']}, '{spu['shoplazza_id']}', " + f"'{spu['handle']}', '{spu['title']}', '{spu['brief']}', " + f"'{spu['description']}', '{spu['spu']}', '{spu['vendor']}', " + f"'{spu['vendor_url']}', '{spu['seo_title']}', '{spu['seo_description']}', " + f"'{spu['seo_keywords']}', '{spu['image_src']}', {spu['image_width']}, " + f"{spu['image_height']}, '{spu['image_path']}', '{spu['image_alt']}', " + f"'{spu['inventory_policy']}', {spu['inventory_quantity']}, " + f"'{spu['inventory_tracking']}', {spu['published']}, " + f"'{spu['published_at']}', {spu['requires_shipping']}, {spu['taxable']}, " + f"{spu['fake_sales']}, {spu['display_fake_sales']}, {spu['mixed_wholesale']}, " + f"{spu['need_variant_image']}, {spu['has_only_default_variant']}, " + f"'{spu['tags']}', '{spu['note']}', '{spu['category']}', " + f"'{spu['shoplazza_created_at']}', '{spu['shoplazza_updated_at']}', " + f"'{spu['tenant_id']}', '{spu['creator']}', '{spu['create_time']}', " + f"'{spu['updater']}', '{spu['update_time']}', {spu['deleted']})" + ) + f.write(values) + if i < len(spus) - 1: + f.write(",\n") + else: + f.write(";\n\n") + + f.write("-- SKU Test Data\n") + f.write("INSERT INTO shoplazza_product_sku (\n") + f.write(" id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, shoplazza_image_id,\n") + f.write(" title, sku, barcode, position, price, compare_at_price, cost_price,\n") + f.write(" option1, option2, option3, inventory_quantity, weight, weight_unit,\n") + f.write(" image_src, wholesale_price, note, extend,\n") + f.write(" shoplazza_created_at, shoplazza_updated_at, tenant_id,\n") + f.write(" creator, create_time, updater, update_time, deleted\n") + f.write(") VALUES\n") + + for i, sku in enumerate(skus): + values = ( + f"({sku['id']}, {sku['spu_id']}, {sku['shop_id']}, '{sku['shoplazza_id']}', " + f"'{sku['shoplazza_product_id']}', '{sku['shoplazza_image_id']}', " + f"'{sku['title']}', '{sku['sku']}', '{sku['barcode']}', {sku['position']}, " + f"{sku['price']}, {sku['compare_at_price']}, {sku['cost_price']}, " + f"'{sku['option1']}', '{sku['option2']}', '{sku['option3']}', " + f"{sku['inventory_quantity']}, {sku['weight']}, '{sku['weight_unit']}', " + f"'{sku['image_src']}', '{sku['wholesale_price']}', '{sku['note']}', " + f"'{sku['extend']}', '{sku['shoplazza_created_at']}', " + f"'{sku['shoplazza_updated_at']}', '{sku['tenant_id']}', " + f"'{sku['creator']}', '{sku['create_time']}', '{sku['updater']}', " + f"'{sku['update_time']}', {sku['deleted']})" + ) + f.write(values) + if i < len(skus) - 1: + f.write(",\n") + else: + f.write(";\n") + + +def main(): + parser = argparse.ArgumentParser(description='Generate test data for Shoplazza tables') + parser.add_argument('--num-spus', type=int, default=100, help='Number of SPUs to generate') + parser.add_argument('--tenant-id', default='1', help='Tenant ID') + parser.add_argument('--start-spu-id', type=int, default=1, help='Starting SPU ID') + parser.add_argument('--start-sku-id', type=int, default=1, help='Starting SKU ID') + parser.add_argument('--output', default='test_data.sql', help='Output SQL file') + + args = parser.parse_args() + + print(f"Generating {args.num_spus} SPUs with variants...") + + # Generate SPU data + spus = generate_spu_data(args.num_spus, args.tenant_id, args.start_spu_id) + print(f"Generated {len(spus)} SPUs") + + # Generate SKU data + skus = generate_sku_data(spus, args.start_sku_id) + print(f"Generated {len(skus)} SKUs") + + # Generate SQL file + generate_sql_inserts(spus, skus, args.output) + print(f"SQL file generated: {args.output}") + + +if __name__ == '__main__': + import json + main() + diff --git a/scripts/import_test_data.py b/scripts/import_test_data.py new file mode 100644 index 0000000..acf2cef --- /dev/null +++ b/scripts/import_test_data.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 +""" +Import test data into MySQL Shoplazza tables. + +Reads SQL file generated by generate_test_data.py and imports into MySQL. +""" + +import sys +import os +import argparse +from pathlib import Path + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from utils.db_connector import create_db_connection, test_connection + + +def import_sql_file(db_engine, sql_file: str): + """ + Import SQL file into database. + + Args: + db_engine: SQLAlchemy database engine + sql_file: Path to SQL file + """ + with open(sql_file, 'r', encoding='utf-8') as f: + sql_content = f.read() + + # Split by semicolons to get individual statements + statements = [s.strip() for s in sql_content.split(';') if s.strip() and not s.strip().startswith('--')] + + print(f"Executing {len(statements)} SQL statements...") + + with db_engine.connect() as conn: + for i, statement in enumerate(statements, 1): + if statement: + try: + conn.execute(statement) + conn.commit() + print(f" [{i}/{len(statements)}] Executed successfully") + except Exception as e: + print(f" [{i}/{len(statements)}] ERROR: {e}") + print(f" Statement: {statement[:100]}...") + raise + + +def verify_import(db_engine, tenant_id: str): + """ + Verify imported data. + + Args: + db_engine: SQLAlchemy database engine + tenant_id: Tenant ID to verify + """ + from sqlalchemy import text + + with db_engine.connect() as conn: + # Count SPUs + result = conn.execute(text("SELECT COUNT(*) FROM shoplazza_product_spu WHERE tenant_id = :tenant_id"), {"tenant_id": tenant_id}) + spu_count = result.scalar() + + # Count SKUs + result = conn.execute(text("SELECT COUNT(*) FROM shoplazza_product_sku WHERE tenant_id = :tenant_id"), {"tenant_id": tenant_id}) + sku_count = result.scalar() + + print(f"\nVerification:") + print(f" SPUs: {spu_count}") + print(f" SKUs: {sku_count}") + + return spu_count, sku_count + + +def main(): + parser = argparse.ArgumentParser(description='Import test data into MySQL') + + # Database connection + parser.add_argument('--db-host', required=True, help='MySQL host') + parser.add_argument('--db-port', type=int, default=3306, help='MySQL port (default: 3306)') + parser.add_argument('--db-database', required=True, help='MySQL database name') + parser.add_argument('--db-username', required=True, help='MySQL username') + parser.add_argument('--db-password', required=True, help='MySQL password') + + # Import options + parser.add_argument('--sql-file', required=True, help='SQL file to import') + parser.add_argument('--tenant-id', help='Tenant ID to verify (optional)') + + args = parser.parse_args() + + print(f"Connecting to MySQL: {args.db_host}:{args.db_port}/{args.db_database}") + + # Connect to database + try: + db_engine = create_db_connection( + host=args.db_host, + port=args.db_port, + database=args.db_database, + username=args.db_username, + password=args.db_password + ) + except Exception as e: + print(f"ERROR: Failed to connect to MySQL: {e}") + return 1 + + # Test connection + if not test_connection(db_engine): + print("ERROR: Database connection test failed") + return 1 + + print("Database connection successful") + + # Import SQL file + print(f"\nImporting SQL file: {args.sql_file}") + try: + import_sql_file(db_engine, args.sql_file) + print("Import completed successfully") + except Exception as e: + print(f"ERROR: Failed to import SQL file: {e}") + import traceback + traceback.print_exc() + return 1 + + # Verify import if tenant_id provided + if args.tenant_id: + verify_import(db_engine, args.tenant_id) + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) + diff --git a/scripts/ingest_shoplazza.py b/scripts/ingest_shoplazza.py new file mode 100644 index 0000000..a48ab6c --- /dev/null +++ b/scripts/ingest_shoplazza.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +""" +Shoplazza data ingestion script. + +Loads SPU and SKU data from MySQL and indexes into Elasticsearch using SPU transformer. +""" + +import sys +import os +import argparse +from pathlib import Path + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from utils.db_connector import create_db_connection +from utils.es_client import ESClient +from indexer.spu_transformer import SPUTransformer +from indexer.mapping_generator import MappingGenerator +from indexer.bulk_indexer import BulkIndexer +from config import ConfigLoader + + +def main(): + parser = argparse.ArgumentParser(description='Ingest Shoplazza SPU/SKU data into Elasticsearch') + + # Database connection + parser.add_argument('--db-host', required=True, help='MySQL host') + parser.add_argument('--db-port', type=int, default=3306, help='MySQL port (default: 3306)') + parser.add_argument('--db-database', required=True, help='MySQL database name') + parser.add_argument('--db-username', required=True, help='MySQL username') + parser.add_argument('--db-password', required=True, help='MySQL password') + + # Tenant and index + parser.add_argument('--tenant-id', required=True, help='Tenant ID (required)') + parser.add_argument('--config', default='base', help='Configuration ID (default: base)') + parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') + + # Options + parser.add_argument('--recreate', action='store_true', help='Recreate index if exists') + parser.add_argument('--batch-size', type=int, default=500, help='Batch size for indexing (default: 500)') + + args = parser.parse_args() + + print(f"Starting Shoplazza data ingestion for tenant: {args.tenant_id}") + + # Load configuration + config_loader = ConfigLoader("config/schema") + try: + config = config_loader.load_customer_config(args.config) + print(f"Loaded configuration: {config.customer_name}") + except Exception as e: + print(f"ERROR: Failed to load configuration: {e}") + return 1 + + # Validate tenant_id field exists + tenant_id_field = None + for field in config.fields: + if field.name == "tenant_id": + tenant_id_field = field + break + + if not tenant_id_field: + print("ERROR: Configuration must include 'tenant_id' field") + return 1 + + # Connect to MySQL + print(f"Connecting to MySQL: {args.db_host}:{args.db_port}/{args.db_database}") + try: + db_engine = create_db_connection( + host=args.db_host, + port=args.db_port, + database=args.db_database, + username=args.db_username, + password=args.db_password + ) + except Exception as e: + print(f"ERROR: Failed to connect to MySQL: {e}") + return 1 + + # Connect to Elasticsearch + print(f"Connecting to Elasticsearch: {args.es_host}") + es_client = ESClient(hosts=[args.es_host]) + if not es_client.ping(): + print(f"ERROR: Cannot connect to Elasticsearch at {args.es_host}") + return 1 + + # Generate and create index + mapping_gen = MappingGenerator(config) + mapping = mapping_gen.generate_mapping() + index_name = config.es_index_name + + if args.recreate: + if es_client.index_exists(index_name): + print(f"Deleting existing index: {index_name}") + es_client.delete_index(index_name) + + if not es_client.index_exists(index_name): + print(f"Creating index: {index_name}") + es_client.create_index(index_name, mapping) + else: + print(f"Using existing index: {index_name}") + + # Initialize SPU transformer + print(f"Initializing SPU transformer for tenant: {args.tenant_id}") + transformer = SPUTransformer(db_engine, args.tenant_id) + + # Transform data + print("Transforming SPU and SKU data...") + try: + documents = transformer.transform_batch() + print(f"Transformed {len(documents)} SPU documents") + except Exception as e: + print(f"ERROR: Failed to transform data: {e}") + import traceback + traceback.print_exc() + return 1 + + if not documents: + print("WARNING: No documents to index") + return 0 + + # Bulk index + print(f"Indexing {len(documents)} documents (batch size: {args.batch_size})...") + indexer = BulkIndexer(es_client, index_name, batch_size=args.batch_size) + + try: + results = indexer.index_documents(documents, id_field="product_id", show_progress=True) + print(f"\nIngestion complete:") + print(f" Success: {results['success']}") + print(f" Failed: {results['failed']}") + print(f" Time: {results.get('elapsed_time', 0):.2f}s") + + if results['failed'] > 0: + print(f"\nWARNING: {results['failed']} documents failed to index") + return 1 + + return 0 + except Exception as e: + print(f"ERROR: Failed to index documents: {e}") + import traceback + traceback.print_exc() + return 1 + + +if __name__ == '__main__': + sys.exit(main()) + diff --git a/scripts/test_base.py b/scripts/test_base.py new file mode 100644 index 0000000..5e42b28 --- /dev/null +++ b/scripts/test_base.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python3 +""" +Test script for base configuration. + +Tests data ingestion, search API, response format, and tenant isolation. +""" + +import sys +import os +import argparse +import requests +import json +from pathlib import Path + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + + +def test_search_api(base_url: str, tenant_id: str, query: str = "耳机"): + """ + Test search API. + + Args: + base_url: API base URL + tenant_id: Tenant ID + query: Search query + + Returns: + Response JSON or None if failed + """ + url = f"{base_url}/search/" + headers = { + "X-Tenant-ID": tenant_id, + "Content-Type": "application/json" + } + payload = { + "query": query, + "size": 10, + "from": 0 + } + + print(f"\nTesting search API:") + print(f" URL: {url}") + print(f" Query: {query}") + print(f" Tenant ID: {tenant_id}") + + try: + response = requests.post(url, json=payload, headers=headers, timeout=30) + response.raise_for_status() + data = response.json() + + print(f" Status: {response.status_code}") + print(f" Total: {data.get('total', 0)}") + print(f" Results: {len(data.get('results', []))}") + + return data + except Exception as e: + print(f" ERROR: {e}") + return None + + +def validate_response_format(data: dict): + """ + Validate response format. + + Args: + data: Response data + + Returns: + List of validation errors (empty if valid) + """ + errors = [] + + # Check for results field (not hits) + if 'hits' in data: + errors.append("Response contains 'hits' field (should be 'results')") + + if 'results' not in data: + errors.append("Response missing 'results' field") + else: + results = data['results'] + if not isinstance(results, list): + errors.append("'results' should be a list") + else: + # Validate first result structure + if results: + result = results[0] + required_fields = ['product_id', 'title', 'variants', 'relevance_score'] + for field in required_fields: + if field not in result: + errors.append(f"Result missing required field: {field}") + + # Check for ES internal fields + es_internal_fields = ['_id', '_score', '_source'] + for field in es_internal_fields: + if field in result: + errors.append(f"Result contains ES internal field: {field}") + + # Validate variants + if 'variants' in result: + variants = result['variants'] + if not isinstance(variants, list): + errors.append("'variants' should be a list") + elif variants: + variant = variants[0] + variant_required = ['variant_id', 'price', 'sku', 'stock'] + for field in variant_required: + if field not in variant: + errors.append(f"Variant missing required field: {field}") + + # Check for suggestions and related_searches + if 'suggestions' not in data: + errors.append("Response missing 'suggestions' field") + if 'related_searches' not in data: + errors.append("Response missing 'related_searches' field") + + return errors + + +def test_facets(base_url: str, tenant_id: str): + """ + Test facets aggregation. + + Args: + base_url: API base URL + tenant_id: Tenant ID + + Returns: + Response JSON or None if failed + """ + url = f"{base_url}/search/" + headers = { + "X-Tenant-ID": tenant_id, + "Content-Type": "application/json" + } + payload = { + "query": "商品", + "size": 10, + "facets": ["category_keyword", "vendor_keyword"] + } + + print(f"\nTesting facets:") + print(f" Facets: {payload['facets']}") + + try: + response = requests.post(url, json=payload, headers=headers, timeout=30) + response.raise_for_status() + data = response.json() + + if 'facets' in data and data['facets']: + print(f" Facets returned: {len(data['facets'])}") + for facet in data['facets']: + print(f" - {facet.get('field')}: {len(facet.get('values', []))} values") + else: + print(" WARNING: No facets returned") + + return data + except Exception as e: + print(f" ERROR: {e}") + return None + + +def test_tenant_isolation(base_url: str, tenant_id_1: str, tenant_id_2: str): + """ + Test tenant isolation. + + Args: + base_url: API base URL + tenant_id_1: First tenant ID + tenant_id_2: Second tenant ID + """ + print(f"\nTesting tenant isolation:") + print(f" Tenant 1: {tenant_id_1}") + print(f" Tenant 2: {tenant_id_2}") + + # Search for tenant 1 + data1 = test_search_api(base_url, tenant_id_1, "商品") + # Search for tenant 2 + data2 = test_search_api(base_url, tenant_id_2, "商品") + + if data1 and data2: + results1 = set(r.get('product_id') for r in data1.get('results', [])) + results2 = set(r.get('product_id') for r in data2.get('results', [])) + + overlap = results1 & results2 + if overlap: + print(f" WARNING: Found {len(overlap)} overlapping results between tenants") + else: + print(f" OK: No overlapping results (tenant isolation working)") + + +def main(): + parser = argparse.ArgumentParser(description='Test base configuration') + parser.add_argument('--api-url', default='http://localhost:8000', help='API base URL') + parser.add_argument('--tenant-id', default='1', help='Tenant ID for testing') + parser.add_argument('--test-tenant-2', help='Second tenant ID for isolation test') + + args = parser.parse_args() + + print("=" * 60) + print("Base Configuration Test Suite") + print("=" * 60) + + # Test 1: Basic search + print("\n[Test 1] Basic Search") + data = test_search_api(args.api_url, args.tenant_id) + if not data: + print("FAILED: Basic search test") + return 1 + + # Test 2: Response format validation + print("\n[Test 2] Response Format Validation") + errors = validate_response_format(data) + if errors: + print("FAILED: Response format validation") + for error in errors: + print(f" - {error}") + return 1 + else: + print("PASSED: Response format is correct") + + # Test 3: Facets + print("\n[Test 3] Facets Aggregation") + facet_data = test_facets(args.api_url, args.tenant_id) + if not facet_data: + print("WARNING: Facets test failed (may be expected if no data)") + + # Test 4: Tenant isolation (if second tenant provided) + if args.test_tenant_2: + print("\n[Test 4] Tenant Isolation") + test_tenant_isolation(args.api_url, args.tenant_id, args.test_tenant_2) + + print("\n" + "=" * 60) + print("All tests completed") + print("=" * 60) + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) + diff --git a/search/searcher.py b/search/searcher.py index 8534d14..969f61f 100644 --- a/search/searcher.py +++ b/search/searcher.py @@ -17,38 +17,45 @@ from .multilang_query_builder import MultiLanguageQueryBuilder from .rerank_engine import RerankEngine from context.request_context import RequestContext, RequestContextStage, create_request_context from api.models import FacetResult, FacetValue +from api.result_formatter import ResultFormatter class SearchResult: - """Container for search results (重构版).""" + """Container for search results (外部友好格式).""" def __init__( self, - hits: List[Dict[str, Any]], + results: List[Any], # List[ProductResult] total: int, max_score: float, took_ms: int, facets: Optional[List[FacetResult]] = None, query_info: Optional[Dict[str, Any]] = None, + suggestions: Optional[List[str]] = None, + related_searches: Optional[List[str]] = None, debug_info: Optional[Dict[str, Any]] = None ): - self.hits = hits + self.results = results self.total = total self.max_score = max_score self.took_ms = took_ms self.facets = facets self.query_info = query_info or {} + self.suggestions = suggestions or [] + self.related_searches = related_searches or [] self.debug_info = debug_info def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" result = { - "hits": self.hits, + "results": [r.model_dump() if hasattr(r, 'model_dump') else r for r in self.results], "total": self.total, "max_score": self.max_score, "took_ms": self.took_ms, "facets": [f.model_dump() for f in self.facets] if self.facets else None, - "query_info": self.query_info + "query_info": self.query_info, + "suggestions": self.suggestions, + "related_searches": self.related_searches } if self.debug_info is not None: result["debug_info"] = self.debug_info @@ -106,6 +113,7 @@ class Searcher: def search( self, query: str, + tenant_id: str, size: int = 10, from_: int = 0, filters: Optional[Dict[str, Any]] = None, @@ -118,10 +126,11 @@ class Searcher: debug: bool = False ) -> SearchResult: """ - Execute search query (重构版). + Execute search query (外部友好格式). Args: query: Search query string + tenant_id: Tenant ID (required for filtering) size: Number of results to return from_: Offset for pagination filters: Exact match filters @@ -134,7 +143,7 @@ class Searcher: debug: Enable debug information output Returns: - SearchResult object + SearchResult object with formatted results """ # Create context if not provided (backward compatibility) if context is None: @@ -248,6 +257,11 @@ class Searcher: # Step 3: Query building context.start_stage(RequestContextStage.QUERY_BUILDING) try: + # Add tenant_id to filters (required) + if filters is None: + filters = {} + filters['tenant_id'] = tenant_id + es_query = self.query_builder.build_multilang_query( parsed_query=parsed_query, query_vector=parsed_query.query_vector if enable_embedding else None, @@ -341,56 +355,10 @@ class Searcher: # Step 5: Result processing context.start_stage(RequestContextStage.RESULT_PROCESSING) try: - hits = [] - raw_hits = [] - + # Extract ES hits + es_hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: - for hit in es_response['hits']['hits']: - raw_hits.append(hit) - - result_doc = { - '_id': hit['_id'], - '_score': hit.get('_score') or 0.0, - '_source': hit['_source'] - } - - # 应用本地重排(仅当启用时) - if enable_rerank and self.rerank_engine.enabled: - base_score = hit.get('_score') or 0.0 - knn_score = None - - # 检查是否使用了KNN(新结构:在function_score内部) - query_section = es_query.get('query', {}) - if 'function_score' in query_section: - fs_query = query_section['function_score'].get('query', {}) - outer_bool = fs_query.get('bool', {}) - inner_bool_list = outer_bool.get('must', []) - if inner_bool_list and 'bool' in inner_bool_list[0]: - inner_should = inner_bool_list[0]['bool'].get('should', []) - if any('knn' in clause for clause in inner_should): - knn_score = base_score * 0.2 - - custom_score = self.rerank_engine.calculate_score( - hit, - base_score, - knn_score - ) - result_doc['_custom_score'] = custom_score - result_doc['_original_score'] = base_score - - hits.append(result_doc) - - # 重排序(仅当启用时) - if enable_rerank and self.rerank_engine.enabled: - hits.sort(key=lambda x: x.get('_custom_score', x['_score']), reverse=True) - context.logger.info( - f"本地重排完成 | 使用RerankEngine", - extra={'reqid': context.reqid, 'uid': context.uid} - ) - - # Store intermediate results in context - context.store_intermediate_result('raw_hits', raw_hits) - context.store_intermediate_result('processed_hits', hits) + es_hits = es_response['hits']['hits'] # Extract total and max_score total = es_response.get('hits', {}).get('total', {}) @@ -401,16 +369,24 @@ class Searcher: max_score = es_response.get('hits', {}).get('max_score') or 0.0 - # Standardize facets - standardized_facets = self._standardize_facets( - es_response.get('aggregations', {}), - facets, - filters - ) + # Format results using ResultFormatter + formatted_results = ResultFormatter.format_search_results(es_hits, max_score) + + # Format facets + standardized_facets = None + if facets: + standardized_facets = ResultFormatter.format_facets( + es_response.get('aggregations', {}), + facets + ) + + # Generate suggestions and related searches + query_text = parsed_query.original_query if parsed_query else query + suggestions = ResultFormatter.generate_suggestions(query_text, formatted_results) + related_searches = ResultFormatter.generate_related_searches(query_text, formatted_results) context.logger.info( - f"结果处理完成 | 返回: {len(hits)}条 | 总计: {total_value}条 | " - f"重排序: {'是' if enable_rerank else '否'}", + f"结果处理完成 | 返回: {len(formatted_results)}条 | 总计: {total_value}条", extra={'reqid': context.reqid, 'uid': context.uid} ) @@ -459,12 +435,14 @@ class Searcher: # Build result result = SearchResult( - hits=hits, + results=formatted_results, total=total_value, max_score=max_score, took_ms=int(total_duration), facets=standardized_facets, query_info=parsed_query.to_dict(), + suggestions=suggestions, + related_searches=related_searches, debug_info=debug_info ) @@ -476,21 +454,23 @@ class Searcher: def search_by_image( self, image_url: str, + tenant_id: str, size: int = 10, filters: Optional[Dict[str, Any]] = None, range_filters: Optional[Dict[str, Any]] = None ) -> SearchResult: """ - Search by image similarity (重构版). + Search by image similarity (外部友好格式). Args: image_url: URL of query image + tenant_id: Tenant ID (required for filtering) size: Number of results filters: Exact match filters range_filters: Range filters for numeric fields Returns: - SearchResult object + SearchResult object with formatted results """ if not self.image_embedding_field: raise ValueError("Image embedding field not configured") @@ -503,6 +483,11 @@ class Searcher: if image_vector is None: raise ValueError(f"Failed to encode image: {image_url}") + # Add tenant_id to filters (required) + if filters is None: + filters = {} + filters['tenant_id'] = tenant_id + # Build KNN query es_query = { "size": size, @@ -536,28 +521,32 @@ class Searcher: size=size ) - # Process results (similar to text search) - hits = [] + # Extract ES hits + es_hits = [] if 'hits' in es_response and 'hits' in es_response['hits']: - for hit in es_response['hits']['hits']: - hits.append({ - '_id': hit['_id'], - '_score': hit['_score'], - '_source': hit['_source'] - }) + es_hits = es_response['hits']['hits'] + # Extract total and max_score total = es_response.get('hits', {}).get('total', {}) if isinstance(total, dict): total_value = total.get('value', 0) else: total_value = total + max_score = es_response.get('hits', {}).get('max_score') or 0.0 + + # Format results using ResultFormatter + formatted_results = ResultFormatter.format_search_results(es_hits, max_score) + return SearchResult( - hits=hits, + results=formatted_results, total=total_value, - max_score=es_response.get('hits', {}).get('max_score') or 0.0, + max_score=max_score, took_ms=es_response.get('took', 0), - query_info={'image_url': image_url, 'search_type': 'image_similarity'} + facets=None, + query_info={'image_url': image_url, 'search_type': 'image_similarity'}, + suggestions=[], + related_searches=[] ) def get_domain_summary(self) -> Dict[str, Any]: diff --git a/设计文档.md b/设计文档.md index 7fb71f4..8e5e832 100644 --- a/设计文档.md +++ b/设计文档.md @@ -12,33 +12,57 @@ ## 1. 原始数据层的约定 -所有租户共用主表、独立配置和扩展表,有自己独立的ES索引。 - ### 1.1 店匠主表 所有租户共用以下主表: - `shoplazza_product_sku` - SKU级别商品数据 - `shoplazza_product_spu` - SPU级别商品数据 -### 1.2 每个租户的扩展表 - -各个租户有自己的扩展表,不同的租户根据不同的业务需要、以及不同的数据源,来定制自己的扩展表: -- 自定义属性体系 -- 多语言商品标题(中文、英文、俄文等) -- 品牌名、不同的类目和标签体系 -- 业务过滤和聚合字段 -- 权重(提权)字段 - -**数据关联方式**: -- 入索引时,商品主表 `shoplazza_product_sku` 的 `id` + `shopid` 与租户扩展表关联 -- 例如:`customer1_extension` 表存储 customer1 的自定义字段 +### 1.2 索引结构(SPU维度) + +**统一索引架构**: +- 所有客户共享同一个Elasticsearch索引:`search_products` +- 索引粒度:SPU级别(每个文档代表一个SPU) +- 数据隔离:通过`tenant_id`字段实现租户隔离 +- 嵌套结构:每个SPU文档包含嵌套的`variants`数组(SKU变体) + +**索引文档结构**: +```json +{ + "tenant_id": "1", + "product_id": "123", + "title": "蓝牙耳机", + "variants": [ + { + "variant_id": "456", + "title": "黑色", + "price": 199.99, + "sku": "SKU-123-1", + "stock": 50 + } + ], + "min_price": 199.99, + "max_price": 299.99 +} +``` ### 1.3 配置化方案 +**配置分离原则**: +- **搜索配置**:只包含ES字段定义、查询域、排序规则等搜索相关配置 +- **数据源配置**:不在搜索配置中,由Pipeline层(脚本)决定 +- **数据导入流程**:写死的脚本,不依赖配置 + 统一通过配置文件定义: -1. ES 字段定义(字段类型、分析器、来源表/列) +1. ES 字段定义(字段类型、分析器、boost等) 2. ES mapping 结构生成 -3. 数据入库映射关系 +3. 查询域配置(indexes) +4. 排序和打分配置(function_score) + +**注意**:配置中**不包含**以下内容: +- `mysql_config` - MySQL数据库配置 +- `main_table` / `extension_table` - 数据表配置 +- `source_table` / `source_column` - 字段数据源映射 --- @@ -72,62 +96,54 @@ - **standard**:标准分析器 - **keyword**:关键词分析器 -#### 字段配置示例 +#### 字段配置示例(Base配置) ```yaml fields: - # 主键字段 - - name: "skuId" - type: "LONG" - source_table: "main" # 主表 - source_column: "id" + # 租户隔离字段(必需) + - name: "tenant_id" + type: "KEYWORD" required: true index: true store: true - # 多语言文本字段 - - name: "name" - type: "TEXT" - source_table: "extension" # 扩展表 - source_column: "name" - analyzer: "chinese_ecommerce" - boost: 2.0 + # 商品标识字段 + - name: "product_id" + type: "KEYWORD" + required: true index: true store: true - - name: "enSpuName" + # 文本搜索字段 + - name: "title" type: "TEXT" - source_table: "extension" - source_column: "enSpuName" - analyzer: "english" - boost: 2.0 + analyzer: "chinese_ecommerce" + boost: 3.0 + index: true + store: true - - name: "ruSkuName" + - name: "seo_keywords" type: "TEXT" - source_table: "extension" - source_column: "ruSkuName" - analyzer: "russian" + analyzer: "chinese_ecommerce" boost: 2.0 - - # 文本向量字段 - - name: "name_embedding" - type: "TEXT_EMBEDDING" - source_table: "extension" - source_column: "name" - embedding_dims: 1024 - embedding_similarity: "dot_product" index: true + store: true - # 图片向量字段 - - name: "image_embedding" - type: "IMAGE_EMBEDDING" - source_table: "extension" - source_column: "imageUrl" - embedding_dims: 1024 - embedding_similarity: "dot_product" - nested: false + # 嵌套variants字段 + - name: "variants" + type: "JSON" + nested: true + nested_properties: + variant_id: + type: "keyword" + price: + type: "float" + sku: + type: "keyword" ``` +**注意**:配置中**不包含**`source_table`和`source_column`,数据源映射由Pipeline层决定。 + **实现模块**: - `config/config_loader.py` - 配置加载器 - `config/field_types.py` - 字段类型定义 @@ -204,77 +220,69 @@ indexes: --- -## 3. 测试数据灌入 +## 3. 数据导入流程 ### 3.1 数据源 -**主表**:`shoplazza_product_sku` -- 所有租户共用 -- 包含基础商品信息(id, shopid 等) +**店匠标准表**(Base配置使用): +- `shoplazza_product_spu` - SPU级别商品数据 +- `shoplazza_product_sku` - SKU级别商品数据 -**扩展表**:`customer1_extension` -- 每个租户独立 -- 包含自定义字段和多语言字段 +**其他客户表**(customer1等): +- 使用各自的数据源表和扩展表 -### 3.2 数据灌入方式 +### 3.2 数据导入方式 -**实现情况**: +**Pipeline层决定数据源**: +- 数据导入流程是写死的脚本,不依赖配置 +- 配置只关注ES搜索相关的内容 +- 数据源映射逻辑写死在转换器代码中 -#### 命令行工具 -```bash -python main.py ingest \ - --customer customer1 \ - --csv-file data/customer1_data.csv \ - --es-host http://localhost:9200 \ - --recreate \ - --batch-size 100 -``` +#### Base配置数据导入(店匠通用) + +**脚本**:`scripts/ingest_shoplazza.py` -#### 数据流程 -1. **数据加载**:从 CSV 文件或 MySQL 数据库加载数据 -2. **数据转换**: - - 字段映射(根据配置将源字段映射到 ES 字段) - - 类型转换(字符串、数字、日期等) - - 向量生成(文本向量、图片向量) - - 向量缓存(避免重复计算) +**数据流程**: +1. **数据加载**:从MySQL读取`shoplazza_product_spu`和`shoplazza_product_sku`表 +2. **数据转换**(`indexer/spu_transformer.py`): + - 按`spu_id`和`tenant_id`关联SPU和SKU数据 + - 将SKU数据聚合为嵌套的`variants`数组 + - 计算扁平化价格字段(`min_price`, `max_price`, `compare_at_price`) + - 字段映射(写死在代码中,不依赖配置) + - 注入`tenant_id`字段 3. **索引创建**: - - 根据配置生成 ES mapping - - 创建或更新索引 + - 根据配置生成ES mapping + - 创建或更新`search_products`索引 4. **批量入库**: - - 批量写入 ES(默认每批 500 条) + - 批量写入ES(默认每批500条) - 错误处理和重试机制 -#### 配置映射示例 - -**customer1_config.yaml** 配置: -```yaml -main_table: "shoplazza_product_sku" -extension_table: "customer1_extension" -es_index_name: "search_customer1" - -fields: - - name: "skuId" - source_table: "main" - source_column: "id" - - name: "name" - source_table: "extension" - source_column: "name" - - name: "enSpuName" - source_table: "extension" - source_column: "enSpuName" +**命令行工具**: +```bash +python scripts/ingest_shoplazza.py \ + --db-host localhost \ + --db-port 3306 \ + --db-database saas \ + --db-username root \ + --db-password password \ + --tenant-id "1" \ + --config base \ + --es-host http://localhost:9200 \ + --recreate \ + --batch-size 500 ``` -**数据转换**: -- 主表字段:直接从 `shoplazza_product_sku` 表的 `id` 字段读取 -- 扩展表字段:从 `customer1_extension` 表的对应列读取 -- 向量字段:对源文本/图片生成向量并缓存 +#### 其他客户数据导入 + +- 使用各自的数据转换器(如`indexer/data_transformer.py`) +- 数据源映射逻辑写死在各自的转换器中 +- 共享`search_products`索引,通过`tenant_id`隔离 **实现模块**: -- `indexer/data_transformer.py` - 数据转换器 +- `indexer/spu_transformer.py` - SPU数据转换器(Base配置) +- `indexer/data_transformer.py` - 通用数据转换器(其他客户) - `indexer/bulk_indexer.py` - 批量索引器 -- `indexer/indexing_pipeline.py` - 索引流水线 -- `embeddings/bge_encoder.py` - 文本向量编码器 -- `embeddings/clip_image_encoder.py` - 图片向量编码器 +- `scripts/ingest_shoplazza.py` - 店匠数据导入脚本 --- @@ -506,6 +514,14 @@ ranking: - ✅ 搜索接口(文本搜索、图片搜索) - ✅ 文档查询接口 - ✅ 前端界面(HTML + JavaScript) +- ✅ 租户隔离(tenant_id过滤) + +### 6.6 Base配置(店匠通用) +- ✅ SPU级别索引结构 +- ✅ 嵌套variants字段 +- ✅ 统一索引(search_products) +- ✅ 租户隔离(tenant_id) +- ✅ 配置简化(移除MySQL相关配置) --- @@ -521,9 +537,55 @@ ranking: --- -## 8. 配置文件示例 +## 8. API响应格式 + +### 8.1 外部友好格式 + +API返回格式不包含ES内部字段(`_id`, `_score`, `_source`),使用外部友好的格式: + +**响应结构**: +```json +{ + "results": [ + { + "product_id": "123", + "title": "蓝牙耳机", + "variants": [ + { + "variant_id": "456", + "price": 199.99, + "sku": "SKU-123-1", + "stock": 50 + } + ], + "relevance_score": 0.95 + } + ], + "total": 10, + "facets": [...], + "suggestions": [], + "related_searches": [] +} +``` + +**主要变化**: +- 结构化结果(`ProductResult`和`VariantResult`) +- 嵌套variants数组 +- 无ES内部字段 + +### 8.2 租户隔离 + +所有API请求必须提供`tenant_id`: +- 请求头:`X-Tenant-ID: 1` +- 或查询参数:`?tenant_id=1` + +搜索时自动添加`tenant_id`过滤,确保数据隔离。 + +## 9. 配置文件示例 + +**Base配置**(店匠通用):`config/schema/base/config.yaml` -完整配置示例请参考:`config/schema/customer1_config.yaml` +**其他客户配置**:`config/schema/customer1/config.yaml` --- -- libgit2 0.21.2