From 26b910bd6bb837fc328a2c4f0f45305a0bf31b79 Mon Sep 17 00:00:00 2001 From: tangwang Date: Tue, 10 Mar 2026 13:09:24 +0800 Subject: [PATCH] refactor service init and tighten multi-tenant search contracts --- api/app.py | 2 +- api/routes/admin.py | 42 +++++++++++++++++++++++++++++++++--------- api/routes/search.py | 34 ++++++++-------------------------- api/translator_app.py | 2 +- config/config_loader.py | 25 +++++++++++++++++++++++++ config/services_config.py | 82 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------- config/tenant_config_loader.py | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------- docs/DEVELOPER_GUIDE.md | 13 +++++++++++-- docs/QUICKSTART.md | 2 +- docs/Usage-Guide.md | 2 +- docs/搜索API对接指南.md | 23 +++++++++++++++++------ docs/搜索API速查表.md | 4 ++-- providers/embedding.py | 16 ++++++++++------ query/query_parser.py | 28 ++++++++++++++++------------ search/es_query_builder.py | 23 ++++++++++++++++++----- search/searcher.py | 40 ++++++++++++++++++++++++++++++---------- tests/ci/test_service_api_contracts.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ utils/es_client.py | 10 ++-------- 18 files changed, 332 insertions(+), 137 deletions(-) diff --git a/api/app.py b/api/app.py index 7c15979..c11fc30 100644 --- a/api/app.py +++ b/api/app.py @@ -206,7 +206,7 @@ async def startup_event(): logger.info("Service initialized successfully") except Exception as e: logger.error(f"Failed to initialize service: {e}", exc_info=True) - logger.warning("Service will start but may not function correctly") + raise @app.on_event("shutdown") diff --git a/api/routes/admin.py b/api/routes/admin.py index 66e0eb3..e4a015e 100644 --- a/api/routes/admin.py +++ b/api/routes/admin.py @@ -2,10 +2,11 @@ Admin API routes for configuration and management. """ -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, HTTPException, Request from typing import Dict from ..models import HealthResponse, ErrorResponse +from indexer.mapping_generator import get_tenant_index_name router = APIRouter(prefix="/admin", tags=["admin"]) @@ -57,6 +58,8 @@ async def get_configuration(): "spu_enabled": config.spu_config.enabled } + except HTTPException: + raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @@ -105,29 +108,50 @@ async def get_rewrite_rules(): @router.get("/stats") -async def get_index_stats(): +async def get_index_stats(http_request: Request): """ Get index statistics. """ try: - from ..app import get_es_client, get_config + from urllib.parse import parse_qs + from ..app import get_es_client es_client = get_es_client() - config = get_config() + + tenant_id = http_request.headers.get("X-Tenant-ID") + if not tenant_id: + query_string = http_request.url.query + if query_string: + params = parse_qs(query_string) + tenant_id = params.get("tenant_id", [None])[0] + + if not tenant_id: + raise HTTPException( + status_code=400, + detail="tenant_id is required. Provide it via header 'X-Tenant-ID' or query parameter 'tenant_id'", + ) + + index_name = get_tenant_index_name(tenant_id) + if not es_client.client.indices.exists(index=index_name): + raise HTTPException( + status_code=404, + detail=f"Tenant index not found: {index_name}", + ) # Get document count - doc_count = es_client.count(config.es_index_name) + doc_count = es_client.client.count(index=index_name).get("count", 0) # Get index size (if available) try: - stats = es_client.client.indices.stats(index=config.es_index_name) - size_in_bytes = stats['indices'][config.es_index_name]['total']['store']['size_in_bytes'] + stats = es_client.client.indices.stats(index=index_name) + size_in_bytes = stats["indices"][index_name]["total"]["store"]["size_in_bytes"] size_mb = size_in_bytes / (1024 * 1024) - except: + except Exception: size_mb = None return { - "index_name": config.es_index_name, + "tenant_id": str(tenant_id), + "index_name": index_name, "document_count": doc_count, "size_mb": round(size_mb, 2) if size_mb else None } diff --git a/api/routes/search.py b/api/routes/search.py index 770dfcf..eb7de96 100644 --- a/api/routes/search.py +++ b/api/routes/search.py @@ -327,7 +327,6 @@ async def search_suggestions( async def instant_search( q: str = Query(..., min_length=2, description="搜索查询"), size: int = Query(5, ge=1, le=20, description="结果数量"), - tenant_id: str = Query(..., description="租户ID") ): """ 即时搜索(Instant Search)。 @@ -336,32 +335,15 @@ async def instant_search( - 边输入边搜索,无需点击搜索按钮 - 返回简化的搜索结果 - 注意:此功能暂未实现,调用标准搜索接口。 - TODO: 优化即时搜索性能 - - 添加防抖/节流 - - 实现结果缓存 - - 简化返回字段 + 注意:此功能暂未开放,当前明确返回 501。 """ - from api.app import get_searcher - searcher = get_searcher() - - result = searcher.search( - query=q, - tenant_id=tenant_id, - size=size, - from_=0, - sku_filter_dimension=None # Instant search doesn't support SKU filtering - ) - - return SearchResponse( - results=result.results, - total=result.total, - max_score=result.max_score, - took_ms=result.took_ms, - facets=result.facets, - query_info=result.query_info, - suggestions=result.suggestions, - related_searches=result.related_searches + # 明确暴露当前接口尚未完成实现,避免调用不完整逻辑导致隐式运行时错误。 + raise HTTPException( + status_code=501, + detail=( + "/search/instant is not implemented yet. " + "Use POST /search/ for production traffic." + ), ) diff --git a/api/translator_app.py b/api/translator_app.py index 73858cb..cbc1d36 100644 --- a/api/translator_app.py +++ b/api/translator_app.py @@ -187,6 +187,7 @@ async def startup_event(): logger.info(f"Translation service ready with default model: {default_model}") except Exception as e: logger.error(f"Failed to initialize translator: {e}", exc_info=True) + raise @app.get("/health") @@ -310,4 +311,3 @@ if __name__ == "__main__": port=args.port, reload=args.reload ) - diff --git a/config/config_loader.py b/config/config_loader.py index ee779c0..6bd06af 100644 --- a/config/config_loader.py +++ b/config/config_loader.py @@ -367,6 +367,31 @@ class ConfigLoader: f"not in supported languages: {config.query_config.supported_languages}" ) + # Validate source_fields tri-state semantics + source_fields = config.query_config.source_fields + if source_fields is not None: + if not isinstance(source_fields, list): + errors.append("query_config.source_fields must be null or list[str]") + else: + for idx, field_name in enumerate(source_fields): + if not isinstance(field_name, str) or not field_name.strip(): + errors.append( + f"query_config.source_fields[{idx}] must be a non-empty string" + ) + + # Validate tenant config shape (default must exist in config) + tenant_cfg = config.tenant_config + if not isinstance(tenant_cfg, dict): + errors.append("tenant_config must be an object") + else: + default_cfg = tenant_cfg.get("default") + if not isinstance(default_cfg, dict): + errors.append("tenant_config.default must be configured") + else: + index_languages = default_cfg.get("index_languages") + if not isinstance(index_languages, list) or len(index_languages) == 0: + errors.append("tenant_config.default.index_languages must be a non-empty list") + return errors def to_dict(self, config: SearchConfig) -> Dict[str, Any]: diff --git a/config/services_config.py b/config/services_config.py index de178fe..0f4ae3a 100644 --- a/config/services_config.py +++ b/config/services_config.py @@ -2,7 +2,7 @@ Services configuration - single source for translation, embedding, rerank providers. All provider selection and endpoint resolution is centralized here. -Priority: env vars > config.yaml > defaults. +Priority: env vars > config.yaml. """ from __future__ import annotations @@ -34,14 +34,32 @@ def _load_services_raw(config_path: Optional[Path] = None) -> Dict[str, Any]: config_path = Path(__file__).parent / "config.yaml" path = Path(config_path) if not path.exists(): - return {} + raise FileNotFoundError(f"services config file not found: {path}") try: with open(path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) - except Exception: - return {} - services = data.get("services") if isinstance(data, dict) else {} - return services if isinstance(services, dict) else {} + except Exception as exc: + raise RuntimeError(f"failed to parse services config from {path}: {exc}") from exc + if not isinstance(data, dict): + raise RuntimeError(f"invalid config format in {path}: expected mapping root") + services = data.get("services") + if not isinstance(services, dict): + raise RuntimeError("config.yaml must contain a valid 'services' mapping") + return services + + +def _resolve_provider_name( + env_name: str, + config_provider: Any, + capability: str, +) -> str: + provider = os.getenv(env_name) or config_provider + if not provider: + raise ValueError( + f"services.{capability}.provider is required " + f"(or set env override {env_name})" + ) + return str(provider).strip().lower() def _resolve_translation() -> ServiceConfig: @@ -49,12 +67,13 @@ def _resolve_translation() -> ServiceConfig: cfg = raw.get("translation", {}) if isinstance(raw.get("translation"), dict) else {} providers = cfg.get("providers", {}) if isinstance(cfg.get("providers"), dict) else {} - provider = ( - os.getenv("TRANSLATION_PROVIDER") - or cfg.get("provider") - or "direct" + provider = _resolve_provider_name( + env_name="TRANSLATION_PROVIDER", + config_provider=cfg.get("provider"), + capability="translation", ) - provider = str(provider).strip().lower() + if provider not in ("direct", "local", "inprocess", "http", "service"): + raise ValueError(f"Unsupported translation provider: {provider}") # Env override for http base_url env_url = os.getenv("TRANSLATION_SERVICE_URL") @@ -71,12 +90,13 @@ def _resolve_embedding() -> ServiceConfig: cfg = raw.get("embedding", {}) if isinstance(raw.get("embedding"), dict) else {} providers = cfg.get("providers", {}) if isinstance(cfg.get("providers"), dict) else {} - provider = ( - os.getenv("EMBEDDING_PROVIDER") - or cfg.get("provider") - or "http" + provider = _resolve_provider_name( + env_name="EMBEDDING_PROVIDER", + config_provider=cfg.get("provider"), + capability="embedding", ) - provider = str(provider).strip().lower() + if provider != "http": + raise ValueError(f"Unsupported embedding provider: {provider}") env_url = os.getenv("EMBEDDING_SERVICE_URL") if env_url and provider == "http": @@ -92,12 +112,13 @@ def _resolve_rerank() -> ServiceConfig: cfg = raw.get("rerank", {}) if isinstance(raw.get("rerank"), dict) else {} providers = cfg.get("providers", {}) if isinstance(cfg.get("providers"), dict) else {} - provider = ( - os.getenv("RERANK_PROVIDER") - or cfg.get("provider") - or "http" + provider = _resolve_provider_name( + env_name="RERANK_PROVIDER", + config_provider=cfg.get("provider"), + capability="rerank", ) - provider = str(provider).strip().lower() + if provider != "http": + raise ValueError(f"Unsupported rerank provider: {provider}") env_url = os.getenv("RERANKER_SERVICE_URL") if env_url: @@ -124,10 +145,13 @@ def get_rerank_backend_config() -> tuple[str, dict]: name = ( os.getenv("RERANK_BACKEND") or cfg.get("backend") - or "qwen3_vllm" ) + if not name: + raise ValueError("services.rerank.backend is required (or env RERANK_BACKEND)") name = str(name).strip().lower() backend_cfg = backends.get(name, {}) if isinstance(backends.get(name), dict) else {} + if not backend_cfg: + raise ValueError(f"services.rerank.backends.{name} is required") return name, backend_cfg @@ -143,10 +167,13 @@ def get_embedding_backend_config() -> tuple[str, dict]: name = ( os.getenv("EMBEDDING_BACKEND") or cfg.get("backend") - or "tei" ) + if not name: + raise ValueError("services.embedding.backend is required (or env EMBEDDING_BACKEND)") name = str(name).strip().lower() backend_cfg = backends.get(name, {}) if isinstance(backends.get(name), dict) else {} + if not backend_cfg: + raise ValueError(f"services.embedding.backends.{name} is required") return name, backend_cfg @@ -173,8 +200,9 @@ def get_translation_base_url() -> str: base = ( os.getenv("TRANSLATION_SERVICE_URL") or get_translation_config().providers.get("http", {}).get("base_url") - or "http://127.0.0.1:6006" ) + if not base: + raise ValueError("Translation HTTP base_url is not configured") return str(base).rstrip("/") @@ -183,8 +211,9 @@ def get_embedding_base_url() -> str: base = ( os.getenv("EMBEDDING_SERVICE_URL") or get_embedding_config().providers.get("http", {}).get("base_url") - or "http://127.0.0.1:6005" ) + if not base: + raise ValueError("Embedding HTTP base_url is not configured") return str(base).rstrip("/") @@ -194,8 +223,9 @@ def get_rerank_service_url() -> str: os.getenv("RERANKER_SERVICE_URL") or get_rerank_config().providers.get("http", {}).get("service_url") or get_rerank_config().providers.get("http", {}).get("base_url") - or "http://127.0.0.1:6007" ) + if not base: + raise ValueError("Rerank HTTP service_url/base_url is not configured") base = str(base).rstrip("/") return base if base.endswith("/rerank") else f"{base}/rerank" diff --git a/config/tenant_config_loader.py b/config/tenant_config_loader.py index 402f7cc..fd1867d 100644 --- a/config/tenant_config_loader.py +++ b/config/tenant_config_loader.py @@ -51,18 +51,16 @@ SUPPORTED_INDEX_LANGUAGES: Dict[str, str] = { "bg": "Bulgarian", } -DEFAULT_INDEX_LANGUAGES: List[str] = ["en", "zh"] - - def normalize_index_languages(value: Any, primary_language: str = "en") -> List[str]: """ 将 index_languages 配置规范化为合法语言代码列表。 - None 或空时返回 DEFAULT_INDEX_LANGUAGES。 + 仅做规范化,不做默认值兜底。 """ + del primary_language if value is None: - return list(DEFAULT_INDEX_LANGUAGES) + return [] if not isinstance(value, (list, tuple)): - return list(DEFAULT_INDEX_LANGUAGES) + return [] valid: List[str] = [] seen: set = set() for item in value: @@ -72,19 +70,23 @@ def normalize_index_languages(value: Any, primary_language: str = "en") -> List[ if code in SUPPORTED_INDEX_LANGUAGES: valid.append(code) seen.add(code) - return valid if valid else list(DEFAULT_INDEX_LANGUAGES) + return valid -def resolve_index_languages(tenant_config: Dict[str, Any]) -> List[str]: +def resolve_index_languages( + tenant_config: Dict[str, Any], + default_index_languages: List[str], +) -> List[str]: """ 从租户配置解析 index_languages。 若存在 index_languages 则用之;否则按旧配置 translate_to_en / translate_to_zh 推导。 """ if "index_languages" in tenant_config: - return normalize_index_languages( + normalized = normalize_index_languages( tenant_config["index_languages"], tenant_config.get("primary_language") or "en", ) + return normalized if normalized else list(default_index_languages) primary = (tenant_config.get("primary_language") or "en").strip().lower() to_en = bool(tenant_config.get("translate_to_en")) to_zh = bool(tenant_config.get("translate_to_zh")) @@ -95,7 +97,7 @@ def resolve_index_languages(tenant_config: Dict[str, Any]) -> List[str]: if code not in langs and ((code == "en" and to_en) or (code == "zh" and to_zh)): if code in SUPPORTED_INDEX_LANGUAGES: langs.append(code) - return langs if langs else list(DEFAULT_INDEX_LANGUAGES) + return langs if langs else list(default_index_languages) class TenantConfigLoader: @@ -117,18 +119,43 @@ class TenantConfigLoader: try: from config import ConfigLoader + config_loader = ConfigLoader() search_config = config_loader.load_config() - self._config = search_config.tenant_config + tenant_cfg = search_config.tenant_config + if not isinstance(tenant_cfg, dict): + raise RuntimeError("tenant_config must be an object") + + default_cfg = tenant_cfg.get("default") + if not isinstance(default_cfg, dict): + raise RuntimeError("tenant_config.default must be configured in config.yaml") + default_primary = (default_cfg.get("primary_language") or "en").strip().lower() + default_index_languages = normalize_index_languages( + default_cfg.get("index_languages"), + default_primary, + ) + if not default_index_languages: + raise RuntimeError( + "tenant_config.default.index_languages must include at least one supported language" + ) + + tenants_cfg = tenant_cfg.get("tenants", {}) + if not isinstance(tenants_cfg, dict): + raise RuntimeError("tenant_config.tenants must be an object") + + normalized_default = dict(default_cfg) + normalized_default["primary_language"] = default_primary + normalized_default["index_languages"] = default_index_languages + + self._config = { + "default": normalized_default, + "tenants": tenants_cfg, + } logger.info("Loaded tenant config from unified config.yaml") return self._config except Exception as e: logger.error(f"Failed to load tenant config: {e}", exc_info=True) - self._config = { - "default": {"primary_language": "en", "index_languages": ["en", "zh"]}, - "tenants": {}, - } - return self._config + raise def get_tenant_config(self, tenant_id: str) -> Dict[str, Any]: """ @@ -142,13 +169,20 @@ class TenantConfigLoader: """ config = self.load_config() tenant_id_str = str(tenant_id) - default = config.get("default", {"primary_language": "en", "index_languages": ["en", "zh"]}) + default = config["default"] tenants = config.get("tenants", {}) - raw = tenants[tenant_id_str] if tenant_id_str in tenants else default + raw = tenants[tenant_id_str] if tenant_id_str in tenants else {} + if raw and not isinstance(raw, dict): + raise RuntimeError(f"tenant_config.tenants.{tenant_id_str} must be an object") if tenant_id_str not in tenants: logger.debug(f"Tenant {tenant_id} not found in config, using default") - out = dict(raw) - out["index_languages"] = resolve_index_languages(raw) + merged = dict(default) + merged.update(raw) + out = dict(merged) + out["index_languages"] = resolve_index_languages( + merged, + default_index_languages=default["index_languages"], + ) return out def reload(self): @@ -167,4 +201,3 @@ def get_tenant_config_loader() -> TenantConfigLoader: if _tenant_config_loader is None: _tenant_config_loader = TenantConfigLoader() return _tenant_config_loader - diff --git a/docs/DEVELOPER_GUIDE.md b/docs/DEVELOPER_GUIDE.md index b7b9154..a82413c 100644 --- a/docs/DEVELOPER_GUIDE.md +++ b/docs/DEVELOPER_GUIDE.md @@ -218,9 +218,18 @@ docs/ # 文档(含本指南) ### 5.7 错误处理 -- 外部能力(翻译、向量、重排)调用失败时应立即报错并中止请求,禁止静默降级;通过日志与监控尽早暴露问题并修复。 +- **启动期(fail-fast)**:配置加载、依赖初始化、模型/资源加载失败必须直接启动失败,不允许“继续启动但不可用”。 +- **运行期(分层处理)**: + - 核心链路(ES 主检索)失败:请求直接失败并返回错误,禁止伪装成空结果。 + - 增强链路(翻译/向量/重排)失败:记录 warning,不应拖垮主检索流程。 +- 禁止通过“静默回退到另一套实现”掩盖问题;默认行为必须由配置显式给出。 -### 5.8 环境隔离 +### 5.8 启动初始化约束 + +- 重资源与关键依赖(如 translator、text/image encoder)应在服务启动期初始化一次并复用,避免请求期懒加载。 +- 若配置声明启用某能力(例如 GPU 后端),但运行资源不满足,应直接启动失败,不自动降级为其它后端。 + +### 5.9 环境隔离 - 主程序(backend/indexer/frontend)只使用 `.venv`,不引入 `clip-as-service`/`vllm` 的重依赖。 - embedding 服务使用 `.venv-embedding`;reranker 服务使用 `.venv-reranker`;CN-CLIP 使用 `.venv-cnclip`;TEI 使用 Docker 容器。 diff --git a/docs/QUICKSTART.md b/docs/QUICKSTART.md index 0a331f1..4c3f159 100644 --- a/docs/QUICKSTART.md +++ b/docs/QUICKSTART.md @@ -102,7 +102,7 @@ curl "http://localhost:6002/search/suggestions?q=玩&size=5" \ # 健康与统计 curl http://localhost:6002/admin/health -curl http://localhost:6002/admin/stats +curl http://localhost:6002/admin/stats -H "X-Tenant-ID: 162" ``` API 文档:`http://localhost:6002/docs` diff --git a/docs/Usage-Guide.md b/docs/Usage-Guide.md index 50fa367..861c760 100644 --- a/docs/Usage-Guide.md +++ b/docs/Usage-Guide.md @@ -385,7 +385,7 @@ curl http://localhost:6002/admin/health ### 2. 索引统计 ```bash -curl http://localhost:6002/admin/stats +curl http://localhost:6002/admin/stats -H "X-Tenant-ID: 162" ``` ### 3. 简单搜索测试 diff --git a/docs/搜索API对接指南.md b/docs/搜索API对接指南.md index 9f33617..e3b8f65 100644 --- a/docs/搜索API对接指南.md +++ b/docs/搜索API对接指南.md @@ -139,7 +139,7 @@ curl -X POST "http://43.166.252.75:6002/search/" \ |------|------|------|------| | 搜索 | POST | `/search/` | 执行搜索查询 | | 搜索建议 | GET | `/search/suggestions` | 搜索建议(自动补全/热词,多语言 + 结果直达) | -| 即时搜索 | GET | `/search/instant` | 边输入边搜索(框架) ⚠️ TODO | +| 即时搜索 | GET | `/search/instant` | 即时搜索预留接口(当前返回 `501 Not Implemented`) | | 获取文档 | GET | `/search/{doc_id}` | 获取单个文档 | | 全量索引 | POST | `/indexer/reindex` | 全量索引接口(导入数据,不删除索引,仅推荐自测使用) | | 增量索引 | POST | `/indexer/index` | 增量索引接口(指定SPU ID列表进行索引,支持自动检测删除和显式删除,仅推荐自测使用) | @@ -149,7 +149,7 @@ curl -X POST "http://43.166.252.75:6002/search/" \ | 索引健康检查 | GET | `/indexer/health` | 检查索引服务状态 | | 健康检查 | GET | `/admin/health` | 服务健康检查 | | 获取配置 | GET | `/admin/config` | 获取租户配置 | -| 索引统计 | GET | `/admin/stats` | 获取索引统计信息 | +| 索引统计 | GET | `/admin/stats` | 获取租户索引统计信息(需 tenant_id) | **微服务(独立端口,外部可直连)**: @@ -633,10 +633,10 @@ curl "http://localhost:6002/search/suggestions?q=芭&size=5&language=zh&with_res ### 3.8 即时搜索接口 -> ⚠️ **TODO**: 此接口当前为框架实现,功能暂未实现,调用标准搜索接口。后续需要优化即时搜索性能(添加防抖/节流、实现结果缓存、简化返回字段)。 +> ⚠️ 当前版本未开放该能力。接口会明确返回 `501 Not Implemented`,避免误用未完成实现。 - **端点**: `GET /search/instant` -- **描述**: 边输入边搜索,采用轻量参数响应当前输入。底层复用标准搜索能力。 +- **描述**: 即时搜索预留端点,后续会在独立实现完成后开放。 #### 查询参数 @@ -651,6 +651,15 @@ curl "http://localhost:6002/search/suggestions?q=芭&size=5&language=zh&with_res curl "http://localhost:6002/search/instant?q=玩具&size=5" ``` +#### 当前响应 + +```json +{ + "error": "/search/instant is not implemented yet. Use POST /search/ for production traffic.", + "status_code": 501 +} +``` + ### 3.9 获取单个文档 - **端点**: `GET /search/{doc_id}` @@ -1534,11 +1543,13 @@ curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ ### 6.3 索引统计 - **端点**: `GET /admin/stats` -- **描述**: 获取索引文档数量与磁盘大小,方便监控。 +- **描述**: 获取指定租户索引文档数量与磁盘大小,方便监控。 +- **租户标识**:通过请求头 `X-Tenant-ID` 或 query 参数 `tenant_id` 传递(必填)。 ```json { - "index_name": "search_tenant1", + "tenant_id": "162", + "index_name": "search_products_tenant_162", "document_count": 10000, "size_mb": 523.45 } diff --git a/docs/搜索API速查表.md b/docs/搜索API速查表.md index fd6f4fe..67f75ec 100644 --- a/docs/搜索API速查表.md +++ b/docs/搜索API速查表.md @@ -296,13 +296,13 @@ POST /search/image GET /search/suggestions?q=芭&size=5&language=zh&with_results=true -GET /search/instant?q=玩具&size=5 +GET /search/instant?q=玩具&size=5 # 当前返回 501 Not Implemented GET /search/{doc_id} GET /admin/health GET /admin/config -GET /admin/stats +GET /admin/stats # 需 X-Tenant-ID 或 tenant_id ``` --- diff --git a/providers/embedding.py b/providers/embedding.py index 51d8d52..fd7a0be 100644 --- a/providers/embedding.py +++ b/providers/embedding.py @@ -22,15 +22,19 @@ class EmbeddingProvider: def __init__(self) -> None: self._base_url = get_embedding_base_url() + from embeddings.text_encoder import TextEmbeddingEncoder + from embeddings.image_encoder import CLIPImageEncoder + + # Initialize once; avoid per-access instantiation. + self._text_encoder = TextEmbeddingEncoder(service_url=self._base_url) + self._image_encoder = CLIPImageEncoder(service_url=self._base_url) @property def text_encoder(self): - """Lazy-created text encoder (TextEmbeddingEncoder).""" - from embeddings.text_encoder import TextEmbeddingEncoder - return TextEmbeddingEncoder(service_url=self._base_url) + """Pre-initialized text encoder (TextEmbeddingEncoder).""" + return self._text_encoder @property def image_encoder(self): - """Lazy-created image encoder (CLIPImageEncoder).""" - from embeddings.image_encoder import CLIPImageEncoder - return CLIPImageEncoder(service_url=self._base_url) + """Pre-initialized image encoder (CLIPImageEncoder).""" + return self._image_encoder diff --git a/query/query_parser.py b/query/query_parser.py index c926850..007361b 100644 --- a/query/query_parser.py +++ b/query/query_parser.py @@ -85,8 +85,8 @@ class QueryParser: Args: config: SearchConfig instance - text_encoder: Text embedding encoder (lazy loaded if not provided) - translator: Translator instance (lazy loaded if not provided) + text_encoder: Text embedding encoder (initialized at startup if not provided) + translator: Translator instance (initialized at startup if not provided) """ self.config = config self._text_encoder = text_encoder @@ -114,22 +114,24 @@ class QueryParser: else: logger.info("HanLP not installed; using simple tokenizer") + # Eager initialization (startup-time failure visibility, no lazy init in request path) + if self.config.query_config.enable_text_embedding and self._text_encoder is None: + logger.info("Initializing text encoder at QueryParser construction...") + self._text_encoder = TextEmbeddingEncoder() + if self._translator is None: + from config.services_config import get_translation_config + cfg = get_translation_config() + logger.info("Initializing translator at QueryParser construction (provider=%s)...", cfg.provider) + self._translator = create_translation_provider(self.config.query_config) + @property def text_encoder(self) -> TextEmbeddingEncoder: - """Lazy load text encoder.""" - if self._text_encoder is None and self.config.query_config.enable_text_embedding: - logger.info("Initializing text encoder (lazy load)...") - self._text_encoder = TextEmbeddingEncoder() + """Return pre-initialized text encoder.""" return self._text_encoder @property def translator(self) -> Any: - """Lazy load translator.""" - if self._translator is None: - from config.services_config import get_translation_config - cfg = get_translation_config() - logger.info("Initializing translator (provider=%s)...", cfg.provider) - self._translator = create_translation_provider(self.config.query_config) + """Return pre-initialized translator.""" return self._translator def _simple_tokenize(self, text: str) -> List[str]: @@ -343,6 +345,8 @@ class QueryParser: encoding_executor = None if should_generate_embedding: try: + if self.text_encoder is None: + raise RuntimeError("Text embedding is enabled but text encoder is not initialized") log_debug("Starting query vector generation (async)") # Submit encoding task to thread pool for async execution encoding_executor = ThreadPoolExecutor(max_workers=1) diff --git a/search/es_query_builder.py b/search/es_query_builder.py index 5325b2a..ae88d21 100644 --- a/search/es_query_builder.py +++ b/search/es_query_builder.py @@ -50,6 +50,22 @@ class ESQueryBuilder: self.default_language = default_language self.knn_boost = knn_boost + def _apply_source_filter(self, es_query: Dict[str, Any]) -> None: + """ + Apply tri-state _source semantics: + - None: do not set _source (return all source fields) + - []: _source=false + - [..]: _source.includes=[..] + """ + if self.source_fields is None: + return + if not isinstance(self.source_fields, list): + raise ValueError("query_config.source_fields must be null or list[str]") + if len(self.source_fields) == 0: + es_query["_source"] = False + return + es_query["_source"] = {"includes": self.source_fields} + def _split_filters_for_faceting( self, filters: Optional[Dict[str, Any]], @@ -146,11 +162,8 @@ class ESQueryBuilder: "from": from_ } - # Add _source filtering if source_fields are configured - if self.source_fields: - es_query["_source"] = { - "includes": self.source_fields - } + # Add _source filtering with explicit tri-state semantics. + self._apply_source_filter(es_query) # 1. Build recall queries (text or embedding) recall_clauses = [] diff --git a/search/searcher.py b/search/searcher.py index 3fd9948..5536e21 100644 --- a/search/searcher.py +++ b/search/searcher.py @@ -82,7 +82,8 @@ class Searcher: self, es_client: ESClient, config: SearchConfig, - query_parser: Optional[QueryParser] = None + query_parser: Optional[QueryParser] = None, + image_encoder: Optional[CLIPImageEncoder] = None, ): """ Initialize searcher. @@ -91,6 +92,7 @@ class Searcher: es_client: Elasticsearch client config: SearchConfig instance query_parser: Query parser (created if not provided) + image_encoder: Optional pre-initialized image encoder """ self.es_client = es_client self.config = config @@ -103,8 +105,12 @@ class Searcher: # Get match fields from config self.match_fields = get_match_fields_for_index(config, "default") self.text_embedding_field = config.query_config.text_embedding_field or "title_embedding" - self.image_embedding_field = config.query_config.image_embedding_field or "image_embedding" - self.source_fields = config.query_config.source_fields or [] + self.image_embedding_field = config.query_config.image_embedding_field + if self.image_embedding_field and image_encoder is None: + self.image_encoder = CLIPImageEncoder() + else: + self.image_encoder = image_encoder + self.source_fields = config.query_config.source_fields # Query builder - simplified single-layer architecture self.query_builder = ESQueryBuilder( @@ -118,6 +124,22 @@ class Searcher: knn_boost=self.config.query_config.knn_boost ) + def _apply_source_filter(self, es_query: Dict[str, Any]) -> None: + """ + Apply tri-state _source semantics: + - None: do not set _source (return full source) + - []: _source=false (return no source fields) + - [..]: _source.includes=[..] + """ + if self.source_fields is None: + return + if not isinstance(self.source_fields, list): + raise ValueError("query_config.source_fields must be null or list[str]") + if len(self.source_fields) == 0: + es_query["_source"] = False + return + es_query["_source"] = {"includes": self.source_fields} + def search( self, query: str, @@ -568,8 +590,9 @@ class Searcher: raise ValueError("Image embedding field not configured") # Generate image embedding - image_encoder = CLIPImageEncoder() - image_vector = image_encoder.encode_image_from_url(image_url) + if self.image_encoder is None: + raise RuntimeError("Image encoder is not initialized at startup") + image_vector = self.image_encoder.encode_image_from_url(image_url) if image_vector is None: raise ValueError(f"Failed to encode image: {image_url}") @@ -590,11 +613,8 @@ class Searcher: } } - # Add _source filtering if source_fields are configured - if self.source_fields: - es_query["_source"] = { - "includes": self.source_fields - } + # Apply source filtering semantics (None / [] / list) + self._apply_source_filter(es_query) if filters or range_filters: filter_clauses = self.query_builder._build_filters(filters, range_filters) diff --git a/tests/ci/test_service_api_contracts.py b/tests/ci/test_service_api_contracts.py index ffd10be..a59f0ee 100644 --- a/tests/ci/test_service_api_contracts.py +++ b/tests/ci/test_service_api_contracts.py @@ -95,6 +95,52 @@ def test_suggestion_api_contract(search_client: TestClient): assert len(data["suggestions"]) == 1 +def test_instant_search_not_implemented(search_client: TestClient): + response = search_client.get("/search/instant?q=iph&size=5") + assert response.status_code == 501 + + +def test_admin_stats_contract(search_client: TestClient, monkeypatch): + import api.app as search_app + + class _FakeIndices: + @staticmethod + def exists(index: str) -> bool: + return index.endswith("search_products_tenant_162") + + @staticmethod + def stats(index: str): + return { + "indices": { + index: { + "total": { + "store": { + "size_in_bytes": 2 * 1024 * 1024, + } + } + } + } + } + + class _FakeClient: + indices = _FakeIndices() + + @staticmethod + def count(index: str): + assert index.endswith("search_products_tenant_162") + return {"count": 123} + + monkeypatch.setattr(search_app, "get_es_client", lambda: SimpleNamespace(client=_FakeClient())) + + response = search_client.get("/admin/stats", headers={"X-Tenant-ID": "162"}) + assert response.status_code == 200 + data = response.json() + assert data["tenant_id"] == "162" + assert data["index_name"].endswith("search_products_tenant_162") + assert data["document_count"] == 123 + assert data["size_mb"] == 2.0 + + class _FakeBulkService: def bulk_index(self, tenant_id: str, recreate_index: bool, batch_size: int): return { diff --git a/utils/es_client.py b/utils/es_client.py index 03b8a77..8799508 100644 --- a/utils/es_client.py +++ b/utils/es_client.py @@ -193,13 +193,7 @@ class ESClient: ) except Exception as e: logger.error(f"Search failed: {e}", exc_info=True) - return { - 'hits': { - 'total': {'value': 0}, - 'hits': [] - }, - 'error': str(e) - } + raise RuntimeError(f"Elasticsearch search failed for index '{index_name}': {e}") from e def get_mapping(self, index_name: str) -> Dict[str, Any]: """Get index mapping.""" @@ -234,7 +228,7 @@ class ESClient: return result['count'] except Exception as e: logger.error(f"Count failed: {e}", exc_info=True) - return 0 + raise RuntimeError(f"Elasticsearch count failed for index '{index_name}': {e}") from e def get_es_client_from_env() -> ESClient: -- libgit2 0.21.2