Commit e4a39cc844139bd7cd7adfa16c74f9cc09d6c672

Authored by tangwang
1 parent 775db2b0

索引隔离。 不同的tenant_id用不同的索引

api/routes/search.py
... ... @@ -326,18 +326,36 @@ async def instant_search(
326 326  
327 327  
328 328 @router.get("/{doc_id}", response_model=DocumentResponse)
329   -async def get_document(doc_id: str):
  329 +async def get_document(doc_id: str, http_request: Request):
330 330 """
331 331 Get a single document by ID.
  332 +
  333 + Requires tenant_id in header (X-Tenant-ID) or query parameter (tenant_id).
332 334 """
333 335 try:
  336 + # Extract tenant_id (required)
  337 + tenant_id = http_request.headers.get('X-Tenant-ID')
  338 + if not tenant_id:
  339 + # Try to get from query string
  340 + from urllib.parse import parse_qs
  341 + query_string = http_request.url.query
  342 + if query_string:
  343 + params = parse_qs(query_string)
  344 + tenant_id = params.get('tenant_id', [None])[0]
  345 +
  346 + if not tenant_id:
  347 + raise HTTPException(
  348 + status_code=400,
  349 + detail="tenant_id is required. Provide it via header 'X-Tenant-ID' or query parameter 'tenant_id'"
  350 + )
  351 +
334 352 from api.app import get_searcher
335 353 searcher = get_searcher()
336 354  
337   - doc = searcher.get_document(doc_id)
  355 + doc = searcher.get_document(tenant_id=tenant_id, doc_id=doc_id)
338 356  
339 357 if doc is None:
340   - raise HTTPException(status_code=404, detail=f"Document {doc_id} not found")
  358 + raise HTTPException(status_code=404, detail=f"Document {doc_id} not found for tenant {tenant_id}")
341 359  
342 360 return DocumentResponse(id=doc_id, source=doc)
343 361  
... ...
docs/亚马逊到店匠格式转换分析.md
... ... @@ -368,3 +368,4 @@ python scripts/amazon_xlsx_to_shoplazza_xlsx.py \
368 368  
369 369  
370 370  
  371 +
... ...
indexer/bulk_indexing_service.py
... ... @@ -10,7 +10,7 @@ from sqlalchemy import Engine
10 10 from utils.es_client import ESClient
11 11 from indexer.spu_transformer import SPUTransformer
12 12 from indexer.bulk_indexer import BulkIndexer
13   -from indexer.mapping_generator import load_mapping, delete_index_if_exists, DEFAULT_INDEX_NAME
  13 +from indexer.mapping_generator import load_mapping, delete_index_if_exists, get_tenant_index_name
14 14 from indexer.indexer_logger import (
15 15 get_indexer_logger, log_index_request, log_index_result, log_bulk_index_batch
16 16 )
... ... @@ -33,13 +33,16 @@ class BulkIndexingService:
33 33 """
34 34 self.db_engine = db_engine
35 35 self.es_client = es_client
36   - self.index_name = DEFAULT_INDEX_NAME
  36 + # Index name is now generated dynamically per tenant, no longer stored here
37 37  
38 38 def bulk_index(self, tenant_id: str, recreate_index: bool = False, batch_size: int = 500) -> Dict[str, Any]:
39 39 """执行全量索引"""
40 40 import time
41 41 start_time = time.time()
42 42  
  43 + # Generate tenant-specific index name
  44 + index_name = get_tenant_index_name(tenant_id)
  45 +
43 46 # 记录请求开始
44 47 log_index_request(
45 48 indexer_logger,
... ... @@ -48,7 +51,7 @@ class BulkIndexingService:
48 51 request_params={
49 52 'recreate_index': recreate_index,
50 53 'batch_size': batch_size,
51   - 'index_name': self.index_name
  54 + 'index_name': index_name
52 55 }
53 56 )
54 57  
... ... @@ -61,45 +64,45 @@ class BulkIndexingService:
61 64 'index_type': 'bulk',
62 65 'tenant_id': tenant_id,
63 66 'operation': 'load_mapping',
64   - 'index_name': self.index_name
  67 + 'index_name': index_name
65 68 }
66 69 )
67 70 mapping = load_mapping()
68 71  
69 72 # 2. 处理索引(删除并重建或创建)
70 73 if recreate_index:
71   - logger.info(f"[BulkIndexing] Recreating index: {self.index_name}")
  74 + logger.info(f"[BulkIndexing] Recreating index: {index_name}")
72 75 indexer_logger.info(
73   - f"Recreating index: {self.index_name}",
  76 + f"Recreating index: {index_name}",
74 77 extra={
75 78 'index_type': 'bulk',
76 79 'tenant_id': tenant_id,
77 80 'operation': 'recreate_index',
78   - 'index_name': self.index_name
  81 + 'index_name': index_name
79 82 }
80 83 )
81   - if self.es_client.index_exists(self.index_name):
82   - if delete_index_if_exists(self.es_client, self.index_name):
83   - logger.info(f"[BulkIndexing] Deleted existing index: {self.index_name}")
  84 + if self.es_client.index_exists(index_name):
  85 + if delete_index_if_exists(self.es_client, index_name):
  86 + logger.info(f"[BulkIndexing] Deleted existing index: {index_name}")
84 87 else:
85   - raise Exception(f"Failed to delete index: {self.index_name}")
  88 + raise Exception(f"Failed to delete index: {index_name}")
86 89  
87   - if not self.es_client.index_exists(self.index_name):
88   - logger.info(f"[BulkIndexing] Creating index: {self.index_name}")
  90 + if not self.es_client.index_exists(index_name):
  91 + logger.info(f"[BulkIndexing] Creating index: {index_name}")
89 92 indexer_logger.info(
90   - f"Creating index: {self.index_name}",
  93 + f"Creating index: {index_name}",
91 94 extra={
92 95 'index_type': 'bulk',
93 96 'tenant_id': tenant_id,
94 97 'operation': 'create_index',
95   - 'index_name': self.index_name
  98 + 'index_name': index_name
96 99 }
97 100 )
98   - if not self.es_client.create_index(self.index_name, mapping):
99   - raise Exception(f"Failed to create index: {self.index_name}")
100   - logger.info(f"[BulkIndexing] Created index: {self.index_name}")
  101 + if not self.es_client.create_index(index_name, mapping):
  102 + raise Exception(f"Failed to create index: {index_name}")
  103 + logger.info(f"[BulkIndexing] Created index: {index_name}")
101 104 else:
102   - logger.info(f"[BulkIndexing] Index already exists: {self.index_name}")
  105 + logger.info(f"[BulkIndexing] Index already exists: {index_name}")
103 106  
104 107 # 3. 转换数据
105 108 logger.info(f"[BulkIndexing] Transforming data for tenant_id={tenant_id}")
... ... @@ -109,7 +112,7 @@ class BulkIndexingService:
109 112 'index_type': 'bulk',
110 113 'tenant_id': tenant_id,
111 114 'operation': 'transform_data',
112   - 'index_name': self.index_name
  115 + 'index_name': index_name
113 116 }
114 117 )
115 118 transformer = SPUTransformer(self.db_engine, tenant_id)
... ... @@ -126,7 +129,7 @@ class BulkIndexingService:
126 129 success_count=0,
127 130 failed_count=0,
128 131 elapsed_time=elapsed_time,
129   - index_name=self.index_name
  132 + index_name=index_name
130 133 )
131 134 return {
132 135 "success": True,
... ... @@ -135,7 +138,7 @@ class BulkIndexingService:
135 138 "failed": 0,
136 139 "elapsed_time": elapsed_time,
137 140 "message": "No documents to index",
138   - "index_name": self.index_name,
  141 + "index_name": index_name,
139 142 "tenant_id": tenant_id
140 143 }
141 144  
... ... @@ -147,13 +150,13 @@ class BulkIndexingService:
147 150 'tenant_id': tenant_id,
148 151 'operation': 'transform_complete',
149 152 'total_count': len(documents),
150   - 'index_name': self.index_name
  153 + 'index_name': index_name
151 154 }
152 155 )
153 156  
154 157 # 4. 批量导入
155 158 logger.info(f"[BulkIndexing] Indexing {len(documents)} documents (batch_size={batch_size})")
156   - indexer = BulkIndexer(self.es_client, self.index_name, batch_size=batch_size, max_retries=3)
  159 + indexer = BulkIndexer(self.es_client, index_name, batch_size=batch_size, max_retries=3)
157 160 results = indexer.index_documents(
158 161 documents,
159 162 id_field="spu_id",
... ... @@ -171,7 +174,7 @@ class BulkIndexingService:
171 174 success_count=results['success'],
172 175 failed_count=results['failed'],
173 176 elapsed_time=elapsed_time,
174   - index_name=self.index_name,
  177 + index_name=index_name,
175 178 errors=results.get('errors', [])
176 179 )
177 180  
... ... @@ -187,7 +190,7 @@ class BulkIndexingService:
187 190 "indexed": results['success'],
188 191 "failed": results['failed'],
189 192 "elapsed_time": elapsed_time,
190   - "index_name": self.index_name,
  193 + "index_name": index_name,
191 194 "tenant_id": tenant_id
192 195 }
193 196  
... ... @@ -203,7 +206,7 @@ class BulkIndexingService:
203 206 'operation': 'request_failed',
204 207 'error': error_msg,
205 208 'elapsed_time': elapsed_time,
206   - 'index_name': self.index_name
  209 + 'index_name': index_name
207 210 },
208 211 exc_info=True
209 212 )
... ...
indexer/incremental_service.py
... ... @@ -9,7 +9,7 @@ import numpy as np
9 9 from sqlalchemy import text, bindparam
10 10 from indexer.indexing_utils import load_category_mapping, create_document_transformer
11 11 from indexer.bulk_indexer import BulkIndexer
12   -from indexer.mapping_generator import DEFAULT_INDEX_NAME
  12 +from indexer.mapping_generator import get_tenant_index_name
13 13 from indexer.indexer_logger import (
14 14 get_indexer_logger, log_index_request, log_index_result, log_spu_processing
15 15 )
... ... @@ -393,7 +393,7 @@ class IncrementalIndexerService:
393 393 es_client,
394 394 tenant_id: str,
395 395 spu_ids: List[str],
396   - index_name: str = DEFAULT_INDEX_NAME,
  396 + index_name: str = None,
397 397 batch_size: int = 100,
398 398 delete_spu_ids: List[str] = None
399 399 ) -> Dict[str, Any]:
... ... @@ -408,13 +408,16 @@ class IncrementalIndexerService:
408 408 es_client: Elasticsearch客户端
409 409 tenant_id: 租户ID
410 410 spu_ids: SPU ID列表(要索引的)
411   - index_name: 索引名称
  411 + index_name: 索引名称(可选,如果不提供则根据tenant_id自动生成)
412 412 batch_size: 批量写入ES的批次大小
413 413 delete_spu_ids: 显式指定要删除的SPU ID列表(可选)
414 414  
415 415 Returns:
416 416 包含成功/失败列表的字典,以及删除结果
417 417 """
  418 + # Generate tenant-specific index name if not provided
  419 + if index_name is None:
  420 + index_name = get_tenant_index_name(tenant_id)
418 421 # 去重但保持顺序(避免重复DB/翻译/embedding/写ES)
419 422 if spu_ids:
420 423 spu_ids = list(dict.fromkeys(spu_ids))
... ...
indexer/mapping_generator.py
... ... @@ -11,13 +11,26 @@ from pathlib import Path
11 11  
12 12 logger = logging.getLogger(__name__)
13 13  
14   -# Default index name
  14 +# Default index name (deprecated, use get_tenant_index_name instead)
15 15 DEFAULT_INDEX_NAME = "search_products"
16 16  
17 17 # Default mapping file path
18 18 DEFAULT_MAPPING_FILE = Path(__file__).parent.parent / "mappings" / "search_products.json"
19 19  
20 20  
  21 +def get_tenant_index_name(tenant_id: str) -> str:
  22 + """
  23 + Generate index name for a tenant.
  24 +
  25 + Args:
  26 + tenant_id: Tenant ID
  27 +
  28 + Returns:
  29 + Index name in format: search_products_tenant_{tenant_id}
  30 + """
  31 + return f"search_products_tenant_{tenant_id}"
  32 +
  33 +
21 34 def load_mapping(mapping_file: str = None) -> Dict[str, Any]:
22 35 """
23 36 Load Elasticsearch mapping from JSON file.
... ...
search/searcher.py
... ... @@ -20,6 +20,7 @@ from config.utils import get_match_fields_for_index
20 20 from context.request_context import RequestContext, RequestContextStage, create_request_context
21 21 from api.models import FacetResult, FacetValue, FacetConfig
22 22 from api.result_formatter import ResultFormatter
  23 +from indexer.mapping_generator import get_tenant_index_name
23 24  
24 25 logger = logging.getLogger(__name__)
25 26  
... ... @@ -93,7 +94,7 @@ class Searcher:
93 94 """
94 95 self.es_client = es_client
95 96 self.config = config
96   - self.index_name = config.es_index_name
  97 + # Index name is now generated dynamically per tenant, no longer stored here
97 98 self.query_parser = query_parser or QueryParser(config)
98 99  
99 100 # Initialize components
... ... @@ -107,8 +108,9 @@ class Searcher:
107 108 self.source_fields = config.query_config.source_fields or []
108 109  
109 110 # Query builder - simplified single-layer architecture
  111 + # index_name is no longer needed in query builder since we use tenant-specific indices
110 112 self.query_builder = ESQueryBuilder(
111   - index_name=self.index_name,
  113 + index_name="", # Not used, kept for backward compatibility
112 114 match_fields=self.match_fields,
113 115 text_embedding_field=self.text_embedding_field,
114 116 image_embedding_field=self.image_embedding_field,
... ... @@ -271,10 +273,10 @@ class Searcher:
271 273 # Step 3: Query building
272 274 context.start_stage(RequestContextStage.QUERY_BUILDING)
273 275 try:
274   - # Add tenant_id to filters (required)
275   - if filters is None:
276   - filters = {}
277   - filters['tenant_id'] = tenant_id
  276 + # Generate tenant-specific index name
  277 + index_name = get_tenant_index_name(tenant_id)
  278 +
  279 + # No longer need to add tenant_id to filters since each tenant has its own index
278 280  
279 281 es_query = self.query_builder.build_query(
280 282 query_text=parsed_query.rewritten_query or parsed_query.normalized_query,
... ... @@ -332,8 +334,9 @@ class Searcher:
332 334 # Step 4: Elasticsearch search
333 335 context.start_stage(RequestContextStage.ELASTICSEARCH_SEARCH)
334 336 try:
  337 + # Use tenant-specific index name
335 338 es_response = self.es_client.search(
336   - index_name=self.index_name,
  339 + index_name=index_name,
337 340 body=body_for_es,
338 341 size=size,
339 342 from_=from_
... ... @@ -496,10 +499,10 @@ class Searcher:
496 499 if image_vector is None:
497 500 raise ValueError(f"Failed to encode image: {image_url}")
498 501  
499   - # Add tenant_id to filters (required)
500   - if filters is None:
501   - filters = {}
502   - filters['tenant_id'] = tenant_id
  502 + # Generate tenant-specific index name
  503 + index_name = get_tenant_index_name(tenant_id)
  504 +
  505 + # No longer need to add tenant_id to filters since each tenant has its own index
503 506  
504 507 # Build KNN query
505 508 es_query = {
... ... @@ -529,7 +532,7 @@ class Searcher:
529 532  
530 533 # Execute search
531 534 es_response = self.es_client.search(
532   - index_name=self.index_name,
  535 + index_name=index_name,
533 536 body=es_query,
534 537 size=size
535 538 )
... ... @@ -576,24 +579,26 @@ class Searcher:
576 579 """
577 580 return self.query_builder.get_domain_summary()
578 581  
579   - def get_document(self, doc_id: str) -> Optional[Dict[str, Any]]:
  582 + def get_document(self, tenant_id: str, doc_id: str) -> Optional[Dict[str, Any]]:
580 583 """
581 584 Get single document by ID.
582 585  
583 586 Args:
  587 + tenant_id: Tenant ID (required to determine which index to query)
584 588 doc_id: Document ID
585 589  
586 590 Returns:
587 591 Document or None if not found
588 592 """
589 593 try:
  594 + index_name = get_tenant_index_name(tenant_id)
590 595 response = self.es_client.client.get(
591   - index=self.index_name,
  596 + index=index_name,
592 597 id=doc_id
593 598 )
594 599 return response.get('_source')
595 600 except Exception as e:
596   - logger.error(f"Failed to get document {doc_id}: {e}", exc_info=True)
  601 + logger.error(f"Failed to get document {doc_id} from tenant {tenant_id}: {e}", exc_info=True)
597 602 return None
598 603  
599 604 def _standardize_facets(
... ...