diff --git a/COMMIT_MESSAGE.md b/COMMIT_MESSAGE.md new file mode 100644 index 0000000..5157e15 --- /dev/null +++ b/COMMIT_MESSAGE.md @@ -0,0 +1,154 @@ +# 本次修改总结 + +## 最终状态 + +### 1. 增量数据获取服务 + +**新增文件**: +- `indexer/incremental_service.py`: 增量索引服务,提供单个SPU数据获取 +- `api/routes/indexer.py`: 增量索引API路由 +- `indexer/test_indexing.py`: 索引功能测试脚本 + +**功能**: +- 提供 `GET /indexer/spu/{spu_id}?tenant_id={tenant_id}` 接口,返回单个SPU的ES文档数据 +- 服务启动时预加载分类映射(全局共享),提高性能 +- 支持按需加载租户配置和搜索配置 + +### 2. 公共文档转换器 + +**新增文件**: +- `indexer/document_transformer.py`: SPU文档转换器,提取全量和增量共用的转换逻辑 + +**功能**: +- 统一了全量索引(SPUTransformer)和增量索引(IncrementalIndexerService)的文档转换逻辑 +- 消除了约300行重复代码 +- 支持根据租户配置进行语言处理和翻译 + +### 3. 租户配置系统 + +**配置位置**: +- 租户配置合并到统一配置文件 `config/config.yaml` 的 `tenant_config` 部分 +- 删除了独立的 `config/tenant_config.json` 文件 + +**配置结构**: +```yaml +tenant_config: + default: + primary_language: "zh" + translate_to_en: true + translate_to_zh: false + tenants: + "162": + primary_language: "zh" + translate_to_en: false # 翻译关闭 + translate_to_zh: false +``` + +**功能**: +- 每个租户可配置主语言和翻译选项 +- 租户162配置为翻译关闭(用于测试) +- 未配置的租户使用默认配置 + +### 4. 翻译功能集成 + +**翻译模块增强**: +- `query/translator.py`: 支持提示词参数,作为DeepL API的`context`参数传递 +- 修复了重复的executor初始化代码 +- 统一使用logger替代print语句 + +**翻译提示词配置**: +- 在 `config/config.yaml` 的 `translation_prompts` 部分配置 +- 支持中英文提示词: + - `product_title_zh/en`: 商品标题翻译提示词 + - `query_zh/en`: 查询翻译提示词 + - `default_zh/en`: 默认翻译用词 + +**翻译模式**: +- **索引场景**:同步翻译,等待结果返回,使用缓存避免重复翻译 +- **查询场景**:异步翻译,立即返回缓存结果,后台翻译缺失项 + +**DeepL Context参数**: +- 提示词作为DeepL API的`context`参数传递(不参与翻译,仅提供上下文) +- Context中的字符不计入DeepL计费 + +### 5. 代码重构 + +**消除冗余**: +- 提取公共转换逻辑到 `SPUDocumentTransformer` +- `SPUTransformer` 和 `IncrementalIndexerService` 都使用公共转换器 +- 移除了重复的 `_transform_spu_to_doc` 和 `_transform_sku_row` 方法 + +**架构优化**: +- 全量和增量索引共用同一转换逻辑 +- 分类映射在服务启动时预加载(全局共享) +- 租户配置按需加载(支持热更新) + +### 6. 测试 + +**测试文件位置**(遵循模块化原则): +- `indexer/test_indexing.py`: 索引功能测试(全量、增量、租户配置、文档转换器) +- `query/test_translation.py`: 翻译功能测试(同步、异步、缓存、Context参数) + +### 7. 文档更新 + +- `docs/索引数据接口文档.md`: 更新了租户配置说明,从独立JSON文件改为统一配置文件 +- `docs/翻译功能测试说明.md`: 新增翻译功能测试说明文档 + +## 修改的文件 + +### 新增文件 +- `indexer/incremental_service.py` +- `indexer/document_transformer.py` +- `indexer/test_indexing.py` +- `api/routes/indexer.py` +- `query/test_translation.py` +- `config/tenant_config_loader.py` (重构,从JSON改为YAML) +- `docs/翻译功能测试说明.md` + +### 修改文件 +- `config/config.yaml`: 添加租户配置和翻译提示词配置 +- `config/config_loader.py`: 支持租户配置加载 +- `config/tenant_config_loader.py`: 从统一配置文件加载租户配置 +- `indexer/spu_transformer.py`: 使用公共转换器,集成翻译服务 +- `indexer/incremental_service.py`: 使用公共转换器,集成翻译服务 +- `query/translator.py`: 支持提示词作为context参数,修复冗余代码 +- `query/query_parser.py`: 使用翻译提示词 +- `api/app.py`: 注册增量索引路由,初始化增量服务 +- `docs/索引数据接口文档.md`: 更新租户配置说明 + +### 删除文件 +- `config/tenant_config.json`: 合并到统一配置文件 + +## 测试验证 + +### 租户162测试(翻译关闭) +- 全量索引:验证翻译功能关闭,title_en为None +- 增量索引:验证翻译功能关闭,title_en为None +- 文档转换器:验证根据租户配置正确处理翻译 + +### 其他租户测试(翻译开启) +- 验证翻译功能正常工作 +- 验证提示词正确使用 + +## 架构设计 + +### 数据流 +``` +MySQL数据 + ↓ +SPUTransformer / IncrementalIndexerService (数据加载层) + ↓ +SPUDocumentTransformer (公共转换层) + ↓ +ES文档 (输出) +``` + +### 配置层次 +1. **索引配置** (`config/config.yaml`): 搜索行为配置 +2. **租户配置** (`config/config.yaml` 的 `tenant_config` 部分): 数据转换配置 + +### 性能优化 +1. 公共数据预加载:分类映射在服务启动时一次性加载 +2. 配置按需加载:租户配置和搜索配置按需加载,支持热更新 +3. 翻译缓存:索引时使用缓存避免重复翻译 + diff --git a/api/app.py b/api/app.py index b425887..a3fb577 100644 --- a/api/app.py +++ b/api/app.py @@ -40,17 +40,20 @@ limiter = Limiter(key_func=get_remote_address) # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from config.env_config import ES_CONFIG +from config.env_config import ES_CONFIG, DB_CONFIG from config import ConfigLoader from utils import ESClient +from utils.db_connector import create_db_connection from search import Searcher from query import QueryParser +from indexer.incremental_service import IncrementalIndexerService # Global instances _es_client: Optional[ESClient] = None _searcher: Optional[Searcher] = None _query_parser: Optional[QueryParser] = None _config = None +_incremental_service: Optional[IncrementalIndexerService] = None def init_service(es_host: str = "http://localhost:9200"): @@ -60,7 +63,7 @@ def init_service(es_host: str = "http://localhost:9200"): Args: es_host: Elasticsearch host URL """ - global _es_client, _searcher, _query_parser, _config + global _es_client, _searcher, _query_parser, _config, _incremental_service start_time = time.time() logger.info("Initializing search service (multi-tenant)") @@ -93,6 +96,33 @@ def init_service(es_host: str = "http://localhost:9200"): logger.info("Initializing searcher...") _searcher = Searcher(_es_client, _config, _query_parser) + # Initialize incremental indexer service (if DB config is available) + try: + db_host = DB_CONFIG.get('host') + db_port = DB_CONFIG.get('port', 3306) + db_database = DB_CONFIG.get('database') + db_username = DB_CONFIG.get('username') + db_password = DB_CONFIG.get('password') + + if all([db_host, db_database, db_username, db_password]): + logger.info("Initializing incremental indexer service...") + db_engine = create_db_connection( + host=db_host, + port=db_port, + database=db_database, + username=db_username, + password=db_password + ) + _incremental_service = IncrementalIndexerService(db_engine) + logger.info("Incremental indexer service initialized") + else: + logger.warning("Database configuration incomplete, incremental indexer service will not be available") + logger.warning("Required: DB_HOST, DB_DATABASE, DB_USERNAME, DB_PASSWORD") + except Exception as e: + logger.warning(f"Failed to initialize incremental indexer service: {e}") + logger.warning("Incremental indexer endpoints will not be available") + _incremental_service = None + elapsed = time.time() - start_time logger.info(f"Search service ready! (took {elapsed:.2f}s) | Index: {_config.es_index_name}") @@ -127,6 +157,11 @@ def get_config(): return _config +def get_incremental_service() -> Optional[IncrementalIndexerService]: + """Get incremental indexer service instance.""" + return _incremental_service + + # Create FastAPI app with enhanced configuration app = FastAPI( title="E-Commerce Search API", @@ -267,10 +302,11 @@ async def health_check(request: Request): # Include routers -from .routes import search, admin +from .routes import search, admin, indexer app.include_router(search.router) app.include_router(admin.router) +app.include_router(indexer.router) # Mount static files and serve frontend frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "frontend") diff --git a/api/routes/indexer.py b/api/routes/indexer.py new file mode 100644 index 0000000..3705500 --- /dev/null +++ b/api/routes/indexer.py @@ -0,0 +1,149 @@ +""" +增量索引API路由。 + +提供单个SPU数据获取接口,用于增量更新ES索引。 +""" + +from fastapi import APIRouter, HTTPException, Query, Request +from typing import Optional +import logging + +from ..models import ErrorResponse + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/indexer", tags=["indexer"]) + + +@router.get("/spu/{spu_id}") +async def get_spu_document( + spu_id: str, + tenant_id: str = Query(..., description="租户ID"), + request: Request = None +): + """ + 获取单个SPU的ES文档数据(用于增量索引更新)。 + + 功能说明: + - 根据 tenant_id 和 spu_id 查询MySQL数据库 + - 返回该SPU的完整ES文档数据(JSON格式) + - 外部Java程序可以调用此接口获取数据后推送到ES + + 参数: + - spu_id: SPU ID(路径参数) + - tenant_id: 租户ID(查询参数,必需) + + 返回: + - 成功:返回ES文档JSON对象 + - SPU不存在或已删除:返回404 + - 其他错误:返回500 + + 示例请求: + ``` + GET /indexer/spu/123?tenant_id=1 + ``` + + 示例响应: + ```json + { + "tenant_id": "1", + "spu_id": "123", + "title_zh": "商品标题", + "brief_zh": "商品简介", + "description_zh": "商品描述", + "vendor_zh": "供应商", + "tags": ["标签1", "标签2"], + "category_path_zh": "类目1/类目2/类目3", + "category1_name": "类目1", + "category2_name": "类目2", + "category3_name": "类目3", + "category_id": "100", + "category_level": 3, + "min_price": 99.99, + "max_price": 199.99, + "compare_at_price": 299.99, + "sales": 1000, + "total_inventory": 500, + "skus": [...], + "specifications": [...], + ... + } + ``` + """ + try: + from ..app import get_incremental_service + + # 获取增量服务实例 + service = get_incremental_service() + if service is None: + raise HTTPException( + status_code=503, + detail="Incremental indexer service is not initialized. Please check database connection." + ) + + # 获取SPU文档 + doc = service.get_spu_document(tenant_id=tenant_id, spu_id=spu_id) + + if doc is None: + raise HTTPException( + status_code=404, + detail=f"SPU {spu_id} not found for tenant_id={tenant_id} or has been deleted" + ) + + return doc + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + + +@router.get("/health") +async def indexer_health_check(): + """ + 检查增量索引服务健康状态。 + + 返回: + - 服务是否可用 + - 数据库连接状态 + - 预加载数据状态 + """ + try: + from ..app import get_incremental_service + + service = get_incremental_service() + if service is None: + return { + "status": "unavailable", + "message": "Incremental indexer service is not initialized", + "database": "unknown", + "preloaded_data": { + "category_mappings": 0 + } + } + + # 检查数据库连接 + try: + from sqlalchemy import text + with service.db_engine.connect() as conn: + conn.execute(text("SELECT 1")) + db_status = "connected" + except Exception as e: + db_status = f"disconnected: {str(e)}" + + return { + "status": "available", + "database": db_status, + "preloaded_data": { + "category_mappings": len(service.category_id_to_name) + } + } + + except Exception as e: + logger.error(f"Error checking indexer health: {e}", exc_info=True) + return { + "status": "error", + "message": str(e) + } + diff --git a/config/config.yaml b/config/config.yaml index ffb0cd5..c6c1c36 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -104,6 +104,18 @@ query_config: translation_service: "deepl" translation_api_key: null # 通过环境变量设置 + # 翻译提示词配置(用于提高翻译质量,作为DeepL API的context参数) + translation_prompts: + # 商品标题翻译提示词 + product_title_zh: "请将原文翻译成中文商品SKU名称,要求:确保精确、完整地传达原文信息的基础上,语言简洁清晰、地道、专业。" + product_title_en: "Translate the original text into an English product SKU name. Requirements: Ensure accurate and complete transmission of the original information, with concise, clear, authentic, and professional language." + # query翻译提示词 + query_zh: "电商领域" + query_en: "e-commerce domain" + # 默认翻译用词 + default_zh: "电商领域" + default_en: "e-commerce domain" + # 返回字段配置(_source includes) # null表示返回所有字段,[]表示不返回任何字段,列表表示只返回指定字段 source_fields: null @@ -133,3 +145,30 @@ spu_config: # 配置哪些option维度参与检索(进索引、以及在线搜索) # 格式为list,选择option1/option2/option3中的一个或多个 searchable_option_dimensions: ['option1', 'option2', 'option3'] + +# 租户配置(Tenant Configuration) +# 每个租户可以配置主语言和翻译选项 +tenant_config: + # 默认配置(未配置的租户使用此配置) + default: + primary_language: "zh" + translate_to_en: true + translate_to_zh: false + # 租户特定配置 + tenants: + "1": + primary_language: "zh" + translate_to_en: true + translate_to_zh: false + "2": + primary_language: "en" + translate_to_en: false + translate_to_zh: true + "3": + primary_language: "zh" + translate_to_en: true + translate_to_zh: false + "162": + primary_language: "zh" + translate_to_en: false + translate_to_zh: false diff --git a/config/config_loader.py b/config/config_loader.py index 0b7ae58..f24f3e6 100644 --- a/config/config_loader.py +++ b/config/config_loader.py @@ -45,6 +45,7 @@ class QueryConfig: translation_api_key: Optional[str] = None translation_glossary_id: Optional[str] = None translation_context: str = "e-commerce product search" + translation_prompts: Dict[str, str] = field(default_factory=dict) # Translation prompts for different use cases # Embedding field names text_embedding_field: Optional[str] = "title_embedding" @@ -118,6 +119,11 @@ class SearchConfig: # ES index settings es_index_name: str + + # Tenant configuration + tenant_config: Dict[str, Any] = field(default_factory=dict) + + # ES settings es_settings: Dict[str, Any] = field(default_factory=dict) @@ -232,6 +238,7 @@ class ConfigLoader: translation_service=query_config_data.get("translation_service") or "deepl", translation_glossary_id=query_config_data.get("translation_glossary_id"), translation_context=query_config_data.get("translation_context") or "e-commerce product search", + translation_prompts=query_config_data.get("translation_prompts", {}), text_embedding_field=query_config_data.get("text_embedding_field"), image_embedding_field=query_config_data.get("image_embedding_field"), embedding_disable_chinese_char_limit=embedding_thresholds.get("chinese_char_limit", 4), @@ -271,6 +278,9 @@ class ConfigLoader: searchable_option_dimensions=spu_data.get("searchable_option_dimensions", ['option1', 'option2', 'option3']) ) + # Parse tenant config + tenant_config_data = config_data.get("tenant_config", {}) + return SearchConfig( field_boosts=field_boosts, indexes=indexes, @@ -279,6 +289,7 @@ class ConfigLoader: function_score=function_score, rerank=rerank, spu_config=spu_config, + tenant_config=tenant_config_data, es_index_name=config_data.get("es_index_name", "search_products"), es_settings=config_data.get("es_settings", {}) ) diff --git a/config/tenant_config_loader.py b/config/tenant_config_loader.py new file mode 100644 index 0000000..e10bf82 --- /dev/null +++ b/config/tenant_config_loader.py @@ -0,0 +1,90 @@ +""" +租户配置加载器。 + +从统一配置文件(config.yaml)加载租户配置,包括主语言和翻译配置。 +""" + +import logging +from typing import Dict, Any, Optional + +logger = logging.getLogger(__name__) + + +class TenantConfigLoader: + """租户配置加载器。""" + + def __init__(self): + """初始化租户配置加载器。""" + self._config: Optional[Dict[str, Any]] = None + + def load_config(self) -> Dict[str, Any]: + """ + 加载租户配置(从统一配置文件)。 + + Returns: + 租户配置字典,格式:{"tenants": {...}, "default": {...}} + """ + if self._config is not None: + return self._config + + try: + from config import ConfigLoader + config_loader = ConfigLoader() + search_config = config_loader.load_config() + self._config = search_config.tenant_config + logger.info("Loaded tenant config from unified config.yaml") + return self._config + except Exception as e: + logger.error(f"Failed to load tenant config: {e}", exc_info=True) + # 返回默认配置 + self._config = { + "default": { + "primary_language": "zh", + "translate_to_en": True, + "translate_to_zh": False + }, + "tenants": {} + } + return self._config + + def get_tenant_config(self, tenant_id: str) -> Dict[str, Any]: + """ + 获取指定租户的配置。 + + Args: + tenant_id: 租户ID + + Returns: + 租户配置字典,如果租户不存在则返回默认配置 + """ + config = self.load_config() + tenant_id_str = str(tenant_id) + + tenants = config.get("tenants", {}) + if tenant_id_str in tenants: + return tenants[tenant_id_str] + else: + logger.debug(f"Tenant {tenant_id} not found in config, using default") + return config.get("default", { + "primary_language": "zh", + "translate_to_en": True, + "translate_to_zh": False + }) + + def reload(self): + """重新加载配置(用于配置更新)。""" + self._config = None + return self.load_config() + + +# 全局实例 +_tenant_config_loader: Optional[TenantConfigLoader] = None + + +def get_tenant_config_loader() -> TenantConfigLoader: + """获取全局租户配置加载器实例。""" + global _tenant_config_loader + if _tenant_config_loader is None: + _tenant_config_loader = TenantConfigLoader() + return _tenant_config_loader + diff --git a/docs/INDEX_FIELDS_DOCUMENTATION.md b/docs/INDEX_FIELDS_DOCUMENTATION.md deleted file mode 100644 index 0fdd156..0000000 --- a/docs/INDEX_FIELDS_DOCUMENTATION.md +++ /dev/null @@ -1,223 +0,0 @@ -# 索引字段说明文档 - -本文档详细说明了 Elasticsearch 索引中所有字段的类型、索引方式、数据来源等信息。 - -## 索引基本信息 - -- **索引名称**: `search_products` -- **索引级别**: SPU级别(商品级别) -- **数据结构**: SPU文档包含嵌套的skus(SKU)数组 - -## 字段说明表 - -### 基础字段 - -| 索引字段名 | ES字段类型 | 是否索引 | 索引方式 | 数据来源表 | 表中字段名 | 表中字段类型 | 说明 | -|-----------|-----------|---------|---------|-----------|-----------|-------------|------| -| tenant_id | keyword | 是 | 精确匹配 | SPU表 | tenant_id | BIGINT | 租户ID,用于多租户隔离 | -| spu_id | keyword | 是 | 精确匹配 | SPU表 | id | BIGINT | 商品ID(SPU ID) | -| handle | keyword | 是 | 精确匹配 | SPU表 | handle | VARCHAR(255) | 商品URL handle | - -### 文本搜索字段 - -| 索引字段名 | ES字段类型 | 是否索引 | 索引方式 | 数据来源表 | 表中字段名 | 表中字段类型 | Boost权重 | 说明 | -|-----------|-----------|---------|---------|-----------|-----------|-------------|-----------|------| -| title | TEXT | 是 | english | SPU表 | title | VARCHAR(512) | 3.0 | 商品标题,权重最高 | -| brief | TEXT | 是 | english | SPU表 | brief | VARCHAR(512) | 1.5 | 商品简介 | -| description | TEXT | 是 | english | SPU表 | description | TEXT | 1.0 | 商品详细描述 | - -### SEO字段 - -| 索引字段名 | ES字段类型 | 是否索引 | 索引方式 | 数据来源表 | 表中字段名 | 表中字段类型 | Boost权重 | 是否返回 | 说明 | -|-----------|-----------|---------|---------|-----------|-----------|-------------|-----------|---------|------| -| seo_title | TEXT | 是 | english | SPU表 | seo_title | VARCHAR(512) | 2.0 | 否 | SEO标题,用于提升相关性 | -| seo_description | TEXT | 是 | english | SPU表 | seo_description | TEXT | 1.5 | 否 | SEO描述 | -| seo_keywords | TEXT | 是 | english | SPU表 | seo_keywords | VARCHAR(1024) | 2.0 | 否 | SEO关键词 | - -### 分类和标签字段 - -| 索引字段名 | ES字段类型 | 是否索引 | 索引方式 | 数据来源表 | 表中字段名 | 表中字段类型 | Boost权重 | 是否返回 | 说明 | -|-----------|-----------|---------|---------|-----------|-----------|-------------|-----------|---------|------| -| vendor | TEXT | 是 | english | SPU表 | vendor | VARCHAR(255) | 1.5 | 是 | 供应商/品牌(文本搜索) | -| vendor.keyword | keyword | 是 | 精确匹配 | SPU表 | vendor | VARCHAR(255) | - | 否 | 供应商/品牌(精确匹配,用于过滤) | -| product_type | TEXT | 是 | english | SPU表 | category | VARCHAR(255) | 1.5 | 是 | 商品类型(文本搜索) | -| product_type_keyword | keyword | 是 | 精确匹配 | SPU表 | category | VARCHAR(255) | - | 否 | 商品类型(精确匹配,用于过滤) | -| tags | TEXT | 是 | english | SPU表 | tags | VARCHAR(1024) | 1.0 | 是 | 标签(文本搜索) | -| tags.keyword | keyword | 是 | 精确匹配 | SPU表 | tags | VARCHAR(1024) | - | 否 | 标签(精确匹配,用于过滤) | -| category | TEXT | 是 | english | SPU表 | category | VARCHAR(255) | 1.5 | 是 | 类目(文本搜索) | -| category.keyword | keyword | 是 | 精确匹配 | SPU表 | category | VARCHAR(255) | - | 否 | 类目(精确匹配,用于过滤) | - -### 价格字段 - -| 索引字段名 | ES字段类型 | 是否索引 | 索引方式 | 数据来源表 | 表中字段名 | 表中字段类型 | 说明 | -|-----------|-----------|---------|---------|-----------|-----------|-------------|------| -| min_price | FLOAT | 是 | float | SKU表(聚合计算) | price | DECIMAL(10,2) | 最低价格(从所有SKU中取最小值) | -| max_price | FLOAT | 是 | float | SKU表(聚合计算) | price | DECIMAL(10,2) | 最高价格(从所有SKU中取最大值) | -| compare_at_price | FLOAT | 是 | float | SKU表(聚合计算) | compare_at_price | DECIMAL(10,2) | 原价(从所有SKU中取最大值) | - -**价格计算逻辑**: -- `min_price`: 取该SPU下所有SKU的price字段的最小值 -- `max_price`: 取该SPU下所有SKU的price字段的最大值 -- `compare_at_price`: 取该SPU下所有SKU的compare_at_price字段的最大值(如果存在) - -### 图片字段 - -| 索引字段名 | ES字段类型 | 是否索引 | 索引方式 | 数据来源表 | 表中字段名 | 表中字段类型 | 说明 | -|-----------|-----------|---------|---------|-----------|-----------|-------------|------| -| image_url | keyword | 否 | 不索引 | SPU表 | image_src | VARCHAR(500) | 商品主图URL,仅用于展示 | - -### 文本嵌入字段 - -| 索引字段名 | ES字段类型 | 是否索引 | 索引方式 | 数据来源表 | 表中字段名 | 表中字段类型 | 说明 | -|-----------|-----------|---------|---------|-----------|-----------|-------------|------| -| title_embedding | TEXT_EMBEDDING | 是 | 向量相似度(dot_product) | 计算生成 | title | VARCHAR(512) | 标题的文本向量(1024维),用于语义搜索 | - -**说明**: -- 向量维度:1024 -- 相似度算法:dot_product(点积) -- 数据来源:基于title字段通过BGE-M3模型生成 - -### 时间字段 - -| 索引字段名 | ES字段类型 | 是否索引 | 索引方式 | 数据来源表 | 表中字段名 | 表中字段类型 | 是否返回 | 说明 | -|-----------|-----------|---------|---------|-----------|-----------|-------------|---------|------| -| create_time | DATE | 是 | 日期范围 | SPU表 | create_time | DATETIME | 是 | 创建时间 | -| update_time | DATE | 是 | 日期范围 | SPU表 | update_time | DATETIME | 是 | 更新时间 | -| shoplazza_created_at | DATE | 是 | 日期范围 | SPU表 | shoplazza_created_at | DATETIME | 否 | 店匠系统创建时间 | -| shoplazza_updated_at | DATE | 是 | 日期范围 | SPU表 | shoplazza_updated_at | DATETIME | 否 | 店匠系统更新时间 | - -### 嵌套Skus字段(SKU级别) - -| 索引字段名 | ES字段类型 | 是否索引 | 索引方式 | 数据来源表 | 表中字段名 | 表中字段类型 | 说明 | -|-----------|-----------|---------|---------|-----------|-----------|-------------|------| -| skus | JSON (nested) | 是 | 嵌套对象 | SKU表 | - | - | 商品变体数组(嵌套结构) | - -#### Skus子字段 - -| 索引字段名 | ES字段类型 | 是否索引 | 索引方式 | 数据来源表 | 表中字段名 | 表中字段类型 | 说明 | -|-----------|-----------|---------|---------|-----------|-----------|-------------|------| -| skus.sku_id | keyword | 是 | 精确匹配 | SKU表 | id | BIGINT | 变体ID(SKU ID) | -| skus.title | text | 是 | english | SKU表 | title | VARCHAR(500) | 变体标题 | -| skus.price | float | 是 | float | SKU表 | price | DECIMAL(10,2) | 变体价格 | -| skus.compare_at_price | float | 是 | float | SKU表 | compare_at_price | DECIMAL(10,2) | 变体原价 | -| skus.sku | keyword | 是 | 精确匹配 | SKU表 | sku | VARCHAR(100) | SKU编码 | -| skus.stock | long | 是 | float | SKU表 | inventory_quantity | INT(11) | 库存数量 | -| skus.options | object | 是 | 对象 | SKU表 | option1/option2/option3 | VARCHAR(255) | 选项(颜色、尺寸等) | - -**Skus结构说明**: -- `skus` 是一个嵌套对象数组,每个元素代表一个SKU -- 使用ES的nested类型,支持对嵌套字段进行独立查询和过滤 -- `options` 对象包含 `option1`、`option2`、`option3` 三个字段,分别对应SKU表中的选项值 - -## 字段类型说明 - -### ES字段类型映射 - -| ES字段类型 | Elasticsearch映射 | 用途 | -|-----------|------------------|------| -| keyword | keyword | 精确匹配、过滤、聚合、排序 | -| TEXT | text | 全文检索(支持分词) | -| FLOAT | float | 浮点数(价格、权重等) | -| LONG | long | 整数(库存、计数等) | -| DATE | date | 日期时间 | -| TEXT_EMBEDDING | dense_vector | 文本向量(1024维) | -| JSON | object/nested | 嵌套对象 | - -### 分析器说明 - -| 分析器名称 | 语言 | 说明 | -|-----------|------|------| -| chinese_ecommerce | 中文 | Ansj中文分词器(电商优化),用于中文文本的分词和搜索 | - -## 索引配置 - -### 索引设置 - -- **分片数**: 1 -- **副本数**: 0 -- **刷新间隔**: 30秒 - -### 查询域(Query Domains) - -系统定义了多个查询域,用于在不同场景下搜索不同的字段组合: - -1. **default(默认索引)**: 搜索所有文本字段 - - 包含字段:title, brief, description, seo_title, seo_description, seo_keywords, vendor, product_type, tags, category - - Boost: 1.0 - -2. **title(标题索引)**: 仅搜索标题相关字段 - - 包含字段:title, seo_title - - Boost: 2.0 - -3. **vendor(品牌索引)**: 仅搜索品牌字段 - - 包含字段:vendor - - Boost: 1.5 - -4. **category(类目索引)**: 仅搜索类目字段 - - 包含字段:category - - Boost: 1.5 - -5. **tags(标签索引)**: 搜索标签和SEO关键词 - - 包含字段:tags, seo_keywords - - Boost: 1.0 - -## 数据转换规则 - -### 数据类型转换 - -1. **BIGINT → keyword**: 数字ID转换为字符串(如 `spu_id`, `sku_id`) -2. **DECIMAL → FLOAT**: 价格字段从DECIMAL转换为FLOAT -3. **INT → LONG**: 库存数量从INT转换为LONG -4. **DATETIME → DATE**: 时间字段转换为ISO格式字符串 - -### 特殊处理 - -1. **价格聚合**: 从多个SKU的价格中计算min_price、max_price、compare_at_price -2. **图片URL处理**: 如果image_src不是完整URL,会自动添加协议前缀 -3. **选项合并**: 将SKU表的option1、option2、option3合并为options对象 - -## 注意事项 - -1. **多租户隔离**: 所有查询必须包含 `tenant_id` 过滤条件 -2. **嵌套查询**: 查询skus字段时需要使用nested查询语法 -3. **字段命名**: 用于过滤的字段应使用 `*_keyword` 后缀的字段 -4. **向量搜索**: title_embedding字段用于语义搜索,需要配合文本查询使用 -5. **Boost权重**: 不同字段的boost权重影响搜索结果的相关性排序 - -## 数据来源表结构 - -### SPU表(shoplazza_product_spu) - -主要字段: -- `id`: BIGINT - 主键ID -- `tenant_id`: BIGINT - 租户ID -- `handle`: VARCHAR(255) - URL handle -- `title`: VARCHAR(512) - 商品标题 -- `brief`: VARCHAR(512) - 商品简介 -- `description`: TEXT - 商品描述 -- `vendor`: VARCHAR(255) - 供应商/品牌 -- `category`: VARCHAR(255) - 类目 -- `tags`: VARCHAR(1024) - 标签 -- `seo_title`: VARCHAR(512) - SEO标题 -- `seo_description`: TEXT - SEO描述 -- `seo_keywords`: VARCHAR(1024) - SEO关键词 -- `image_src`: VARCHAR(500) - 图片URL -- `create_time`: DATETIME - 创建时间 -- `update_time`: DATETIME - 更新时间 -- `shoplazza_created_at`: DATETIME - 店匠创建时间 -- `shoplazza_updated_at`: DATETIME - 店匠更新时间 - -### SKU表(shoplazza_product_sku) - -主要字段: -- `id`: BIGINT - 主键ID(对应sku_id) -- `spu_id`: BIGINT - SPU ID(关联字段) -- `title`: VARCHAR(500) - 变体标题 -- `price`: DECIMAL(10,2) - 价格 -- `compare_at_price`: DECIMAL(10,2) - 原价 -- `sku`: VARCHAR(100) - SKU编码 -- `inventory_quantity`: INT(11) - 库存数量 -- `option1`: VARCHAR(255) - 选项1 -- `option2`: VARCHAR(255) - 选项2 -- `option3`: VARCHAR(255) - 选项3 - diff --git a/docs/相关性检索优化说明.md b/docs/相关性检索优化说明.md index af988d2..638029a 100644 --- a/docs/相关性检索优化说明.md +++ b/docs/相关性检索优化说明.md @@ -54,7 +54,7 @@ "fields": ["title_en^3.0", ...], "minimum_should_match": "75%", "operator": "AND", - "query": "water sports (e.g. animals playing with water)", + "query": "water sports", "tie_breaker": 0.9 } }, diff --git a/docs/索引数据接口文档.md b/docs/索引数据接口文档.md new file mode 100644 index 0000000..8e53cbe --- /dev/null +++ b/docs/索引数据接口文档.md @@ -0,0 +1,714 @@ +# 索引数据接口文档 + +本文档说明如何获取需要灌入ES索引的数据,包括全量导入脚本和增量数据获取接口。 + +## 目录 + +1. [租户配置说明](#租户配置说明) +2. [全量数据导入脚本](#全量数据导入脚本) +3. [增量数据获取接口](#增量数据获取接口) +4. [数据格式说明](#数据格式说明) +5. [使用示例](#使用示例) + +--- + +## 租户配置说明 + +### 配置文件位置 + +租户配置存储在统一配置文件 `config/config.yaml` 中,与索引配置放在同一文件。 + +### 配置结构 + +在 `config/config.yaml` 中的 `tenant_config` 部分: + +```yaml +tenant_config: + # 默认配置(未配置的租户使用此配置) + default: + primary_language: "zh" + translate_to_en: true + translate_to_zh: false + # 租户特定配置 + tenants: + "1": + primary_language: "zh" + translate_to_en: true + translate_to_zh: false + "162": + primary_language: "zh" + translate_to_en: false + translate_to_zh: false +``` + +### 配置字段说明 + +| 字段 | 类型 | 说明 | 可选值 | +|------|------|------|--------| +| `primary_language` | string | 主语言(SKU表中title等文本字段的语言) | `"zh"`(中文)或 `"en"`(英文) | +| `translate_to_en` | boolean | 是否需要翻译英文 | `true` 或 `false` | +| `translate_to_zh` | boolean | 是否需要翻译中文 | `true` 或 `false` | + +### 配置规则 + +1. **主语言**:指定SKU表中 `title`、`brief`、`description`、`vendor` 等字段的语言。 + - 如果主语言是 `zh`,这些字段的值会填充到 `title_zh`、`brief_zh` 等字段 + - 如果主语言是 `en`,这些字段的值会填充到 `title_en`、`brief_en` 等字段 + +2. **翻译配置**: + - `translate_to_en: true`:如果主语言是中文,则会将中文内容翻译为英文,填充到 `title_en` 等字段 + - `translate_to_zh: true`:如果主语言是英文,则会将英文内容翻译为中文,填充到 `title_zh` 等字段 + - **注意**:如果主语言本身就是目标语言,则不会触发翻译(例如主语言是英文,`translate_to_en: true` 不会触发翻译) + +3. **默认配置**:如果租户ID不在 `tenants` 中,则使用 `default` 配置。 + +### 配置示例 + +**示例1:中文主语言,需要翻译英文** +```json +{ + "primary_language": "zh", + "translate_to_en": true, + "translate_to_zh": false +} +``` +- SKU表的 `title` 字段(中文)→ `title_zh` +- 翻译服务将中文翻译为英文 → `title_en` + +**示例2:英文主语言,需要翻译中文** +```json +{ + "primary_language": "en", + "translate_to_en": false, + "translate_to_zh": true +} +``` +- SKU表的 `title` 字段(英文)→ `title_en` +- 翻译服务将英文翻译为中文 → `title_zh` + +**示例3:仅使用主语言,不翻译** +```json +{ + "primary_language": "zh", + "translate_to_en": false, + "translate_to_zh": false +} +``` +- SKU表的 `title` 字段(中文)→ `title_zh` +- `title_en` 保持为 `null` + +### 配置更新 + +修改 `config/config.yaml` 中的 `tenant_config` 部分后,需要重启服务才能生效。增量服务会在每次请求时重新加载租户配置(支持热更新)。 + +--- + +## 全量数据导入脚本 + +### 功能说明 + +`scripts/recreate_and_import.py` 是一个全量数据导入脚本,用于: +- 重建ES索引(删除旧索引,使用新的mapping创建新索引) +- 从MySQL数据库批量读取指定租户的所有SPU数据 +- 将数据转换为ES文档格式 +- 批量导入到Elasticsearch + +### 使用方法 + +#### 基本用法 + +```bash +python scripts/recreate_and_import.py \ + --tenant-id 1 \ + --db-host 120.79.247.228 \ + --db-port 3306 \ + --db-database saas \ + --db-username saas \ + --db-password your_password \ + --es-host http://localhost:9200 \ + --batch-size 500 +``` + +#### 参数说明 + +| 参数 | 说明 | 是否必需 | 默认值 | +|------|------|----------|--------| +| `--tenant-id` | 租户ID | **是** | - | +| `--db-host` | MySQL主机地址 | 否(可用环境变量) | 环境变量 `DB_HOST` | +| `--db-port` | MySQL端口 | 否(可用环境变量) | 环境变量 `DB_PORT` 或 3306 | +| `--db-database` | MySQL数据库名 | 否(可用环境变量) | 环境变量 `DB_DATABASE` | +| `--db-username` | MySQL用户名 | 否(可用环境变量) | 环境变量 `DB_USERNAME` | +| `--db-password` | MySQL密码 | 否(可用环境变量) | 环境变量 `DB_PASSWORD` | +| `--es-host` | Elasticsearch地址 | 否(可用环境变量) | 环境变量 `ES_HOST` 或 `http://localhost:9200` | +| `--batch-size` | 批量导入大小 | 否 | 500 | +| `--skip-delete` | 跳过删除旧索引步骤 | 否 | False | + +#### 环境变量配置 + +可以通过环境变量设置数据库和ES连接信息,避免在命令行中暴露敏感信息: + +```bash +export DB_HOST=120.79.247.228 +export DB_PORT=3306 +export DB_DATABASE=saas +export DB_USERNAME=saas +export DB_PASSWORD=your_password +export ES_HOST=http://localhost:9200 + +python scripts/recreate_and_import.py --tenant-id 1 +``` + +#### 执行流程 + +脚本执行分为以下步骤: + +1. **加载mapping配置**:从 `mappings/search_products.json` 加载ES索引mapping +2. **连接Elasticsearch**:验证ES连接可用性 +3. **删除旧索引**(可选):如果索引已存在,删除旧索引(可通过 `--skip-delete` 跳过) +4. **创建新索引**:使用新的mapping创建索引 +5. **连接MySQL**:建立数据库连接 +6. **数据转换和导入**: + - 从MySQL读取SPU、SKU、Option数据 + - 转换为ES文档格式 + - 批量导入到ES + +#### 输出示例 + +``` +============================================================ +重建ES索引并导入数据 +============================================================ + +[1/4] 加载mapping配置... +✓ 成功加载mapping配置 +索引名称: search_products + +[2/4] 连接Elasticsearch... +ES地址: http://localhost:9200 +✓ Elasticsearch连接成功 + +[3/4] 删除旧索引... +发现已存在的索引: search_products +✓ 成功删除索引: search_products + +[4/4] 创建新索引... +创建索引: search_products +✓ 成功创建索引: search_products + +[5/5] 连接MySQL... +MySQL: 120.79.247.228:3306/saas +✓ MySQL连接成功 + +[6/6] 导入数据... +Tenant ID: 1 +批量大小: 500 +正在转换数据... +✓ 转换完成: 1000 个文档 +正在导入数据到ES (批量大小: 500)... +✓ 导入完成 + +============================================================ +导入完成! +============================================================ +成功: 1000 +失败: 0 +耗时: 12.34秒 +``` + +#### 注意事项 + +1. **数据量**:全量导入适合数据量较小或首次导入的场景。对于大数据量,建议使用增量接口。 +2. **索引重建**:默认会删除旧索引,请确保有数据备份。 +3. **性能**:批量大小(`--batch-size`)影响导入性能,建议根据ES集群性能调整(默认500)。 +4. **租户隔离**:每次只能导入一个租户的数据,需要为每个租户分别执行。 + +--- + +## 增量数据获取接口 + +### 功能说明 + +增量数据获取接口提供单个SPU的ES文档数据,用于增量更新ES索引。适用于: +- MySQL数据变更后,实时同步到ES +- 外部Java程序监听MySQL变更事件,调用接口获取数据后推送到ES +- 避免全量重建索引,提高更新效率 + +### 接口地址 + +``` +GET /indexer/spu/{spu_id}?tenant_id={tenant_id} +``` + +### 请求参数 + +| 参数 | 位置 | 类型 | 说明 | 是否必需 | +|------|------|------|------|----------| +| `spu_id` | 路径参数 | string | SPU ID | **是** | +| `tenant_id` | 查询参数 | string | 租户ID | **是** | + +### 请求示例 + +```bash +# cURL +curl -X GET "http://localhost:6002/indexer/spu/123?tenant_id=1" + +# Java (OkHttp) +OkHttpClient client = new OkHttpClient(); +Request request = new Request.Builder() + .url("http://localhost:6002/indexer/spu/123?tenant_id=1") + .get() + .build(); +Response response = client.newCall(request).execute(); +String json = response.body().string(); +``` + +### 响应格式 + +#### 成功响应(200 OK) + +返回完整的ES文档JSON对象,包含所有索引字段: + +```json +{ + "tenant_id": "1", + "spu_id": "123", + "title_zh": "商品标题", + "title_en": null, + "brief_zh": "商品简介", + "brief_en": null, + "description_zh": "商品详细描述", + "description_en": null, + "vendor_zh": "供应商名称", + "vendor_en": null, + "tags": ["标签1", "标签2"], + "category_path_zh": "类目1/类目2/类目3", + "category_path_en": null, + "category_name_zh": "类目名称", + "category_name_en": null, + "category_id": "100", + "category_name": "类目名称", + "category_level": 3, + "category1_name": "类目1", + "category2_name": "类目2", + "category3_name": "类目3", + "option1_name": "颜色", + "option2_name": "尺寸", + "option3_name": null, + "option1_values": ["红色", "蓝色", "绿色"], + "option2_values": ["S", "M", "L"], + "option3_values": [], + "min_price": 99.99, + "max_price": 199.99, + "compare_at_price": 299.99, + "sku_prices": [99.99, 149.99, 199.99], + "sku_weights": [100, 150, 200], + "sku_weight_units": ["g"], + "total_inventory": 500, + "sales": 1000, + "image_url": "https://example.com/image.jpg", + "create_time": "2024-01-01T00:00:00", + "update_time": "2024-01-02T00:00:00", + "skus": [ + { + "sku_id": "456", + "price": 99.99, + "compare_at_price": 149.99, + "sku_code": "SKU001", + "stock": 100, + "weight": 100.0, + "weight_unit": "g", + "option1_value": "红色", + "option2_value": "S", + "option3_value": null, + "image_src": "https://example.com/sku1.jpg" + } + ], + "specifications": [ + { + "sku_id": "456", + "name": "颜色", + "value": "红色" + }, + { + "sku_id": "456", + "name": "尺寸", + "value": "S" + } + ] +} +``` + +#### 错误响应 + +**404 Not Found** - SPU不存在或已删除: +```json +{ + "detail": "SPU 123 not found for tenant_id=1 or has been deleted" +} +``` + +**400 Bad Request** - 缺少必需参数: +```json +{ + "detail": "tenant_id is required" +} +``` + +**500 Internal Server Error** - 服务器内部错误: +```json +{ + "detail": "Internal server error: ..." +} +``` + +**503 Service Unavailable** - 服务未初始化: +```json +{ + "detail": "Incremental indexer service is not initialized. Please check database connection." +} +``` + +### 健康检查接口 + +检查增量索引服务的健康状态: + +``` +GET /indexer/health +``` + +#### 响应示例 + +```json +{ + "status": "available", + "database": "connected", + "preloaded_data": { + "category_mappings": 150, + "searchable_option_dimensions": ["option1", "option2", "option3"] + } +} +``` + +### 性能优化 + +服务在启动时预加载以下公共数据,以提高查询性能: + +1. **分类映射**:所有租户共享的分类ID到名称映射 +2. **配置信息**:搜索配置(如 `searchable_option_dimensions`) + +这些数据在服务启动时一次性加载,后续查询无需重复查询数据库,大幅提升响应速度。 + +### 使用场景 + +#### 场景1:MySQL变更监听 + +外部Java程序使用Canal或Debezium监听MySQL binlog,当检测到商品数据变更时: + +```java +// 伪代码示例 +@EventListener +public void onProductChange(ProductChangeEvent event) { + String tenantId = event.getTenantId(); + String spuId = event.getSpuId(); + + // 调用增量接口获取ES文档数据 + String url = String.format("http://localhost:6002/indexer/spu/%s?tenant_id=%s", spuId, tenantId); + Map esDoc = httpClient.get(url); + + // 推送到ES + elasticsearchClient.index("search_products", esDoc); +} +``` + +#### 场景2:定时同步 + +定时任务扫描变更的商品,批量更新: + +```java +// 伪代码示例 +List changedSpuIds = getChangedSpuIds(); +for (String spuId : changedSpuIds) { + String url = String.format("http://localhost:6002/indexer/spu/%s?tenant_id=%s", spuId, tenantId); + Map esDoc = httpClient.get(url); + elasticsearchClient.index("search_products", esDoc); +} +``` + +### 注意事项 + +1. **服务初始化**:确保API服务已启动,且数据库连接配置正确(`DB_HOST`, `DB_DATABASE`, `DB_USERNAME`, `DB_PASSWORD`)。 +2. **数据一致性**:接口返回的是调用时刻的数据快照,如果MySQL数据在调用后立即变更,可能需要重新调用。 +3. **错误处理**:建议实现重试机制,对于404错误(SPU已删除),应调用ES删除接口。 +4. **性能**:接口已优化,单次查询通常在100ms以内。如需批量获取,建议并发调用。 + +--- + +## 数据格式说明 + +### ES文档结构 + +返回的ES文档结构完全符合 `mappings/search_products.json` 定义的索引结构。主要字段说明: + +| 字段类别 | 字段名 | 类型 | 说明 | +|---------|--------|------|------| +| 基础标识 | `tenant_id` | keyword | 租户ID | +| 基础标识 | `spu_id` | keyword | SPU ID | +| 文本字段 | `title_zh`, `title_en` | text | 标题(中英文) | +| 文本字段 | `brief_zh`, `brief_en` | text | 简介(中英文) | +| 文本字段 | `description_zh`, `description_en` | text | 描述(中英文) | +| 文本字段 | `vendor_zh`, `vendor_en` | text | 供应商(中英文) | +| 类目字段 | `category_path_zh`, `category_path_en` | text | 类目路径(中英文) | +| 类目字段 | `category1_name`, `category2_name`, `category3_name` | keyword | 分层类目名称 | +| 价格字段 | `min_price`, `max_price` | float | 价格范围 | +| 库存字段 | `total_inventory` | long | 总库存 | +| 销量字段 | `sales` | long | 销量 | +| 嵌套字段 | `skus` | nested | SKU列表 | +| 嵌套字段 | `specifications` | nested | 规格列表 | + +详细字段说明请参考:[索引字段说明v2.md](./索引字段说明v2.md) + +### SKU嵌套结构 + +```json +{ + "skus": [ + { + "sku_id": "456", + "price": 99.99, + "compare_at_price": 149.99, + "sku_code": "SKU001", + "stock": 100, + "weight": 100.0, + "weight_unit": "g", + "option1_value": "红色", + "option2_value": "S", + "option3_value": null, + "image_src": "https://example.com/sku1.jpg" + } + ] +} +``` + +### Specifications嵌套结构 + +```json +{ + "specifications": [ + { + "sku_id": "456", + "name": "颜色", + "value": "红色" + }, + { + "sku_id": "456", + "name": "尺寸", + "value": "S" + } + ] +} +``` + +--- + +## 使用示例 + +### 示例1:全量导入 + +```bash +# 设置环境变量 +export DB_HOST=120.79.247.228 +export DB_PORT=3306 +export DB_DATABASE=saas +export DB_USERNAME=saas +export DB_PASSWORD=your_password +export ES_HOST=http://localhost:9200 + +# 执行全量导入 +python scripts/recreate_and_import.py --tenant-id 1 --batch-size 500 +``` + +### 示例2:增量更新(Java) + +```java +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.elasticsearch.client.RestHighLevelClient; + +public class IncrementalIndexer { + private static final String API_BASE_URL = "http://localhost:6002"; + private static final OkHttpClient httpClient = new OkHttpClient(); + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final RestHighLevelClient esClient = createESClient(); + + /** + * 获取SPU的ES文档数据并推送到ES + */ + public void indexSpu(String tenantId, String spuId) throws Exception { + // 1. 调用增量接口获取数据 + String url = String.format("%s/indexer/spu/%s?tenant_id=%s", + API_BASE_URL, spuId, tenantId); + + Request request = new Request.Builder() + .url(url) + .get() + .build(); + + try (Response response = httpClient.newCall(request).execute()) { + if (response.code() == 404) { + // SPU已删除,从ES中删除 + deleteFromES(tenantId, spuId); + return; + } + + if (!response.isSuccessful()) { + throw new RuntimeException("Failed to get SPU data: " + response.code()); + } + + // 2. 解析JSON响应 + String json = response.body().string(); + Map esDoc = objectMapper.readValue(json, Map.class); + + // 3. 推送到ES + IndexRequest indexRequest = new IndexRequest("search_products") + .id(spuId) + .source(esDoc); + + esClient.index(indexRequest, RequestOptions.DEFAULT); + } + } + + /** + * 从ES中删除SPU + */ + private void deleteFromES(String tenantId, String spuId) throws Exception { + DeleteRequest deleteRequest = new DeleteRequest("search_products", spuId); + esClient.delete(deleteRequest, RequestOptions.DEFAULT); + } +} +``` + +### 示例3:批量增量更新 + +```java +/** + * 批量更新多个SPU + */ +public void batchIndexSpus(String tenantId, List spuIds) { + ExecutorService executor = Executors.newFixedThreadPool(10); + List> futures = new ArrayList<>(); + + for (String spuId : spuIds) { + Future future = executor.submit(() -> { + try { + indexSpu(tenantId, spuId); + } catch (Exception e) { + log.error("Failed to index SPU: " + spuId, e); + } + }); + futures.add(future); + } + + // 等待所有任务完成 + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + log.error("Task failed", e); + } + } + + executor.shutdown(); +} +``` + +### 示例4:监听MySQL变更(Canal) + +```java +@CanalEventListener +public class ProductChangeListener { + + @Autowired + private IncrementalIndexer indexer; + + @ListenPoint( + destination = "example", + schema = "saas", + table = {"shoplazza_product_spu", "shoplazza_product_sku"}, + eventType = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE, CanalEntry.EventType.DELETE} + ) + public void onEvent(CanalEntry.Entry entry) { + String tableName = entry.getHeader().getTableName(); + String tenantId = extractTenantId(entry); + String spuId = extractSpuId(entry, tableName); + + if (tableName.equals("shoplazza_product_spu")) { + if (entry.getEntryType() == CanalEntry.EntryType.DELETE) { + // SPU删除,从ES删除 + indexer.deleteFromES(tenantId, spuId); + } else { + // SPU新增或更新,重新索引 + indexer.indexSpu(tenantId, spuId); + } + } else if (tableName.equals("shoplazza_product_sku")) { + // SKU变更,需要更新对应的SPU + indexer.indexSpu(tenantId, spuId); + } + } +} +``` + +--- + +## 常见问题 + +### Q1: 全量导入和增量接口的区别? + +- **全量导入**:适合首次导入或数据重建,一次性导入所有数据,但耗时较长。 +- **增量接口**:适合实时同步,按需获取单个SPU数据,响应快速。 + +### Q2: 增量接口返回的数据是否包含向量字段? + +不包含。向量字段(`title_embedding`, `image_embedding`)需要单独生成,不在本接口返回范围内。如需向量字段,需要: +1. 调用本接口获取基础数据 +2. 使用文本/图片编码服务生成向量 +3. 将向量字段添加到文档后推送到ES + +### Q3: 如何处理SPU删除? + +当接口返回404时,表示SPU不存在或已删除。此时应从ES中删除对应文档: + +```java +if (response.code() == 404) { + DeleteRequest deleteRequest = new DeleteRequest("search_products", spuId); + esClient.delete(deleteRequest, RequestOptions.DEFAULT); +} +``` + +### Q4: 服务启动失败,提示数据库连接错误? + +检查环境变量或配置文件中的数据库连接信息: +- `DB_HOST` +- `DB_PORT` +- `DB_DATABASE` +- `DB_USERNAME` +- `DB_PASSWORD` + +确保这些变量已正确设置,且数据库可访问。 + +### Q5: 接口响应慢怎么办? + +1. 检查数据库连接池配置 +2. 确认预加载数据是否成功(调用 `/indexer/health` 检查) +3. 检查数据库查询性能(SPU、SKU、Option表是否有索引) +4. 考虑使用连接池和缓存优化 + +--- + +## 相关文档 + +- [索引字段说明v2.md](./索引字段说明v2.md) - ES索引字段详细说明 +- [索引字段说明v2-参考表结构.md](./索引字段说明v2-参考表结构.md) - MySQL表结构参考 +- [mappings/search_products.json](../mappings/search_products.json) - ES索引mapping定义 + diff --git a/docs/翻译功能测试说明.md b/docs/翻译功能测试说明.md new file mode 100644 index 0000000..a524c8c --- /dev/null +++ b/docs/翻译功能测试说明.md @@ -0,0 +1,197 @@ +# 翻译功能测试说明 + +## 功能概述 + +本次更新实现了以下功能: + +1. **翻译提示词配置**:支持中英文提示词,用于提高翻译质量 +2. **DeepL Context参数**:提示词作为DeepL API的`context`参数传递(不参与翻译,仅提供上下文) +3. **同步/异步翻译**: + - 索引场景:同步翻译,等待结果返回 + - 查询场景:异步翻译,立即返回缓存结果 +4. **缓存机制**:翻译结果自动缓存,避免重复翻译 + +## 配置说明 + +### 配置文件位置 + +`config/config.yaml` + +### 翻译提示词配置 + +```yaml +translation_prompts: + # 商品标题翻译提示词 + product_title_zh: "请将原文翻译成中文商品SKU名称,要求:确保精确、完整地传达原文信息的基础上,语言简洁清晰、地道、专业。" + product_title_en: "Translate the original text into an English product SKU name. Requirements: Ensure accurate and complete transmission of the original information, with concise, clear, authentic, and professional language." + # query翻译提示词 + query_zh: "电商领域" + query_en: "e-commerce domain" + # 默认翻译用词 + default_zh: "电商领域" + default_en: "e-commerce domain" +``` + +### 提示词使用规则 + +1. **商品标题翻译**: + - 中文→英文:使用 `product_title_en` + - 英文→中文:使用 `product_title_zh` + +2. **其他字段翻译**(brief, description, vendor): + - 根据目标语言选择 `default_zh` 或 `default_en` + +3. **查询翻译**: + - 根据目标语言选择 `query_zh` 或 `query_en` + +## 测试方法 + +### 1. 测试配置加载 + +```python +from config import ConfigLoader + +config_loader = ConfigLoader() +config = config_loader.load_config() + +# 检查翻译提示词配置 +print(config.query_config.translation_prompts) +``` + +### 2. 测试同步翻译(索引场景) + +```python +from query.translator import Translator +from config import ConfigLoader + +config = ConfigLoader().load_config() +translator = Translator( + api_key=config.query_config.translation_api_key, + use_cache=True +) + +# 测试商品标题翻译 +text = "蓝牙耳机" +prompt = config.query_config.translation_prompts.get('product_title_en') +result = translator.translate( + text, + target_lang='en', + source_lang='zh', + prompt=prompt +) +print(f"翻译结果: {result}") +``` + +### 3. 测试异步翻译(查询场景) + +```python +# 异步模式(立即返回,后台翻译) +results = translator.translate_multi( + "手机", + target_langs=['en'], + source_lang='zh', + async_mode=True, + prompt=config.query_config.translation_prompts.get('query_zh') +) +print(f"异步结果: {results}") # 可能包含None(后台翻译中) + +# 同步模式(等待完成) +results_sync = translator.translate_multi( + "手机", + target_langs=['en'], + source_lang='zh', + async_mode=False, + prompt=config.query_config.translation_prompts.get('query_zh') +) +print(f"同步结果: {results_sync}") +``` + +### 4. 测试文档转换器集成 + +```python +from indexer.document_transformer import SPUDocumentTransformer +import pandas as pd + +# 创建模拟数据 +spu_row = pd.Series({ + 'id': 123, + 'tenant_id': '1', + 'title': '蓝牙耳机', + 'brief': '高品质无线蓝牙耳机', + 'description': '这是一款高品质的无线蓝牙耳机。', + 'vendor': '品牌A', + # ... 其他字段 +}) + +# 初始化转换器(带翻译器) +transformer = SPUDocumentTransformer( + category_id_to_name={}, + searchable_option_dimensions=['option1', 'option2', 'option3'], + tenant_config={'primary_language': 'zh', 'translate_to_en': True}, + translator=translator, + translation_prompts=config.query_config.translation_prompts +) + +# 转换文档 +doc = transformer.transform_spu_to_doc( + tenant_id='1', + spu_row=spu_row, + skus=pd.DataFrame(), + options=pd.DataFrame() +) + +print(f"title_zh: {doc.get('title_zh')}") +print(f"title_en: {doc.get('title_en')}") # 应该包含翻译结果 +``` + +### 5. 测试缓存功能 + +```python +# 第一次翻译(调用API) +result1 = translator.translate("测试文本", "en", "zh", prompt="电商领域") + +# 第二次翻译(使用缓存) +result2 = translator.translate("测试文本", "en", "zh", prompt="电商领域") + +assert result1 == result2 # 应该相同 +``` + +## DeepL API Context参数说明 + +根据 [DeepL API文档](https://developers.deepl.com/api-reference/translate/request-translation): + +- `context` 参数:Additional context that can influence a translation but is not translated itself +- Context中的字符不计入计费 +- Context用于提供翻译上下文,帮助提高翻译质量 + +我们的实现: +- 将提示词作为 `context` 参数传递给DeepL API +- Context不参与翻译,仅提供上下文信息 +- 不同场景使用不同的提示词(商品标题、查询、默认) + +## 运行完整测试 + +```bash +# 激活环境 +source /home/tw/miniconda3/etc/profile.d/conda.sh +conda activate searchengine + +# 运行测试脚本 +python scripts/test_translation.py +``` + +## 验证要点 + +1. **配置加载**:确认所有提示词配置正确加载 +2. **同步翻译**:索引时翻译结果正确填充到文档 +3. **异步翻译**:查询时缓存命中立即返回,未命中后台翻译 +4. **提示词使用**:不同场景使用正确的提示词 +5. **缓存机制**:相同文本和提示词的翻译结果被缓存 + +## 注意事项 + +1. 需要配置 `DEEPL_AUTH_KEY` 环境变量或 `translation_api_key` +2. 如果没有API key,翻译器会返回原文(mock模式) +3. 缓存文件存储在 `.cache/translations.json` +4. Context参数中的字符不计入DeepL计费 + diff --git a/indexer/document_transformer.py b/indexer/document_transformer.py new file mode 100644 index 0000000..14b3a75 --- /dev/null +++ b/indexer/document_transformer.py @@ -0,0 +1,545 @@ +""" +SPU文档转换器 - 公共转换逻辑。 + +提取全量和增量索引共用的文档转换逻辑,避免代码冗余。 +""" + +import pandas as pd +import logging +from typing import Dict, Any, Optional, List +from config import ConfigLoader + +logger = logging.getLogger(__name__) + +# Try to import translator (optional dependency) +try: + from query.translator import Translator + TRANSLATOR_AVAILABLE = True +except ImportError: + TRANSLATOR_AVAILABLE = False + Translator = None + + +class SPUDocumentTransformer: + """SPU文档转换器,将SPU、SKU、Option数据转换为ES文档格式。""" + + def __init__( + self, + category_id_to_name: Dict[str, str], + searchable_option_dimensions: List[str], + tenant_config: Optional[Dict[str, Any]] = None, + translator: Optional[Any] = None, + translation_prompts: Optional[Dict[str, str]] = None + ): + """ + 初始化文档转换器。 + + Args: + category_id_to_name: 分类ID到名称的映射 + searchable_option_dimensions: 可搜索的option维度列表 + tenant_config: 租户配置(包含主语言和翻译配置) + translator: 翻译器实例(可选,如果提供则启用翻译功能) + translation_prompts: 翻译提示词配置(可选) + """ + self.category_id_to_name = category_id_to_name + self.searchable_option_dimensions = searchable_option_dimensions + self.tenant_config = tenant_config or {} + self.translator = translator + self.translation_prompts = translation_prompts or {} + + def transform_spu_to_doc( + self, + tenant_id: str, + spu_row: pd.Series, + skus: pd.DataFrame, + options: pd.DataFrame + ) -> Optional[Dict[str, Any]]: + """ + 将单个SPU行和其SKUs转换为ES文档。 + + Args: + tenant_id: 租户ID + spu_row: SPU行数据 + skus: SKU数据DataFrame + options: Option数据DataFrame + + Returns: + ES文档字典 + """ + doc = {} + + # Tenant ID (required) + doc['tenant_id'] = str(tenant_id) + + # SPU ID + spu_id = spu_row['id'] + doc['spu_id'] = str(spu_id) + + # Validate required fields + if pd.isna(spu_row.get('title')) or not str(spu_row['title']).strip(): + logger.error(f"SPU {spu_id} has no title, this may cause search issues") + + # 获取租户配置 + primary_lang = self.tenant_config.get('primary_language', 'zh') + translate_to_en = self.tenant_config.get('translate_to_en', True) + translate_to_zh = self.tenant_config.get('translate_to_zh', False) + + # 文本字段处理(根据主语言和翻译配置) + self._fill_text_fields(doc, spu_row, primary_lang, translate_to_en, translate_to_zh) + + # Tags + if pd.notna(spu_row.get('tags')): + tags_str = str(spu_row['tags']) + doc['tags'] = [tag.strip() for tag in tags_str.split(',') if tag.strip()] + + # Category相关字段 + self._fill_category_fields(doc, spu_row) + + # Option名称(从option表获取) + self._fill_option_names(doc, options) + + # Image URL + self._fill_image_url(doc, spu_row) + + # Sales (fake_sales) + if pd.notna(spu_row.get('fake_sales')): + try: + doc['sales'] = int(spu_row['fake_sales']) + except (ValueError, TypeError): + doc['sales'] = 0 + else: + doc['sales'] = 0 + + # Process SKUs and build specifications + skus_list, prices, compare_prices, sku_prices, sku_weights, sku_weight_units, total_inventory, specifications = \ + self._process_skus(skus, options) + + doc['skus'] = skus_list + doc['specifications'] = specifications + + # 提取option值(根据配置的searchable_option_dimensions) + self._fill_option_values(doc, skus) + + # Calculate price ranges + if prices: + doc['min_price'] = float(min(prices)) + doc['max_price'] = float(max(prices)) + else: + doc['min_price'] = 0.0 + doc['max_price'] = 0.0 + + if compare_prices: + doc['compare_at_price'] = float(max(compare_prices)) + else: + doc['compare_at_price'] = None + + # SKU扁平化字段 + doc['sku_prices'] = sku_prices + doc['sku_weights'] = sku_weights + doc['sku_weight_units'] = list(set(sku_weight_units)) # 去重 + doc['total_inventory'] = total_inventory + + # Time fields - convert datetime to ISO format string for ES DATE type + if pd.notna(spu_row.get('create_time')): + create_time = spu_row['create_time'] + if hasattr(create_time, 'isoformat'): + doc['create_time'] = create_time.isoformat() + else: + doc['create_time'] = str(create_time) + + if pd.notna(spu_row.get('update_time')): + update_time = spu_row['update_time'] + if hasattr(update_time, 'isoformat'): + doc['update_time'] = update_time.isoformat() + else: + doc['update_time'] = str(update_time) + + return doc + + def _fill_text_fields( + self, + doc: Dict[str, Any], + spu_row: pd.Series, + primary_lang: str, + translate_to_en: bool, + translate_to_zh: bool + ): + """填充文本字段(根据主语言和翻译配置)。""" + # 主语言字段 + primary_suffix = '_zh' if primary_lang == 'zh' else '_en' + secondary_suffix = '_en' if primary_lang == 'zh' else '_zh' + + # Title + if pd.notna(spu_row.get('title')): + title_text = str(spu_row['title']) + doc[f'title{primary_suffix}'] = title_text + # 如果需要翻译,调用翻译服务(同步模式) + if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh): + if self.translator: + target_lang = 'en' if primary_lang == 'zh' else 'zh' + # 根据目标语言选择对应的提示词 + if target_lang == 'zh': + prompt = self.translation_prompts.get('product_title_zh') or self.translation_prompts.get('default_zh') + else: + prompt = self.translation_prompts.get('product_title_en') or self.translation_prompts.get('default_en') + translated = self.translator.translate( + title_text, + target_lang=target_lang, + source_lang=primary_lang, + prompt=prompt + ) + doc[f'title{secondary_suffix}'] = translated if translated else None + else: + doc[f'title{secondary_suffix}'] = None # 无翻译器,设为None + else: + doc[f'title{secondary_suffix}'] = None + else: + doc[f'title{primary_suffix}'] = None + doc[f'title{secondary_suffix}'] = None + + # Brief + if pd.notna(spu_row.get('brief')): + brief_text = str(spu_row['brief']) + doc[f'brief{primary_suffix}'] = brief_text + if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh): + if self.translator: + target_lang = 'en' if primary_lang == 'zh' else 'zh' + # 根据目标语言选择对应的提示词 + prompt = self.translation_prompts.get(f'default_{target_lang}') or self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') + translated = self.translator.translate( + brief_text, + target_lang=target_lang, + source_lang=primary_lang, + prompt=prompt + ) + doc[f'brief{secondary_suffix}'] = translated if translated else None + else: + doc[f'brief{secondary_suffix}'] = None + else: + doc[f'brief{secondary_suffix}'] = None + else: + doc[f'brief{primary_suffix}'] = None + doc[f'brief{secondary_suffix}'] = None + + # Description + if pd.notna(spu_row.get('description')): + desc_text = str(spu_row['description']) + doc[f'description{primary_suffix}'] = desc_text + if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh): + if self.translator: + target_lang = 'en' if primary_lang == 'zh' else 'zh' + # 根据目标语言选择对应的提示词 + prompt = self.translation_prompts.get(f'default_{target_lang}') or self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') + translated = self.translator.translate( + desc_text, + target_lang=target_lang, + source_lang=primary_lang, + prompt=prompt + ) + doc[f'description{secondary_suffix}'] = translated if translated else None + else: + doc[f'description{secondary_suffix}'] = None + else: + doc[f'description{secondary_suffix}'] = None + else: + doc[f'description{primary_suffix}'] = None + doc[f'description{secondary_suffix}'] = None + + # Vendor + if pd.notna(spu_row.get('vendor')): + vendor_text = str(spu_row['vendor']) + doc[f'vendor{primary_suffix}'] = vendor_text + if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh): + if self.translator: + target_lang = 'en' if primary_lang == 'zh' else 'zh' + # 根据目标语言选择对应的提示词 + prompt = self.translation_prompts.get(f'default_{target_lang}') or self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') + translated = self.translator.translate( + vendor_text, + target_lang=target_lang, + source_lang=primary_lang, + prompt=prompt + ) + doc[f'vendor{secondary_suffix}'] = translated if translated else None + else: + doc[f'vendor{secondary_suffix}'] = None + else: + doc[f'vendor{secondary_suffix}'] = None + else: + doc[f'vendor{primary_suffix}'] = None + doc[f'vendor{secondary_suffix}'] = None + + def _fill_category_fields(self, doc: Dict[str, Any], spu_row: pd.Series): + """填充类目相关字段。""" + if pd.notna(spu_row.get('category_path')): + category_path = str(spu_row['category_path']) + + # 解析category_path - 这是逗号分隔的类目ID列表 + category_ids = [cid.strip() for cid in category_path.split(',') if cid.strip()] + + # 将ID映射为名称 + category_names = [] + for cid in category_ids: + if cid in self.category_id_to_name: + category_names.append(self.category_id_to_name[cid]) + else: + logger.error(f"Category ID {cid} not found in mapping for SPU {spu_row['id']} (title: {spu_row.get('title', 'N/A')}), category_path={category_path}") + category_names.append(cid) # 使用ID作为备选 + + # 构建类目路径字符串(用于搜索) + if category_names: + category_path_str = '/'.join(category_names) + doc['category_path_zh'] = category_path_str + doc['category_path_en'] = None # 暂时设为空 + + # 填充分层类目名称 + if len(category_names) > 0: + doc['category1_name'] = category_names[0] + if len(category_names) > 1: + doc['category2_name'] = category_names[1] + if len(category_names) > 2: + doc['category3_name'] = category_names[2] + elif pd.notna(spu_row.get('category')): + # 如果category_path为空,使用category字段作为category1_name的备选 + category = str(spu_row['category']) + doc['category_name_zh'] = category + doc['category_name_en'] = None + doc['category_name'] = category + + # 尝试从category字段解析多级分类 + if '/' in category: + path_parts = category.split('/') + if len(path_parts) > 0: + doc['category1_name'] = path_parts[0].strip() + if len(path_parts) > 1: + doc['category2_name'] = path_parts[1].strip() + if len(path_parts) > 2: + doc['category3_name'] = path_parts[2].strip() + else: + # 如果category不包含"/",直接作为category1_name + doc['category1_name'] = category.strip() + + if pd.notna(spu_row.get('category')): + # 确保category相关字段都被设置(如果前面没有设置) + category_name = str(spu_row['category']) + if 'category_name_zh' not in doc: + doc['category_name_zh'] = category_name + if 'category_name_en' not in doc: + doc['category_name_en'] = None + if 'category_name' not in doc: + doc['category_name'] = category_name + + if pd.notna(spu_row.get('category_id')): + doc['category_id'] = str(int(spu_row['category_id'])) + + if pd.notna(spu_row.get('category_level')): + doc['category_level'] = int(spu_row['category_level']) + + def _fill_option_names(self, doc: Dict[str, Any], options: pd.DataFrame): + """填充Option名称字段。""" + if not options.empty: + # 按position排序获取option名称 + sorted_options = options.sort_values('position') + if len(sorted_options) > 0 and pd.notna(sorted_options.iloc[0].get('name')): + doc['option1_name'] = str(sorted_options.iloc[0]['name']) + if len(sorted_options) > 1 and pd.notna(sorted_options.iloc[1].get('name')): + doc['option2_name'] = str(sorted_options.iloc[1]['name']) + if len(sorted_options) > 2 and pd.notna(sorted_options.iloc[2].get('name')): + doc['option3_name'] = str(sorted_options.iloc[2]['name']) + + def _fill_image_url(self, doc: Dict[str, Any], spu_row: pd.Series): + """填充图片URL字段。""" + if pd.notna(spu_row.get('image_src')): + image_src = str(spu_row['image_src']) + if not image_src.startswith('http'): + image_src = f"//{image_src}" if image_src.startswith('//') else image_src + doc['image_url'] = image_src + + def _process_skus( + self, + skus: pd.DataFrame, + options: pd.DataFrame + ) -> tuple: + """处理SKU数据,返回处理结果。""" + skus_list = [] + prices = [] + compare_prices = [] + sku_prices = [] + sku_weights = [] + sku_weight_units = [] + total_inventory = 0 + specifications = [] + + # 构建option名称映射(position -> name) + option_name_map = {} + if not options.empty: + for _, opt_row in options.iterrows(): + position = opt_row.get('position') + name = opt_row.get('name') + if pd.notna(position) and pd.notna(name): + option_name_map[int(position)] = str(name) + + for _, sku_row in skus.iterrows(): + sku_data = self._transform_sku_row(sku_row, option_name_map) + if sku_data: + skus_list.append(sku_data) + + # 收集价格信息 + if 'price' in sku_data and sku_data['price'] is not None: + try: + price_val = float(sku_data['price']) + prices.append(price_val) + sku_prices.append(price_val) + except (ValueError, TypeError): + pass + + if 'compare_at_price' in sku_data and sku_data['compare_at_price'] is not None: + try: + compare_prices.append(float(sku_data['compare_at_price'])) + except (ValueError, TypeError): + pass + + # 收集重量信息 + if 'weight' in sku_data and sku_data['weight'] is not None: + try: + sku_weights.append(int(float(sku_data['weight']))) + except (ValueError, TypeError): + pass + + if 'weight_unit' in sku_data and sku_data['weight_unit']: + sku_weight_units.append(str(sku_data['weight_unit'])) + + # 收集库存信息 + if 'stock' in sku_data and sku_data['stock'] is not None: + try: + total_inventory += int(sku_data['stock']) + except (ValueError, TypeError): + pass + + # 构建specifications(从SKU的option值和option表的name) + sku_id = str(sku_row['id']) + if pd.notna(sku_row.get('option1')) and 1 in option_name_map: + specifications.append({ + 'sku_id': sku_id, + 'name': option_name_map[1], + 'value': str(sku_row['option1']) + }) + if pd.notna(sku_row.get('option2')) and 2 in option_name_map: + specifications.append({ + 'sku_id': sku_id, + 'name': option_name_map[2], + 'value': str(sku_row['option2']) + }) + if pd.notna(sku_row.get('option3')) and 3 in option_name_map: + specifications.append({ + 'sku_id': sku_id, + 'name': option_name_map[3], + 'value': str(sku_row['option3']) + }) + + return skus_list, prices, compare_prices, sku_prices, sku_weights, sku_weight_units, total_inventory, specifications + + def _fill_option_values(self, doc: Dict[str, Any], skus: pd.DataFrame): + """填充option值字段。""" + option1_values = [] + option2_values = [] + option3_values = [] + + for _, sku_row in skus.iterrows(): + if pd.notna(sku_row.get('option1')): + option1_values.append(str(sku_row['option1'])) + if pd.notna(sku_row.get('option2')): + option2_values.append(str(sku_row['option2'])) + if pd.notna(sku_row.get('option3')): + option3_values.append(str(sku_row['option3'])) + + # 去重并根据配置决定是否写入索引 + if 'option1' in self.searchable_option_dimensions: + doc['option1_values'] = list(set(option1_values)) if option1_values else [] + else: + doc['option1_values'] = [] + + if 'option2' in self.searchable_option_dimensions: + doc['option2_values'] = list(set(option2_values)) if option2_values else [] + else: + doc['option2_values'] = [] + + if 'option3' in self.searchable_option_dimensions: + doc['option3_values'] = list(set(option3_values)) if option3_values else [] + else: + doc['option3_values'] = [] + + def _transform_sku_row(self, sku_row: pd.Series, option_name_map: Dict[int, str] = None) -> Optional[Dict[str, Any]]: + """ + 将SKU行转换为SKU对象。 + + Args: + sku_row: SKU行数据 + option_name_map: position到option名称的映射 + + Returns: + SKU字典 + """ + sku_data = {} + + # SKU ID + sku_data['sku_id'] = str(sku_row['id']) + + # Price + if pd.notna(sku_row.get('price')): + try: + sku_data['price'] = float(sku_row['price']) + except (ValueError, TypeError): + sku_data['price'] = None + else: + sku_data['price'] = None + + # Compare at price + if pd.notna(sku_row.get('compare_at_price')): + try: + sku_data['compare_at_price'] = float(sku_row['compare_at_price']) + except (ValueError, TypeError): + sku_data['compare_at_price'] = None + else: + sku_data['compare_at_price'] = None + + # SKU Code + if pd.notna(sku_row.get('sku')): + sku_data['sku_code'] = str(sku_row['sku']) + + # Stock + if pd.notna(sku_row.get('inventory_quantity')): + try: + sku_data['stock'] = int(sku_row['inventory_quantity']) + except (ValueError, TypeError): + sku_data['stock'] = 0 + else: + sku_data['stock'] = 0 + + # Weight + if pd.notna(sku_row.get('weight')): + try: + sku_data['weight'] = float(sku_row['weight']) + except (ValueError, TypeError): + sku_data['weight'] = None + else: + sku_data['weight'] = None + + # Weight unit + if pd.notna(sku_row.get('weight_unit')): + sku_data['weight_unit'] = str(sku_row['weight_unit']) + + # Option values + if pd.notna(sku_row.get('option1')): + sku_data['option1_value'] = str(sku_row['option1']) + if pd.notna(sku_row.get('option2')): + sku_data['option2_value'] = str(sku_row['option2']) + if pd.notna(sku_row.get('option3')): + sku_data['option3_value'] = str(sku_row['option3']) + + # Image src + if pd.notna(sku_row.get('image_src')): + sku_data['image_src'] = str(sku_row['image_src']) + + return sku_data + diff --git a/indexer/incremental_service.py b/indexer/incremental_service.py new file mode 100644 index 0000000..34fcd58 --- /dev/null +++ b/indexer/incremental_service.py @@ -0,0 +1,238 @@ +""" +增量数据获取服务。 + +提供单个SPU的数据获取接口,用于增量更新ES索引。 +公共数据(分类映射、配置等)在服务启动时预加载,以提高性能。 +""" + +import pandas as pd +import numpy as np +import logging +from typing import Dict, Any, Optional +from sqlalchemy import text +from config import ConfigLoader +from config.tenant_config_loader import get_tenant_config_loader +from indexer.document_transformer import SPUDocumentTransformer + +# Configure logger +logger = logging.getLogger(__name__) + + +class IncrementalIndexerService: + """增量索引服务,提供单个SPU数据获取功能。""" + + def __init__(self, db_engine: Any): + """ + 初始化增量索引服务。 + + Args: + db_engine: SQLAlchemy database engine + """ + self.db_engine = db_engine + + # 预加载分类映射(全局,所有租户共享) + self.category_id_to_name = self._load_category_mapping() + logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings") + + # 租户配置加载器(延迟加载,按需获取租户配置) + self.tenant_config_loader = get_tenant_config_loader() + + def _load_category_mapping(self) -> Dict[str, str]: + """ + 加载分类ID到名称的映射(全局,所有租户共享)。 + + Returns: + Dictionary mapping category_id to category_name + """ + query = text(""" + SELECT DISTINCT + category_id, + category + FROM shoplazza_product_spu + WHERE deleted = 0 AND category_id IS NOT NULL + """) + + mapping = {} + try: + with self.db_engine.connect() as conn: + result = conn.execute(query) + for row in result: + category_id = str(int(row.category_id)) + category_name = row.category + + if not category_name or not category_name.strip(): + logger.warning(f"Category ID {category_id} has empty name, skipping") + continue + + mapping[category_id] = category_name + except Exception as e: + logger.error(f"Failed to load category mapping: {e}", exc_info=True) + + return mapping + + def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]: + """ + 获取单个SPU的ES文档数据。 + + Args: + tenant_id: 租户ID + spu_id: SPU ID + + Returns: + ES文档字典,如果SPU不存在或已删除则返回None + """ + try: + # 加载SPU数据 + spu_row = self._load_single_spu(tenant_id, spu_id) + if spu_row is None: + logger.warning(f"SPU {spu_id} not found for tenant_id={tenant_id}") + return None + + # 加载SKU数据 + skus_df = self._load_skus_for_spu(tenant_id, spu_id) + + # 加载Option数据 + options_df = self._load_options_for_spu(tenant_id, spu_id) + + # 获取租户配置 + tenant_config = self.tenant_config_loader.get_tenant_config(tenant_id) + + # 加载搜索配置 + translator = None + translation_prompts = {} + searchable_option_dimensions = ['option1', 'option2', 'option3'] + try: + config_loader = ConfigLoader() + config = config_loader.load_config() + searchable_option_dimensions = config.spu_config.searchable_option_dimensions + + # Initialize translator if translation is enabled + if config.query_config.enable_translation: + from query.translator import Translator + translator = Translator( + api_key=config.query_config.translation_api_key, + use_cache=True, # 索引时使用缓存避免重复翻译 + glossary_id=config.query_config.translation_glossary_id, + translation_context=config.query_config.translation_context + ) + translation_prompts = config.query_config.translation_prompts + except Exception as e: + logger.warning(f"Failed to load config, using default: {e}") + + # 创建文档转换器 + transformer = SPUDocumentTransformer( + category_id_to_name=self.category_id_to_name, + searchable_option_dimensions=searchable_option_dimensions, + tenant_config=tenant_config, + translator=translator, + translation_prompts=translation_prompts + ) + + # 转换为ES文档 + doc = transformer.transform_spu_to_doc( + tenant_id=tenant_id, + spu_row=spu_row, + skus=skus_df, + options=options_df + ) + + if doc is None: + logger.warning(f"Failed to transform SPU {spu_id} for tenant_id={tenant_id}") + return None + + return doc + + except Exception as e: + logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True) + raise + + def _load_single_spu(self, tenant_id: str, spu_id: str) -> Optional[pd.Series]: + """ + 加载单个SPU数据。 + + Args: + tenant_id: 租户ID + spu_id: SPU ID + + Returns: + SPU行数据,如果不存在则返回None + """ + query = text(""" + SELECT + id, shop_id, shoplazza_id, title, brief, description, + spu, vendor, vendor_url, + image_src, image_width, image_height, image_path, image_alt, + tags, note, category, category_id, category_google_id, + category_level, category_path, + fake_sales, display_fake_sales, + tenant_id, creator, create_time, updater, update_time, deleted + FROM shoplazza_product_spu + WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0 + LIMIT 1 + """) + + with self.db_engine.connect() as conn: + df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id}) + + if df.empty: + return None + + return df.iloc[0] + + def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame: + """ + 加载指定SPU的所有SKU数据。 + + Args: + tenant_id: 租户ID + spu_id: SPU ID + + Returns: + SKU数据DataFrame + """ + query = text(""" + SELECT + id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, + shoplazza_image_id, title, sku, barcode, position, + price, compare_at_price, cost_price, + option1, option2, option3, + inventory_quantity, weight, weight_unit, image_src, + wholesale_price, note, extend, + shoplazza_created_at, shoplazza_updated_at, tenant_id, + creator, create_time, updater, update_time, deleted + FROM shoplazza_product_sku + WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0 + """) + + with self.db_engine.connect() as conn: + df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id}) + + return df + + def _load_options_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame: + """ + 加载指定SPU的所有Option数据。 + + Args: + tenant_id: 租户ID + spu_id: SPU ID + + Returns: + Option数据DataFrame + """ + query = text(""" + SELECT + id, spu_id, shop_id, shoplazza_id, shoplazza_product_id, + position, name, `values`, tenant_id, + creator, create_time, updater, update_time, deleted + FROM shoplazza_product_option + WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0 + ORDER BY position + """) + + with self.db_engine.connect() as conn: + df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id}) + + return df + + diff --git a/indexer/spu_transformer.py b/indexer/spu_transformer.py index 6ddd972..eb03ebb 100644 --- a/indexer/spu_transformer.py +++ b/indexer/spu_transformer.py @@ -11,6 +11,8 @@ from typing import Dict, Any, List, Optional from sqlalchemy import create_engine, text from utils.db_connector import create_db_connection from config import ConfigLoader +from config.tenant_config_loader import get_tenant_config_loader +from indexer.document_transformer import SPUDocumentTransformer # Configure logger logger = logging.getLogger(__name__) @@ -35,16 +37,42 @@ class SPUTransformer: self.tenant_id = tenant_id # Load configuration to get searchable_option_dimensions + translator = None + translation_prompts = {} try: config_loader = ConfigLoader() config = config_loader.load_config() self.searchable_option_dimensions = config.spu_config.searchable_option_dimensions + + # Initialize translator if translation is enabled + if config.query_config.enable_translation: + from query.translator import Translator + translator = Translator( + api_key=config.query_config.translation_api_key, + use_cache=True, # 索引时使用缓存避免重复翻译 + glossary_id=config.query_config.translation_glossary_id, + translation_context=config.query_config.translation_context + ) + translation_prompts = config.query_config.translation_prompts except Exception as e: - print(f"Warning: Failed to load config, using default searchable_option_dimensions: {e}") + logger.warning(f"Failed to load config, using default: {e}") self.searchable_option_dimensions = ['option1', 'option2', 'option3'] # Load category ID to name mapping self.category_id_to_name = self._load_category_mapping() + + # Load tenant config + tenant_config_loader = get_tenant_config_loader() + tenant_config = tenant_config_loader.get_tenant_config(tenant_id) + + # Initialize document transformer + self.document_transformer = SPUDocumentTransformer( + category_id_to_name=self.category_id_to_name, + searchable_option_dimensions=self.searchable_option_dimensions, + tenant_config=tenant_config, + translator=translator, + translation_prompts=translation_prompts + ) def _load_category_mapping(self) -> Dict[str, str]: """ @@ -291,7 +319,12 @@ class SPUTransformer: logger.warning(f"SPU {spu_id} (title: {spu_row.get('title', 'N/A')}) has no SKUs") # Transform to ES document - doc = self._transform_spu_to_doc(spu_row, skus, options) + doc = self.document_transformer.transform_spu_to_doc( + tenant_id=self.tenant_id, + spu_row=spu_row, + skus=skus, + options=options + ) if doc: documents.append(doc) else: @@ -309,378 +342,4 @@ class SPUTransformer: return documents - def _transform_spu_to_doc( - self, - spu_row: pd.Series, - skus: pd.DataFrame, - options: pd.DataFrame - ) -> Optional[Dict[str, Any]]: - """ - Transform a single SPU row and its SKUs into an ES document. - - Args: - spu_row: SPU row from database - skus: DataFrame with SKUs for this SPU - options: DataFrame with options for this SPU - - Returns: - ES document or None if transformation fails - """ - doc = {} - - # Tenant ID (required) - doc['tenant_id'] = str(self.tenant_id) - - # SPU ID - spu_id = spu_row['id'] - doc['spu_id'] = str(spu_id) - - # Validate required fields - if pd.isna(spu_row.get('title')) or not str(spu_row['title']).strip(): - logger.error(f"SPU {spu_id} has no title, this may cause search issues") - - # 文本相关性相关字段(中英文双语,暂时只填充中文) - if pd.notna(spu_row.get('title')): - doc['title_zh'] = str(spu_row['title']) - doc['title_en'] = None # 暂时设为空 - - if pd.notna(spu_row.get('brief')): - doc['brief_zh'] = str(spu_row['brief']) - doc['brief_en'] = None - - if pd.notna(spu_row.get('description')): - doc['description_zh'] = str(spu_row['description']) - doc['description_en'] = None - - if pd.notna(spu_row.get('vendor')): - doc['vendor_zh'] = str(spu_row['vendor']) - doc['vendor_en'] = None - - # Tags - if pd.notna(spu_row.get('tags')): - # Tags是逗号分隔的字符串,需要转换为数组 - tags_str = str(spu_row['tags']) - doc['tags'] = [tag.strip() for tag in tags_str.split(',') if tag.strip()] - - # Category相关字段 - if pd.notna(spu_row.get('category_path')): - category_path = str(spu_row['category_path']) - - # 解析category_path - 这是逗号分隔的类目ID列表 - category_ids = [cid.strip() for cid in category_path.split(',') if cid.strip()] - - # 将ID映射为名称 - category_names = [] - missing_category_ids = [] - for cid in category_ids: - if cid in self.category_id_to_name: - category_names.append(self.category_id_to_name[cid]) - else: - # 如果找不到映射,记录错误并使用ID作为备选 - logger.error(f"Category ID {cid} not found in mapping for SPU {spu_row['id']} (title: {spu_row.get('title', 'N/A')}), category_path={category_path}") - missing_category_ids.append(cid) - category_names.append(cid) # 使用ID作为备选 - - # 构建类目路径字符串(用于搜索) - if category_names: - category_path_str = '/'.join(category_names) - doc['category_path_zh'] = category_path_str - doc['category_path_en'] = None # 暂时设为空 - - # 填充分层类目名称 - if len(category_names) > 0: - doc['category1_name'] = category_names[0] - if len(category_names) > 1: - doc['category2_name'] = category_names[1] - if len(category_names) > 2: - doc['category3_name'] = category_names[2] - elif pd.notna(spu_row.get('category')): - # 如果category_path为空,使用category字段作为category1_name的备选 - category = str(spu_row['category']) - doc['category_name_zh'] = category - doc['category_name_en'] = None - doc['category_name'] = category - - # 尝试从category字段解析多级分类 - if '/' in category: - path_parts = category.split('/') - if len(path_parts) > 0: - doc['category1_name'] = path_parts[0].strip() - if len(path_parts) > 1: - doc['category2_name'] = path_parts[1].strip() - if len(path_parts) > 2: - doc['category3_name'] = path_parts[2].strip() - else: - # 如果category不包含"/",直接作为category1_name - doc['category1_name'] = category.strip() - - if pd.notna(spu_row.get('category')): - # 确保category相关字段都被设置(如果前面没有设置) - category_name = str(spu_row['category']) - if 'category_name_zh' not in doc: - doc['category_name_zh'] = category_name - if 'category_name_en' not in doc: - doc['category_name_en'] = None - if 'category_name' not in doc: - doc['category_name'] = category_name - - if pd.notna(spu_row.get('category_id')): - doc['category_id'] = str(int(spu_row['category_id'])) - - if pd.notna(spu_row.get('category_level')): - doc['category_level'] = int(spu_row['category_level']) - - # Option名称(从option表获取) - if not options.empty: - # 按position排序获取option名称 - sorted_options = options.sort_values('position') - if len(sorted_options) > 0 and pd.notna(sorted_options.iloc[0].get('name')): - doc['option1_name'] = str(sorted_options.iloc[0]['name']) - if len(sorted_options) > 1 and pd.notna(sorted_options.iloc[1].get('name')): - doc['option2_name'] = str(sorted_options.iloc[1]['name']) - if len(sorted_options) > 2 and pd.notna(sorted_options.iloc[2].get('name')): - doc['option3_name'] = str(sorted_options.iloc[2]['name']) - - # Image URL - if pd.notna(spu_row.get('image_src')): - image_src = str(spu_row['image_src']) - if not image_src.startswith('http'): - image_src = f"//{image_src}" if image_src.startswith('//') else image_src - doc['image_url'] = image_src - - # Sales (fake_sales) - if pd.notna(spu_row.get('fake_sales')): - try: - doc['sales'] = int(spu_row['fake_sales']) - except (ValueError, TypeError): - doc['sales'] = 0 - else: - doc['sales'] = 0 - - # Process SKUs and build specifications - skus_list = [] - prices = [] - compare_prices = [] - sku_prices = [] - sku_weights = [] - sku_weight_units = [] - total_inventory = 0 - specifications = [] - - # 构建option名称映射(position -> name) - option_name_map = {} - if not options.empty: - for _, opt_row in options.iterrows(): - position = opt_row.get('position') - name = opt_row.get('name') - if pd.notna(position) and pd.notna(name): - option_name_map[int(position)] = str(name) - - for _, sku_row in skus.iterrows(): - sku_data = self._transform_sku_row(sku_row, option_name_map) - if sku_data: - skus_list.append(sku_data) - - # 收集价格信息 - if 'price' in sku_data and sku_data['price'] is not None: - try: - price_val = float(sku_data['price']) - prices.append(price_val) - sku_prices.append(price_val) - except (ValueError, TypeError): - pass - - if 'compare_at_price' in sku_data and sku_data['compare_at_price'] is not None: - try: - compare_prices.append(float(sku_data['compare_at_price'])) - except (ValueError, TypeError): - pass - - # 收集重量信息 - if 'weight' in sku_data and sku_data['weight'] is not None: - try: - sku_weights.append(int(float(sku_data['weight']))) - except (ValueError, TypeError): - pass - - if 'weight_unit' in sku_data and sku_data['weight_unit']: - sku_weight_units.append(str(sku_data['weight_unit'])) - - # 收集库存信息 - if 'stock' in sku_data and sku_data['stock'] is not None: - try: - total_inventory += int(sku_data['stock']) - except (ValueError, TypeError): - pass - - # 构建specifications(从SKU的option值和option表的name) - sku_id = str(sku_row['id']) - if pd.notna(sku_row.get('option1')) and 1 in option_name_map: - specifications.append({ - 'sku_id': sku_id, - 'name': option_name_map[1], - 'value': str(sku_row['option1']) - }) - if pd.notna(sku_row.get('option2')) and 2 in option_name_map: - specifications.append({ - 'sku_id': sku_id, - 'name': option_name_map[2], - 'value': str(sku_row['option2']) - }) - if pd.notna(sku_row.get('option3')) and 3 in option_name_map: - specifications.append({ - 'sku_id': sku_id, - 'name': option_name_map[3], - 'value': str(sku_row['option3']) - }) - - doc['skus'] = skus_list - doc['specifications'] = specifications - - # 提取option值(根据配置的searchable_option_dimensions) - # 从子SKU的option1_value, option2_value, option3_value中提取去重后的值 - option1_values = [] - option2_values = [] - option3_values = [] - - for _, sku_row in skus.iterrows(): - if pd.notna(sku_row.get('option1')): - option1_values.append(str(sku_row['option1'])) - if pd.notna(sku_row.get('option2')): - option2_values.append(str(sku_row['option2'])) - if pd.notna(sku_row.get('option3')): - option3_values.append(str(sku_row['option3'])) - - # 去重并根据配置决定是否写入索引 - if 'option1' in self.searchable_option_dimensions: - doc['option1_values'] = list(set(option1_values)) if option1_values else [] - else: - doc['option1_values'] = [] - - if 'option2' in self.searchable_option_dimensions: - doc['option2_values'] = list(set(option2_values)) if option2_values else [] - else: - doc['option2_values'] = [] - - if 'option3' in self.searchable_option_dimensions: - doc['option3_values'] = list(set(option3_values)) if option3_values else [] - else: - doc['option3_values'] = [] - - # Calculate price ranges - if prices: - doc['min_price'] = float(min(prices)) - doc['max_price'] = float(max(prices)) - else: - doc['min_price'] = 0.0 - doc['max_price'] = 0.0 - - if compare_prices: - doc['compare_at_price'] = float(max(compare_prices)) - else: - doc['compare_at_price'] = None - - # SKU扁平化字段 - doc['sku_prices'] = sku_prices - doc['sku_weights'] = sku_weights - doc['sku_weight_units'] = list(set(sku_weight_units)) # 去重 - doc['total_inventory'] = total_inventory - - # Image URL - if pd.notna(spu_row.get('image_src')): - image_src = str(spu_row['image_src']) - if not image_src.startswith('http'): - image_src = f"//{image_src}" if image_src.startswith('//') else image_src - doc['image_url'] = image_src - - # Time fields - convert datetime to ISO format string for ES DATE type - if pd.notna(spu_row.get('create_time')): - create_time = spu_row['create_time'] - if hasattr(create_time, 'isoformat'): - doc['create_time'] = create_time.isoformat() - else: - doc['create_time'] = str(create_time) - - if pd.notna(spu_row.get('update_time')): - update_time = spu_row['update_time'] - if hasattr(update_time, 'isoformat'): - doc['update_time'] = update_time.isoformat() - else: - doc['update_time'] = str(update_time) - - return doc - - def _transform_sku_row(self, sku_row: pd.Series, option_name_map: Dict[int, str] = None) -> Optional[Dict[str, Any]]: - """ - Transform a SKU row into a SKU object. - - Args: - sku_row: SKU row from database - option_name_map: Mapping from position to option name - - Returns: - SKU dictionary or None - """ - sku_data = {} - - # SKU ID - sku_data['sku_id'] = str(sku_row['id']) - - # Price - if pd.notna(sku_row.get('price')): - try: - sku_data['price'] = float(sku_row['price']) - except (ValueError, TypeError): - sku_data['price'] = None - else: - sku_data['price'] = None - - # Compare at price - if pd.notna(sku_row.get('compare_at_price')): - try: - sku_data['compare_at_price'] = float(sku_row['compare_at_price']) - except (ValueError, TypeError): - sku_data['compare_at_price'] = None - else: - sku_data['compare_at_price'] = None - - # SKU Code - if pd.notna(sku_row.get('sku')): - sku_data['sku_code'] = str(sku_row['sku']) - - # Stock - if pd.notna(sku_row.get('inventory_quantity')): - try: - sku_data['stock'] = int(sku_row['inventory_quantity']) - except (ValueError, TypeError): - sku_data['stock'] = 0 - else: - sku_data['stock'] = 0 - - # Weight - if pd.notna(sku_row.get('weight')): - try: - sku_data['weight'] = float(sku_row['weight']) - except (ValueError, TypeError): - sku_data['weight'] = None - else: - sku_data['weight'] = None - - # Weight unit - if pd.notna(sku_row.get('weight_unit')): - sku_data['weight_unit'] = str(sku_row['weight_unit']) - - # Option values - if pd.notna(sku_row.get('option1')): - sku_data['option1_value'] = str(sku_row['option1']) - if pd.notna(sku_row.get('option2')): - sku_data['option2_value'] = str(sku_row['option2']) - if pd.notna(sku_row.get('option3')): - sku_data['option3_value'] = str(sku_row['option3']) - - # Image src - if pd.notna(sku_row.get('image_src')): - sku_data['image_src'] = str(sku_row['image_src']) - - return sku_data diff --git a/indexer/test_indexing.py b/indexer/test_indexing.py new file mode 100755 index 0000000..13b892c --- /dev/null +++ b/indexer/test_indexing.py @@ -0,0 +1,362 @@ +#!/usr/bin/env python3 +""" +索引功能测试脚本。 + +测试内容: +1. 全量索引(SPUTransformer) +2. 增量索引(IncrementalIndexerService) +3. 租户配置加载 +4. 翻译功能集成(根据租户配置) +5. 文档转换器功能 +""" + +import sys +import os +from pathlib import Path + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from config import ConfigLoader +from config.tenant_config_loader import get_tenant_config_loader +from utils.db_connector import create_db_connection +from indexer.spu_transformer import SPUTransformer +from indexer.incremental_service import IncrementalIndexerService +import logging + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +def test_tenant_config(): + """测试租户配置加载""" + print("\n" + "="*60) + print("测试1: 租户配置加载") + print("="*60) + + try: + tenant_config_loader = get_tenant_config_loader() + + # 测试默认配置 + default_config = tenant_config_loader.get_tenant_config("999") + print(f"默认配置: {default_config}") + + # 测试租户162(翻译关闭) + tenant_162_config = tenant_config_loader.get_tenant_config("162") + print(f"租户162配置: {tenant_162_config}") + assert tenant_162_config['translate_to_en'] == False, "租户162翻译应该关闭" + assert tenant_162_config['translate_to_zh'] == False, "租户162翻译应该关闭" + print("✓ 租户162配置正确(翻译关闭)") + + # 测试其他租户 + tenant_1_config = tenant_config_loader.get_tenant_config("1") + print(f"租户1配置: {tenant_1_config}") + assert tenant_1_config['translate_to_en'] == True, "租户1应该启用英文翻译" + print("✓ 租户1配置正确(翻译开启)") + + return True + except Exception as e: + print(f"✗ 租户配置测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_full_indexing(tenant_id: str = "162"): + """测试全量索引""" + print("\n" + "="*60) + print(f"测试2: 全量索引(租户{tenant_id})") + print("="*60) + + # 获取数据库配置 + db_host = os.environ.get('DB_HOST') + db_port = int(os.environ.get('DB_PORT', 3306)) + db_database = os.environ.get('DB_DATABASE') + db_username = os.environ.get('DB_USERNAME') + db_password = os.environ.get('DB_PASSWORD') + + if not all([db_host, db_database, db_username, db_password]): + print("✗ 跳过:数据库配置不完整") + print(" 需要环境变量: DB_HOST, DB_DATABASE, DB_USERNAME, DB_PASSWORD") + return False + + try: + # 连接数据库 + db_engine = create_db_connection( + host=db_host, + port=db_port, + database=db_database, + username=db_username, + password=db_password + ) + print(f"✓ 数据库连接成功: {db_host}:{db_port}/{db_database}") + + # 创建转换器 + transformer = SPUTransformer(db_engine, tenant_id) + print(f"✓ SPUTransformer初始化成功") + + # 转换数据(只转换前3个SPU用于测试) + print(f"\n开始转换数据(租户{tenant_id})...") + documents = transformer.transform_batch() + + if not documents: + print(f"⚠ 没有数据需要转换") + return True + + print(f"✓ 转换完成: {len(documents)} 个文档") + + # 检查前3个文档 + for i, doc in enumerate(documents[:3]): + print(f"\n文档 {i+1}:") + print(f" SPU ID: {doc.get('spu_id')}") + print(f" Tenant ID: {doc.get('tenant_id')}") + print(f" 标题 (中文): {doc.get('title_zh', 'N/A')}") + print(f" 标题 (英文): {doc.get('title_en', 'N/A')}") + + # 检查租户162的翻译状态 + if tenant_id == "162": + # 租户162翻译应该关闭,title_en应该为None + if doc.get('title_en') is None: + print(f" ✓ 翻译已关闭(title_en为None)") + else: + print(f" ⚠ 警告:翻译应该关闭,但title_en有值: {doc.get('title_en')}") + + return True + + except Exception as e: + print(f"✗ 全量索引测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_incremental_indexing(tenant_id: str = "162"): + """测试增量索引""" + print("\n" + "="*60) + print(f"测试3: 增量索引(租户{tenant_id})") + print("="*60) + + # 获取数据库配置 + db_host = os.environ.get('DB_HOST') + db_port = int(os.environ.get('DB_PORT', 3306)) + db_database = os.environ.get('DB_DATABASE') + db_username = os.environ.get('DB_USERNAME') + db_password = os.environ.get('DB_PASSWORD') + + if not all([db_host, db_database, db_username, db_password]): + print("✗ 跳过:数据库配置不完整") + return False + + try: + # 连接数据库 + db_engine = create_db_connection( + host=db_host, + port=db_port, + database=db_database, + username=db_username, + password=db_password + ) + + # 创建增量服务 + service = IncrementalIndexerService(db_engine) + print(f"✓ IncrementalIndexerService初始化成功") + + # 先查询一个SPU ID + from sqlalchemy import text + with db_engine.connect() as conn: + query = text(""" + SELECT id FROM shoplazza_product_spu + WHERE tenant_id = :tenant_id AND deleted = 0 + LIMIT 1 + """) + result = conn.execute(query, {"tenant_id": tenant_id}) + row = result.fetchone() + if not row: + print(f"⚠ 租户{tenant_id}没有数据,跳过增量测试") + return True + spu_id = str(row[0]) + + print(f"\n测试SPU ID: {spu_id}") + + # 获取SPU文档 + doc = service.get_spu_document(tenant_id=tenant_id, spu_id=spu_id) + + if doc is None: + print(f"✗ SPU {spu_id} 文档获取失败") + return False + + print(f"✓ SPU文档获取成功") + print(f" SPU ID: {doc.get('spu_id')}") + print(f" Tenant ID: {doc.get('tenant_id')}") + print(f" 标题 (中文): {doc.get('title_zh', 'N/A')}") + print(f" 标题 (英文): {doc.get('title_en', 'N/A')}") + print(f" SKU数量: {len(doc.get('skus', []))}") + print(f" 规格数量: {len(doc.get('specifications', []))}") + + # 检查租户162的翻译状态 + if tenant_id == "162": + if doc.get('title_en') is None: + print(f" ✓ 翻译已关闭(title_en为None)") + else: + print(f" ⚠ 警告:翻译应该关闭,但title_en有值: {doc.get('title_en')}") + + return True + + except Exception as e: + print(f"✗ 增量索引测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_document_transformer(): + """测试文档转换器""" + print("\n" + "="*60) + print("测试4: 文档转换器") + print("="*60) + + try: + import pandas as pd + from indexer.document_transformer import SPUDocumentTransformer + from config import ConfigLoader + + config = ConfigLoader().load_config() + + # 创建模拟数据 + spu_row = pd.Series({ + 'id': 123, + 'tenant_id': '162', + 'title': '测试商品', + 'brief': '测试简介', + 'description': '测试描述', + 'vendor': '测试品牌', + 'category': '测试类目', + 'category_id': 100, + 'category_level': 1, + 'fake_sales': 1000, + 'image_src': 'https://example.com/image.jpg', + 'tags': '测试,标签', + 'create_time': pd.Timestamp.now(), + 'update_time': pd.Timestamp.now() + }) + + skus_df = pd.DataFrame([{ + 'id': 456, + 'price': 99.99, + 'compare_at_price': 149.99, + 'sku': 'SKU001', + 'inventory_quantity': 100, + 'option1': '黑色', + 'option2': None, + 'option3': None + }]) + + options_df = pd.DataFrame([{ + 'id': 1, + 'position': 1, + 'name': '颜色' + }]) + + # 获取租户配置 + tenant_config_loader = get_tenant_config_loader() + tenant_config = tenant_config_loader.get_tenant_config('162') + + # 初始化翻译器(如果启用) + translator = None + if config.query_config.enable_translation: + from query.translator import Translator + translator = Translator( + api_key=config.query_config.translation_api_key, + use_cache=True + ) + + # 创建转换器 + transformer = SPUDocumentTransformer( + category_id_to_name={}, + searchable_option_dimensions=['option1', 'option2', 'option3'], + tenant_config=tenant_config, + translator=translator, + translation_prompts=config.query_config.translation_prompts + ) + + # 转换文档 + doc = transformer.transform_spu_to_doc( + tenant_id='162', + spu_row=spu_row, + skus=skus_df, + options=options_df + ) + + if doc: + print(f"✓ 文档转换成功") + print(f" title_zh: {doc.get('title_zh')}") + print(f" title_en: {doc.get('title_en')}") + print(f" SKU数量: {len(doc.get('skus', []))}") + + # 验证租户162翻译关闭 + if doc.get('title_en') is None: + print(f" ✓ 翻译已关闭(符合租户162配置)") + else: + print(f" ⚠ 警告:翻译应该关闭") + + return True + else: + print(f"✗ 文档转换失败") + return False + + except Exception as e: + print(f"✗ 文档转换器测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def main(): + """主测试函数""" + print("="*60) + print("索引功能完整测试") + print("="*60) + + results = [] + + # 测试1: 租户配置 + results.append(("租户配置加载", test_tenant_config())) + + # 测试2: 全量索引(租户162) + results.append(("全量索引(租户162)", test_full_indexing("162"))) + + # 测试3: 增量索引(租户162) + results.append(("增量索引(租户162)", test_incremental_indexing("162"))) + + # 测试4: 文档转换器 + results.append(("文档转换器", test_document_transformer())) + + # 总结 + print("\n" + "="*60) + print("测试总结") + print("="*60) + + passed = sum(1 for _, result in results if result) + total = len(results) + + for name, result in results: + status = "✓ 通过" if result else "✗ 失败" + print(f"{status}: {name}") + + print(f"\n总计: {passed}/{total} 通过") + + if passed == total: + print("✓ 所有测试通过") + return 0 + else: + print("✗ 部分测试失败") + return 1 + + +if __name__ == '__main__': + sys.exit(main()) + diff --git a/query/query_parser.py b/query/query_parser.py index d24e4fa..6e176fc 100644 --- a/query/query_parser.py +++ b/query/query_parser.py @@ -229,14 +229,19 @@ class QueryParser: if target_langs: # Use e-commerce context for better disambiguation - translation_context = 'e-commerce product search' + translation_context = self.config.query_config.translation_context + # For query translation, we use a general prompt (not language-specific) + # Since translate_multi uses same prompt for all languages, we use default + query_prompt = self.config.query_config.translation_prompts.get('query_zh') or \ + self.config.query_config.translation_prompts.get('default_zh') # Use async mode: returns cached translations immediately, missing ones translated in background translations = self.translator.translate_multi( query_text, target_langs, source_lang=detected_lang, context=translation_context, - async_mode=True + async_mode=True, + prompt=query_prompt ) # Filter out None values (missing translations that are being processed async) translations = {k: v for k, v in translations.items() if v is not None} diff --git a/query/test_translation.py b/query/test_translation.py new file mode 100755 index 0000000..8422b27 --- /dev/null +++ b/query/test_translation.py @@ -0,0 +1,294 @@ +#!/usr/bin/env python3 +""" +翻译功能测试脚本。 + +测试内容: +1. 翻译提示词配置加载 +2. 同步翻译(索引场景) +3. 异步翻译(查询场景) +4. 不同提示词的使用 +5. 缓存功能 +6. DeepL Context参数使用 +""" + +import sys +import os +from pathlib import Path + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from config import ConfigLoader +from query.translator import Translator +import logging + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +def test_config_loading(): + """测试配置加载""" + print("\n" + "="*60) + print("测试1: 配置加载") + print("="*60) + + try: + config_loader = ConfigLoader() + config = config_loader.load_config() + + print(f"✓ 配置加载成功") + print(f" 翻译服务: {config.query_config.translation_service}") + print(f" 翻译提示词配置:") + for key, value in config.query_config.translation_prompts.items(): + print(f" {key}: {value[:60]}..." if len(value) > 60 else f" {key}: {value}") + + return config + except Exception as e: + print(f"✗ 配置加载失败: {e}") + import traceback + traceback.print_exc() + return None + + +def test_translator_sync(config): + """测试同步翻译(索引场景)""" + print("\n" + "="*60) + print("测试2: 同步翻译(索引场景)") + print("="*60) + + if not config: + print("✗ 跳过:配置未加载") + return None + + try: + translator = Translator( + api_key=config.query_config.translation_api_key, + use_cache=True, + glossary_id=config.query_config.translation_glossary_id, + translation_context=config.query_config.translation_context + ) + + # 测试商品标题翻译(使用product_title提示词) + test_texts = [ + ("蓝牙耳机", "zh", "en", "product_title"), + ("Wireless Headphones", "en", "zh", "product_title"), + ] + + for text, source_lang, target_lang, prompt_type in test_texts: + if prompt_type == "product_title": + if target_lang == "zh": + prompt = config.query_config.translation_prompts.get('product_title_zh') + else: + prompt = config.query_config.translation_prompts.get('product_title_en') + else: + if target_lang == "zh": + prompt = config.query_config.translation_prompts.get('default_zh') + else: + prompt = config.query_config.translation_prompts.get('default_en') + + print(f"\n翻译测试:") + print(f" 原文 ({source_lang}): {text}") + print(f" 目标语言: {target_lang}") + print(f" 提示词: {prompt[:50] if prompt else 'None'}...") + + result = translator.translate( + text, + target_lang=target_lang, + source_lang=source_lang, + prompt=prompt + ) + + if result: + print(f" 结果: {result}") + print(f" ✓ 翻译成功") + else: + print(f" ⚠ 翻译返回None(可能是mock模式或无API key)") + + return translator + + except Exception as e: + print(f"✗ 同步翻译测试失败: {e}") + import traceback + traceback.print_exc() + return None + + +def test_translator_async(config, translator): + """测试异步翻译(查询场景)""" + print("\n" + "="*60) + print("测试3: 异步翻译(查询场景)") + print("="*60) + + if not config or not translator: + print("✗ 跳过:配置或翻译器未初始化") + return + + try: + query_text = "手机" + target_langs = ['en'] + source_lang = 'zh' + + query_prompt = config.query_config.translation_prompts.get('query_zh') + + print(f"查询文本: {query_text}") + print(f"目标语言: {target_langs}") + print(f"提示词: {query_prompt}") + + # 异步模式(立即返回,后台翻译) + results = translator.translate_multi( + query_text, + target_langs, + source_lang=source_lang, + context=config.query_config.translation_context, + async_mode=True, + prompt=query_prompt + ) + + print(f"\n异步翻译结果:") + for lang, translation in results.items(): + if translation: + print(f" {lang}: {translation} (缓存命中)") + else: + print(f" {lang}: None (后台翻译中...)") + + # 同步模式(等待完成) + print(f"\n同步翻译(等待完成):") + results_sync = translator.translate_multi( + query_text, + target_langs, + source_lang=source_lang, + context=config.query_config.translation_context, + async_mode=False, + prompt=query_prompt + ) + + for lang, translation in results_sync.items(): + print(f" {lang}: {translation}") + + except Exception as e: + print(f"✗ 异步翻译测试失败: {e}") + import traceback + traceback.print_exc() + + +def test_cache(): + """测试缓存功能""" + print("\n" + "="*60) + print("测试4: 缓存功能") + print("="*60) + + try: + config_loader = ConfigLoader() + config = config_loader.load_config() + + translator = Translator( + api_key=config.query_config.translation_api_key, + use_cache=True + ) + + test_text = "测试文本" + target_lang = "en" + source_lang = "zh" + prompt = config.query_config.translation_prompts.get('default_zh') + + print(f"第一次翻译(应该调用API或返回mock):") + result1 = translator.translate(test_text, target_lang, source_lang, prompt=prompt) + print(f" 结果: {result1}") + + print(f"\n第二次翻译(应该使用缓存):") + result2 = translator.translate(test_text, target_lang, source_lang, prompt=prompt) + print(f" 结果: {result2}") + + if result1 == result2: + print(f" ✓ 缓存功能正常") + else: + print(f" ⚠ 缓存可能有问题") + + except Exception as e: + print(f"✗ 缓存测试失败: {e}") + import traceback + traceback.print_exc() + + +def test_context_parameter(): + """测试DeepL Context参数使用""" + print("\n" + "="*60) + print("测试5: DeepL Context参数") + print("="*60) + + try: + config_loader = ConfigLoader() + config = config_loader.load_config() + + translator = Translator( + api_key=config.query_config.translation_api_key, + use_cache=False # 禁用缓存以便测试 + ) + + # 测试带context和不带context的翻译 + text = "手机" + prompt = config.query_config.translation_prompts.get('query_zh') + + print(f"测试文本: {text}") + print(f"提示词(作为context): {prompt}") + + # 带context的翻译 + result_with_context = translator.translate( + text, + target_lang='en', + source_lang='zh', + prompt=prompt + ) + print(f"\n带context翻译结果: {result_with_context}") + + # 不带context的翻译 + result_without_context = translator.translate( + text, + target_lang='en', + source_lang='zh', + prompt=None + ) + print(f"不带context翻译结果: {result_without_context}") + + print(f"\n✓ Context参数测试完成") + print(f" 注意:根据DeepL API,context参数影响翻译但不参与翻译本身") + + except Exception as e: + print(f"✗ Context参数测试失败: {e}") + import traceback + traceback.print_exc() + + +def main(): + """主测试函数""" + print("="*60) + print("翻译功能测试") + print("="*60) + + # 测试1: 配置加载 + config = test_config_loading() + + # 测试2: 同步翻译 + translator = test_translator_sync(config) + + # 测试3: 异步翻译 + test_translator_async(config, translator) + + # 测试4: 缓存功能 + test_cache() + + # 测试5: Context参数 + test_context_parameter() + + print("\n" + "="*60) + print("测试完成") + print("="*60) + + +if __name__ == '__main__': + main() + diff --git a/query/translator.py b/query/translator.py index 2f99310..040f892 100644 --- a/query/translator.py +++ b/query/translator.py @@ -2,10 +2,16 @@ Translation service for multi-language query support. Supports DeepL API for high-quality translations. + + +#### 官方文档: +https://developers.deepl.com/api-reference/translate/request-translation +##### + + """ import requests -import threading from concurrent.futures import ThreadPoolExecutor from typing import Dict, List, Optional from utils.cache import DictCache @@ -74,25 +80,24 @@ class Translator: # Thread pool for async translation self.executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="translator") - - # Thread pool for async translation - self.executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="translator") def translate( self, text: str, target_lang: str, source_lang: Optional[str] = None, - context: Optional[str] = None + context: Optional[str] = None, + prompt: Optional[str] = None ) -> Optional[str]: """ - Translate text to target language. + Translate text to target language (synchronous mode). Args: text: Text to translate target_lang: Target language code ('zh', 'en', 'ru', etc.) source_lang: Source language code (optional, auto-detect if None) context: Additional context for translation (overrides default context) + prompt: Translation prompt/instruction (optional, for better translation quality) Returns: Translated text or None if translation fails @@ -107,35 +112,40 @@ class Translator: # Use provided context or default context translation_context = context or self.translation_context - - # Check cache (include context in cache key for accuracy) + + # Build cache key (include prompt in cache key if provided) + cache_key_parts = [source_lang or 'auto', target_lang, translation_context] + if prompt: + cache_key_parts.append(prompt) + cache_key_parts.append(text) + cache_key = ':'.join(cache_key_parts) + + # Check cache (include context and prompt in cache key for accuracy) if self.use_cache: - cache_key = f"{source_lang or 'auto'}:{target_lang}:{translation_context}:{text}" cached = self.cache.get(cache_key, category="translations") if cached: return cached # If no API key, return mock translation (for testing) if not self.api_key: - print(f"[Translator] No API key, returning original text (mock mode)") + logger.debug(f"[Translator] No API key, returning original text (mock mode)") return text # Translate using DeepL with fallback - result = self._translate_deepl(text, target_lang, source_lang, translation_context) + result = self._translate_deepl(text, target_lang, source_lang, translation_context, prompt) # If translation failed, try fallback to free API if result is None and "api.deepl.com" in self.DEEPL_API_URL: - print(f"[Translator] Pro API failed, trying free API...") - result = self._translate_deepl_free(text, target_lang, source_lang, translation_context) + logger.debug(f"[Translator] Pro API failed, trying free API...") + result = self._translate_deepl_free(text, target_lang, source_lang, translation_context, prompt) # If still failed, return original text with warning if result is None: - print(f"[Translator] Translation failed, returning original text") + logger.warning(f"[Translator] Translation failed for '{text[:50]}...', returning original text") result = text # Cache result if result and self.use_cache: - cache_key = f"{source_lang or 'auto'}:{target_lang}:{translation_context}:{text}" self.cache.set(cache_key, result, category="translations") return result @@ -145,7 +155,8 @@ class Translator: text: str, target_lang: str, source_lang: Optional[str], - context: Optional[str] = None + context: Optional[str] = None, + prompt: Optional[str] = None ) -> Optional[str]: """ Translate using DeepL API with context and glossary support. @@ -164,10 +175,14 @@ class Translator: "Content-Type": "application/json", } - # Build text with context for better disambiguation + # Use prompt as context parameter for DeepL API (not as text prefix) + # According to DeepL API: context is "Additional context that can influence a translation but is not translated itself" + # If prompt is provided, use it as context; otherwise use the default context + api_context = prompt if prompt else context + # For e-commerce, add context words to help DeepL understand the domain # This is especially important for single-word ambiguous terms like "车" (car vs rook) - text_to_translate, needs_extraction = self._add_ecommerce_context(text, source_lang, context) + text_to_translate, needs_extraction = self._add_ecommerce_context(text, source_lang, api_context) payload = { "text": [text_to_translate], @@ -178,15 +193,18 @@ class Translator: source_code = self.LANG_CODE_MAP.get(source_lang, source_lang.upper()) payload["source_lang"] = source_code + # Add context parameter (prompt or default context) + # Context influences translation but is not translated itself + if api_context: + payload["context"] = api_context + # Add glossary if configured if self.glossary_id: payload["glossary_id"] = self.glossary_id - # Note: DeepL API v2 doesn't have a direct "context" parameter, - # but we can improve translation by: - # 1. Using glossary for domain-specific terms (best solution) - # 2. Adding context words to the text (for single-word queries) - implemented in _add_ecommerce_context - # 3. Using more specific source language detection + # Note: DeepL API v2 supports "context" parameter for additional context + # that influences translation but is not translated itself. + # We use prompt as context parameter when provided. try: response = requests.post( @@ -207,14 +225,14 @@ class Translator: ) return translated_text else: - print(f"[Translator] DeepL API error: {response.status_code} - {response.text}") + logger.error(f"[Translator] DeepL API error: {response.status_code} - {response.text}") return None except requests.Timeout: - print(f"[Translator] Translation request timed out") + logger.warning(f"[Translator] Translation request timed out") return None except Exception as e: - print(f"[Translator] Translation failed: {e}") + logger.error(f"[Translator] Translation failed: {e}", exc_info=True) return None def _translate_deepl_free( @@ -222,7 +240,8 @@ class Translator: text: str, target_lang: str, source_lang: Optional[str], - context: Optional[str] = None + context: Optional[str] = None, + prompt: Optional[str] = None ) -> Optional[str]: """ Translate using DeepL Free API. @@ -237,6 +256,9 @@ class Translator: "Content-Type": "application/json", } + # Use prompt as context parameter for DeepL API + api_context = prompt if prompt else context + payload = { "text": [text], "target_lang": target_code, @@ -246,6 +268,10 @@ class Translator: source_code = self.LANG_CODE_MAP.get(source_lang, source_lang.upper()) payload["source_lang"] = source_code + # Add context parameter + if api_context: + payload["context"] = api_context + # Note: Free API typically doesn't support glossary_id # But we can still use context hints in the text @@ -262,14 +288,14 @@ class Translator: if "translations" in data and len(data["translations"]) > 0: return data["translations"][0]["text"] else: - print(f"[Translator] DeepL Free API error: {response.status_code} - {response.text}") + logger.error(f"[Translator] DeepL Free API error: {response.status_code} - {response.text}") return None except requests.Timeout: - print(f"[Translator] Free API request timed out") + logger.warning(f"[Translator] Free API request timed out") return None except Exception as e: - print(f"[Translator] Free API translation failed: {e}") + logger.error(f"[Translator] Free API translation failed: {e}", exc_info=True) return None def translate_multi( @@ -278,7 +304,8 @@ class Translator: target_langs: List[str], source_lang: Optional[str] = None, context: Optional[str] = None, - async_mode: bool = True + async_mode: bool = True, + prompt: Optional[str] = None ) -> Dict[str, Optional[str]]: """ Translate text to multiple target languages. @@ -297,6 +324,7 @@ class Translator: source_lang: Source language code (optional) context: Context hint for translation (optional) async_mode: If True, return cached results immediately and translate missing ones async + prompt: Translation prompt/instruction (optional) Returns: Dictionary mapping language code to translated text (only cached results in async mode) @@ -306,7 +334,7 @@ class Translator: # First, get cached translations for lang in target_langs: - cached = self._get_cached_translation(text, lang, source_lang, context) + cached = self._get_cached_translation(text, lang, source_lang, context, prompt) if cached is not None: results[lang] = cached else: @@ -315,14 +343,14 @@ class Translator: # If async mode and there are missing translations, launch async tasks if async_mode and missing_langs: for lang in missing_langs: - self._translate_async(text, lang, source_lang, context) + self._translate_async(text, lang, source_lang, context, prompt) # Return None for missing translations for lang in missing_langs: results[lang] = None else: # Synchronous mode: wait for all translations for lang in missing_langs: - results[lang] = self.translate(text, lang, source_lang, context) + results[lang] = self.translate(text, lang, source_lang, context, prompt) return results @@ -331,14 +359,19 @@ class Translator: text: str, target_lang: str, source_lang: Optional[str] = None, - context: Optional[str] = None + context: Optional[str] = None, + prompt: Optional[str] = None ) -> Optional[str]: """Get translation from cache if available.""" if not self.cache: return None translation_context = context or self.translation_context - cache_key = f"{source_lang or 'auto'}:{target_lang}:{translation_context}:{text}" + cache_key_parts = [source_lang or 'auto', target_lang, translation_context] + if prompt: + cache_key_parts.append(prompt) + cache_key_parts.append(text) + cache_key = ':'.join(cache_key_parts) return self.cache.get(cache_key, category="translations") def _translate_async( @@ -346,12 +379,13 @@ class Translator: text: str, target_lang: str, source_lang: Optional[str] = None, - context: Optional[str] = None + context: Optional[str] = None, + prompt: Optional[str] = None ): """Launch async translation task.""" def _do_translate(): try: - result = self.translate(text, target_lang, source_lang, context) + result = self.translate(text, target_lang, source_lang, context, prompt) if result: logger.debug(f"Async translation completed: {text} -> {target_lang}: {result}") except Exception as e: -- libgit2 0.21.2