Commit 59b0a3420f8a8faf4f1628502140d66c28134a68

Authored by tangwang
1 parent 5dcddc06

创建手写 mapping JSON

mappings/search_products.json - 完整的ES索引配置(settings + mappings)
基于 docs/索引字段说明v2-mapping结构.md
简化 mapping_generator.py
移除所有config依赖
直接使用 load_mapping() 从JSON文件加载
保留工具函数:create_index_if_not_exists, delete_index_if_exists, update_mapping
更新数据导入脚本
scripts/ingest_shoplazza.py - 移除ConfigLoader依赖
直接使用 load_mapping() 和 DEFAULT_INDEX_NAME
更新indexer模块
indexer/__init__.py - 更新导出
indexer/bulk_indexer.py - 简化IndexingPipeline,移除config依赖
创建查询配置常量
search/query_config.py - 硬编码字段列表和配置项

使用方式
创建索引:
from indexer.mapping_generator import load_mapping, create_index_if_not_existsfrom utils.es_client import ESClientes_client = ESClient(hosts=["http://localhost:9200"])mapping = load_mapping()create_index_if_not_exists(es_client, "search_products", mapping)
数据导入:
python scripts/ingest_shoplazza.py \    --db-host localhost \    --db-database saas \    --db-username root \    --db-password password \    --tenant-id "1" \    --es-host http://localhost:9200 \    --recreate

注意事项
修改mapping:直接编辑 mappings/search_products.json
字段映射:spu_transformer.py 中硬编码,与mapping保持一致
config目录:保留但不再使用,可后续清理
search模块:仍依赖config
indexer/__init__.py
1 1 """Indexer package initialization."""
2 2  
3   -from .mapping_generator import MappingGenerator, create_index_if_not_exists, delete_index_if_exists
4   -from .data_transformer import DataTransformer
5   -from .bulk_indexer import BulkIndexer, IndexingPipeline
  3 +from .mapping_generator import load_mapping, create_index_if_not_exists, delete_index_if_exists, DEFAULT_INDEX_NAME
  4 +from .spu_transformer import SPUTransformer
  5 +from .bulk_indexer import BulkIndexer
6 6  
7 7 __all__ = [
8   - 'MappingGenerator',
  8 + 'load_mapping',
9 9 'create_index_if_not_exists',
10 10 'delete_index_if_exists',
11   - 'DataTransformer',
  11 + 'DEFAULT_INDEX_NAME',
  12 + 'SPUTransformer',
12 13 'BulkIndexer',
13   - 'IndexingPipeline',
14 14 ]
... ...
indexer/bulk_indexer.py
... ... @@ -7,7 +7,7 @@ Handles batch indexing of documents with progress tracking and error handling.
7 7 from typing import List, Dict, Any, Optional
8 8 from elasticsearch.helpers import bulk, BulkIndexError
9 9 from utils.es_client import ESClient
10   -from indexer import MappingGenerator
  10 +from indexer.mapping_generator import load_mapping, DEFAULT_INDEX_NAME
11 11 import time
12 12  
13 13  
... ... @@ -203,23 +203,23 @@ class IndexingPipeline:
203 203  
204 204 def __init__(
205 205 self,
206   - config,
207 206 es_client: ESClient,
208 207 data_transformer,
  208 + index_name: str = None,
209 209 recreate_index: bool = False
210 210 ):
211 211 """
212 212 Initialize indexing pipeline.
213 213  
214 214 Args:
215   - config: Search configuration
216 215 es_client: Elasticsearch client
217 216 data_transformer: Data transformer instance
  217 + index_name: Index name (defaults to DEFAULT_INDEX_NAME)
218 218 recreate_index: Whether to recreate index if exists
219 219 """
220   - self.config = config
221 220 self.es_client = es_client
222 221 self.transformer = data_transformer
  222 + self.index_name = index_name or DEFAULT_INDEX_NAME
223 223 self.recreate_index = recreate_index
224 224  
225 225 def run(self, df, batch_size: int = 100) -> Dict[str, Any]:
... ... @@ -233,22 +233,19 @@ class IndexingPipeline:
233 233 Returns:
234 234 Indexing statistics
235 235 """
236   - # Generate and create index
237   - mapping_gen = MappingGenerator(self.config)
238   - mapping = mapping_gen.generate_mapping()
239   -
240   - index_name = self.config.es_index_name
  236 + # Load and create index
  237 + mapping = load_mapping()
241 238  
242 239 if self.recreate_index:
243   - if self.es_client.index_exists(index_name):
244   - print(f"[IndexingPipeline] Deleting existing index: {index_name}")
245   - self.es_client.delete_index(index_name)
  240 + if self.es_client.index_exists(self.index_name):
  241 + print(f"[IndexingPipeline] Deleting existing index: {self.index_name}")
  242 + self.es_client.delete_index(self.index_name)
246 243  
247   - if not self.es_client.index_exists(index_name):
248   - print(f"[IndexingPipeline] Creating index: {index_name}")
249   - self.es_client.create_index(index_name, mapping)
  244 + if not self.es_client.index_exists(self.index_name):
  245 + print(f"[IndexingPipeline] Creating index: {self.index_name}")
  246 + self.es_client.create_index(self.index_name, mapping)
250 247 else:
251   - print(f"[IndexingPipeline] Using existing index: {index_name}")
  248 + print(f"[IndexingPipeline] Using existing index: {self.index_name}")
252 249  
253 250 # Transform data
254 251 print(f"[IndexingPipeline] Transforming {len(df)} documents...")
... ... @@ -256,7 +253,7 @@ class IndexingPipeline:
256 253 print(f"[IndexingPipeline] Transformed {len(documents)} documents")
257 254  
258 255 # Bulk index
259   - indexer = BulkIndexer(self.es_client, index_name, batch_size=500)
  256 + indexer = BulkIndexer(self.es_client, self.index_name, batch_size=500)
260 257 results = indexer.index_documents(documents, id_field="skuId")
261 258  
262 259 return results
... ...
indexer/mapping_generator.py
1 1 """
2   -Elasticsearch mapping generator.
  2 +Elasticsearch mapping loader.
3 3  
4   -Generates Elasticsearch index mappings from search configuration.
  4 +Loads Elasticsearch index mapping from JSON file.
5 5 """
6 6  
7 7 from typing import Dict, Any
  8 +import json
8 9 import logging
9   -
10   -from config import (
11   - SearchConfig,
12   - FieldConfig,
13   - get_es_mapping_for_field,
14   - get_default_analyzers,
15   - get_default_similarity
16   -)
  10 +from pathlib import Path
17 11  
18 12 logger = logging.getLogger(__name__)
19 13  
  14 +# Default index name
  15 +DEFAULT_INDEX_NAME = "search_products"
  16 +
  17 +# Default mapping file path
  18 +DEFAULT_MAPPING_FILE = Path(__file__).parent.parent / "mappings" / "search_products.json"
  19 +
  20 +
  21 +def load_mapping(mapping_file: str = None) -> Dict[str, Any]:
  22 + """
  23 + Load Elasticsearch mapping from JSON file.
  24 +
  25 + Args:
  26 + mapping_file: Path to mapping JSON file. If None, uses default.
  27 +
  28 + Returns:
  29 + Dictionary containing index configuration (settings + mappings)
  30 +
  31 + Raises:
  32 + FileNotFoundError: If mapping file doesn't exist
  33 + json.JSONDecodeError: If mapping file is invalid JSON
  34 + """
  35 + if mapping_file is None:
  36 + mapping_file = str(DEFAULT_MAPPING_FILE)
  37 +
  38 + mapping_path = Path(mapping_file)
  39 + if not mapping_path.exists():
  40 + raise FileNotFoundError(f"Mapping file not found: {mapping_path}")
20 41  
21   -class MappingGenerator:
22   - """Generates Elasticsearch mapping from search configuration."""
23   -
24   - def __init__(self, config: SearchConfig):
25   - self.config = config
26   -
27   - def generate_mapping(self) -> Dict[str, Any]:
28   - """
29   - Generate complete Elasticsearch index configuration including
30   - settings and mappings.
31   -
32   - Returns:
33   - Dictionary containing index configuration
34   - """
35   - return {
36   - "settings": self._generate_settings(),
37   - "mappings": self._generate_mappings()
38   - }
39   -
40   - def _generate_settings(self) -> Dict[str, Any]:
41   - """Generate index settings."""
42   - settings = {
43   - "number_of_shards": self.config.es_settings.get("number_of_shards", 1),
44   - "number_of_replicas": self.config.es_settings.get("number_of_replicas", 0),
45   - "refresh_interval": self.config.es_settings.get("refresh_interval", "30s"),
46   - }
47   -
48   - # Add similarity configuration (modified BM25)
49   - similarity_config = get_default_similarity()
50   - settings.update(similarity_config)
51   -
52   - # Add analyzer configuration
53   - analyzer_config = get_default_analyzers()
54   - settings.update(analyzer_config)
55   -
56   - # Merge any custom settings from config
57   - for key, value in self.config.es_settings.items():
58   - if key not in ["number_of_shards", "number_of_replicas", "refresh_interval"]:
59   - settings[key] = value
60   -
61   - return settings
62   -
63   - def _generate_mappings(self) -> Dict[str, Any]:
64   - """Generate field mappings."""
65   - properties = {}
66   -
67   - for field in self.config.fields:
68   - field_mapping = get_es_mapping_for_field(field)
69   - properties[field.name] = field_mapping
70   -
71   - return {
72   - "properties": properties
73   - }
74   -
75   - def get_default_domain_fields(self) -> list:
76   - """
77   - Get list of fields in the 'default' domain.
78   -
79   - Returns:
80   - List of field names
81   - """
82   - for index in self.config.indexes:
83   - if index.name == "default":
84   - return index.fields
85   - return []
86   -
87   - def get_text_embedding_field(self) -> str:
88   - """
89   - Get the primary text embedding field name.
90   -
91   - Returns:
92   - Field name or empty string if not configured
93   - """
94   - return self.config.query_config.text_embedding_field or ""
95   -
96   - def get_image_embedding_field(self) -> str:
97   - """
98   - Get the primary image embedding field name.
99   -
100   - Returns:
101   - Field name or empty string if not configured
102   - """
103   - return self.config.query_config.image_embedding_field or ""
104   -
105   - def get_field_by_name(self, field_name: str) -> FieldConfig:
106   - """
107   - Get field configuration by name.
108   -
109   - Args:
110   - field_name: Field name
111   -
112   - Returns:
113   - FieldConfig object or None if not found
114   - """
115   - for field in self.config.fields:
116   - if field.name == field_name:
117   - return field
118   - return None
119   -
120   - def get_match_fields_for_domain(self, domain_name: str = "default") -> list:
121   - """
122   - Get list of text fields for matching in a domain.
123   -
124   - Args:
125   - domain_name: Name of the query domain
126   -
127   - Returns:
128   - List of field names with optional boost (e.g., ["name^2.0", "category^1.5"])
129   - """
130   - for index in self.config.indexes:
131   - if index.name == domain_name:
132   - result = []
133   - for field_name in index.fields:
134   - field = self.get_field_by_name(field_name)
135   - if field and field.boost != 1.0:
136   - result.append(f"{field_name}^{field.boost}")
137   - else:
138   - result.append(field_name)
139   - return result
140   - return []
141   -
142   -
143   -def create_index_if_not_exists(es_client, index_name: str, mapping: Dict[str, Any]) -> bool:
  42 + with open(mapping_path, 'r', encoding='utf-8') as f:
  43 + mapping = json.load(f)
  44 +
  45 + logger.info(f"Loaded mapping from {mapping_path}")
  46 + return mapping
  47 +
  48 +
  49 +def create_index_if_not_exists(es_client, index_name: str, mapping: Dict[str, Any] = None) -> bool:
144 50 """
145 51 Create Elasticsearch index if it doesn't exist.
146 52  
147 53 Args:
148 54 es_client: Elasticsearch client instance
149 55 index_name: Name of the index to create
150   - mapping: Index mapping configuration
  56 + mapping: Index mapping configuration. If None, loads from default file.
151 57  
152 58 Returns:
153 59 True if index was created, False if it already exists
... ... @@ -156,6 +62,9 @@ def create_index_if_not_exists(es_client, index_name: str, mapping: Dict[str, An
156 62 logger.info(f"Index '{index_name}' already exists")
157 63 return False
158 64  
  65 + if mapping is None:
  66 + mapping = load_mapping()
  67 +
159 68 es_client.indices.create(index=index_name, body=mapping)
160 69 logger.info(f"Index '{index_name}' created successfully")
161 70 return True
... ...
mappings/README.md 0 → 100644
... ... @@ -0,0 +1,36 @@
  1 +# ES Mapping Configuration
  2 +
  3 +## 概述
  4 +
  5 +所有租户共享同一个ES mapping结构,直接使用手写的JSON文件,无需通过config.yaml生成。
  6 +
  7 +## Mapping文件
  8 +
  9 +- `search_products.json`: 完整的ES索引配置,包括settings和mappings
  10 +
  11 +## 使用方式
  12 +
  13 +### 创建索引
  14 +
  15 +```python
  16 +from indexer.mapping_generator import load_mapping, create_index_if_not_exists
  17 +from utils.es_client import ESClient
  18 +
  19 +es_client = ESClient(hosts=["http://localhost:9200"])
  20 +mapping = load_mapping() # 从mappings/search_products.json加载
  21 +create_index_if_not_exists(es_client, "search_products", mapping)
  22 +```
  23 +
  24 +### 修改Mapping
  25 +
  26 +直接编辑 `mappings/search_products.json` 文件,然后重新创建索引。
  27 +
  28 +注意:ES不支持修改已有字段的mapping类型,只能添加新字段。如需修改字段类型,需要:
  29 +1. 删除旧索引
  30 +2. 使用新mapping创建索引
  31 +3. 重新导入数据
  32 +
  33 +## 字段说明
  34 +
  35 +参考 `docs/索引字段说明v2-mapping结构.md`
  36 +
... ...
scripts/ingest_shoplazza.py
... ... @@ -16,9 +16,8 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
16 16 from utils.db_connector import create_db_connection
17 17 from utils.es_client import ESClient
18 18 from indexer.spu_transformer import SPUTransformer
19   -from indexer.mapping_generator import MappingGenerator
  19 +from indexer.mapping_generator import load_mapping, DEFAULT_INDEX_NAME
20 20 from indexer.bulk_indexer import BulkIndexer
21   -from config import ConfigLoader
22 21  
23 22  
24 23 def main():
... ... @@ -43,25 +42,15 @@ def main():
43 42  
44 43 print(f"Starting Shoplazza data ingestion for tenant: {args.tenant_id}")
45 44  
46   - # Load unified configuration
47   - config_loader = ConfigLoader("config/config.yaml")
  45 + # Load mapping from JSON file
48 46 try:
49   - config = config_loader.load_config()
50   - print(f"Loaded configuration: {config.es_index_name}")
  47 + mapping = load_mapping()
  48 + print(f"Loaded mapping configuration")
51 49 except Exception as e:
52   - print(f"ERROR: Failed to load configuration: {e}")
  50 + print(f"ERROR: Failed to load mapping: {e}")
53 51 return 1
54 52  
55   - # Validate tenant_id field exists
56   - tenant_id_field = None
57   - for field in config.fields:
58   - if field.name == "tenant_id":
59   - tenant_id_field = field
60   - break
61   -
62   - if not tenant_id_field:
63   - print("ERROR: Configuration must include 'tenant_id' field")
64   - return 1
  53 + index_name = DEFAULT_INDEX_NAME
65 54  
66 55 # Connect to MySQL
67 56 print(f"Connecting to MySQL: {args.db_host}:{args.db_port}/{args.db_database}")
... ... @@ -77,13 +66,10 @@ def main():
77 66 print(f"ERROR: Failed to connect to MySQL: {e}")
78 67 return 1
79 68  
80   - # Connect to Elasticsearch (use unified config loading)
81   - from config.env_config import ES_CONFIG
82   -
83   - # Use provided es_host or fallback to config
84   - es_host = args.es_host or ES_CONFIG.get('host', 'http://localhost:9200')
85   - es_username = ES_CONFIG.get('username')
86   - es_password = ES_CONFIG.get('password')
  69 + # Connect to Elasticsearch
  70 + es_host = args.es_host
  71 + es_username = os.environ.get('ES_USERNAME')
  72 + es_password = os.environ.get('ES_PASSWORD')
87 73  
88 74 print(f"Connecting to Elasticsearch: {es_host}")
89 75 if es_username and es_password:
... ... @@ -96,11 +82,7 @@ def main():
96 82 print(f"ERROR: Cannot connect to Elasticsearch at {es_host}")
97 83 return 1
98 84  
99   - # Generate and create index
100   - mapping_gen = MappingGenerator(config)
101   - mapping = mapping_gen.generate_mapping()
102   - index_name = config.es_index_name
103   -
  85 + # Create index if needed
104 86 if args.recreate:
105 87 if es_client.index_exists(index_name):
106 88 print(f"Deleting existing index: {index_name}")
... ...
search/query_config.py 0 → 100644
... ... @@ -0,0 +1,114 @@
  1 +"""
  2 +Query configuration constants.
  3 +
  4 +Since all tenants share the same ES mapping, we can hardcode field lists here.
  5 +"""
  6 +
  7 +import os
  8 +from typing import Dict, List
  9 +
  10 +# Default index name
  11 +DEFAULT_INDEX_NAME = "search_products"
  12 +
  13 +# Text embedding field
  14 +TEXT_EMBEDDING_FIELD = "title_embedding"
  15 +
  16 +# Image embedding field
  17 +IMAGE_EMBEDDING_FIELD = "image_embedding"
  18 +
  19 +# Default match fields for text search (with boost)
  20 +DEFAULT_MATCH_FIELDS = [
  21 + "title_zh^3.0",
  22 + "brief_zh^1.5",
  23 + "description_zh^1.0",
  24 + "vendor_zh^1.5",
  25 + "tags^1.0",
  26 + "category_path_zh^1.5",
  27 + "category_name_zh^1.5"
  28 +]
  29 +
  30 +# Domain-specific match fields
  31 +DOMAIN_FIELDS: Dict[str, List[str]] = {
  32 + "default": DEFAULT_MATCH_FIELDS,
  33 + "title": ["title_zh^2.0"],
  34 + "vendor": ["vendor_zh^1.5"],
  35 + "category": ["category_path_zh^1.5", "category_name_zh^1.5"],
  36 + "tags": ["tags^1.0"]
  37 +}
  38 +
  39 +# Source fields to return in search results
  40 +SOURCE_FIELDS = [
  41 + "tenant_id",
  42 + "spu_id",
  43 + "title_zh",
  44 + "brief_zh",
  45 + "description_zh",
  46 + "vendor_zh",
  47 + "tags",
  48 + "image_url",
  49 + "category_path_zh",
  50 + "category_name_zh",
  51 + "category_id",
  52 + "category_name",
  53 + "category_level",
  54 + "category1_name",
  55 + "category2_name",
  56 + "category3_name",
  57 + "option1_name",
  58 + "option2_name",
  59 + "option3_name",
  60 + "min_price",
  61 + "max_price",
  62 + "compare_at_price",
  63 + "total_inventory",
  64 + "create_time",
  65 + "update_time",
  66 + "skus",
  67 + "specifications"
  68 +]
  69 +
  70 +# Query processing settings
  71 +ENABLE_TRANSLATION = os.environ.get("ENABLE_TRANSLATION", "true").lower() == "true"
  72 +ENABLE_TEXT_EMBEDDING = os.environ.get("ENABLE_TEXT_EMBEDDING", "true").lower() == "true"
  73 +TRANSLATION_API_KEY = os.environ.get("DEEPL_API_KEY")
  74 +TRANSLATION_SERVICE = "deepl"
  75 +
  76 +# Ranking expression (currently disabled)
  77 +RANKING_EXPRESSION = "bm25() + 0.2*text_embedding_relevance()"
  78 +
  79 +# Function score config
  80 +FUNCTION_SCORE_CONFIG = {
  81 + "score_mode": "sum",
  82 + "boost_mode": "multiply",
  83 + "functions": []
  84 +}
  85 +
  86 +# Load rewrite dictionary from file if exists
  87 +def load_rewrite_dictionary() -> Dict[str, str]:
  88 + """Load query rewrite dictionary from file."""
  89 + rewrite_file = os.path.join(
  90 + os.path.dirname(os.path.dirname(__file__)),
  91 + "config",
  92 + "query_rewrite.dict"
  93 + )
  94 +
  95 + if not os.path.exists(rewrite_file):
  96 + return {}
  97 +
  98 + rewrite_dict = {}
  99 + try:
  100 + with open(rewrite_file, 'r', encoding='utf-8') as f:
  101 + for line in f:
  102 + line = line.strip()
  103 + if not line or line.startswith('#'):
  104 + continue
  105 + parts = line.split('\t')
  106 + if len(parts) == 2:
  107 + rewrite_dict[parts[0].strip()] = parts[1].strip()
  108 + except Exception as e:
  109 + print(f"Warning: Failed to load rewrite dictionary: {e}")
  110 +
  111 + return rewrite_dict
  112 +
  113 +REWRITE_DICTIONARY = load_rewrite_dictionary()
  114 +
... ...