Commit efd435cfd693c23f8ee6eeb65adf4b63f08e54e3

Authored by tangwang
1 parent daf66a51

tei性能调优:

./scripts/start_tei_service.sh
START_TEI=0 ./scripts/service_ctl.sh restart embedding

curl -sS -X POST "http://127.0.0.1:6005/embed/text" \
  -H "Content-Type: application/json" \
  -d '["芭比娃娃 儿童玩具", "纯棉T恤 短袖"]'
config/config.yaml
... ... @@ -146,11 +146,11 @@ services:
146 146 http:
147 147 base_url: "http://127.0.0.1:6005"
148 148 # 服务内文本后端(embedding 进程启动时读取)
149   - backend: "local_st" # tei | local_st
  149 + backend: "tei" # tei | local_st
150 150 backends:
151 151 tei:
152 152 base_url: "http://127.0.0.1:8080"
153   - timeout_sec: 60
  153 + timeout_sec: 20
154 154 model_id: "Qwen/Qwen3-Embedding-0.6B"
155 155 local_st:
156 156 model_id: "Qwen/Qwen3-Embedding-0.6B"
... ...
docs/TEI_SERVICE说明文档.md
... ... @@ -68,9 +68,13 @@ TEI_USE_GPU=1 ./scripts/start_tei_service.sh
68 68  
69 69 预期输出包含:
70 70  
71   -- `Image: ghcr.io/huggingface/text-embeddings-inference:cuda-...`
  71 +- `Image: ghcr.io/huggingface/text-embeddings-inference:turing-...` 或 `cuda-...`(脚本按 GPU 架构自动选择)
72 72 - `Mode: gpu`
73   -- `TEI is ready: http://127.0.0.1:8080`
  73 +- `TEI is ready and output probe passed: http://127.0.0.1:8080`
  74 +
  75 +说明:
  76 +- T4(compute capability 7.5)会自动使用 `turing-*` 镜像。
  77 +- Ampere 及更新架构(compute capability >= 8)会自动使用 `cuda-*` 镜像。
74 78  
75 79 ### 5.2 CPU 模式启动(显式)
76 80  
... ... @@ -82,7 +86,7 @@ TEI_USE_GPU=0 ./scripts/start_tei_service.sh
82 86  
83 87 - `Image: ghcr.io/huggingface/text-embeddings-inference:1.9`(非 `cuda-`)
84 88 - `Mode: cpu`
85   -- `TEI is ready: http://127.0.0.1:8080`
  89 +- `TEI is ready and output probe passed: http://127.0.0.1:8080`
86 90  
87 91 ### 5.3 停止服务
88 92  
... ... @@ -108,6 +112,8 @@ curl -sS http://127.0.0.1:8080/embed \
108 112  
109 113 返回应为二维数组(每条输入对应一个向量)。
110 114  
  115 +建议再连续请求一次,确认不是“首个请求正常,后续返回 null/NaN”。
  116 +
111 117 ### 6.3 与 embedding 服务联调
112 118  
113 119 ```bash
... ... @@ -120,6 +126,11 @@ curl -sS -X POST "http://127.0.0.1:6005/embed/text" \
120 126  
121 127 返回应为 1024 维向量数组。
122 128  
  129 +### 6.4 运行建议(单服务兼顾在线与索引)
  130 +
  131 +- 在线 query(低延迟优先):客户端建议 `batch=1~4`
  132 +- 索引构建(吞吐优先):客户端建议 `batch=15~20`
  133 +
123 134 ## 7. 配置项(环境变量)
124 135  
125 136 `scripts/start_tei_service.sh` 支持下列变量:
... ... @@ -130,10 +141,11 @@ curl -sS -X POST "http://127.0.0.1:6005/embed/text" \
130 141 - `TEI_MODEL_ID`:默认 `Qwen/Qwen3-Embedding-0.6B`
131 142 - `TEI_VERSION`:镜像版本,默认 `1.9`
132 143 - `TEI_DTYPE`:默认 `float16`
133   -- `TEI_MAX_BATCH_TOKENS`:默认 `2048`
134   -- `TEI_MAX_CLIENT_BATCH_SIZE`:默认 `8`
  144 +- `TEI_MAX_BATCH_TOKENS`:默认 `4096`
  145 +- `TEI_MAX_CLIENT_BATCH_SIZE`:默认 `24`
135 146 - `HF_CACHE_DIR`:HF 缓存目录,默认 `$HOME/.cache/huggingface`
136 147 - `HF_TOKEN`:可选,避免匿名限速
  148 +- `TEI_IMAGE`:可选,手动指定镜像(通常不需要,建议使用脚本自动选择)
137 149  
138 150 ## 8. service_ctl 使用方式
139 151  
... ... @@ -184,10 +196,20 @@ curl -sS http://127.0.0.1:8080/health
184 196 - `TEI_BASE_URL`
185 197 - `services.embedding.backends.tei.base_url`(`config/config.yaml`)
186 198  
  199 +### 9.4 `/embed/text` 第二次请求开始出现 NaN/null
  200 +
  201 +- 常见原因:在 T4 这类 pre-Ampere GPU 上误用了 `cuda-*` TEI 镜像。
  202 +- 处理:
  203 +
  204 +```bash
  205 +./scripts/start_tei_service.sh
  206 +```
  207 +
  208 +该脚本会自动按 GPU 架构选择镜像,并在启动后做两次输出探测;若发现 `null/NaN/Inf` 会直接失败并清理错误容器。
  209 +
187 210 ## 10. 相关文档
188 211  
189 212 - 开发总览:`docs/QUICKSTART.md`
190 213 - 体系规范:`docs/DEVELOPER_GUIDE.md`
191 214 - embedding 模块:`embeddings/README.md`
192 215 - CN-CLIP 专项:`docs/CNCLIP_SERVICE说明文档.md`
193   -
... ...
docs/搜索API对接指南.md
... ... @@ -1586,6 +1586,28 @@ curl -X POST "http://localhost:6005/embed/image" \
1586 1586 curl "http://localhost:6005/health"
1587 1587 ```
1588 1588  
  1589 +#### 7.1.4 TEI 统一调优建议(主服务)
  1590 +
  1591 +使用单套主服务即可同时兼顾:
  1592 +- 在线 query 向量化(低延迟,常见 `batch=1~4`)
  1593 +- 索引构建向量化(高吞吐,常见 `batch=15~20`)
  1594 +
  1595 +统一启动(主链路):
  1596 +
  1597 +```bash
  1598 +./scripts/start_tei_service.sh
  1599 +START_TEI=0 ./scripts/service_ctl.sh restart embedding
  1600 +```
  1601 +
  1602 +默认端口:
  1603 +- TEI: `http://127.0.0.1:8080`
  1604 +- 向量服务(`/embed/text`): `http://127.0.0.1:6005`
  1605 +
  1606 +当前主 TEI 启动默认值(已按 T4/短文本场景调优):
  1607 +- `TEI_MAX_BATCH_TOKENS=4096`
  1608 +- `TEI_MAX_CLIENT_BATCH_SIZE=24`
  1609 +- `TEI_DTYPE=float16`
  1610 +
1589 1611 ### 7.2 重排服务(Reranker)
1590 1612  
1591 1613 - **Base URL**: `http://localhost:6007`(可通过 `RERANKER_SERVICE_URL` 覆盖)
... ... @@ -2094,6 +2116,8 @@ curl "http://localhost:6006/health"
2094 2116 - 翻译服务:`POST /translate`
2095 2117 - 重排服务:`POST /rerank`
2096 2118  
  2119 +说明:脚本对 `embed_text` 场景会校验返回向量内容有效性(必须是有限数值,不允许 `null/NaN/Inf`),不是只看 HTTP 200。
  2120 +
2097 2121 ### 10.1 快速示例
2098 2122  
2099 2123 ```bash
... ...
embeddings/qwen3_model.py
... ... @@ -9,6 +9,7 @@ from typing import List, Union
9 9  
10 10 import numpy as np
11 11 from sentence_transformers import SentenceTransformer
  12 +import torch
12 13  
13 14  
14 15 class Qwen3TextModel(object):
... ... @@ -24,8 +25,21 @@ class Qwen3TextModel(object):
24 25 if cls._instance is None:
25 26 cls._instance = super(Qwen3TextModel, cls).__new__(cls)
26 27 cls._instance.model = SentenceTransformer(model_id, trust_remote_code=True)
  28 + cls._instance._current_device = None
  29 + cls._instance._encode_lock = threading.Lock()
27 30 return cls._instance
28 31  
  32 + def _ensure_device(self, device: str) -> str:
  33 + target = (device or "cpu").strip().lower()
  34 + if target == "gpu":
  35 + target = "cuda"
  36 + if target == "cuda" and not torch.cuda.is_available():
  37 + target = "cpu"
  38 + if target != self._current_device:
  39 + self.model = self.model.to(target)
  40 + self._current_device = target
  41 + return target
  42 +
29 43 def encode(
30 44 self,
31 45 sentences: Union[str, List[str]],
... ... @@ -33,17 +47,18 @@ class Qwen3TextModel(object):
33 47 device: str = "cuda",
34 48 batch_size: int = 32,
35 49 ) -> np.ndarray:
36   - if device == "gpu":
37   - device = "cuda"
38   - self.model = self.model.to(device)
39   - embeddings = self.model.encode(
40   - sentences,
41   - normalize_embeddings=normalize_embeddings,
42   - device=device,
43   - show_progress_bar=False,
44   - batch_size=batch_size,
45   - )
46   - return embeddings
  50 + # SentenceTransformer + CUDA inference is not thread-safe in our usage;
  51 + # keep one in-flight encode call while avoiding repeated .to(device) hops.
  52 + with self._encode_lock:
  53 + run_device = self._ensure_device(device)
  54 + embeddings = self.model.encode(
  55 + sentences,
  56 + normalize_embeddings=normalize_embeddings,
  57 + device=run_device,
  58 + show_progress_bar=False,
  59 + batch_size=batch_size,
  60 + )
  61 + return embeddings
47 62  
48 63 def encode_batch(
49 64 self,
... ...
embeddings/server.py
... ... @@ -9,6 +9,9 @@ API (simple list-in, list-out; aligned by index):
9 9 import logging
10 10 import os
11 11 import threading
  12 +import time
  13 +from collections import deque
  14 +from dataclasses import dataclass
12 15 from typing import Any, Dict, List, Optional
13 16  
14 17 import numpy as np
... ... @@ -26,13 +29,139 @@ app = FastAPI(title="saas-search Embedding Service", version="1.0.0")
26 29 _text_model: Optional[Any] = None
27 30 _image_model: Optional[ImageEncoderProtocol] = None
28 31 _text_backend_name: str = ""
29   -open_text_model = True
30   -open_image_model = True # Enable image embedding when using clip-as-service
  32 +open_text_model = os.getenv("EMBEDDING_ENABLE_TEXT_MODEL", "true").lower() in ("1", "true", "yes")
  33 +open_image_model = os.getenv("EMBEDDING_ENABLE_IMAGE_MODEL", "true").lower() in ("1", "true", "yes")
31 34  
32 35 _text_encode_lock = threading.Lock()
33 36 _image_encode_lock = threading.Lock()
34 37  
35 38  
  39 +@dataclass
  40 +class _SingleTextTask:
  41 + text: str
  42 + normalize: bool
  43 + created_at: float
  44 + done: threading.Event
  45 + result: Optional[List[float]] = None
  46 + error: Optional[Exception] = None
  47 +
  48 +
  49 +_text_single_queue: "deque[_SingleTextTask]" = deque()
  50 +_text_single_queue_cv = threading.Condition()
  51 +_text_batch_worker: Optional[threading.Thread] = None
  52 +_text_batch_worker_stop = False
  53 +_TEXT_MICROBATCH_WINDOW_SEC = max(
  54 + 0.0, float(os.getenv("TEXT_MICROBATCH_WINDOW_MS", "4")) / 1000.0
  55 +)
  56 +_TEXT_REQUEST_TIMEOUT_SEC = max(
  57 + 1.0, float(os.getenv("TEXT_REQUEST_TIMEOUT_SEC", "30"))
  58 +)
  59 +
  60 +
  61 +def _encode_local_st(texts: List[str], normalize_embeddings: bool) -> Any:
  62 + with _text_encode_lock:
  63 + return _text_model.encode_batch(
  64 + texts,
  65 + batch_size=int(CONFIG.TEXT_BATCH_SIZE),
  66 + device=CONFIG.TEXT_DEVICE,
  67 + normalize_embeddings=normalize_embeddings,
  68 + )
  69 +
  70 +
  71 +def _start_text_batch_worker() -> None:
  72 + global _text_batch_worker, _text_batch_worker_stop
  73 + if _text_batch_worker is not None and _text_batch_worker.is_alive():
  74 + return
  75 + _text_batch_worker_stop = False
  76 + _text_batch_worker = threading.Thread(
  77 + target=_text_batch_worker_loop,
  78 + name="embed-text-microbatch-worker",
  79 + daemon=True,
  80 + )
  81 + _text_batch_worker.start()
  82 + logger.info(
  83 + "Started local_st text micro-batch worker | window_ms=%.1f max_batch=%d",
  84 + _TEXT_MICROBATCH_WINDOW_SEC * 1000.0,
  85 + int(CONFIG.TEXT_BATCH_SIZE),
  86 + )
  87 +
  88 +
  89 +def _stop_text_batch_worker() -> None:
  90 + global _text_batch_worker_stop
  91 + with _text_single_queue_cv:
  92 + _text_batch_worker_stop = True
  93 + _text_single_queue_cv.notify_all()
  94 +
  95 +
  96 +def _text_batch_worker_loop() -> None:
  97 + max_batch = max(1, int(CONFIG.TEXT_BATCH_SIZE))
  98 + while True:
  99 + with _text_single_queue_cv:
  100 + while not _text_single_queue and not _text_batch_worker_stop:
  101 + _text_single_queue_cv.wait()
  102 + if _text_batch_worker_stop:
  103 + return
  104 +
  105 + batch: List[_SingleTextTask] = [_text_single_queue.popleft()]
  106 + deadline = time.perf_counter() + _TEXT_MICROBATCH_WINDOW_SEC
  107 +
  108 + while len(batch) < max_batch:
  109 + remaining = deadline - time.perf_counter()
  110 + if remaining <= 0:
  111 + break
  112 + if not _text_single_queue:
  113 + _text_single_queue_cv.wait(timeout=remaining)
  114 + continue
  115 + while _text_single_queue and len(batch) < max_batch:
  116 + batch.append(_text_single_queue.popleft())
  117 +
  118 + try:
  119 + embs = _encode_local_st([task.text for task in batch], normalize_embeddings=False)
  120 + if embs is None or len(embs) != len(batch):
  121 + raise RuntimeError(
  122 + f"Text model response length mismatch in micro-batch: "
  123 + f"expected {len(batch)}, got {0 if embs is None else len(embs)}"
  124 + )
  125 + for task, emb in zip(batch, embs):
  126 + vec = _as_list(emb, normalize=task.normalize)
  127 + if vec is None:
  128 + raise RuntimeError("Text model returned empty embedding in micro-batch")
  129 + task.result = vec
  130 + except Exception as exc:
  131 + for task in batch:
  132 + task.error = exc
  133 + finally:
  134 + for task in batch:
  135 + task.done.set()
  136 +
  137 +
  138 +def _encode_single_text_with_microbatch(text: str, normalize: bool) -> List[float]:
  139 + task = _SingleTextTask(
  140 + text=text,
  141 + normalize=normalize,
  142 + created_at=time.perf_counter(),
  143 + done=threading.Event(),
  144 + )
  145 + with _text_single_queue_cv:
  146 + _text_single_queue.append(task)
  147 + _text_single_queue_cv.notify()
  148 +
  149 + if not task.done.wait(timeout=_TEXT_REQUEST_TIMEOUT_SEC):
  150 + with _text_single_queue_cv:
  151 + try:
  152 + _text_single_queue.remove(task)
  153 + except ValueError:
  154 + pass
  155 + raise RuntimeError(
  156 + f"Timed out waiting for text micro-batch worker ({_TEXT_REQUEST_TIMEOUT_SEC:.1f}s)"
  157 + )
  158 + if task.error is not None:
  159 + raise task.error
  160 + if task.result is None:
  161 + raise RuntimeError("Text micro-batch worker returned empty result")
  162 + return task.result
  163 +
  164 +
36 165 @app.on_event("startup")
37 166 def load_models():
38 167 """Load models at service startup to avoid first-request latency."""
... ... @@ -73,6 +202,7 @@ def load_models():
73 202 )
74 203 logger.info("Loading text backend: local_st (model=%s)", model_id)
75 204 _text_model = Qwen3TextModel(model_id=str(model_id))
  205 + _start_text_batch_worker()
76 206 else:
77 207 raise ValueError(
78 208 f"Unsupported embedding backend: {backend_name}. "
... ... @@ -112,6 +242,11 @@ def load_models():
112 242 logger.info("All embedding models loaded successfully, service ready")
113 243  
114 244  
  245 +@app.on_event("shutdown")
  246 +def stop_workers() -> None:
  247 + _stop_text_batch_worker()
  248 +
  249 +
115 250 def _normalize_vector(vec: np.ndarray) -> np.ndarray:
116 251 norm = float(np.linalg.norm(vec))
117 252 if not np.isfinite(norm) or norm <= 0.0:
... ... @@ -157,8 +292,24 @@ def embed_text(texts: List[str], normalize: Optional[bool] = None) -&gt; List[Optio
157 292 raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: empty string")
158 293 normalized.append(s)
159 294  
  295 + t0 = time.perf_counter()
160 296 try:
161   - with _text_encode_lock:
  297 + # local_st backend uses in-process torch model, keep serialized encode for safety;
  298 + # TEI backend is an HTTP client and supports concurrent requests.
  299 + if _text_backend_name == "local_st":
  300 + if len(normalized) == 1 and _text_batch_worker is not None:
  301 + out = [_encode_single_text_with_microbatch(normalized[0], normalize=effective_normalize)]
  302 + elapsed_ms = (time.perf_counter() - t0) * 1000.0
  303 + logger.info(
  304 + "embed_text done | backend=%s mode=microbatch-single inputs=%d normalize=%s elapsed_ms=%.2f",
  305 + _text_backend_name,
  306 + len(normalized),
  307 + effective_normalize,
  308 + elapsed_ms,
  309 + )
  310 + return out
  311 + embs = _encode_local_st(normalized, normalize_embeddings=False)
  312 + else:
162 313 embs = _text_model.encode_batch(
163 314 normalized,
164 315 batch_size=int(CONFIG.TEXT_BATCH_SIZE),
... ... @@ -182,6 +333,14 @@ def embed_text(texts: List[str], normalize: Optional[bool] = None) -&gt; List[Optio
182 333 if vec is None:
183 334 raise RuntimeError(f"Text model returned empty embedding for index {i}")
184 335 out.append(vec)
  336 + elapsed_ms = (time.perf_counter() - t0) * 1000.0
  337 + logger.info(
  338 + "embed_text done | backend=%s inputs=%d normalize=%s elapsed_ms=%.2f",
  339 + _text_backend_name,
  340 + len(normalized),
  341 + effective_normalize,
  342 + elapsed_ms,
  343 + )
185 344 return out
186 345  
187 346  
... ...
reranker/backends/qwen3_vllm.py
... ... @@ -9,6 +9,7 @@ from __future__ import annotations
9 9  
10 10 import logging
11 11 import math
  12 +import threading
12 13 import time
13 14 from typing import Any, Dict, List, Optional, Tuple
14 15  
... ... @@ -102,6 +103,9 @@ class Qwen3VLLMRerankerBackend:
102 103 logprobs=20,
103 104 allowed_token_ids=[self._true_token, self._false_token],
104 105 )
  106 + # vLLM generate path is unstable under concurrent calls in this process model.
  107 + # Serialize infer calls to avoid engine-core protocol corruption.
  108 + self._infer_lock = threading.Lock()
105 109  
106 110 self._model_name = model_name
107 111 logger.info("[Qwen3_VLLM] Model ready | model=%s", model_name)
... ... @@ -209,8 +213,9 @@ class Qwen3VLLMRerankerBackend:
209 213 position_to_unique.append(len(unique_texts) - 1)
210 214  
211 215 pairs = [(query, t) for t in unique_texts]
212   - prompts = self._process_inputs(pairs)
213   - unique_scores = self._compute_scores(prompts)
  216 + with self._infer_lock:
  217 + prompts = self._process_inputs(pairs)
  218 + unique_scores = self._compute_scores(prompts)
214 219  
215 220 for (orig_idx, _), unique_idx in zip(indexed, position_to_unique):
216 221 # Score is already P(yes) in [0,1] from yes/(yes+no)
... ...
scripts/perf_api_benchmark.py
... ... @@ -55,6 +55,43 @@ class RequestResult:
55 55 error: str = ""
56 56  
57 57  
  58 +def _is_finite_number(v: Any) -> bool:
  59 + if isinstance(v, bool):
  60 + return False
  61 + if isinstance(v, (int, float)):
  62 + return math.isfinite(float(v))
  63 + return False
  64 +
  65 +
  66 +def validate_response_payload(
  67 + scenario_name: str,
  68 + tpl: RequestTemplate,
  69 + payload: Any,
  70 +) -> Tuple[bool, str]:
  71 + """
  72 + Lightweight payload validation for correctness-aware perf tests.
  73 + Currently strict for embed_text to catch NaN/null vector regressions.
  74 + """
  75 + if scenario_name != "embed_text":
  76 + return True, ""
  77 +
  78 + expected_len = len(tpl.json_body) if isinstance(tpl.json_body, list) else None
  79 + if not isinstance(payload, list):
  80 + return False, "invalid_payload_non_list"
  81 + if expected_len is not None and len(payload) != expected_len:
  82 + return False, "invalid_payload_length"
  83 + if len(payload) == 0:
  84 + return False, "invalid_payload_empty"
  85 +
  86 + for i, vec in enumerate(payload):
  87 + if not isinstance(vec, list) or len(vec) == 0:
  88 + return False, f"invalid_vector_{i}_shape"
  89 + for x in vec:
  90 + if not _is_finite_number(x):
  91 + return False, f"invalid_vector_{i}_non_finite"
  92 + return True, ""
  93 +
  94 +
58 95 def percentile(sorted_values: List[float], p: float) -> float:
59 96 if not sorted_values:
60 97 return 0.0
... ... @@ -259,7 +296,22 @@ async def run_single_scenario(
259 296 )
260 297 status = int(resp.status_code)
261 298 ok = 200 <= status < 300
262   - if not ok:
  299 + if ok:
  300 + try:
  301 + payload = resp.json()
  302 + except Exception:
  303 + ok = False
  304 + err = "invalid_json_response"
  305 + else:
  306 + valid, reason = validate_response_payload(
  307 + scenario_name=scenario.name,
  308 + tpl=tpl,
  309 + payload=payload,
  310 + )
  311 + if not valid:
  312 + ok = False
  313 + err = reason or "invalid_payload"
  314 + if not ok and not err:
263 315 err = f"http_{status}"
264 316 except Exception as e:
265 317 err = type(e).__name__
... ...
scripts/start_embedding_service.sh
... ... @@ -54,6 +54,13 @@ USE_CLIP_AS_SERVICE=$(&quot;${PYTHON_BIN}&quot; -c &quot;from embeddings.config import CONFIG;
54 54 CLIP_AS_SERVICE_SERVER=$("${PYTHON_BIN}" -c "from embeddings.config import CONFIG; print(CONFIG.CLIP_AS_SERVICE_SERVER)")
55 55 TEXT_BACKEND=$("${PYTHON_BIN}" -c "from config.services_config import get_embedding_backend_config; print(get_embedding_backend_config()[0])")
56 56 TEI_BASE_URL=$("${PYTHON_BIN}" -c "import os; from config.services_config import get_embedding_backend_config; from embeddings.config import CONFIG; _, cfg = get_embedding_backend_config(); print(os.getenv('TEI_BASE_URL') or cfg.get('base_url') or CONFIG.TEI_BASE_URL)")
  57 +ENABLE_IMAGE_MODEL="${EMBEDDING_ENABLE_IMAGE_MODEL:-true}"
  58 +ENABLE_IMAGE_MODEL="$(echo "${ENABLE_IMAGE_MODEL}" | tr '[:upper:]' '[:lower:]')"
  59 +if [[ "${ENABLE_IMAGE_MODEL}" == "1" || "${ENABLE_IMAGE_MODEL}" == "true" || "${ENABLE_IMAGE_MODEL}" == "yes" ]]; then
  60 + IMAGE_MODEL_ENABLED=1
  61 +else
  62 + IMAGE_MODEL_ENABLED=0
  63 +fi
57 64  
58 65 EMBEDDING_SERVICE_HOST="${EMBEDDING_HOST:-${DEFAULT_EMBEDDING_SERVICE_HOST}}"
59 66 EMBEDDING_SERVICE_PORT="${EMBEDDING_PORT:-${DEFAULT_EMBEDDING_SERVICE_PORT}}"
... ... @@ -66,7 +73,7 @@ if [[ &quot;${TEXT_BACKEND}&quot; == &quot;tei&quot; ]]; then
66 73 fi
67 74 fi
68 75  
69   -if [[ "${USE_CLIP_AS_SERVICE}" == "1" ]]; then
  76 +if [[ "${IMAGE_MODEL_ENABLED}" == "1" && "${USE_CLIP_AS_SERVICE}" == "1" ]]; then
70 77 CLIP_SERVER="${CLIP_AS_SERVICE_SERVER#*://}"
71 78 CLIP_HOST="${CLIP_SERVER%:*}"
72 79 CLIP_PORT="${CLIP_SERVER##*:}"
... ... @@ -102,7 +109,9 @@ echo &quot;Text backend: ${TEXT_BACKEND}&quot;
102 109 if [[ "${TEXT_BACKEND}" == "tei" ]]; then
103 110 echo "TEI URL: ${TEI_BASE_URL}"
104 111 fi
105   -if [[ "${USE_CLIP_AS_SERVICE}" == "1" ]]; then
  112 +if [[ "${IMAGE_MODEL_ENABLED}" == "0" ]]; then
  113 + echo "Image backend: disabled"
  114 +elif [[ "${USE_CLIP_AS_SERVICE}" == "1" ]]; then
106 115 echo "Image backend: clip-as-service (${CLIP_AS_SERVICE_SERVER})"
107 116 fi
108 117 echo
... ... @@ -111,7 +120,16 @@ echo &quot; - Use a single worker (GPU models cannot be safely duplicated across wor
111 120 echo " - Clients can set EMBEDDING_SERVICE_URL=http://localhost:${EMBEDDING_SERVICE_PORT}"
112 121 echo
113 122  
114   -exec "${PYTHON_BIN}" -m uvicorn embeddings.server:app \
115   - --host "${EMBEDDING_SERVICE_HOST}" \
116   - --port "${EMBEDDING_SERVICE_PORT}" \
  123 +UVICORN_LOG_LEVEL="${EMBEDDING_UVICORN_LOG_LEVEL:-info}"
  124 +UVICORN_ACCESS_LOG="${EMBEDDING_UVICORN_ACCESS_LOG:-true}"
  125 +UVICORN_ARGS=(
  126 + --host "${EMBEDDING_SERVICE_HOST}"
  127 + --port "${EMBEDDING_SERVICE_PORT}"
117 128 --workers 1
  129 + --log-level "${UVICORN_LOG_LEVEL}"
  130 +)
  131 +if [[ "${UVICORN_ACCESS_LOG}" == "0" || "${UVICORN_ACCESS_LOG}" == "false" || "${UVICORN_ACCESS_LOG}" == "no" ]]; then
  132 + UVICORN_ARGS+=(--no-access-log)
  133 +fi
  134 +
  135 +exec "${PYTHON_BIN}" -m uvicorn embeddings.server:app "${UVICORN_ARGS[@]}"
... ...
scripts/start_tei_service.sh
... ... @@ -43,8 +43,8 @@ TEI_CONTAINER_NAME=&quot;${TEI_CONTAINER_NAME:-saas-search-tei}&quot;
43 43 TEI_PORT="${TEI_PORT:-8080}"
44 44 TEI_MODEL_ID="${TEI_MODEL_ID:-Qwen/Qwen3-Embedding-0.6B}"
45 45 TEI_VERSION="${TEI_VERSION:-1.9}"
46   -TEI_MAX_BATCH_TOKENS="${TEI_MAX_BATCH_TOKENS:-2048}"
47   -TEI_MAX_CLIENT_BATCH_SIZE="${TEI_MAX_CLIENT_BATCH_SIZE:-8}"
  46 +TEI_MAX_BATCH_TOKENS="${TEI_MAX_BATCH_TOKENS:-4096}"
  47 +TEI_MAX_CLIENT_BATCH_SIZE="${TEI_MAX_CLIENT_BATCH_SIZE:-24}"
48 48 TEI_DTYPE="${TEI_DTYPE:-float16}"
49 49 HF_CACHE_DIR="${HF_CACHE_DIR:-$HOME/.cache/huggingface}"
50 50 TEI_HEALTH_TIMEOUT_SEC="${TEI_HEALTH_TIMEOUT_SEC:-300}"
... ... @@ -60,6 +60,18 @@ else
60 60 exit 1
61 61 fi
62 62  
  63 +detect_gpu_tei_image() {
  64 + # Prefer turing image for pre-Ampere GPUs (e.g. Tesla T4, compute capability 7.5).
  65 + local compute_cap major
  66 + compute_cap="$(nvidia-smi --query-gpu=compute_cap --format=csv,noheader 2>/dev/null | head -n1 || true)"
  67 + major="${compute_cap%%.*}"
  68 + if [[ -n "${major}" && "${major}" -lt 8 ]]; then
  69 + echo "ghcr.io/huggingface/text-embeddings-inference:turing-${TEI_VERSION}"
  70 + else
  71 + echo "ghcr.io/huggingface/text-embeddings-inference:cuda-${TEI_VERSION}"
  72 + fi
  73 +}
  74 +
63 75 if [[ "${USE_GPU}" == "1" ]]; then
64 76 if ! command -v nvidia-smi >/dev/null 2>&1 || ! nvidia-smi >/dev/null 2>&1; then
65 77 echo "ERROR: TEI_USE_GPU=1 but NVIDIA GPU is not available. No CPU fallback." >&2
... ... @@ -70,7 +82,7 @@ if [[ &quot;${USE_GPU}&quot; == &quot;1&quot; ]]; then
70 82 echo "Install and configure nvidia-container-toolkit, then restart Docker." >&2
71 83 exit 1
72 84 fi
73   - TEI_IMAGE="${TEI_IMAGE:-ghcr.io/huggingface/text-embeddings-inference:cuda-${TEI_VERSION}}"
  85 + TEI_IMAGE="${TEI_IMAGE:-$(detect_gpu_tei_image)}"
74 86 GPU_ARGS=(--gpus all)
75 87 TEI_MODE="gpu"
76 88 else
... ... @@ -87,28 +99,33 @@ if [[ -n &quot;${existing_id}&quot; ]]; then
87 99 if [[ -n "${running_id}" ]]; then
88 100 current_image="$(docker inspect "${TEI_CONTAINER_NAME}" --format '{{.Config.Image}}' 2>/dev/null || true)"
89 101 device_req="$(docker inspect "${TEI_CONTAINER_NAME}" --format '{{json .HostConfig.DeviceRequests}}' 2>/dev/null || true)"
  102 + current_is_gpu_image=0
  103 + if [[ "${current_image}" == *":cuda-"* || "${current_image}" == *":turing-"* ]]; then
  104 + current_is_gpu_image=1
  105 + fi
90 106 if [[ "${USE_GPU}" == "1" ]]; then
91   - if [[ "${current_image}" != *":cuda-"* ]] || [[ "${device_req}" == "null" ]]; then
92   - echo "ERROR: existing TEI container mode mismatch (need GPU): ${TEI_CONTAINER_NAME}" >&2
93   - echo " image=${current_image:-unknown}" >&2
94   - echo " device_requests=${device_req:-unknown}" >&2
95   - echo "Stop it first: ./scripts/stop_tei_service.sh" >&2
96   - exit 1
  107 + if [[ "${current_is_gpu_image}" -eq 1 ]] && [[ "${device_req}" != "null" ]] && [[ "${current_image}" == "${TEI_IMAGE}" ]]; then
  108 + echo "TEI already running (GPU): ${TEI_CONTAINER_NAME}"
  109 + exit 0
97 110 fi
98   - echo "TEI already running (GPU): ${TEI_CONTAINER_NAME}"
  111 + echo "TEI running with different mode/image; recreating container ${TEI_CONTAINER_NAME}"
  112 + echo " current_image=${current_image:-unknown}"
  113 + echo " target_image=${TEI_IMAGE}"
  114 + docker rm -f "${TEI_CONTAINER_NAME}" >/dev/null 2>&1 || true
99 115 else
100   - if [[ "${current_image}" == *":cuda-"* ]] || [[ "${device_req}" != "null" ]]; then
101   - echo "ERROR: existing TEI container mode mismatch (need CPU): ${TEI_CONTAINER_NAME}" >&2
102   - echo " image=${current_image:-unknown}" >&2
103   - echo " device_requests=${device_req:-unknown}" >&2
104   - echo "Stop it first: ./scripts/stop_tei_service.sh" >&2
105   - exit 1
  116 + if [[ "${current_is_gpu_image}" -eq 0 ]] && [[ "${device_req}" == "null" ]] && [[ "${current_image}" == "${TEI_IMAGE}" ]]; then
  117 + echo "TEI already running (CPU): ${TEI_CONTAINER_NAME}"
  118 + exit 0
106 119 fi
107   - echo "TEI already running (CPU): ${TEI_CONTAINER_NAME}"
  120 + echo "TEI running with different mode/image; recreating container ${TEI_CONTAINER_NAME}"
  121 + echo " current_image=${current_image:-unknown}"
  122 + echo " target_image=${TEI_IMAGE}"
  123 + docker rm -f "${TEI_CONTAINER_NAME}" >/dev/null 2>&1 || true
108 124 fi
109   - exit 0
110 125 fi
111   - docker rm "${TEI_CONTAINER_NAME}" >/dev/null
  126 + if docker ps -aq -f name=^/${TEI_CONTAINER_NAME}$ | grep -q .; then
  127 + docker rm "${TEI_CONTAINER_NAME}" >/dev/null
  128 + fi
112 129 fi
113 130  
114 131 echo "Starting TEI container: ${TEI_CONTAINER_NAME}"
... ... @@ -132,12 +149,37 @@ docker run -d \
132 149 echo "Waiting for TEI health..."
133 150 for i in $(seq 1 "${TEI_HEALTH_TIMEOUT_SEC}"); do
134 151 if curl -sf "http://127.0.0.1:${TEI_PORT}/health" >/dev/null 2>&1; then
135   - echo "TEI is ready: http://127.0.0.1:${TEI_PORT}"
136   - exit 0
  152 + echo "TEI health is ready: http://127.0.0.1:${TEI_PORT}"
  153 + break
137 154 fi
138 155 sleep 1
  156 + if [[ "${i}" == "${TEI_HEALTH_TIMEOUT_SEC}" ]]; then
  157 + echo "ERROR: TEI failed to become healthy in time." >&2
  158 + docker logs --tail 100 "${TEI_CONTAINER_NAME}" >&2 || true
  159 + exit 1
  160 + fi
  161 +done
  162 +
  163 +echo "Running TEI output probe..."
  164 +for probe_idx in 1 2; do
  165 + probe_resp="$(curl -sf -X POST "http://127.0.0.1:${TEI_PORT}/embed" \
  166 + -H "Content-Type: application/json" \
  167 + -d '{"inputs":["health check","芭比娃娃 儿童玩具"]}' || true)"
  168 + if [[ -z "${probe_resp}" ]]; then
  169 + echo "ERROR: TEI probe ${probe_idx} failed: empty response" >&2
  170 + docker logs --tail 120 "${TEI_CONTAINER_NAME}" >&2 || true
  171 + docker rm -f "${TEI_CONTAINER_NAME}" >/dev/null 2>&1 || true
  172 + exit 1
  173 + fi
  174 + # Detect non-finite-like payloads (observed as null/NaN on incompatible CUDA image + GPU).
  175 + if echo "${probe_resp}" | rg -qi '(null|nan|inf)'; then
  176 + echo "ERROR: TEI probe ${probe_idx} detected invalid embedding values (null/NaN/Inf)." >&2
  177 + echo "Response preview: $(echo "${probe_resp}" | head -c 220)" >&2
  178 + docker logs --tail 120 "${TEI_CONTAINER_NAME}" >&2 || true
  179 + docker rm -f "${TEI_CONTAINER_NAME}" >/dev/null 2>&1 || true
  180 + exit 1
  181 + fi
139 182 done
140 183  
141   -echo "ERROR: TEI failed to become healthy in time." >&2
142   -docker logs --tail 100 "${TEI_CONTAINER_NAME}" >&2 || true
143   -exit 1
  184 +echo "TEI is ready and output probe passed: http://127.0.0.1:${TEI_PORT}"
  185 +exit 0
... ...
suggestion/service.py
... ... @@ -123,6 +123,7 @@ class SuggestionService:
123 123 size: int = 10,
124 124 ) -> Dict[str, Any]:
125 125 start = time.time()
  126 + query_text = str(query or "").strip()
126 127 resolved_lang = self._resolve_language(tenant_id, language)
127 128 index_name = self._resolve_search_target(tenant_id)
128 129 if not index_name:
... ... @@ -137,6 +138,55 @@ class SuggestionService:
137 138 "took_ms": took_ms,
138 139 }
139 140  
  141 + # Recall path A: completion suggester (fast path, usually enough for short prefix typing)
  142 + t_completion_start = time.time()
  143 + completion_items = self._completion_suggest(
  144 + index_name=index_name,
  145 + query=query_text,
  146 + lang=resolved_lang,
  147 + size=size,
  148 + tenant_id=tenant_id,
  149 + )
  150 + completion_ms = int((time.time() - t_completion_start) * 1000)
  151 +
  152 + suggestions: List[Dict[str, Any]] = []
  153 + seen_text_norm: set = set()
  154 +
  155 + def _norm_text(v: Any) -> str:
  156 + return str(v or "").strip().lower()
  157 +
  158 + def _append_items(items: List[Dict[str, Any]]) -> None:
  159 + for item in items:
  160 + text_val = item.get("text")
  161 + norm = _norm_text(text_val)
  162 + if not norm or norm in seen_text_norm:
  163 + continue
  164 + seen_text_norm.add(norm)
  165 + suggestions.append(dict(item))
  166 +
  167 + _append_items(completion_items)
  168 +
  169 + # Fast path: avoid a second ES query for short prefixes or when completion already full.
  170 + if len(query_text) <= 2 or len(suggestions) >= size:
  171 + took_ms = int((time.time() - start) * 1000)
  172 + logger.info(
  173 + "suggest completion-fast-return | tenant=%s lang=%s q=%s completion=%d took_ms=%d completion_ms=%d",
  174 + tenant_id,
  175 + resolved_lang,
  176 + query_text,
  177 + len(suggestions),
  178 + took_ms,
  179 + completion_ms,
  180 + )
  181 + return {
  182 + "query": query,
  183 + "language": language,
  184 + "resolved_language": resolved_lang,
  185 + "suggestions": suggestions[:size],
  186 + "took_ms": took_ms,
  187 + }
  188 +
  189 + # Recall path B: bool_prefix on search_as_you_type (fallback/recall补全)
140 190 sat_field = f"sat.{resolved_lang}"
141 191 dsl = {
142 192 "track_total_hits": False,
... ... @@ -151,7 +201,7 @@ class SuggestionService:
151 201 "should": [
152 202 {
153 203 "multi_match": {
154   - "query": query,
  204 + "query": query_text,
155 205 "type": "bool_prefix",
156 206 "fields": [sat_field, f"{sat_field}._2gram", f"{sat_field}._3gram"],
157 207 }
... ... @@ -180,7 +230,7 @@ class SuggestionService:
180 230 "lang_conflict",
181 231 ],
182 232 }
183   - # Recall path A: bool_prefix on search_as_you_type
  233 + t_sat_start = time.time()
184 234 es_resp = self.es_client.search(
185 235 index_name=index_name,
186 236 body=dsl,
... ... @@ -188,52 +238,38 @@ class SuggestionService:
188 238 from_=0,
189 239 routing=str(tenant_id),
190 240 )
  241 + sat_ms = int((time.time() - t_sat_start) * 1000)
191 242 hits = es_resp.get("hits", {}).get("hits", []) or []
192 243  
193   - # Recall path B: completion suggester (optional optimization)
194   - completion_items = self._completion_suggest(
195   - index_name=index_name,
196   - query=query,
197   - lang=resolved_lang,
198   - size=size,
199   - tenant_id=tenant_id,
200   - )
201   -
202   - suggestions: List[Dict[str, Any]] = []
203   - seen_text_norm: set = set()
204   -
205   - def _norm_text(v: Any) -> str:
206   - return str(v or "").strip().lower()
207   -
208   - # Put completion results first (usually better prefix UX), then fill with sat results.
209   - for item in completion_items:
210   - text_val = item.get("text")
211   - norm = _norm_text(text_val)
212   - if not norm or norm in seen_text_norm:
213   - continue
214   - seen_text_norm.add(norm)
215   - suggestions.append(dict(item))
216   -
  244 + sat_items: List[Dict[str, Any]] = []
217 245 for hit in hits:
218 246 src = hit.get("_source", {}) or {}
219   - text_val = src.get("text")
220   - norm = _norm_text(text_val)
221   - if not norm or norm in seen_text_norm:
222   - continue
223   - seen_text_norm.add(norm)
224   - item = {
225   - "text": text_val,
226   - "lang": src.get("lang"),
227   - "score": hit.get("_score", 0.0),
228   - "rank_score": src.get("rank_score"),
229   - "sources": src.get("sources", []),
230   - "lang_source": src.get("lang_source"),
231   - "lang_confidence": src.get("lang_confidence"),
232   - "lang_conflict": src.get("lang_conflict", False),
233   - }
234   - suggestions.append(item)
  247 + sat_items.append(
  248 + {
  249 + "text": src.get("text"),
  250 + "lang": src.get("lang"),
  251 + "score": hit.get("_score", 0.0),
  252 + "rank_score": src.get("rank_score"),
  253 + "sources": src.get("sources", []),
  254 + "lang_source": src.get("lang_source"),
  255 + "lang_confidence": src.get("lang_confidence"),
  256 + "lang_conflict": src.get("lang_conflict", False),
  257 + }
  258 + )
  259 + _append_items(sat_items)
235 260  
236 261 took_ms = int((time.time() - start) * 1000)
  262 + logger.info(
  263 + "suggest completion+sat-return | tenant=%s lang=%s q=%s completion=%d sat_hits=%d took_ms=%d completion_ms=%d sat_ms=%d",
  264 + tenant_id,
  265 + resolved_lang,
  266 + query_text,
  267 + len(completion_items),
  268 + len(hits),
  269 + took_ms,
  270 + completion_ms,
  271 + sat_ms,
  272 + )
237 273 return {
238 274 "query": query,
239 275 "language": language,
... ...