diff --git a/api/routes/search.py b/api/routes/search.py index 50e586f..26b0730 100644 --- a/api/routes/search.py +++ b/api/routes/search.py @@ -326,18 +326,36 @@ async def instant_search( @router.get("/{doc_id}", response_model=DocumentResponse) -async def get_document(doc_id: str): +async def get_document(doc_id: str, http_request: Request): """ Get a single document by ID. + + Requires tenant_id in header (X-Tenant-ID) or query parameter (tenant_id). """ try: + # Extract tenant_id (required) + tenant_id = http_request.headers.get('X-Tenant-ID') + if not tenant_id: + # Try to get from query string + from urllib.parse import parse_qs + query_string = http_request.url.query + if query_string: + params = parse_qs(query_string) + tenant_id = params.get('tenant_id', [None])[0] + + if not tenant_id: + raise HTTPException( + status_code=400, + detail="tenant_id is required. Provide it via header 'X-Tenant-ID' or query parameter 'tenant_id'" + ) + from api.app import get_searcher searcher = get_searcher() - doc = searcher.get_document(doc_id) + doc = searcher.get_document(tenant_id=tenant_id, doc_id=doc_id) if doc is None: - raise HTTPException(status_code=404, detail=f"Document {doc_id} not found") + raise HTTPException(status_code=404, detail=f"Document {doc_id} not found for tenant {tenant_id}") return DocumentResponse(id=doc_id, source=doc) diff --git a/docs/亚马逊到店匠格式转换分析.md b/docs/亚马逊到店匠格式转换分析.md index 06cb4cd..981efc0 100644 --- a/docs/亚马逊到店匠格式转换分析.md +++ b/docs/亚马逊到店匠格式转换分析.md @@ -368,3 +368,4 @@ python scripts/amazon_xlsx_to_shoplazza_xlsx.py \ + diff --git a/indexer/bulk_indexing_service.py b/indexer/bulk_indexing_service.py index 1a60867..579b246 100644 --- a/indexer/bulk_indexing_service.py +++ b/indexer/bulk_indexing_service.py @@ -10,7 +10,7 @@ from sqlalchemy import Engine from utils.es_client import ESClient from indexer.spu_transformer import SPUTransformer from indexer.bulk_indexer import BulkIndexer -from indexer.mapping_generator import load_mapping, delete_index_if_exists, DEFAULT_INDEX_NAME +from indexer.mapping_generator import load_mapping, delete_index_if_exists, get_tenant_index_name from indexer.indexer_logger import ( get_indexer_logger, log_index_request, log_index_result, log_bulk_index_batch ) @@ -33,13 +33,16 @@ class BulkIndexingService: """ self.db_engine = db_engine self.es_client = es_client - self.index_name = DEFAULT_INDEX_NAME + # Index name is now generated dynamically per tenant, no longer stored here def bulk_index(self, tenant_id: str, recreate_index: bool = False, batch_size: int = 500) -> Dict[str, Any]: """执行全量索引""" import time start_time = time.time() + # Generate tenant-specific index name + index_name = get_tenant_index_name(tenant_id) + # 记录请求开始 log_index_request( indexer_logger, @@ -48,7 +51,7 @@ class BulkIndexingService: request_params={ 'recreate_index': recreate_index, 'batch_size': batch_size, - 'index_name': self.index_name + 'index_name': index_name } ) @@ -61,45 +64,45 @@ class BulkIndexingService: 'index_type': 'bulk', 'tenant_id': tenant_id, 'operation': 'load_mapping', - 'index_name': self.index_name + 'index_name': index_name } ) mapping = load_mapping() # 2. 处理索引(删除并重建或创建) if recreate_index: - logger.info(f"[BulkIndexing] Recreating index: {self.index_name}") + logger.info(f"[BulkIndexing] Recreating index: {index_name}") indexer_logger.info( - f"Recreating index: {self.index_name}", + f"Recreating index: {index_name}", extra={ 'index_type': 'bulk', 'tenant_id': tenant_id, 'operation': 'recreate_index', - 'index_name': self.index_name + 'index_name': index_name } ) - if self.es_client.index_exists(self.index_name): - if delete_index_if_exists(self.es_client, self.index_name): - logger.info(f"[BulkIndexing] Deleted existing index: {self.index_name}") + if self.es_client.index_exists(index_name): + if delete_index_if_exists(self.es_client, index_name): + logger.info(f"[BulkIndexing] Deleted existing index: {index_name}") else: - raise Exception(f"Failed to delete index: {self.index_name}") + raise Exception(f"Failed to delete index: {index_name}") - if not self.es_client.index_exists(self.index_name): - logger.info(f"[BulkIndexing] Creating index: {self.index_name}") + if not self.es_client.index_exists(index_name): + logger.info(f"[BulkIndexing] Creating index: {index_name}") indexer_logger.info( - f"Creating index: {self.index_name}", + f"Creating index: {index_name}", extra={ 'index_type': 'bulk', 'tenant_id': tenant_id, 'operation': 'create_index', - 'index_name': self.index_name + 'index_name': index_name } ) - if not self.es_client.create_index(self.index_name, mapping): - raise Exception(f"Failed to create index: {self.index_name}") - logger.info(f"[BulkIndexing] Created index: {self.index_name}") + if not self.es_client.create_index(index_name, mapping): + raise Exception(f"Failed to create index: {index_name}") + logger.info(f"[BulkIndexing] Created index: {index_name}") else: - logger.info(f"[BulkIndexing] Index already exists: {self.index_name}") + logger.info(f"[BulkIndexing] Index already exists: {index_name}") # 3. 转换数据 logger.info(f"[BulkIndexing] Transforming data for tenant_id={tenant_id}") @@ -109,7 +112,7 @@ class BulkIndexingService: 'index_type': 'bulk', 'tenant_id': tenant_id, 'operation': 'transform_data', - 'index_name': self.index_name + 'index_name': index_name } ) transformer = SPUTransformer(self.db_engine, tenant_id) @@ -126,7 +129,7 @@ class BulkIndexingService: success_count=0, failed_count=0, elapsed_time=elapsed_time, - index_name=self.index_name + index_name=index_name ) return { "success": True, @@ -135,7 +138,7 @@ class BulkIndexingService: "failed": 0, "elapsed_time": elapsed_time, "message": "No documents to index", - "index_name": self.index_name, + "index_name": index_name, "tenant_id": tenant_id } @@ -147,13 +150,13 @@ class BulkIndexingService: 'tenant_id': tenant_id, 'operation': 'transform_complete', 'total_count': len(documents), - 'index_name': self.index_name + 'index_name': index_name } ) # 4. 批量导入 logger.info(f"[BulkIndexing] Indexing {len(documents)} documents (batch_size={batch_size})") - indexer = BulkIndexer(self.es_client, self.index_name, batch_size=batch_size, max_retries=3) + indexer = BulkIndexer(self.es_client, index_name, batch_size=batch_size, max_retries=3) results = indexer.index_documents( documents, id_field="spu_id", @@ -171,7 +174,7 @@ class BulkIndexingService: success_count=results['success'], failed_count=results['failed'], elapsed_time=elapsed_time, - index_name=self.index_name, + index_name=index_name, errors=results.get('errors', []) ) @@ -187,7 +190,7 @@ class BulkIndexingService: "indexed": results['success'], "failed": results['failed'], "elapsed_time": elapsed_time, - "index_name": self.index_name, + "index_name": index_name, "tenant_id": tenant_id } @@ -203,7 +206,7 @@ class BulkIndexingService: 'operation': 'request_failed', 'error': error_msg, 'elapsed_time': elapsed_time, - 'index_name': self.index_name + 'index_name': index_name }, exc_info=True ) diff --git a/indexer/incremental_service.py b/indexer/incremental_service.py index 1246f75..1170602 100644 --- a/indexer/incremental_service.py +++ b/indexer/incremental_service.py @@ -9,7 +9,7 @@ import numpy as np from sqlalchemy import text, bindparam from indexer.indexing_utils import load_category_mapping, create_document_transformer from indexer.bulk_indexer import BulkIndexer -from indexer.mapping_generator import DEFAULT_INDEX_NAME +from indexer.mapping_generator import get_tenant_index_name from indexer.indexer_logger import ( get_indexer_logger, log_index_request, log_index_result, log_spu_processing ) @@ -393,7 +393,7 @@ class IncrementalIndexerService: es_client, tenant_id: str, spu_ids: List[str], - index_name: str = DEFAULT_INDEX_NAME, + index_name: str = None, batch_size: int = 100, delete_spu_ids: List[str] = None ) -> Dict[str, Any]: @@ -408,13 +408,16 @@ class IncrementalIndexerService: es_client: Elasticsearch客户端 tenant_id: 租户ID spu_ids: SPU ID列表(要索引的) - index_name: 索引名称 + index_name: 索引名称(可选,如果不提供则根据tenant_id自动生成) batch_size: 批量写入ES的批次大小 delete_spu_ids: 显式指定要删除的SPU ID列表(可选) Returns: 包含成功/失败列表的字典,以及删除结果 """ + # Generate tenant-specific index name if not provided + if index_name is None: + index_name = get_tenant_index_name(tenant_id) # 去重但保持顺序(避免重复DB/翻译/embedding/写ES) if spu_ids: spu_ids = list(dict.fromkeys(spu_ids)) diff --git a/indexer/mapping_generator.py b/indexer/mapping_generator.py index dbd5e28..7627b08 100644 --- a/indexer/mapping_generator.py +++ b/indexer/mapping_generator.py @@ -11,13 +11,26 @@ from pathlib import Path logger = logging.getLogger(__name__) -# Default index name +# Default index name (deprecated, use get_tenant_index_name instead) DEFAULT_INDEX_NAME = "search_products" # Default mapping file path DEFAULT_MAPPING_FILE = Path(__file__).parent.parent / "mappings" / "search_products.json" +def get_tenant_index_name(tenant_id: str) -> str: + """ + Generate index name for a tenant. + + Args: + tenant_id: Tenant ID + + Returns: + Index name in format: search_products_tenant_{tenant_id} + """ + return f"search_products_tenant_{tenant_id}" + + def load_mapping(mapping_file: str = None) -> Dict[str, Any]: """ Load Elasticsearch mapping from JSON file. diff --git a/search/searcher.py b/search/searcher.py index bd746c1..f54b8de 100644 --- a/search/searcher.py +++ b/search/searcher.py @@ -20,6 +20,7 @@ from config.utils import get_match_fields_for_index from context.request_context import RequestContext, RequestContextStage, create_request_context from api.models import FacetResult, FacetValue, FacetConfig from api.result_formatter import ResultFormatter +from indexer.mapping_generator import get_tenant_index_name logger = logging.getLogger(__name__) @@ -93,7 +94,7 @@ class Searcher: """ self.es_client = es_client self.config = config - self.index_name = config.es_index_name + # Index name is now generated dynamically per tenant, no longer stored here self.query_parser = query_parser or QueryParser(config) # Initialize components @@ -107,8 +108,9 @@ class Searcher: self.source_fields = config.query_config.source_fields or [] # Query builder - simplified single-layer architecture + # index_name is no longer needed in query builder since we use tenant-specific indices self.query_builder = ESQueryBuilder( - index_name=self.index_name, + index_name="", # Not used, kept for backward compatibility match_fields=self.match_fields, text_embedding_field=self.text_embedding_field, image_embedding_field=self.image_embedding_field, @@ -271,10 +273,10 @@ class Searcher: # Step 3: Query building context.start_stage(RequestContextStage.QUERY_BUILDING) try: - # Add tenant_id to filters (required) - if filters is None: - filters = {} - filters['tenant_id'] = tenant_id + # Generate tenant-specific index name + index_name = get_tenant_index_name(tenant_id) + + # No longer need to add tenant_id to filters since each tenant has its own index es_query = self.query_builder.build_query( query_text=parsed_query.rewritten_query or parsed_query.normalized_query, @@ -332,8 +334,9 @@ class Searcher: # Step 4: Elasticsearch search context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH) try: + # Use tenant-specific index name es_response = self.es_client.search( - index_name=self.index_name, + index_name=index_name, body=body_for_es, size=size, from_=from_ @@ -496,10 +499,10 @@ class Searcher: if image_vector is None: raise ValueError(f"Failed to encode image: {image_url}") - # Add tenant_id to filters (required) - if filters is None: - filters = {} - filters['tenant_id'] = tenant_id + # Generate tenant-specific index name + index_name = get_tenant_index_name(tenant_id) + + # No longer need to add tenant_id to filters since each tenant has its own index # Build KNN query es_query = { @@ -529,7 +532,7 @@ class Searcher: # Execute search es_response = self.es_client.search( - index_name=self.index_name, + index_name=index_name, body=es_query, size=size ) @@ -576,24 +579,26 @@ class Searcher: """ return self.query_builder.get_domain_summary() - def get_document(self, doc_id: str) -> Optional[Dict[str, Any]]: + def get_document(self, tenant_id: str, doc_id: str) -> Optional[Dict[str, Any]]: """ Get single document by ID. Args: + tenant_id: Tenant ID (required to determine which index to query) doc_id: Document ID Returns: Document or None if not found """ try: + index_name = get_tenant_index_name(tenant_id) response = self.es_client.client.get( - index=self.index_name, + index=index_name, id=doc_id ) return response.get('_source') except Exception as e: - logger.error(f"Failed to get document {doc_id}: {e}", exc_info=True) + logger.error(f"Failed to get document {doc_id} from tenant {tenant_id}: {e}", exc_info=True) return None def _standardize_facets( -- libgit2 0.21.2