From ed94866621f769cb80425846a68db4a01a9f5caa Mon Sep 17 00:00:00 2001 From: tangwang Date: Mon, 9 Mar 2026 17:04:00 +0800 Subject: [PATCH] tidy --- activate.sh | 25 +++++-------------------- api/indexer_app.py | 16 ++++++++-------- api/routes/indexer.py | 37 ++++++++++++------------------------- config/__init__.py | 12 +----------- config/config_loader.py | 14 -------------- config/service_endpoints.py | 12 ------------ docs/DEVELOPER_GUIDE.md | 14 +++++++------- docs/Usage-Guide.md | 3 +-- embeddings/__init__.py | 44 +++++++------------------------------------- embeddings/clip_as_service_encoder.py | 62 ++++++++++++++++++++++++++++++++------------------------------ embeddings/image_encoder.py | 97 +++++++++++++++++++++++++++++++++++++------------------------------------------------------------ embeddings/qwen3_model.py | 47 +++++++++-------------------------------------- embeddings/server.py | 117 ++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------------------- embeddings/text_encoder.py | 60 ++++++++++++++++++++---------------------------------------- indexer/document_transformer.py | 61 ++++++++++++++++++++++++++----------------------------------- indexer/incremental_service.py | 129 +++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------------------------------------ indexer/indexing_utils.py | 55 +++++++++++++++++++++++-------------------------------- main.py | 4 +++- providers/embedding.py | 11 ++--------- providers/rerank.py | 8 +++----- providers/translation.py | 14 +------------- query/__init__.py | 3 --- query/translation_client.py | 20 -------------------- requirements.txt | 1 + scripts/create_venv.sh | 3 ++- scripts/start_embedding_service.sh | 18 +++++++++++++++++- scripts/test_build_docs_api.py | 6 +++++- scripts/trace_indexer_calls.sh | 74 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ search/es_query_builder.py | 4 ---- search/searcher.py | 7 ++----- tests/ci/test_service_api_contracts.py | 1 + tests/test_cnclip_service.py | 34 ++++++++++++++++++++++++++++------ tests/test_embedding_pipeline.py | 21 ++++++++++++++++++--- 33 files changed, 440 insertions(+), 594 deletions(-) delete mode 100644 config/service_endpoints.py delete mode 100644 query/translation_client.py create mode 100755 scripts/trace_indexer_calls.sh diff --git a/activate.sh b/activate.sh index 7e4555a..643ac38 100644 --- a/activate.sh +++ b/activate.sh @@ -1,13 +1,10 @@ #!/bin/bash # -# Unified environment activator (venv preferred, conda fallback). -# # Usage: # source activate.sh # -# Priority: +# Required: # 1) ./.venv (Python venv) -# 2) conda env "searchengine" (legacy) # # Must be sourced @@ -18,28 +15,16 @@ fi PROJECT_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -# 1) venv (preferred) +# 1) venv (required) VENV_ACTIVATE="${PROJECT_ROOT}/.venv/bin/activate" if [[ -f "${VENV_ACTIVATE}" ]]; then # shellcheck disable=SC1090 source "${VENV_ACTIVATE}" ENV_KIND="venv" else - # 2) conda fallback (legacy) - # 新机器部署:可设置 CONDA_ROOT 指向本机 Conda 路径 - # 例如你的 conda 是 ~/anaconda3/bin/conda,则 export CONDA_ROOT=$HOME/anaconda3 - CONDA_ROOT="${CONDA_ROOT:-/home/tw/miniconda3}" - if [[ -f "${CONDA_ROOT}/etc/profile.d/conda.sh" ]]; then - # shellcheck disable=SC1091 - source "${CONDA_ROOT}/etc/profile.d/conda.sh" - conda activate searchengine - ENV_KIND="conda" - else - echo "ERROR: No .venv found and conda.sh not found at ${CONDA_ROOT}/etc/profile.d/conda.sh" >&2 - echo " - Create venv: ./scripts/create_venv.sh" >&2 - echo " - Or set CONDA_ROOT to your conda install path" >&2 - return 1 - fi + echo "ERROR: No .venv found at ${VENV_ACTIVATE}" >&2 + echo " - Create venv: ./scripts/create_venv.sh" >&2 + return 1 fi # 如果需要加载 .env 中的环境变量 diff --git a/api/indexer_app.py b/api/indexer_app.py index 30da294..d5544eb 100644 --- a/api/indexer_app.py +++ b/api/indexer_app.py @@ -124,12 +124,10 @@ def init_indexer_service(es_host: str = "http://localhost:9200"): ] if not value ] - logger.warning( - "Database config incomplete for indexer, services will not be available. " + raise RuntimeError( + "Database config incomplete for indexer. " f"Missing: {', '.join(missing)}" ) - _incremental_service = None - _bulk_indexing_service = None elapsed = time.time() - start_time logger.info(f"Indexer service ready! (took {elapsed:.2f}s)") @@ -180,14 +178,17 @@ async def startup_event(): # If no explicit tenants configured, skip warmup. if tenants: warm = _incremental_service.warmup_transformers(tenants) + if warm.get("failed"): + raise RuntimeError(f"Indexer warmup failed: {warm['failed']}") logger.info("Indexer warmup completed: %s", warm) else: logger.info("Indexer warmup skipped (no tenant ids in config.tenant_config.tenants)") except Exception as e: - logger.warning("Indexer warmup failed (service still starts): %s", e, exc_info=True) + logger.error("Indexer warmup failed: %s", e, exc_info=True) + raise except Exception as e: logger.error(f"Failed to initialize indexer service: {e}", exc_info=True) - logger.warning("Indexer service will start but may not function correctly") + raise @app.on_event("shutdown") @@ -215,7 +216,7 @@ async def global_exception_handler(request: Request, exc: Exception): async def health_check(): """Simple health check for indexer service.""" try: - # ensure ES is reachable (best-effort) + # ensure ES is reachable if _es_client is None: raise RuntimeError("ES client is not initialized") return { @@ -262,4 +263,3 @@ if __name__ == "__main__": port=args.port, reload=args.reload, ) - diff --git a/api/routes/indexer.py b/api/routes/indexer.py index 001b1e2..bcc8078 100644 --- a/api/routes/indexer.py +++ b/api/routes/indexer.py @@ -244,31 +244,19 @@ async def build_docs(request: BuildDocsRequest): title_text = str(v) break if title_text and str(title_text).strip(): - try: - import numpy as np - - embeddings = encoder.encode(title_text) - if embeddings is not None and len(embeddings) > 0: - emb0 = embeddings[0] - if isinstance(emb0, np.ndarray) and emb0.size > 0: - doc["title_embedding"] = emb0.tolist() - else: - logger.warning( - "build-docs: title_embedding skipped (encoder returned None/invalid for title: %s...)", - title_text[:50], - ) - else: - logger.warning( - "build-docs: title_embedding skipped (encoder returned empty for title: %s...)", - title_text[:50], - ) - except Exception as e: - logger.warning( - "build-docs: title_embedding failed for spu_id=%s: %s", - doc.get("spu_id"), - e, + import numpy as np + + embeddings = encoder.encode(title_text) + if embeddings is None or len(embeddings) == 0: + raise RuntimeError( + f"title_embedding empty for spu_id={doc.get('spu_id')}" + ) + emb0 = np.asarray(embeddings[0], dtype=np.float32) + if emb0.ndim != 1 or emb0.size == 0 or not np.isfinite(emb0).all(): + raise RuntimeError( + f"title_embedding invalid for spu_id={doc.get('spu_id')}" ) - # 构建 doc 接口不因为 embedding 失败而整体失败 + doc["title_embedding"] = emb0.tolist() docs.append(doc) except Exception as e: @@ -486,4 +474,3 @@ async def indexer_health_check(): except Exception as e: logger.error(f"Error checking indexer health: {e}", exc_info=True) return {"status": "error", "message": str(e)} - diff --git a/config/__init__.py b/config/__init__.py index ed7fc4e..b8e93e0 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -13,19 +13,13 @@ from .config_loader import ( FunctionScoreConfig, RerankConfig, ConfigLoader, - ConfigurationError, - load_tenant_config + ConfigurationError ) from .utils import ( get_match_fields_for_index, get_domain_fields ) -from .service_endpoints import ( - resolve_translation_service_url, - resolve_embedding_service_url, - resolve_reranker_service_url, -) from .services_config import ( get_translation_config, get_embedding_config, @@ -50,12 +44,8 @@ __all__ = [ # Loader and utilities 'ConfigLoader', 'ConfigurationError', - 'load_tenant_config', 'get_match_fields_for_index', 'get_domain_fields', - 'resolve_translation_service_url', - 'resolve_embedding_service_url', - 'resolve_reranker_service_url', 'get_translation_config', 'get_embedding_config', 'get_rerank_config', diff --git a/config/config_loader.py b/config/config_loader.py index d60996a..ee779c0 100644 --- a/config/config_loader.py +++ b/config/config_loader.py @@ -433,17 +433,3 @@ class ConfigLoader: result["example"] = index.example return result - - -def load_tenant_config(tenant_id: Optional[str] = None) -> SearchConfig: - """ - Load tenant configuration (backward compatibility wrapper). - - Args: - tenant_id: Ignored (kept for backward compatibility) - - Returns: - SearchConfig loaded from config/config.yaml - """ - loader = ConfigLoader() - return loader.load_config() diff --git a/config/service_endpoints.py b/config/service_endpoints.py deleted file mode 100644 index c1635d3..0000000 --- a/config/service_endpoints.py +++ /dev/null @@ -1,12 +0,0 @@ -""" -Endpoint resolvers - delegate to services_config. - -Deprecated: use config.services_config directly. -Kept for backward compatibility. -""" - -from .services_config import ( - get_translation_base_url as resolve_translation_service_url, - get_embedding_base_url as resolve_embedding_service_url, - get_rerank_service_url as resolve_reranker_service_url, -) diff --git a/docs/DEVELOPER_GUIDE.md b/docs/DEVELOPER_GUIDE.md index 95023d7..4668b0d 100644 --- a/docs/DEVELOPER_GUIDE.md +++ b/docs/DEVELOPER_GUIDE.md @@ -216,9 +216,9 @@ docs/ # 文档(含本指南) - 索引结构以 `mappings/search_products.json` 为唯一来源;indexer 产出的 doc 必须与该 mapping 一致。 - 查询侧使用的字段名、多语言后缀(.zh/.en)、嵌套路径等与 mapping 保持一致;新增字段时同步更新 mapping 与查询/分面/过滤逻辑。 -### 5.7 错误与降级 +### 5.7 错误处理 -- 外部能力(翻译、向量、重排)调用失败时,应有明确降级策略(如跳过向量、仅用 BM25、重排失败时保留 ES 顺序),并打日志便于排查;不因单一能力不可用导致整请求失败。 +- 外部能力(翻译、向量、重排)调用失败时应立即报错并中止请求,禁止静默降级;通过日志与监控尽早暴露问题并修复。 --- @@ -315,11 +315,11 @@ services: 5. **调用方**:无需修改;仅部署时启动使用新后端的 reranker 服务即可。 6. **文档与依赖**:在 `reranker/README.md` 或 docs 中说明依赖(如 vllm)、显存建议;可选依赖放入 `requirements_ml.txt` 或 extra。 -### 7.7 与现有配置的兼容说明 +### 7.7 配置一致性说明 -- **reranker**:当前 `reranker/config.py` 的 BGE 相关默认值可保留为兜底,或将默认值迁移到 `config.yaml` 的 `services.rerank.backends.bge`,由 config 只读环境变量与 YAML。 -- **embeddings**:`embeddings/config.py` 的文本/图片及 clip-as-service 开关与 `services.embedding` 的 URL 分离;后续多种后端可在 `services.embedding.backends` 中增加条目。 -- **环境变量**:所有能力均支持环境变量覆盖(如 `RERANKER_SERVICE_URL`、`RERANK_BACKEND`、`EMBEDDING_SERVICE_URL`),便于多环境部署。 +- **单一路径**:Provider 和 backend 必须由 `config/config.yaml` 的 `services` 块显式指定;未知配置应直接报错。 +- **无兼容回退**:不保留“旧配置自动推导/兜底默认值”机制,避免静默行为偏差。 +- **环境变量覆盖**:允许环境变量覆盖(如 `RERANKER_SERVICE_URL`、`RERANK_BACKEND`、`EMBEDDING_SERVICE_URL`),但覆盖后仍需满足合法性校验。 --- @@ -346,7 +346,7 @@ services: ### 8.4 日志与可观测性 -- 关键路径(请求入口、外部调用、失败降级)打日志;日志级别合理(如 debug 用于详细参数,info 用于流程,warning 用于降级)。 +- 关键路径(请求入口、外部调用、失败报错)打日志;日志级别合理(如 debug 用于详细参数,info 用于流程,error 用于失败)。 - 对外接口的耗时、错误码、租户等可考虑结构化日志或后续接入监控,便于运维与排查。 --- diff --git a/docs/Usage-Guide.md b/docs/Usage-Guide.md index 0d10365..c5022d1 100644 --- a/docs/Usage-Guide.md +++ b/docs/Usage-Guide.md @@ -29,7 +29,7 @@ #### 1. 安装 Python 依赖与激活环境 -**推荐**:使用项目根目录的 `activate.sh` 激活环境(会加载 `.env`)。目前推荐 venv(`.venv`);Conda 仅作为兼容回退(需要 `CONDA_ROOT`)。系统要求、Python 环境、生产凭证与 `.env` 模板见 [QUICKSTART.md](./QUICKSTART.md) §1.4–1.8。 +**推荐**:使用项目根目录的 `activate.sh` 激活环境(会加载 `.env`)。当前仅支持 venv(`.venv`),不存在 Conda 回退。系统要求、Python 环境、生产凭证与 `.env` 模板见 [QUICKSTART.md](./QUICKSTART.md) §1.4–1.8。 ```bash cd /data/saas-search @@ -663,4 +663,3 @@ curl -X POST http://localhost:6002/search/ \ **文档版本**: v2.0 **最后更新**: 2024-12 - diff --git a/embeddings/__init__.py b/embeddings/__init__.py index 7f1e37c..064e91c 100644 --- a/embeddings/__init__.py +++ b/embeddings/__init__.py @@ -1,39 +1,9 @@ -""" -Embeddings module. +"""Embeddings module exports.""" -Important: keep package import lightweight. +from .text_encoder import TextEmbeddingEncoder +from .image_encoder import CLIPImageEncoder -Some callers do: - - `from embeddings import TextEmbeddingEncoder` - - `from embeddings import BgeEncoder` (deprecated alias) - - `from embeddings import CLIPImageEncoder` - -But the underlying implementations may import heavy optional deps (Pillow, torch, etc). -To avoid importing those at package import time (and to allow the embedding service to boot -without importing client code), we provide small lazy factories here. -""" - - -class TextEmbeddingEncoder(object): - """Lazy factory for `embeddings.text_encoder.TextEmbeddingEncoder`.""" - - def __new__(cls, *args, **kwargs): - from .text_encoder import TextEmbeddingEncoder as _Real - - return _Real(*args, **kwargs) - - -class BgeEncoder(TextEmbeddingEncoder): - """Deprecated backward-compatible alias for old class name.""" - - -class CLIPImageEncoder(object): - """Lazy factory for `embeddings.image_encoder.CLIPImageEncoder`.""" - - def __new__(cls, *args, **kwargs): - from .image_encoder import CLIPImageEncoder as _Real - - return _Real(*args, **kwargs) - - -__all__ = ["TextEmbeddingEncoder", "BgeEncoder", "CLIPImageEncoder"] +__all__ = [ + "TextEmbeddingEncoder", + "CLIPImageEncoder", +] diff --git a/embeddings/clip_as_service_encoder.py b/embeddings/clip_as_service_encoder.py index e016481..564783f 100644 --- a/embeddings/clip_as_service_encoder.py +++ b/embeddings/clip_as_service_encoder.py @@ -65,13 +65,21 @@ class ClipAsServiceImageEncoder: self._server = server self._batch_size = batch_size self._show_progress = show_progress - self._client = Client(server) + try: + self._client = Client(server) + except ModuleNotFoundError as e: + if str(e) == "No module named 'pkg_resources'": + raise RuntimeError( + "clip-as-service requires pkg_resources via jina/hubble. " + "Install compatible setuptools (<82) in current venv." + ) from e + raise def encode_image_urls( self, urls: List[str], batch_size: Optional[int] = None, - ) -> List[Optional[np.ndarray]]: + ) -> List[np.ndarray]: """ Encode a list of image URLs to vectors. @@ -80,42 +88,36 @@ class ClipAsServiceImageEncoder: batch_size: override instance batch_size for this call. Returns: - List of vectors (1024-dim float32) or None for failed items, same length as urls. + List of vectors (float32), same length as urls. """ if not urls: return [] normalized = [_normalize_image_url(u) for u in urls] - valid_indices = [i for i, u in enumerate(normalized) if u] - if not valid_indices: - return [None] * len(urls) - - valid_urls = [normalized[i] for i in valid_indices] bs = batch_size if batch_size is not None else self._batch_size - out: List[Optional[np.ndarray]] = [None] * len(urls) - - try: - # Client.encode(iterable of str) returns np.ndarray [N, D] for string input - arr = self._client.encode( - valid_urls, - batch_size=bs, - show_progress=self._show_progress, + invalid_indices = [i for i, u in enumerate(normalized) if not u] + if invalid_indices: + raise ValueError(f"Invalid empty image URL at indices: {invalid_indices}") + + # Client.encode(iterable of str) returns np.ndarray [N, D] for string input + arr = self._client.encode( + normalized, + batch_size=bs, + show_progress=self._show_progress, + ) + if arr is None or not hasattr(arr, "shape"): + raise RuntimeError("clip-as-service encode returned empty result") + if len(arr) != len(normalized): + raise RuntimeError( + f"clip-as-service encode length mismatch: expected {len(normalized)}, got {len(arr)}" ) - if arr is not None and hasattr(arr, "shape") and len(arr) == len(valid_indices): - for j, idx in enumerate(valid_indices): - row = arr[j] - if row is not None and hasattr(row, "tolist"): - out[idx] = np.asarray(row, dtype=np.float32) - else: - out[idx] = np.array(row, dtype=np.float32) - else: - logger.warning( - "clip-as-service encode returned unexpected shape/length, " - "expected %d vectors", len(valid_indices) - ) - except Exception as e: - logger.warning("clip-as-service encode failed: %s", e, exc_info=True) + out: List[np.ndarray] = [] + for row in arr: + vec = np.asarray(row, dtype=np.float32) + if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all(): + raise RuntimeError("clip-as-service returned invalid embedding vector") + out.append(vec) return out def encode_image_from_url(self, url: str) -> Optional[np.ndarray]: diff --git a/embeddings/image_encoder.py b/embeddings/image_encoder.py index 956123d..3e4b6d7 100644 --- a/embeddings/image_encoder.py +++ b/embeddings/image_encoder.py @@ -48,16 +48,15 @@ class CLIPImageEncoder: logger.error(f"CLIPImageEncoder service request failed: {e}", exc_info=True) raise - def encode_image(self, image: Image.Image) -> Optional[np.ndarray]: + def encode_image(self, image: Image.Image) -> np.ndarray: """ Encode image to embedding vector using network service. Note: This method is kept for compatibility but the service only works with URLs. """ - logger.warning("encode_image with PIL Image not supported by service, returning None") - return None + raise NotImplementedError("encode_image with PIL Image is not supported by embedding service") - def encode_image_from_url(self, url: str) -> Optional[np.ndarray]: + def encode_image_from_url(self, url: str) -> np.ndarray: """ Generate image embedding via network service using URL. @@ -65,24 +64,21 @@ class CLIPImageEncoder: url: Image URL to process Returns: - Embedding vector or None if failed + Embedding vector """ - try: - response_data = self._call_service([url]) - if response_data and len(response_data) > 0 and response_data[0] is not None: - return np.array(response_data[0], dtype=np.float32) - logger.warning(f"No embedding for URL {url}") - return None - - except Exception as e: - logger.error(f"Failed to process image from URL {url}: {str(e)}", exc_info=True) - return None + response_data = self._call_service([url]) + if not response_data or len(response_data) != 1 or response_data[0] is None: + raise RuntimeError(f"No image embedding returned for URL: {url}") + vec = np.array(response_data[0], dtype=np.float32) + if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all(): + raise RuntimeError(f"Invalid image embedding returned for URL: {url}") + return vec def encode_batch( self, images: List[Union[str, Image.Image]], batch_size: int = 8 - ) -> List[Optional[np.ndarray]]: + ) -> List[np.ndarray]: """ Encode a batch of images efficiently via network service. @@ -91,50 +87,31 @@ class CLIPImageEncoder: batch_size: Batch size for processing (used for service requests) Returns: - List of embeddings (or None for failed images) + List of embeddings """ - # Initialize results with None for all images - results = [None] * len(images) - - # Filter out PIL Images since service only supports URLs - url_images = [] - url_indices = [] - for i, img in enumerate(images): - if isinstance(img, str): - url_images.append(img) - url_indices.append(i) - elif isinstance(img, Image.Image): - logger.warning(f"PIL Image at index {i} not supported by service, returning None") - # results[i] is already None - - # Process URLs in batches - for i in range(0, len(url_images), batch_size): - batch_urls = url_images[i:i + batch_size] - batch_indices = url_indices[i:i + batch_size] - - try: - # Call service - response_data = self._call_service(batch_urls) - - # Process response (aligned list) - batch_results = [] - for j, url in enumerate(batch_urls): - if response_data and j < len(response_data) and response_data[j] is not None: - batch_results.append(np.array(response_data[j], dtype=np.float32)) - else: - logger.warning(f"Failed to encode URL {url}: no embedding") - batch_results.append(None) - - # Insert results at the correct positions - for j, result in enumerate(batch_results): - results[batch_indices[j]] = result - - except Exception as e: - logger.error(f"Batch processing failed: {e}", exc_info=True) - # Fill with None for this batch - for j in range(len(batch_urls)): - results[batch_indices[j]] = None + if isinstance(img, Image.Image): + raise NotImplementedError(f"PIL Image at index {i} is not supported by service") + if not isinstance(img, str) or not img.strip(): + raise ValueError(f"Invalid image URL/path at index {i}: {img!r}") + + results: List[np.ndarray] = [] + for i in range(0, len(images), batch_size): + batch_urls = [str(u).strip() for u in images[i:i + batch_size]] + response_data = self._call_service(batch_urls) + if not response_data or len(response_data) != len(batch_urls): + raise RuntimeError( + f"Image embedding response length mismatch: expected {len(batch_urls)}, " + f"got {0 if response_data is None else len(response_data)}" + ) + for j, url in enumerate(batch_urls): + embedding = response_data[j] + if embedding is None: + raise RuntimeError(f"No image embedding returned for URL: {url}") + vec = np.array(embedding, dtype=np.float32) + if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all(): + raise RuntimeError(f"Invalid image embedding returned for URL: {url}") + results.append(vec) return results @@ -142,7 +119,7 @@ class CLIPImageEncoder: self, urls: List[str], batch_size: Optional[int] = None, - ) -> List[Optional[np.ndarray]]: + ) -> List[np.ndarray]: """ 与 ClipImageModel / ClipAsServiceImageEncoder 一致的接口,供索引器 document_transformer 调用。 @@ -151,6 +128,6 @@ class CLIPImageEncoder: batch_size: 批大小(默认 8) Returns: - 与 urls 等长的向量列表,失败为 None + 与 urls 等长的向量列表 """ return self.encode_batch(urls, batch_size=batch_size or 8) diff --git a/embeddings/qwen3_model.py b/embeddings/qwen3_model.py index ea46f24..454e443 100644 --- a/embeddings/qwen3_model.py +++ b/embeddings/qwen3_model.py @@ -35,43 +35,15 @@ class Qwen3TextModel(object): ) -> np.ndarray: if device == "gpu": device = "cuda" - - # Try requested device, fallback to CPU if CUDA is unavailable/insufficient. - try: - if device == "cuda": - import torch - - if torch.cuda.is_available(): - free_memory = ( - torch.cuda.get_device_properties(0).total_memory - - torch.cuda.memory_allocated() - ) - if free_memory < 1024 * 1024 * 1024: # 1GB - device = "cpu" - else: - device = "cpu" - - self.model = self.model.to(device) - embeddings = self.model.encode( - sentences, - normalize_embeddings=normalize_embeddings, - device=device, - show_progress_bar=False, - batch_size=batch_size, - ) - return embeddings - except Exception: - if device != "cpu": - self.model = self.model.to("cpu") - embeddings = self.model.encode( - sentences, - normalize_embeddings=normalize_embeddings, - device="cpu", - show_progress_bar=False, - batch_size=batch_size, - ) - return embeddings - raise + self.model = self.model.to(device) + embeddings = self.model.encode( + sentences, + normalize_embeddings=normalize_embeddings, + device=device, + show_progress_bar=False, + batch_size=batch_size, + ) + return embeddings def encode_batch( self, @@ -86,4 +58,3 @@ class Qwen3TextModel(object): device=device, normalize_embeddings=normalize_embeddings, ) - diff --git a/embeddings/server.py b/embeddings/server.py index 7966b40..3a1ce15 100644 --- a/embeddings/server.py +++ b/embeddings/server.py @@ -1,9 +1,9 @@ """ Embedding service (FastAPI). -API (simple list-in, list-out; aligned by index; failures -> null): -- POST /embed/text body: ["text1", "text2", ...] -> [[...], null, ...] -- POST /embed/image body: ["url_or_path1", ...] -> [[...], null, ...] +API (simple list-in, list-out; aligned by index): +- POST /embed/text body: ["text1", "text2", ...] -> [[...], ...] +- POST /embed/image body: ["url_or_path1", ...] -> [[...], ...] """ import logging @@ -11,7 +11,7 @@ import threading from typing import Any, Dict, List, Optional import numpy as np -from fastapi import FastAPI +from fastapi import FastAPI, HTTPException from embeddings.config import CONFIG from embeddings.protocols import ImageEncoderProtocol @@ -51,9 +51,6 @@ def load_models(): # Load image model: clip-as-service (recommended) or local CN-CLIP - # IMPORTANT: failures here should NOT prevent the whole service from starting. - # If image model cannot be loaded, we keep `_image_model` as None and only - # disable /embed/image while keeping /embed/text fully functional. if open_image_model: try: if CONFIG.USE_CLIP_AS_SERVICE: @@ -75,12 +72,8 @@ def load_models(): ) logger.info("Image model (local CN-CLIP) loaded successfully") except Exception as e: - logger.error( - "Failed to load image model; image embeddings will be disabled but text embeddings remain available: %s", - e, - exc_info=True, - ) - _image_model = None + logger.error("Failed to load image model: %s", e, exc_info=True) + raise logger.info("All embedding models loaded successfully, service ready") @@ -109,70 +102,60 @@ def health() -> Dict[str, Any]: def embed_text(texts: List[str]) -> List[Optional[List[float]]]: if _text_model is None: raise RuntimeError("Text model not loaded") - out: List[Optional[List[float]]] = [None] * len(texts) - - indexed_texts: List[tuple] = [] + normalized: List[str] = [] for i, t in enumerate(texts): - if t is None: - continue if not isinstance(t, str): - t = str(t) - t = t.strip() - if not t: - continue - indexed_texts.append((i, t)) - - if not indexed_texts: - return out - - batch_texts = [t for _, t in indexed_texts] - try: - with _text_encode_lock: - embs = _text_model.encode_batch( - batch_texts, - batch_size=int(CONFIG.TEXT_BATCH_SIZE), - device=CONFIG.TEXT_DEVICE, - normalize_embeddings=bool(CONFIG.TEXT_NORMALIZE_EMBEDDINGS), - ) - for j, (idx, _t) in enumerate(indexed_texts): - out[idx] = _as_list(embs[j]) - except Exception: - # keep Nones - pass + raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: must be string") + s = t.strip() + if not s: + raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: empty string") + normalized.append(s) + + with _text_encode_lock: + embs = _text_model.encode_batch( + normalized, + batch_size=int(CONFIG.TEXT_BATCH_SIZE), + device=CONFIG.TEXT_DEVICE, + normalize_embeddings=bool(CONFIG.TEXT_NORMALIZE_EMBEDDINGS), + ) + if embs is None or len(embs) != len(normalized): + raise RuntimeError( + f"Text model response length mismatch: expected {len(normalized)}, " + f"got {0 if embs is None else len(embs)}" + ) + out: List[Optional[List[float]]] = [] + for i, emb in enumerate(embs): + vec = _as_list(emb) + if vec is None: + raise RuntimeError(f"Text model returned empty embedding for index {i}") + out.append(vec) return out @app.post("/embed/image") def embed_image(images: List[str]) -> List[Optional[List[float]]]: if _image_model is None: - # Graceful degradation: keep API shape but return all None - logger.warning("embed_image called but image model is not loaded; returning all None vectors") - return [None] * len(images) - out: List[Optional[List[float]]] = [None] * len(images) - - # Normalize inputs - urls = [] - indices = [] + raise RuntimeError("Image model not loaded") + urls: List[str] = [] for i, url_or_path in enumerate(images): - if url_or_path is None: - continue if not isinstance(url_or_path, str): - url_or_path = str(url_or_path) - url_or_path = url_or_path.strip() - if url_or_path: - urls.append(url_or_path) - indices.append(i) - - if not urls: - return out + raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: must be string URL/path") + s = url_or_path.strip() + if not s: + raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: empty URL/path") + urls.append(s) with _image_encode_lock: - try: - # Both ClipAsServiceImageEncoder and ClipImageModel implement encode_image_urls(urls, batch_size) - vectors = _image_model.encode_image_urls(urls, batch_size=CONFIG.IMAGE_BATCH_SIZE) - for j, idx in enumerate(indices): - out[idx] = _as_list(vectors[j] if j < len(vectors) else None) - except Exception: - for idx in indices: - out[idx] = None + vectors = _image_model.encode_image_urls(urls, batch_size=CONFIG.IMAGE_BATCH_SIZE) + if vectors is None or len(vectors) != len(urls): + raise RuntimeError( + f"Image model response length mismatch: expected {len(urls)}, " + f"got {0 if vectors is None else len(vectors)}" + ) + out: List[Optional[List[float]]] = [] + for i, vec in enumerate(vectors): + out_vec = _as_list(vec) + if out_vec is None: + raise RuntimeError(f"Image model returned empty embedding for index {i}") + out.append(out_vec) return out diff --git a/embeddings/text_encoder.py b/embeddings/text_encoder.py index 1221c28..bfcfc52 100644 --- a/embeddings/text_encoder.py +++ b/embeddings/text_encoder.py @@ -89,9 +89,8 @@ class TextEmbeddingEncoder: batch_size: Batch size for processing (used for service requests) Returns: - numpy array of dtype=object, where each element is either: - - np.ndarray (valid embedding vector) or - - None (no embedding available for that text) + numpy array of dtype=object,元素均为有效 np.ndarray 向量。 + 若任一输入无法生成向量,将直接抛出异常。 """ # Convert single string to list if isinstance(sentences, str): @@ -101,8 +100,6 @@ class TextEmbeddingEncoder: uncached_indices: List[int] = [] uncached_texts: List[str] = [] - # Process response - # Each element can be np.ndarray or None (表示该文本没有可用的向量) embeddings: List[Optional[np.ndarray]] = [None] * len(sentences) for i, text in enumerate(sentences): @@ -118,40 +115,27 @@ class TextEmbeddingEncoder: # If there are uncached texts, call service if uncached_texts: - try: - # Call service - response_data = self._call_service(request_data) + response_data = self._call_service(request_data) - # Process response - for i, text in enumerate(uncached_texts): - original_idx = uncached_indices[i] - if response_data and i < len(response_data): - embedding = response_data[i] - else: - embedding = None + # Process response + for i, text in enumerate(uncached_texts): + original_idx = uncached_indices[i] + if response_data and i < len(response_data): + embedding = response_data[i] + else: + embedding = None - if embedding is not None: - embedding_array = np.array(embedding, dtype=np.float32) - # Validate embedding from service - if invalid, treat as no result - if self._is_valid_embedding(embedding_array): - embeddings[original_idx] = embedding_array - # Cache the embedding - self._set_cached_embedding(text, "generic", embedding_array) - else: - logger.warning( - f"Invalid embedding returned from service for text {original_idx} " - f"(contains NaN/Inf or invalid shape), treating as no result. " - f"Text preview: {text[:50]}..." - ) - embeddings[original_idx] = None + if embedding is not None: + embedding_array = np.array(embedding, dtype=np.float32) + if self._is_valid_embedding(embedding_array): + embeddings[original_idx] = embedding_array + self._set_cached_embedding(text, "generic", embedding_array) else: - logger.warning(f"No embedding found for text {original_idx}: {text[:50]}...") - embeddings[original_idx] = None - - except Exception as e: - logger.error(f"Failed to encode texts: {e}", exc_info=True) - # 出错时不要生成兜底全零向量,保持为 None - pass + raise ValueError( + f"Invalid embedding returned from service for text index {original_idx}" + ) + else: + raise ValueError(f"No embedding found for text index {original_idx}: {text[:50]}...") # 返回 numpy 数组(dtype=object),元素为 np.ndarray 或 None return np.array(embeddings, dtype=object) @@ -254,7 +238,3 @@ class TextEmbeddingEncoder: except Exception as e: logger.error(f"Error storing embedding in cache: {e}") return False - - -# Backward compatibility for existing imports/usages. -BgeEncoder = TextEmbeddingEncoder diff --git a/indexer/document_transformer.py b/indexer/document_transformer.py index b4d569b..12ddaa8 100644 --- a/indexer/document_transformer.py +++ b/indexer/document_transformer.py @@ -406,23 +406,21 @@ class SPUDocumentTransformer: if not urls: return - try: - vectors = self.image_encoder.encode_image_urls(urls, batch_size=8) - if not vectors or len(vectors) != len(urls): - return - out = [] - for url, vec in zip(urls, vectors): - if vec is None: - continue - if isinstance(vec, np.ndarray): - vec = vec.astype(np.float32) - out.append({"vector": vec.tolist(), "url": url}) - elif hasattr(vec, "tolist"): - out.append({"vector": vec.tolist(), "url": url}) - if out: - doc["image_embedding"] = out - except Exception as e: - logger.warning("Failed to generate image_embedding for SPU %s: %s", doc.get("spu_id"), e) + vectors = self.image_encoder.encode_image_urls(urls, batch_size=8) + if not vectors or len(vectors) != len(urls): + raise RuntimeError( + f"image_embedding response length mismatch for SPU {doc.get('spu_id')}: " + f"expected {len(urls)}, got {0 if vectors is None else len(vectors)}" + ) + out = [] + for url, vec in zip(urls, vectors): + arr = np.asarray(vec, dtype=np.float32) + if arr.ndim != 1 or arr.size == 0 or not np.isfinite(arr).all(): + raise RuntimeError( + f"Invalid image embedding for SPU {doc.get('spu_id')} and URL {url}" + ) + out.append({"vector": arr.tolist(), "url": url}) + doc["image_embedding"] = out def _process_skus( self, @@ -722,21 +720,14 @@ class SPUDocumentTransformer: logger.debug(f"No title text available for embedding, SPU: {doc.get('spu_id')}") return - try: - # 使用文本向量编码器生成 embedding - # encode方法返回numpy数组,形状为(n, 1024) - embeddings = self.encoder.encode(title_text) - - if embeddings is not None and len(embeddings) > 0: - # 取第一个embedding(因为只传了一个文本) - embedding = embeddings[0] - if not isinstance(embedding, np.ndarray): - logger.warning(f"Embedding is None/invalid for title: {title_text[:50]}...") - return - # 转换为列表格式(ES需要) - doc['title_embedding'] = embedding.tolist() - logger.debug(f"Generated title_embedding for SPU: {doc.get('spu_id')}, title: {title_text[:50]}...") - else: - logger.warning(f"Failed to generate embedding for title: {title_text[:50]}...") - except Exception as e: - logger.error(f"Error generating title_embedding for SPU {doc.get('spu_id')}: {e}", exc_info=True) + # 使用文本向量编码器生成 embedding + # encode方法返回numpy数组,形状为(n, d) + embeddings = self.encoder.encode(title_text) + if embeddings is None or len(embeddings) == 0: + raise RuntimeError(f"Failed to generate title embedding for SPU {doc.get('spu_id')}") + + embedding = np.asarray(embeddings[0], dtype=np.float32) + if embedding.ndim != 1 or embedding.size == 0 or not np.isfinite(embedding).all(): + raise RuntimeError(f"Invalid title embedding for SPU {doc.get('spu_id')}") + doc['title_embedding'] = embedding.tolist() + logger.debug(f"Generated title_embedding for SPU: {doc.get('spu_id')}, title: {title_text[:50]}...") diff --git a/indexer/incremental_service.py b/indexer/incremental_service.py index 9595dcf..4979ca0 100644 --- a/indexer/incremental_service.py +++ b/indexer/incremental_service.py @@ -33,9 +33,9 @@ class IncrementalIndexerService: logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings") # 缓存:避免频繁增量请求重复加载 config / 构造 transformer - # NOTE: 为避免“首请求”懒加载导致超时,尽量在进程启动阶段完成初始化: + # 启动阶段强校验初始化: # - config.yaml 加载 - # - translator / embedding / image encoder provider 初始化(best-effort) + # - translator / embedding / image encoder provider 初始化 self._config: Optional[Any] = None self._config_lock = threading.Lock() self._translator: Optional[Any] = None @@ -50,59 +50,35 @@ class IncrementalIndexerService: self._transformer_cache_lock = threading.Lock() def _eager_init(self) -> None: - """Best-effort eager initialization to reduce first-request latency.""" - try: - self._config = ConfigLoader("config/config.yaml").load_config() - except Exception as e: - logger.warning("Failed to eagerly load config/config.yaml: %s", e, exc_info=True) - self._config = None - return + """Strict eager initialization. Any dependency failure should fail fast.""" + self._config = ConfigLoader("config/config.yaml").load_config() + self._translation_prompts = getattr(self._config.query_config, "translation_prompts", {}) or {} + self._searchable_option_dimensions = ( + getattr(self._config.spu_config, "searchable_option_dimensions", None) + or ["option1", "option2", "option3"] + ) - try: - self._translation_prompts = getattr(self._config.query_config, "translation_prompts", {}) or {} - self._searchable_option_dimensions = ( - getattr(self._config.spu_config, "searchable_option_dimensions", None) - or ["option1", "option2", "option3"] - ) - except Exception: - self._translation_prompts = {} - self._searchable_option_dimensions = ["option1", "option2", "option3"] + from providers import create_translation_provider - # Translator provider (best-effort) - try: - from providers import create_translation_provider + self._translator = create_translation_provider(self._config.query_config) - self._translator = create_translation_provider(self._config.query_config) - except Exception as e: - logger.warning("Failed to initialize translation provider at startup: %s", e) - self._translator = None - - # Text embedding encoder (best-effort) + # Text embedding encoder (strict when enabled) if bool(getattr(self._config.query_config, "enable_text_embedding", False)): - try: - from embeddings.text_encoder import TextEmbeddingEncoder + from embeddings.text_encoder import TextEmbeddingEncoder - self._shared_text_encoder = TextEmbeddingEncoder() - except Exception as e: - logger.warning("Failed to initialize TextEmbeddingEncoder at startup: %s", e) - self._shared_text_encoder = None + self._shared_text_encoder = TextEmbeddingEncoder() + else: + self._shared_text_encoder = None - # Image embedding encoder (best-effort; may be unavailable if embedding service not running) - try: - from embeddings.image_encoder import CLIPImageEncoder + # Image embedding encoder (strict) + from embeddings.image_encoder import CLIPImageEncoder - self._shared_image_encoder = CLIPImageEncoder() - except Exception as e: - logger.debug("Image encoder not available for indexer startup: %s", e) - self._shared_image_encoder = None + self._shared_image_encoder = CLIPImageEncoder() def _get_config(self) -> Any: """Load config once per process (thread-safe).""" - if self._config is not None: - return self._config - with self._config_lock: - if self._config is None: - self._config = ConfigLoader("config/config.yaml").load_config() + if self._config is None: + raise RuntimeError("Indexer config is not initialized") return self._config def _get_transformer_bundle(self, tenant_id: str) -> Tuple[Any, Optional[Any], bool]: @@ -121,32 +97,13 @@ class IncrementalIndexerService: config = self._get_config() enable_embedding = bool(getattr(config.query_config, "enable_text_embedding", False)) - # Use shared encoders/providers preloaded at startup when可用; - # 若启动时初始化失败,则在首次请求时做一次兜底初始化,避免永久禁用。 encoder: Optional[Any] = self._shared_text_encoder if enable_embedding else None if enable_embedding and encoder is None: - try: - from embeddings.text_encoder import TextEmbeddingEncoder - - encoder = TextEmbeddingEncoder() - self._shared_text_encoder = encoder - logger.info("TextEmbeddingEncoder lazily initialized in _get_transformer_bundle") - except Exception as e: - logger.warning("Failed to lazily initialize TextEmbeddingEncoder for tenant_id=%s: %s", tenant_id, e) - encoder = None - enable_embedding = False + raise RuntimeError("Text embedding is enabled but TextEmbeddingEncoder is not initialized") image_encoder: Optional[Any] = self._shared_image_encoder if image_encoder is None: - try: - from embeddings.image_encoder import CLIPImageEncoder - - image_encoder = CLIPImageEncoder() - self._shared_image_encoder = image_encoder - logger.info("CLIPImageEncoder lazily initialized in _get_transformer_bundle") - except Exception as e: - logger.debug("Image encoder not available for indexer (lazy init): %s", e) - image_encoder = None + raise RuntimeError("CLIPImageEncoder is not initialized") transformer = create_document_transformer( category_id_to_name=self.category_id_to_name, @@ -157,7 +114,7 @@ class IncrementalIndexerService: encoder=encoder, enable_title_embedding=False, # batch fill later image_encoder=image_encoder, - enable_image_embedding=(image_encoder is not None), + enable_image_embedding=True, config=config, ) @@ -236,14 +193,13 @@ class IncrementalIndexerService: title_text = str(v) break if title_text and str(title_text).strip(): - try: - embeddings = encoder.encode(title_text) - if embeddings is not None and len(embeddings) > 0: - emb0 = embeddings[0] - if isinstance(emb0, np.ndarray): - doc["title_embedding"] = emb0.tolist() - except Exception as e: - logger.warning(f"Failed to generate embedding for spu_id={spu_id}: {e}") + embeddings = encoder.encode(title_text) + if embeddings is None or len(embeddings) == 0: + raise RuntimeError(f"Failed to generate title embedding for spu_id={spu_id}") + emb0 = np.asarray(embeddings[0], dtype=np.float32) + if emb0.ndim != 1 or emb0.size == 0 or not np.isfinite(emb0).all(): + raise RuntimeError(f"Invalid title embedding for spu_id={spu_id}") + doc["title_embedding"] = emb0.tolist() return doc @@ -678,14 +634,20 @@ class IncrementalIndexerService: title_doc_indices.append(i) if title_texts: - try: - embeddings = encoder.encode_batch(title_texts, batch_size=32) - for j, emb in enumerate(embeddings): - doc_idx = title_doc_indices[j] - if isinstance(emb, np.ndarray): - documents[doc_idx][1]["title_embedding"] = emb.tolist() - except Exception as e: - logger.warning(f"[IncrementalIndexing] Batch embedding failed for tenant_id={tenant_id}: {e}", exc_info=True) + embeddings = encoder.encode_batch(title_texts, batch_size=32) + if embeddings is None or len(embeddings) != len(title_texts): + raise RuntimeError( + f"[IncrementalIndexing] Batch embedding length mismatch for tenant_id={tenant_id}: " + f"expected {len(title_texts)}, got {0 if embeddings is None else len(embeddings)}" + ) + for j, emb in enumerate(embeddings): + vec = np.asarray(emb, dtype=np.float32) + if vec.ndim != 1 or vec.size == 0 or not np.isfinite(vec).all(): + raise RuntimeError( + f"[IncrementalIndexing] Invalid title embedding in batch for tenant_id={tenant_id}, index={j}" + ) + doc_idx = title_doc_indices[j] + documents[doc_idx][1]["title_embedding"] = vec.tolist() logger.info(f"[IncrementalIndexing] Transformed {len(documents)}/{total_count} documents") @@ -789,4 +751,3 @@ class IncrementalIndexerService: "index_name": index_name, "tenant_id": tenant_id } - diff --git a/indexer/indexing_utils.py b/indexer/indexing_utils.py index 397bc0b..3ea1cc0 100644 --- a/indexer/indexing_utils.py +++ b/indexer/indexing_utils.py @@ -92,38 +92,29 @@ def create_document_transformer( or (encoder is None and enable_title_embedding) or config is None ): - try: - if config is None: - config_loader = ConfigLoader() - config = config_loader.load_config() - - if searchable_option_dimensions is None: - searchable_option_dimensions = config.spu_config.searchable_option_dimensions - - index_langs = tenant_config.get("index_languages") or [] - need_translator = len(index_langs) > 1 - if translator is None and need_translator: - from providers import create_translation_provider - translator = create_translation_provider(config.query_config) - - if translation_prompts is None: - translation_prompts = config.query_config.translation_prompts - - # 初始化encoder(如果启用标题向量化且未提供encoder) - if encoder is None and enable_title_embedding and config.query_config.enable_text_embedding: - try: - from embeddings.text_encoder import TextEmbeddingEncoder - encoder = TextEmbeddingEncoder() - logger.info("TextEmbeddingEncoder initialized for title embedding") - except Exception as e: - logger.warning(f"Failed to initialize TextEmbeddingEncoder: {e}, title embedding will be disabled") - enable_title_embedding = False - except Exception as e: - logger.warning(f"Failed to load config, using defaults: {e}") - if searchable_option_dimensions is None: - searchable_option_dimensions = ['option1', 'option2', 'option3'] - if translation_prompts is None: - translation_prompts = {} + if config is None: + config_loader = ConfigLoader() + config = config_loader.load_config() + + if searchable_option_dimensions is None: + searchable_option_dimensions = config.spu_config.searchable_option_dimensions + + index_langs = tenant_config.get("index_languages") or [] + need_translator = len(index_langs) > 1 + if translator is None and need_translator: + from providers import create_translation_provider + + translator = create_translation_provider(config.query_config) + + if translation_prompts is None: + translation_prompts = config.query_config.translation_prompts + + # 初始化encoder(如果启用标题向量化且未提供encoder) + if encoder is None and enable_title_embedding and config.query_config.enable_text_embedding: + from embeddings.text_encoder import TextEmbeddingEncoder + + encoder = TextEmbeddingEncoder() + logger.info("TextEmbeddingEncoder initialized for title embedding") return SPUDocumentTransformer( category_id_to_name=category_id_to_name, diff --git a/main.py b/main.py index 8a8c936..c62f55a 100755 --- a/main.py +++ b/main.py @@ -22,6 +22,7 @@ from utils import ESClient from search import Searcher from suggestion import SuggestionIndexBuilder from utils.db_connector import create_db_connection +from context.request_context import create_request_context def cmd_serve(args): @@ -78,7 +79,8 @@ def cmd_search(args): result = searcher.search( query=args.query, tenant_id=args.tenant_id, - size=args.size + size=args.size, + context=create_request_context(), ) # Display results diff --git a/providers/embedding.py b/providers/embedding.py index 6aed3db..51d8d52 100644 --- a/providers/embedding.py +++ b/providers/embedding.py @@ -1,8 +1,4 @@ -""" -Embedding provider - HTTP service (vllm reserved). - -Returns text/image encoders configured via services_config. -""" +"""Embedding provider - HTTP service.""" from __future__ import annotations @@ -14,10 +10,7 @@ def create_embedding_provider() -> "EmbeddingProvider": cfg = get_embedding_config() provider = (cfg.provider or "http").strip().lower() if provider != "http": - import logging - logging.getLogger(__name__).warning( - "Unsupported embedding provider '%s', fallback to HTTP provider.", provider - ) + raise ValueError(f"Unsupported embedding provider: {provider}") return EmbeddingProvider() diff --git a/providers/rerank.py b/providers/rerank.py index 153a28e..fc16dd5 100644 --- a/providers/rerank.py +++ b/providers/rerank.py @@ -1,6 +1,4 @@ -""" -Rerank provider - HTTP service (vllm reserved). -""" +"""Rerank provider - HTTP service.""" from __future__ import annotations @@ -61,8 +59,8 @@ def create_rerank_provider() -> HttpRerankProvider: cfg = get_rerank_config() provider = (cfg.provider or "http").strip().lower() - if provider == "vllm": - logger.warning("rerank provider 'vllm' is reserved, using HTTP.") + if provider != "http": + raise ValueError(f"Unsupported rerank provider: {provider}") url = get_rerank_service_url() return HttpRerankProvider(service_url=url) diff --git a/providers/translation.py b/providers/translation.py index 277ea08..10bdef5 100644 --- a/providers/translation.py +++ b/providers/translation.py @@ -158,19 +158,7 @@ def create_translation_provider(query_config: Any = None) -> Any: translation_context=getattr(qc, "translation_context", "e-commerce product search"), ) - logger.warning( - "Unsupported translation provider '%s', fallback to direct.", - provider, - ) - from query.translator import Translator - qc = query_config or _empty_query_config() - return Translator( - model=pc.get("model") or "qwen", - api_key=getattr(qc, "translation_api_key", None), - use_cache=True, - glossary_id=getattr(qc, "translation_glossary_id", None), - translation_context=getattr(qc, "translation_context", "e-commerce product search"), - ) + raise ValueError(f"Unsupported translation provider: {provider}") def _empty_query_config() -> Any: diff --git a/query/__init__.py b/query/__init__.py index bb8fa61..b732adf 100644 --- a/query/__init__.py +++ b/query/__init__.py @@ -2,15 +2,12 @@ from .language_detector import LanguageDetector from .translator import Translator -from .translation_client import HttpTranslationClient, create_translation_client from .query_rewriter import QueryRewriter, QueryNormalizer from .query_parser import QueryParser, ParsedQuery __all__ = [ 'LanguageDetector', 'Translator', - 'HttpTranslationClient', - 'create_translation_client', 'QueryRewriter', 'QueryNormalizer', 'QueryParser', diff --git a/query/translation_client.py b/query/translation_client.py deleted file mode 100644 index 7b0caaa..0000000 --- a/query/translation_client.py +++ /dev/null @@ -1,20 +0,0 @@ -""" -Translation client - delegates to providers. - -Deprecated: use providers.create_translation_provider() instead. -Kept for backward compatibility. -""" - -from __future__ import annotations - -from typing import Any - -from providers.translation import ( - HttpTranslationProvider as HttpTranslationClient, - create_translation_provider, -) - - -def create_translation_client(query_config: Any) -> Any: - """Backward compat: delegate to create_translation_provider.""" - return create_translation_provider(query_config) diff --git a/requirements.txt b/requirements.txt index 8f69f39..03e3ce7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,6 +35,7 @@ requests>=2.31.0 # Utilities tqdm>=4.65.0 click>=8.1.0 +setuptools<82 # Testing pytest>=7.4.0 diff --git a/scripts/create_venv.sh b/scripts/create_venv.sh index 2f3c5d9..e039612 100644 --- a/scripts/create_venv.sh +++ b/scripts/create_venv.sh @@ -44,7 +44,8 @@ fi # shellcheck disable=SC1091 source "${VENV_DIR}/bin/activate" -python -m pip install --upgrade pip setuptools wheel +python -m pip install --upgrade pip wheel +python -m pip install "setuptools<82" python -m pip install -r requirements.txt if [[ "${INSTALL_ML:-0}" == "1" ]]; then diff --git a/scripts/start_embedding_service.sh b/scripts/start_embedding_service.sh index a58bab7..13dbbd3 100755 --- a/scripts/start_embedding_service.sh +++ b/scripts/start_embedding_service.sh @@ -16,10 +16,27 @@ source ./activate.sh DEFAULT_EMBEDDING_SERVICE_HOST=$(python -c "from embeddings.config import CONFIG; print(CONFIG.HOST)") DEFAULT_EMBEDDING_SERVICE_PORT=$(python -c "from embeddings.config import CONFIG; print(CONFIG.PORT)") +USE_CLIP_AS_SERVICE=$(python -c "from embeddings.config import CONFIG; print('1' if CONFIG.USE_CLIP_AS_SERVICE else '0')") EMBEDDING_SERVICE_HOST="${EMBEDDING_HOST:-${DEFAULT_EMBEDDING_SERVICE_HOST}}" EMBEDDING_SERVICE_PORT="${EMBEDDING_PORT:-${DEFAULT_EMBEDDING_SERVICE_PORT}}" +if [[ "${USE_CLIP_AS_SERVICE}" == "1" ]]; then + if ! python - <<'PY' +try: + import pkg_resources # noqa: F401 +except Exception: + raise SystemExit(1) +PY + then + echo "ERROR: clip-as-service image embedding requires pkg_resources, but current venv is missing it." >&2 + echo "Fix:" >&2 + echo " python -m pip install 'setuptools<82'" >&2 + echo "Then restart: ./scripts/start_embedding_service.sh" >&2 + exit 1 + fi +fi + echo "========================================" echo "Starting Local Embedding Service" echo "========================================" @@ -36,4 +53,3 @@ exec python -m uvicorn embeddings.server:app \ --port "${EMBEDDING_SERVICE_PORT}" \ --workers 1 - diff --git a/scripts/test_build_docs_api.py b/scripts/test_build_docs_api.py index 1a37f5a..24aa533 100644 --- a/scripts/test_build_docs_api.py +++ b/scripts/test_build_docs_api.py @@ -29,6 +29,10 @@ except ImportError: def build_sample_request(): """构造一条完整的 build-docs 请求体(对应 shoplazza_product_spu / sku / option 表结构)。""" now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + sample_image_url = os.getenv( + "SAMPLE_IMAGE_URL", + "https://oss.essa.cn/98532128-cf8e-456c-9e30-6f2a5ea0c19f.jpg", + ) return { "tenant_id": "162", "items": [ @@ -45,7 +49,7 @@ def build_sample_request(): "category_level": 2, "category_path": "服装/上衣/T恤", "fake_sales": 1280, - "image_src": "https://example.com/img/tshirt.jpg", + "image_src": sample_image_url, "tags": "T恤,纯棉,短袖,夏季", "create_time": now, "update_time": now, diff --git a/scripts/trace_indexer_calls.sh b/scripts/trace_indexer_calls.sh new file mode 100755 index 0000000..f79894d --- /dev/null +++ b/scripts/trace_indexer_calls.sh @@ -0,0 +1,74 @@ +#!/bin/bash +# +# 排查「谁在调用索引服务」的脚本 +# 用法: ./scripts/trace_indexer_calls.sh +# + +set -euo pipefail + +cd "$(dirname "$0")/.." +source ./activate.sh 2>/dev/null || true + +echo "==========================================" +echo "索引服务调用方排查" +echo "==========================================" + +INDEXER_PORT="${INDEXER_PORT:-6004}" +EMBEDDING_PORT="${EMBEDDING_PORT:-6005}" + +echo "" +echo "1. 监听端口 6004 的进程(Indexer 服务)" +echo "------------------------------------------" +if command -v lsof >/dev/null 2>&1; then + lsof -i :"${INDEXER_PORT}" 2>/dev/null || echo " (无进程监听或 lsof 无权限)" +else + ss -tlnp 2>/dev/null | grep ":${INDEXER_PORT}" || echo " (无进程监听)" +fi + +echo "" +echo "2. 连接到 6004 的客户端(谁在请求 Indexer)" +echo "------------------------------------------" +if command -v ss >/dev/null 2>&1; then + ss -tnp 2>/dev/null | grep ":${INDEXER_PORT}" || echo " (当前无活跃连接)" +elif command -v netstat >/dev/null 2>&1; then + netstat -tnp 2>/dev/null | grep ":${INDEXER_PORT}" || echo " (当前无活跃连接)" +else + echo " 请安装 ss 或 netstat" +fi + +echo "" +echo "3. 连接到 6005 的客户端(Indexer 会调用 Embedding 服务)" +echo "------------------------------------------" +if command -v ss >/dev/null 2>&1; then + ss -tnp 2>/dev/null | grep ":${EMBEDDING_PORT}" || echo " (当前无活跃连接)" +fi + +echo "" +echo "4. 检查定时任务(cron)" +echo "------------------------------------------" +(crontab -l 2>/dev/null | grep -i indexer) || echo " 当前用户无相关 cron" +if [ -d /etc/cron.d ]; then + grep -l -i indexer /etc/cron.d/* 2>/dev/null || true +fi + +echo "" +echo "5. 端口与逻辑说明" +echo "------------------------------------------" +echo " - Indexer 服务: 端口 ${INDEXER_PORT}" +echo " 启动: ./scripts/start_indexer.sh 或 python main.py serve-indexer" +echo " 接口: POST /indexer/reindex, POST /indexer/index, POST /indexer/build-docs 等" +echo "" +echo " - 调用方(文档说明): 外部 Java 程序或 curl 等 HTTP 客户端" +echo " 全量: curl -X POST http://localhost:${INDEXER_PORT}/indexer/reindex -d '{\"tenant_id\":\"170\",\"batch_size\":500}'" +echo " 增量: curl -X POST http://localhost:${INDEXER_PORT}/indexer/index -d '{\"tenant_id\":\"170\",\"spu_ids\":[\"123\"]}'" +echo "" +echo " - Indexer 内部会调用:" +echo " - Embedding 服务 (${EMBEDDING_PORT}): POST /embed/text" +echo " - Qwen API: dashscope.aliyuncs.com (翻译、LLM 分析)" +echo " - MySQL: 商品数据" +echo " - Elasticsearch: 写入索引" +echo "" +echo "6. 实时监控连接(按 Ctrl+C 停止)" +echo "------------------------------------------" +echo " 运行: watch -n 2 'ss -tnp | grep -E \":${INDEXER_PORT}|:${EMBEDDING_PORT}\"'" +echo "" diff --git a/search/es_query_builder.py b/search/es_query_builder.py index 68278ee..5325b2a 100644 --- a/search/es_query_builder.py +++ b/search/es_query_builder.py @@ -19,7 +19,6 @@ class ESQueryBuilder: def __init__( self, - index_name: str, match_fields: List[str], text_embedding_field: Optional[str] = None, image_embedding_field: Optional[str] = None, @@ -33,7 +32,6 @@ class ESQueryBuilder: Initialize query builder. Args: - index_name: ES index name match_fields: Fields to search for text matching text_embedding_field: Field name for text embeddings image_embedding_field: Field name for image embeddings @@ -43,7 +41,6 @@ class ESQueryBuilder: default_language: Default language to use when detection fails or returns "unknown" knn_boost: Boost value for KNN (embedding recall) """ - self.index_name = index_name self.match_fields = match_fields self.text_embedding_field = text_embedding_field self.image_embedding_field = image_embedding_field @@ -351,7 +348,6 @@ class ESQueryBuilder: def _build_text_query(self, query_text: str) -> Dict[str, Any]: """ Build simple text matching query (BM25). - Legacy method - kept for backward compatibility. Args: query_text: Query text diff --git a/search/searcher.py b/search/searcher.py index 93dbfc4..35b4ae7 100644 --- a/search/searcher.py +++ b/search/searcher.py @@ -17,7 +17,7 @@ from .es_query_builder import ESQueryBuilder from config import SearchConfig from config.tenant_config_loader import get_tenant_config_loader from config.utils import get_match_fields_for_index -from context.request_context import RequestContext, RequestContextStage, create_request_context +from context.request_context import RequestContext, RequestContextStage from api.models import FacetResult, FacetValue, FacetConfig from api.result_formatter import ResultFormatter from indexer.mapping_generator import get_tenant_index_name @@ -107,9 +107,7 @@ class Searcher: self.source_fields = config.query_config.source_fields or [] # Query builder - simplified single-layer architecture - # index_name is no longer needed in query builder since we use tenant-specific indices self.query_builder = ESQueryBuilder( - index_name="", # Not used, kept for backward compatibility match_fields=self.match_fields, text_embedding_field=self.text_embedding_field, image_embedding_field=self.image_embedding_field, @@ -160,9 +158,8 @@ class Searcher: Returns: SearchResult object with formatted results """ - # Create context if not provided (backward compatibility) if context is None: - context = create_request_context() + raise ValueError("context is required") # 根据租户配置决定翻译开关(离线/在线统一) tenant_loader = get_tenant_config_loader() diff --git a/tests/ci/test_service_api_contracts.py b/tests/ci/test_service_api_contracts.py index ca8e196..ffd10be 100644 --- a/tests/ci/test_service_api_contracts.py +++ b/tests/ci/test_service_api_contracts.py @@ -179,6 +179,7 @@ def indexer_client(monkeypatch): import api.indexer_app as indexer_app import api.routes.indexer as indexer_routes + indexer_app.app.router.on_startup.clear() monkeypatch.setattr(indexer_app, "init_indexer_service", lambda es_host="": None) monkeypatch.setattr(indexer_routes, "get_bulk_indexing_service", lambda: _FakeBulkService()) monkeypatch.setattr(indexer_routes, "get_incremental_service", lambda: _FakeIncrementalService()) diff --git a/tests/test_cnclip_service.py b/tests/test_cnclip_service.py index 9751346..d483c58 100755 --- a/tests/test_cnclip_service.py +++ b/tests/test_cnclip_service.py @@ -6,21 +6,35 @@ CN-CLIP 服务测试脚本 测试 CN-CLIP 服务的文本和图像编码功能(使用 gRPC 协议) 使用方法: - python scripts/test_cnclip_service.py [PORT] + python tests/test_cnclip_service.py [PORT] 参数: PORT: 服务端口(默认:51000) """ import sys +import os -import pytest +import numpy as np + +# Skip clip_client version check (it imports pkg_resources in legacy path). +os.environ.setdefault("NO_VERSION_CHECK", "1") + +# Ensure vendored client is importable in direct `python tests/test_cnclip_service.py` mode. +ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +VENDORED_CLIENT = os.path.join(ROOT, "third-party", "clip-as-service", "client") +if os.path.isdir(VENDORED_CLIENT) and VENDORED_CLIENT not in sys.path: + sys.path.insert(0, VENDORED_CLIENT) try: - import numpy as np from clip_client import Client -except ImportError: - pytest.skip("clip_client not installed (optional clip-as-service client)", allow_module_level=True) +except ImportError as e: + print("✗ 无法导入 clip_client。请先安装/暴露客户端依赖:") + print(" 1) pip install -e third-party/clip-as-service/client") + print(" 或") + print(" 2) export PYTHONPATH=third-party/clip-as-service/client:$PYTHONPATH") + print(f" 详细错误: {e}") + sys.exit(1) def _test_encoding(client, test_name, inputs): @@ -72,6 +86,15 @@ def main(): # 创建客户端 try: client = Client(grpc_url) + except ModuleNotFoundError as e: + if str(e) == "No module named 'pkg_resources'": + print("✗ 当前环境缺少 pkg_resources,clip_client/jina 无法初始化。") + print(" 建议使用专用环境运行:") + print(" .venv-cnclip/bin/python tests/test_cnclip_service.py 51000") + print(" 或在当前 .venv 安装兼容 setuptools(包含 pkg_resources)。") + sys.exit(1) + print(f"✗ 客户端创建失败: {e}") + sys.exit(1) except Exception as e: print(f"✗ 客户端创建失败: {e}") sys.exit(1) @@ -118,4 +141,3 @@ def main(): if __name__ == '__main__': main() - diff --git a/tests/test_embedding_pipeline.py b/tests/test_embedding_pipeline.py index 5558c6a..f524b38 100644 --- a/tests/test_embedding_pipeline.py +++ b/tests/test_embedding_pipeline.py @@ -2,6 +2,7 @@ import pickle from typing import Any, Dict, List, Optional import numpy as np +import pytest from config import ( FunctionScoreConfig, @@ -100,7 +101,7 @@ def test_text_embedding_encoder_response_alignment(monkeypatch): def _fake_post(url, json, timeout): assert url.endswith("/embed/text") assert json == ["hello", "world"] - return _FakeResponse([[0.1, 0.2], None]) + return _FakeResponse([[0.1, 0.2], [0.3, 0.4]]) monkeypatch.setattr("embeddings.text_encoder.requests.post", _fake_post) @@ -110,7 +111,22 @@ def test_text_embedding_encoder_response_alignment(monkeypatch): assert len(out) == 2 assert isinstance(out[0], np.ndarray) assert out[0].shape == (2,) - assert out[1] is None + assert isinstance(out[1], np.ndarray) + assert out[1].shape == (2,) + + +def test_text_embedding_encoder_raises_on_missing_vector(monkeypatch): + fake_redis = _FakeRedis() + monkeypatch.setattr("embeddings.text_encoder.redis.Redis", lambda **kwargs: fake_redis) + + def _fake_post(url, json, timeout): + return _FakeResponse([[0.1, 0.2], None]) + + monkeypatch.setattr("embeddings.text_encoder.requests.post", _fake_post) + + encoder = TextEmbeddingEncoder(service_url="http://127.0.0.1:6005") + with pytest.raises(ValueError): + encoder.encode(["hello", "world"]) def test_text_embedding_encoder_cache_hit(monkeypatch): @@ -156,4 +172,3 @@ def test_query_parser_skips_query_vector_when_disabled(): parsed = parser.parse("red dress", tenant_id="162", generate_vector=False) assert parsed.query_vector is None - -- libgit2 0.21.2