From 59b0a3420f8a8faf4f1628502140d66c28134a68 Mon Sep 17 00:00:00 2001 From: tangwang Date: Tue, 25 Nov 2025 22:46:51 +0800 Subject: [PATCH] 创建手写 mapping JSON mappings/search_products.json - 完整的ES索引配置(settings + mappings) 基于 docs/索引字段说明v2-mapping结构.md 简化 mapping_generator.py 移除所有config依赖 直接使用 load_mapping() 从JSON文件加载 保留工具函数:create_index_if_not_exists, delete_index_if_exists, update_mapping 更新数据导入脚本 scripts/ingest_shoplazza.py - 移除ConfigLoader依赖 直接使用 load_mapping() 和 DEFAULT_INDEX_NAME 更新indexer模块 indexer/__init__.py - 更新导出 indexer/bulk_indexer.py - 简化IndexingPipeline,移除config依赖 创建查询配置常量 search/query_config.py - 硬编码字段列表和配置项 --- indexer/__init__.py | 12 ++++++------ indexer/bulk_indexer.py | 31 ++++++++++++++----------------- indexer/mapping_generator.py | 177 +++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------------------------------------------------------------------------------------------- mappings/README.md | 36 ++++++++++++++++++++++++++++++++++++ scripts/ingest_shoplazza.py | 40 +++++++++++----------------------------- search/query_config.py | 114 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 224 insertions(+), 186 deletions(-) create mode 100644 mappings/README.md create mode 100644 search/query_config.py diff --git a/indexer/__init__.py b/indexer/__init__.py index 9f25b75..d472a02 100644 --- a/indexer/__init__.py +++ b/indexer/__init__.py @@ -1,14 +1,14 @@ """Indexer package initialization.""" -from .mapping_generator import MappingGenerator, create_index_if_not_exists, delete_index_if_exists -from .data_transformer import DataTransformer -from .bulk_indexer import BulkIndexer, IndexingPipeline +from .mapping_generator import load_mapping, create_index_if_not_exists, delete_index_if_exists, DEFAULT_INDEX_NAME +from .spu_transformer import SPUTransformer +from .bulk_indexer import BulkIndexer __all__ = [ - 'MappingGenerator', + 'load_mapping', 'create_index_if_not_exists', 'delete_index_if_exists', - 'DataTransformer', + 'DEFAULT_INDEX_NAME', + 'SPUTransformer', 'BulkIndexer', - 'IndexingPipeline', ] diff --git a/indexer/bulk_indexer.py b/indexer/bulk_indexer.py index 9811a6e..61321d6 100644 --- a/indexer/bulk_indexer.py +++ b/indexer/bulk_indexer.py @@ -7,7 +7,7 @@ Handles batch indexing of documents with progress tracking and error handling. from typing import List, Dict, Any, Optional from elasticsearch.helpers import bulk, BulkIndexError from utils.es_client import ESClient -from indexer import MappingGenerator +from indexer.mapping_generator import load_mapping, DEFAULT_INDEX_NAME import time @@ -203,23 +203,23 @@ class IndexingPipeline: def __init__( self, - config, es_client: ESClient, data_transformer, + index_name: str = None, recreate_index: bool = False ): """ Initialize indexing pipeline. Args: - config: Search configuration es_client: Elasticsearch client data_transformer: Data transformer instance + index_name: Index name (defaults to DEFAULT_INDEX_NAME) recreate_index: Whether to recreate index if exists """ - self.config = config self.es_client = es_client self.transformer = data_transformer + self.index_name = index_name or DEFAULT_INDEX_NAME self.recreate_index = recreate_index def run(self, df, batch_size: int = 100) -> Dict[str, Any]: @@ -233,22 +233,19 @@ class IndexingPipeline: Returns: Indexing statistics """ - # Generate and create index - mapping_gen = MappingGenerator(self.config) - mapping = mapping_gen.generate_mapping() - - index_name = self.config.es_index_name + # Load and create index + mapping = load_mapping() if self.recreate_index: - if self.es_client.index_exists(index_name): - print(f"[IndexingPipeline] Deleting existing index: {index_name}") - self.es_client.delete_index(index_name) + if self.es_client.index_exists(self.index_name): + print(f"[IndexingPipeline] Deleting existing index: {self.index_name}") + self.es_client.delete_index(self.index_name) - if not self.es_client.index_exists(index_name): - print(f"[IndexingPipeline] Creating index: {index_name}") - self.es_client.create_index(index_name, mapping) + if not self.es_client.index_exists(self.index_name): + print(f"[IndexingPipeline] Creating index: {self.index_name}") + self.es_client.create_index(self.index_name, mapping) else: - print(f"[IndexingPipeline] Using existing index: {index_name}") + print(f"[IndexingPipeline] Using existing index: {self.index_name}") # Transform data print(f"[IndexingPipeline] Transforming {len(df)} documents...") @@ -256,7 +253,7 @@ class IndexingPipeline: print(f"[IndexingPipeline] Transformed {len(documents)} documents") # Bulk index - indexer = BulkIndexer(self.es_client, index_name, batch_size=500) + indexer = BulkIndexer(self.es_client, self.index_name, batch_size=500) results = indexer.index_documents(documents, id_field="skuId") return results diff --git a/indexer/mapping_generator.py b/indexer/mapping_generator.py index dfb7566..a861f3c 100644 --- a/indexer/mapping_generator.py +++ b/indexer/mapping_generator.py @@ -1,153 +1,59 @@ """ -Elasticsearch mapping generator. +Elasticsearch mapping loader. -Generates Elasticsearch index mappings from search configuration. +Loads Elasticsearch index mapping from JSON file. """ from typing import Dict, Any +import json import logging - -from config import ( - SearchConfig, - FieldConfig, - get_es_mapping_for_field, - get_default_analyzers, - get_default_similarity -) +from pathlib import Path logger = logging.getLogger(__name__) +# Default index name +DEFAULT_INDEX_NAME = "search_products" + +# Default mapping file path +DEFAULT_MAPPING_FILE = Path(__file__).parent.parent / "mappings" / "search_products.json" + + +def load_mapping(mapping_file: str = None) -> Dict[str, Any]: + """ + Load Elasticsearch mapping from JSON file. + + Args: + mapping_file: Path to mapping JSON file. If None, uses default. + + Returns: + Dictionary containing index configuration (settings + mappings) + + Raises: + FileNotFoundError: If mapping file doesn't exist + json.JSONDecodeError: If mapping file is invalid JSON + """ + if mapping_file is None: + mapping_file = str(DEFAULT_MAPPING_FILE) + + mapping_path = Path(mapping_file) + if not mapping_path.exists(): + raise FileNotFoundError(f"Mapping file not found: {mapping_path}") -class MappingGenerator: - """Generates Elasticsearch mapping from search configuration.""" - - def __init__(self, config: SearchConfig): - self.config = config - - def generate_mapping(self) -> Dict[str, Any]: - """ - Generate complete Elasticsearch index configuration including - settings and mappings. - - Returns: - Dictionary containing index configuration - """ - return { - "settings": self._generate_settings(), - "mappings": self._generate_mappings() - } - - def _generate_settings(self) -> Dict[str, Any]: - """Generate index settings.""" - settings = { - "number_of_shards": self.config.es_settings.get("number_of_shards", 1), - "number_of_replicas": self.config.es_settings.get("number_of_replicas", 0), - "refresh_interval": self.config.es_settings.get("refresh_interval", "30s"), - } - - # Add similarity configuration (modified BM25) - similarity_config = get_default_similarity() - settings.update(similarity_config) - - # Add analyzer configuration - analyzer_config = get_default_analyzers() - settings.update(analyzer_config) - - # Merge any custom settings from config - for key, value in self.config.es_settings.items(): - if key not in ["number_of_shards", "number_of_replicas", "refresh_interval"]: - settings[key] = value - - return settings - - def _generate_mappings(self) -> Dict[str, Any]: - """Generate field mappings.""" - properties = {} - - for field in self.config.fields: - field_mapping = get_es_mapping_for_field(field) - properties[field.name] = field_mapping - - return { - "properties": properties - } - - def get_default_domain_fields(self) -> list: - """ - Get list of fields in the 'default' domain. - - Returns: - List of field names - """ - for index in self.config.indexes: - if index.name == "default": - return index.fields - return [] - - def get_text_embedding_field(self) -> str: - """ - Get the primary text embedding field name. - - Returns: - Field name or empty string if not configured - """ - return self.config.query_config.text_embedding_field or "" - - def get_image_embedding_field(self) -> str: - """ - Get the primary image embedding field name. - - Returns: - Field name or empty string if not configured - """ - return self.config.query_config.image_embedding_field or "" - - def get_field_by_name(self, field_name: str) -> FieldConfig: - """ - Get field configuration by name. - - Args: - field_name: Field name - - Returns: - FieldConfig object or None if not found - """ - for field in self.config.fields: - if field.name == field_name: - return field - return None - - def get_match_fields_for_domain(self, domain_name: str = "default") -> list: - """ - Get list of text fields for matching in a domain. - - Args: - domain_name: Name of the query domain - - Returns: - List of field names with optional boost (e.g., ["name^2.0", "category^1.5"]) - """ - for index in self.config.indexes: - if index.name == domain_name: - result = [] - for field_name in index.fields: - field = self.get_field_by_name(field_name) - if field and field.boost != 1.0: - result.append(f"{field_name}^{field.boost}") - else: - result.append(field_name) - return result - return [] - - -def create_index_if_not_exists(es_client, index_name: str, mapping: Dict[str, Any]) -> bool: + with open(mapping_path, 'r', encoding='utf-8') as f: + mapping = json.load(f) + + logger.info(f"Loaded mapping from {mapping_path}") + return mapping + + +def create_index_if_not_exists(es_client, index_name: str, mapping: Dict[str, Any] = None) -> bool: """ Create Elasticsearch index if it doesn't exist. Args: es_client: Elasticsearch client instance index_name: Name of the index to create - mapping: Index mapping configuration + mapping: Index mapping configuration. If None, loads from default file. Returns: True if index was created, False if it already exists @@ -156,6 +62,9 @@ def create_index_if_not_exists(es_client, index_name: str, mapping: Dict[str, An logger.info(f"Index '{index_name}' already exists") return False + if mapping is None: + mapping = load_mapping() + es_client.indices.create(index=index_name, body=mapping) logger.info(f"Index '{index_name}' created successfully") return True diff --git a/mappings/README.md b/mappings/README.md new file mode 100644 index 0000000..32cdfa1 --- /dev/null +++ b/mappings/README.md @@ -0,0 +1,36 @@ +# ES Mapping Configuration + +## 概述 + +所有租户共享同一个ES mapping结构,直接使用手写的JSON文件,无需通过config.yaml生成。 + +## Mapping文件 + +- `search_products.json`: 完整的ES索引配置,包括settings和mappings + +## 使用方式 + +### 创建索引 + +```python +from indexer.mapping_generator import load_mapping, create_index_if_not_exists +from utils.es_client import ESClient + +es_client = ESClient(hosts=["http://localhost:9200"]) +mapping = load_mapping() # 从mappings/search_products.json加载 +create_index_if_not_exists(es_client, "search_products", mapping) +``` + +### 修改Mapping + +直接编辑 `mappings/search_products.json` 文件,然后重新创建索引。 + +注意:ES不支持修改已有字段的mapping类型,只能添加新字段。如需修改字段类型,需要: +1. 删除旧索引 +2. 使用新mapping创建索引 +3. 重新导入数据 + +## 字段说明 + +参考 `docs/索引字段说明v2-mapping结构.md` + diff --git a/scripts/ingest_shoplazza.py b/scripts/ingest_shoplazza.py index c8a8924..60699c0 100644 --- a/scripts/ingest_shoplazza.py +++ b/scripts/ingest_shoplazza.py @@ -16,9 +16,8 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) from utils.db_connector import create_db_connection from utils.es_client import ESClient from indexer.spu_transformer import SPUTransformer -from indexer.mapping_generator import MappingGenerator +from indexer.mapping_generator import load_mapping, DEFAULT_INDEX_NAME from indexer.bulk_indexer import BulkIndexer -from config import ConfigLoader def main(): @@ -43,25 +42,15 @@ def main(): print(f"Starting Shoplazza data ingestion for tenant: {args.tenant_id}") - # Load unified configuration - config_loader = ConfigLoader("config/config.yaml") + # Load mapping from JSON file try: - config = config_loader.load_config() - print(f"Loaded configuration: {config.es_index_name}") + mapping = load_mapping() + print(f"Loaded mapping configuration") except Exception as e: - print(f"ERROR: Failed to load configuration: {e}") + print(f"ERROR: Failed to load mapping: {e}") return 1 - # Validate tenant_id field exists - tenant_id_field = None - for field in config.fields: - if field.name == "tenant_id": - tenant_id_field = field - break - - if not tenant_id_field: - print("ERROR: Configuration must include 'tenant_id' field") - return 1 + index_name = DEFAULT_INDEX_NAME # Connect to MySQL print(f"Connecting to MySQL: {args.db_host}:{args.db_port}/{args.db_database}") @@ -77,13 +66,10 @@ def main(): print(f"ERROR: Failed to connect to MySQL: {e}") return 1 - # Connect to Elasticsearch (use unified config loading) - from config.env_config import ES_CONFIG - - # Use provided es_host or fallback to config - es_host = args.es_host or ES_CONFIG.get('host', 'http://localhost:9200') - es_username = ES_CONFIG.get('username') - es_password = ES_CONFIG.get('password') + # Connect to Elasticsearch + es_host = args.es_host + es_username = os.environ.get('ES_USERNAME') + es_password = os.environ.get('ES_PASSWORD') print(f"Connecting to Elasticsearch: {es_host}") if es_username and es_password: @@ -96,11 +82,7 @@ def main(): print(f"ERROR: Cannot connect to Elasticsearch at {es_host}") return 1 - # Generate and create index - mapping_gen = MappingGenerator(config) - mapping = mapping_gen.generate_mapping() - index_name = config.es_index_name - + # Create index if needed if args.recreate: if es_client.index_exists(index_name): print(f"Deleting existing index: {index_name}") diff --git a/search/query_config.py b/search/query_config.py new file mode 100644 index 0000000..a415957 --- /dev/null +++ b/search/query_config.py @@ -0,0 +1,114 @@ +""" +Query configuration constants. + +Since all tenants share the same ES mapping, we can hardcode field lists here. +""" + +import os +from typing import Dict, List + +# Default index name +DEFAULT_INDEX_NAME = "search_products" + +# Text embedding field +TEXT_EMBEDDING_FIELD = "title_embedding" + +# Image embedding field +IMAGE_EMBEDDING_FIELD = "image_embedding" + +# Default match fields for text search (with boost) +DEFAULT_MATCH_FIELDS = [ + "title_zh^3.0", + "brief_zh^1.5", + "description_zh^1.0", + "vendor_zh^1.5", + "tags^1.0", + "category_path_zh^1.5", + "category_name_zh^1.5" +] + +# Domain-specific match fields +DOMAIN_FIELDS: Dict[str, List[str]] = { + "default": DEFAULT_MATCH_FIELDS, + "title": ["title_zh^2.0"], + "vendor": ["vendor_zh^1.5"], + "category": ["category_path_zh^1.5", "category_name_zh^1.5"], + "tags": ["tags^1.0"] +} + +# Source fields to return in search results +SOURCE_FIELDS = [ + "tenant_id", + "spu_id", + "title_zh", + "brief_zh", + "description_zh", + "vendor_zh", + "tags", + "image_url", + "category_path_zh", + "category_name_zh", + "category_id", + "category_name", + "category_level", + "category1_name", + "category2_name", + "category3_name", + "option1_name", + "option2_name", + "option3_name", + "min_price", + "max_price", + "compare_at_price", + "total_inventory", + "create_time", + "update_time", + "skus", + "specifications" +] + +# Query processing settings +ENABLE_TRANSLATION = os.environ.get("ENABLE_TRANSLATION", "true").lower() == "true" +ENABLE_TEXT_EMBEDDING = os.environ.get("ENABLE_TEXT_EMBEDDING", "true").lower() == "true" +TRANSLATION_API_KEY = os.environ.get("DEEPL_API_KEY") +TRANSLATION_SERVICE = "deepl" + +# Ranking expression (currently disabled) +RANKING_EXPRESSION = "bm25() + 0.2*text_embedding_relevance()" + +# Function score config +FUNCTION_SCORE_CONFIG = { + "score_mode": "sum", + "boost_mode": "multiply", + "functions": [] +} + +# Load rewrite dictionary from file if exists +def load_rewrite_dictionary() -> Dict[str, str]: + """Load query rewrite dictionary from file.""" + rewrite_file = os.path.join( + os.path.dirname(os.path.dirname(__file__)), + "config", + "query_rewrite.dict" + ) + + if not os.path.exists(rewrite_file): + return {} + + rewrite_dict = {} + try: + with open(rewrite_file, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if not line or line.startswith('#'): + continue + parts = line.split('\t') + if len(parts) == 2: + rewrite_dict[parts[0].strip()] = parts[1].strip() + except Exception as e: + print(f"Warning: Failed to load rewrite dictionary: {e}") + + return rewrite_dict + +REWRITE_DICTIONARY = load_rewrite_dictionary() + -- libgit2 0.21.2