7bfb9946
tangwang
向量化模块
|
1
2
3
|
"""
Embedding service (FastAPI).
|
ed948666
tangwang
tidy
|
4
5
6
|
API (simple list-in, list-out; aligned by index):
- POST /embed/text body: ["text1", "text2", ...] -> [[...], ...]
- POST /embed/image body: ["url_or_path1", ...] -> [[...], ...]
|
7bfb9946
tangwang
向量化模块
|
7
8
|
"""
|
0a3764c4
tangwang
优化embedding模型加载
|
9
|
import logging
|
07cf5a93
tangwang
START_EMBEDDING=...
|
10
|
import os
|
7bfb9946
tangwang
向量化模块
|
11
|
import threading
|
efd435cf
tangwang
tei性能调优:
|
12
13
14
|
import time
from collections import deque
from dataclasses import dataclass
|
7bfb9946
tangwang
向量化模块
|
15
16
17
|
from typing import Any, Dict, List, Optional
import numpy as np
|
ed948666
tangwang
tidy
|
18
|
from fastapi import FastAPI, HTTPException
|
7bfb9946
tangwang
向量化模块
|
19
20
|
from embeddings.config import CONFIG
|
c10f90fe
tangwang
cnclip
|
21
|
from embeddings.protocols import ImageEncoderProtocol
|
07cf5a93
tangwang
START_EMBEDDING=...
|
22
|
from config.services_config import get_embedding_backend_config
|
7bfb9946
tangwang
向量化模块
|
23
|
|
0a3764c4
tangwang
优化embedding模型加载
|
24
|
logger = logging.getLogger(__name__)
|
7bfb9946
tangwang
向量化模块
|
25
|
|
a7920e17
tangwang
项目名称和部署路径修改
|
26
|
app = FastAPI(title="saas-search Embedding Service", version="1.0.0")
|
7bfb9946
tangwang
向量化模块
|
27
|
|
0a3764c4
tangwang
优化embedding模型加载
|
28
|
# Models are loaded at startup, not lazily
|
950a640e
tangwang
embeddings
|
29
|
_text_model: Optional[Any] = None
|
c10f90fe
tangwang
cnclip
|
30
|
_image_model: Optional[ImageEncoderProtocol] = None
|
07cf5a93
tangwang
START_EMBEDDING=...
|
31
|
_text_backend_name: str = ""
|
efd435cf
tangwang
tei性能调优:
|
32
33
|
open_text_model = os.getenv("EMBEDDING_ENABLE_TEXT_MODEL", "true").lower() in ("1", "true", "yes")
open_image_model = os.getenv("EMBEDDING_ENABLE_IMAGE_MODEL", "true").lower() in ("1", "true", "yes")
|
7bfb9946
tangwang
向量化模块
|
34
35
36
37
38
|
_text_encode_lock = threading.Lock()
_image_encode_lock = threading.Lock()
|
efd435cf
tangwang
tei性能调优:
|
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
@dataclass
class _SingleTextTask:
text: str
normalize: bool
created_at: float
done: threading.Event
result: Optional[List[float]] = None
error: Optional[Exception] = None
_text_single_queue: "deque[_SingleTextTask]" = deque()
_text_single_queue_cv = threading.Condition()
_text_batch_worker: Optional[threading.Thread] = None
_text_batch_worker_stop = False
_TEXT_MICROBATCH_WINDOW_SEC = max(
0.0, float(os.getenv("TEXT_MICROBATCH_WINDOW_MS", "4")) / 1000.0
)
_TEXT_REQUEST_TIMEOUT_SEC = max(
1.0, float(os.getenv("TEXT_REQUEST_TIMEOUT_SEC", "30"))
)
def _encode_local_st(texts: List[str], normalize_embeddings: bool) -> Any:
with _text_encode_lock:
return _text_model.encode_batch(
texts,
batch_size=int(CONFIG.TEXT_BATCH_SIZE),
device=CONFIG.TEXT_DEVICE,
normalize_embeddings=normalize_embeddings,
)
def _start_text_batch_worker() -> None:
global _text_batch_worker, _text_batch_worker_stop
if _text_batch_worker is not None and _text_batch_worker.is_alive():
return
_text_batch_worker_stop = False
_text_batch_worker = threading.Thread(
target=_text_batch_worker_loop,
name="embed-text-microbatch-worker",
daemon=True,
)
_text_batch_worker.start()
logger.info(
"Started local_st text micro-batch worker | window_ms=%.1f max_batch=%d",
_TEXT_MICROBATCH_WINDOW_SEC * 1000.0,
int(CONFIG.TEXT_BATCH_SIZE),
)
def _stop_text_batch_worker() -> None:
global _text_batch_worker_stop
with _text_single_queue_cv:
_text_batch_worker_stop = True
_text_single_queue_cv.notify_all()
def _text_batch_worker_loop() -> None:
max_batch = max(1, int(CONFIG.TEXT_BATCH_SIZE))
while True:
with _text_single_queue_cv:
while not _text_single_queue and not _text_batch_worker_stop:
_text_single_queue_cv.wait()
if _text_batch_worker_stop:
return
batch: List[_SingleTextTask] = [_text_single_queue.popleft()]
deadline = time.perf_counter() + _TEXT_MICROBATCH_WINDOW_SEC
while len(batch) < max_batch:
remaining = deadline - time.perf_counter()
if remaining <= 0:
break
if not _text_single_queue:
_text_single_queue_cv.wait(timeout=remaining)
continue
while _text_single_queue and len(batch) < max_batch:
batch.append(_text_single_queue.popleft())
try:
embs = _encode_local_st([task.text for task in batch], normalize_embeddings=False)
if embs is None or len(embs) != len(batch):
raise RuntimeError(
f"Text model response length mismatch in micro-batch: "
f"expected {len(batch)}, got {0 if embs is None else len(embs)}"
)
for task, emb in zip(batch, embs):
vec = _as_list(emb, normalize=task.normalize)
if vec is None:
raise RuntimeError("Text model returned empty embedding in micro-batch")
task.result = vec
except Exception as exc:
for task in batch:
task.error = exc
finally:
for task in batch:
task.done.set()
def _encode_single_text_with_microbatch(text: str, normalize: bool) -> List[float]:
task = _SingleTextTask(
text=text,
normalize=normalize,
created_at=time.perf_counter(),
done=threading.Event(),
)
with _text_single_queue_cv:
_text_single_queue.append(task)
_text_single_queue_cv.notify()
if not task.done.wait(timeout=_TEXT_REQUEST_TIMEOUT_SEC):
with _text_single_queue_cv:
try:
_text_single_queue.remove(task)
except ValueError:
pass
raise RuntimeError(
f"Timed out waiting for text micro-batch worker ({_TEXT_REQUEST_TIMEOUT_SEC:.1f}s)"
)
if task.error is not None:
raise task.error
if task.result is None:
raise RuntimeError("Text micro-batch worker returned empty result")
return task.result
|
0a3764c4
tangwang
优化embedding模型加载
|
165
166
167
|
@app.on_event("startup")
def load_models():
"""Load models at service startup to avoid first-request latency."""
|
07cf5a93
tangwang
START_EMBEDDING=...
|
168
|
global _text_model, _image_model, _text_backend_name
|
7bfb9946
tangwang
向量化模块
|
169
|
|
0a3764c4
tangwang
优化embedding模型加载
|
170
|
logger.info("Loading embedding models at startup...")
|
7bfb9946
tangwang
向量化模块
|
171
|
|
0a3764c4
tangwang
优化embedding模型加载
|
172
|
# Load text model
|
40f1e391
tangwang
cnclip
|
173
174
|
if open_text_model:
try:
|
07cf5a93
tangwang
START_EMBEDDING=...
|
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
|
backend_name, backend_cfg = get_embedding_backend_config()
_text_backend_name = backend_name
if backend_name == "tei":
from embeddings.tei_model import TEITextModel
base_url = (
os.getenv("TEI_BASE_URL")
or backend_cfg.get("base_url")
or CONFIG.TEI_BASE_URL
)
timeout_sec = int(
os.getenv("TEI_TIMEOUT_SEC")
or backend_cfg.get("timeout_sec")
or CONFIG.TEI_TIMEOUT_SEC
)
logger.info("Loading text backend: tei (base_url=%s)", base_url)
_text_model = TEITextModel(
base_url=str(base_url),
timeout_sec=timeout_sec,
)
elif backend_name == "local_st":
from embeddings.qwen3_model import Qwen3TextModel
|
950a640e
tangwang
embeddings
|
197
|
|
07cf5a93
tangwang
START_EMBEDDING=...
|
198
199
200
201
202
203
204
|
model_id = (
os.getenv("TEXT_MODEL_ID")
or backend_cfg.get("model_id")
or CONFIG.TEXT_MODEL_ID
)
logger.info("Loading text backend: local_st (model=%s)", model_id)
_text_model = Qwen3TextModel(model_id=str(model_id))
|
efd435cf
tangwang
tei性能调优:
|
205
|
_start_text_batch_worker()
|
07cf5a93
tangwang
START_EMBEDDING=...
|
206
207
208
209
210
211
|
else:
raise ValueError(
f"Unsupported embedding backend: {backend_name}. "
"Supported: tei, local_st"
)
logger.info("Text backend loaded successfully: %s", _text_backend_name)
|
40f1e391
tangwang
cnclip
|
212
213
214
215
|
except Exception as e:
logger.error(f"Failed to load text model: {e}", exc_info=True)
raise
|
0a3764c4
tangwang
优化embedding模型加载
|
216
|
|
c10f90fe
tangwang
cnclip
|
217
|
# Load image model: clip-as-service (recommended) or local CN-CLIP
|
40f1e391
tangwang
cnclip
|
218
219
|
if open_image_model:
try:
|
c10f90fe
tangwang
cnclip
|
220
|
if CONFIG.USE_CLIP_AS_SERVICE:
|
950a640e
tangwang
embeddings
|
221
222
|
from embeddings.clip_as_service_encoder import ClipAsServiceImageEncoder
|
c10f90fe
tangwang
cnclip
|
223
224
225
226
227
228
229
|
logger.info(f"Loading image encoder via clip-as-service: {CONFIG.CLIP_AS_SERVICE_SERVER}")
_image_model = ClipAsServiceImageEncoder(
server=CONFIG.CLIP_AS_SERVICE_SERVER,
batch_size=CONFIG.IMAGE_BATCH_SIZE,
)
logger.info("Image model (clip-as-service) loaded successfully")
else:
|
950a640e
tangwang
embeddings
|
230
231
|
from embeddings.clip_model import ClipImageModel
|
c10f90fe
tangwang
cnclip
|
232
233
234
235
236
237
|
logger.info(f"Loading local image model: {CONFIG.IMAGE_MODEL_NAME} (device: {CONFIG.IMAGE_DEVICE})")
_image_model = ClipImageModel(
model_name=CONFIG.IMAGE_MODEL_NAME,
device=CONFIG.IMAGE_DEVICE,
)
logger.info("Image model (local CN-CLIP) loaded successfully")
|
40f1e391
tangwang
cnclip
|
238
|
except Exception as e:
|
ed948666
tangwang
tidy
|
239
240
|
logger.error("Failed to load image model: %s", e, exc_info=True)
raise
|
0a3764c4
tangwang
优化embedding模型加载
|
241
242
|
logger.info("All embedding models loaded successfully, service ready")
|
7bfb9946
tangwang
向量化模块
|
243
244
|
|
efd435cf
tangwang
tei性能调优:
|
245
246
247
248
249
|
@app.on_event("shutdown")
def stop_workers() -> None:
_stop_text_batch_worker()
|
200fdddf
tangwang
embed norm
|
250
251
252
253
254
255
256
257
|
def _normalize_vector(vec: np.ndarray) -> np.ndarray:
norm = float(np.linalg.norm(vec))
if not np.isfinite(norm) or norm <= 0.0:
raise RuntimeError("Embedding vector has invalid norm (must be > 0)")
return vec / norm
def _as_list(embedding: Optional[np.ndarray], normalize: bool = False) -> Optional[List[float]]:
|
7bfb9946
tangwang
向量化模块
|
258
259
260
261
262
263
|
if embedding is None:
return None
if not isinstance(embedding, np.ndarray):
embedding = np.array(embedding, dtype=np.float32)
if embedding.ndim != 1:
embedding = embedding.reshape(-1)
|
200fdddf
tangwang
embed norm
|
264
265
266
267
|
embedding = embedding.astype(np.float32, copy=False)
if normalize:
embedding = _normalize_vector(embedding).astype(np.float32, copy=False)
return embedding.tolist()
|
7bfb9946
tangwang
向量化模块
|
268
269
270
271
|
@app.get("/health")
def health() -> Dict[str, Any]:
|
0a3764c4
tangwang
优化embedding模型加载
|
272
273
274
275
|
"""Health check endpoint. Returns status and model loading state."""
return {
"status": "ok",
"text_model_loaded": _text_model is not None,
|
07cf5a93
tangwang
START_EMBEDDING=...
|
276
|
"text_backend": _text_backend_name,
|
0a3764c4
tangwang
优化embedding模型加载
|
277
278
|
"image_model_loaded": _image_model is not None,
}
|
7bfb9946
tangwang
向量化模块
|
279
280
281
|
@app.post("/embed/text")
|
200fdddf
tangwang
embed norm
|
282
|
def embed_text(texts: List[str], normalize: Optional[bool] = None) -> List[Optional[List[float]]]:
|
0a3764c4
tangwang
优化embedding模型加载
|
283
284
|
if _text_model is None:
raise RuntimeError("Text model not loaded")
|
200fdddf
tangwang
embed norm
|
285
|
effective_normalize = bool(CONFIG.TEXT_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize)
|
ed948666
tangwang
tidy
|
286
|
normalized: List[str] = []
|
7bfb9946
tangwang
向量化模块
|
287
|
for i, t in enumerate(texts):
|
7bfb9946
tangwang
向量化模块
|
288
|
if not isinstance(t, str):
|
ed948666
tangwang
tidy
|
289
290
291
292
293
294
|
raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: must be string")
s = t.strip()
if not s:
raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: empty string")
normalized.append(s)
|
efd435cf
tangwang
tei性能调优:
|
295
|
t0 = time.perf_counter()
|
54ccf28c
tangwang
tei
|
296
|
try:
|
efd435cf
tangwang
tei性能调优:
|
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
|
# local_st backend uses in-process torch model, keep serialized encode for safety;
# TEI backend is an HTTP client and supports concurrent requests.
if _text_backend_name == "local_st":
if len(normalized) == 1 and _text_batch_worker is not None:
out = [_encode_single_text_with_microbatch(normalized[0], normalize=effective_normalize)]
elapsed_ms = (time.perf_counter() - t0) * 1000.0
logger.info(
"embed_text done | backend=%s mode=microbatch-single inputs=%d normalize=%s elapsed_ms=%.2f",
_text_backend_name,
len(normalized),
effective_normalize,
elapsed_ms,
)
return out
embs = _encode_local_st(normalized, normalize_embeddings=False)
else:
|
54ccf28c
tangwang
tei
|
313
314
315
316
|
embs = _text_model.encode_batch(
normalized,
batch_size=int(CONFIG.TEXT_BATCH_SIZE),
device=CONFIG.TEXT_DEVICE,
|
200fdddf
tangwang
embed norm
|
317
|
normalize_embeddings=effective_normalize,
|
54ccf28c
tangwang
tei
|
318
319
320
321
322
323
324
|
)
except Exception as e:
logger.error("Text embedding backend failure: %s", e, exc_info=True)
raise HTTPException(
status_code=502,
detail=f"Text embedding backend failure: {e}",
) from e
|
ed948666
tangwang
tidy
|
325
326
327
328
329
330
331
|
if embs is None or len(embs) != len(normalized):
raise RuntimeError(
f"Text model response length mismatch: expected {len(normalized)}, "
f"got {0 if embs is None else len(embs)}"
)
out: List[Optional[List[float]]] = []
for i, emb in enumerate(embs):
|
200fdddf
tangwang
embed norm
|
332
|
vec = _as_list(emb, normalize=effective_normalize)
|
ed948666
tangwang
tidy
|
333
334
335
|
if vec is None:
raise RuntimeError(f"Text model returned empty embedding for index {i}")
out.append(vec)
|
efd435cf
tangwang
tei性能调优:
|
336
337
338
339
340
341
342
343
|
elapsed_ms = (time.perf_counter() - t0) * 1000.0
logger.info(
"embed_text done | backend=%s inputs=%d normalize=%s elapsed_ms=%.2f",
_text_backend_name,
len(normalized),
effective_normalize,
elapsed_ms,
)
|
7bfb9946
tangwang
向量化模块
|
344
345
346
347
|
return out
@app.post("/embed/image")
|
200fdddf
tangwang
embed norm
|
348
|
def embed_image(images: List[str], normalize: Optional[bool] = None) -> List[Optional[List[float]]]:
|
0a3764c4
tangwang
优化embedding模型加载
|
349
|
if _image_model is None:
|
ed948666
tangwang
tidy
|
350
|
raise RuntimeError("Image model not loaded")
|
200fdddf
tangwang
embed norm
|
351
|
effective_normalize = bool(CONFIG.IMAGE_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize)
|
ed948666
tangwang
tidy
|
352
|
urls: List[str] = []
|
c10f90fe
tangwang
cnclip
|
353
|
for i, url_or_path in enumerate(images):
|
c10f90fe
tangwang
cnclip
|
354
|
if not isinstance(url_or_path, str):
|
ed948666
tangwang
tidy
|
355
356
357
358
359
|
raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: must be string URL/path")
s = url_or_path.strip()
if not s:
raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: empty URL/path")
urls.append(s)
|
c10f90fe
tangwang
cnclip
|
360
|
|
7bfb9946
tangwang
向量化模块
|
361
|
with _image_encode_lock:
|
200fdddf
tangwang
embed norm
|
362
363
364
365
366
|
vectors = _image_model.encode_image_urls(
urls,
batch_size=CONFIG.IMAGE_BATCH_SIZE,
normalize_embeddings=effective_normalize,
)
|
ed948666
tangwang
tidy
|
367
368
369
370
371
372
373
|
if vectors is None or len(vectors) != len(urls):
raise RuntimeError(
f"Image model response length mismatch: expected {len(urls)}, "
f"got {0 if vectors is None else len(vectors)}"
)
out: List[Optional[List[float]]] = []
for i, vec in enumerate(vectors):
|
200fdddf
tangwang
embed norm
|
374
|
out_vec = _as_list(vec, normalize=effective_normalize)
|
ed948666
tangwang
tidy
|
375
376
377
|
if out_vec is None:
raise RuntimeError(f"Image model returned empty embedding for index {i}")
out.append(out_vec)
|
7bfb9946
tangwang
向量化模块
|
378
|
return out
|