7bfb9946
tangwang
向量化模块
|
1
2
3
|
"""
Embedding service (FastAPI).
|
ed948666
tangwang
tidy
|
4
5
6
|
API (simple list-in, list-out; aligned by index):
- POST /embed/text body: ["text1", "text2", ...] -> [[...], ...]
- POST /embed/image body: ["url_or_path1", ...] -> [[...], ...]
|
7bfb9946
tangwang
向量化模块
|
7
8
|
"""
|
0a3764c4
tangwang
优化embedding模型加载
|
9
|
import logging
|
07cf5a93
tangwang
START_EMBEDDING=...
|
10
|
import os
|
7bfb9946
tangwang
向量化模块
|
11
|
import threading
|
efd435cf
tangwang
tei性能调优:
|
12
13
14
|
import time
from collections import deque
from dataclasses import dataclass
|
7bfb9946
tangwang
向量化模块
|
15
16
17
|
from typing import Any, Dict, List, Optional
import numpy as np
|
ed948666
tangwang
tidy
|
18
|
from fastapi import FastAPI, HTTPException
|
7bfb9946
tangwang
向量化模块
|
19
20
|
from embeddings.config import CONFIG
|
c10f90fe
tangwang
cnclip
|
21
|
from embeddings.protocols import ImageEncoderProtocol
|
07cf5a93
tangwang
START_EMBEDDING=...
|
22
|
from config.services_config import get_embedding_backend_config
|
7bfb9946
tangwang
向量化模块
|
23
|
|
0a3764c4
tangwang
优化embedding模型加载
|
24
|
logger = logging.getLogger(__name__)
|
7bfb9946
tangwang
向量化模块
|
25
|
|
a7920e17
tangwang
项目名称和部署路径修改
|
26
|
app = FastAPI(title="saas-search Embedding Service", version="1.0.0")
|
7bfb9946
tangwang
向量化模块
|
27
|
|
0a3764c4
tangwang
优化embedding模型加载
|
28
|
# Models are loaded at startup, not lazily
|
950a640e
tangwang
embeddings
|
29
|
_text_model: Optional[Any] = None
|
c10f90fe
tangwang
cnclip
|
30
|
_image_model: Optional[ImageEncoderProtocol] = None
|
07cf5a93
tangwang
START_EMBEDDING=...
|
31
|
_text_backend_name: str = ""
|
efd435cf
tangwang
tei性能调优:
|
32
33
|
open_text_model = os.getenv("EMBEDDING_ENABLE_TEXT_MODEL", "true").lower() in ("1", "true", "yes")
open_image_model = os.getenv("EMBEDDING_ENABLE_IMAGE_MODEL", "true").lower() in ("1", "true", "yes")
|
7bfb9946
tangwang
向量化模块
|
34
35
36
37
38
|
_text_encode_lock = threading.Lock()
_image_encode_lock = threading.Lock()
|
efd435cf
tangwang
tei性能调优:
|
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
@dataclass
class _SingleTextTask:
text: str
normalize: bool
created_at: float
done: threading.Event
result: Optional[List[float]] = None
error: Optional[Exception] = None
_text_single_queue: "deque[_SingleTextTask]" = deque()
_text_single_queue_cv = threading.Condition()
_text_batch_worker: Optional[threading.Thread] = None
_text_batch_worker_stop = False
_TEXT_MICROBATCH_WINDOW_SEC = max(
0.0, float(os.getenv("TEXT_MICROBATCH_WINDOW_MS", "4")) / 1000.0
)
_TEXT_REQUEST_TIMEOUT_SEC = max(
1.0, float(os.getenv("TEXT_REQUEST_TIMEOUT_SEC", "30"))
)
|
28e57bb1
tangwang
日志体系优化
|
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
_LOG_PREVIEW_COUNT = max(1, int(os.getenv("EMBEDDING_LOG_PREVIEW_COUNT", "3")))
_LOG_TEXT_PREVIEW_CHARS = max(32, int(os.getenv("EMBEDDING_LOG_TEXT_PREVIEW_CHARS", "120")))
_LOG_IMAGE_PREVIEW_CHARS = max(32, int(os.getenv("EMBEDDING_LOG_IMAGE_PREVIEW_CHARS", "180")))
def _compact_preview(text: str, max_chars: int) -> str:
compact = " ".join((text or "").split())
if len(compact) <= max_chars:
return compact
return compact[:max_chars] + "..."
def _preview_inputs(items: List[str], max_items: int, max_chars: int) -> List[Dict[str, Any]]:
previews: List[Dict[str, Any]] = []
for idx, item in enumerate(items[:max_items]):
previews.append(
{
"idx": idx,
"len": len(item),
"preview": _compact_preview(item, max_chars),
}
)
return previews
|
efd435cf
tangwang
tei性能调优:
|
82
83
84
85
|
def _encode_local_st(texts: List[str], normalize_embeddings: bool) -> Any:
with _text_encode_lock:
|
77516841
tangwang
tidy embeddings
|
86
|
return _text_model.encode(
|
efd435cf
tangwang
tei性能调优:
|
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
|
texts,
batch_size=int(CONFIG.TEXT_BATCH_SIZE),
device=CONFIG.TEXT_DEVICE,
normalize_embeddings=normalize_embeddings,
)
def _start_text_batch_worker() -> None:
global _text_batch_worker, _text_batch_worker_stop
if _text_batch_worker is not None and _text_batch_worker.is_alive():
return
_text_batch_worker_stop = False
_text_batch_worker = threading.Thread(
target=_text_batch_worker_loop,
name="embed-text-microbatch-worker",
daemon=True,
)
_text_batch_worker.start()
logger.info(
"Started local_st text micro-batch worker | window_ms=%.1f max_batch=%d",
_TEXT_MICROBATCH_WINDOW_SEC * 1000.0,
int(CONFIG.TEXT_BATCH_SIZE),
)
def _stop_text_batch_worker() -> None:
global _text_batch_worker_stop
with _text_single_queue_cv:
_text_batch_worker_stop = True
_text_single_queue_cv.notify_all()
def _text_batch_worker_loop() -> None:
max_batch = max(1, int(CONFIG.TEXT_BATCH_SIZE))
while True:
with _text_single_queue_cv:
while not _text_single_queue and not _text_batch_worker_stop:
_text_single_queue_cv.wait()
if _text_batch_worker_stop:
return
batch: List[_SingleTextTask] = [_text_single_queue.popleft()]
deadline = time.perf_counter() + _TEXT_MICROBATCH_WINDOW_SEC
while len(batch) < max_batch:
remaining = deadline - time.perf_counter()
if remaining <= 0:
break
if not _text_single_queue:
_text_single_queue_cv.wait(timeout=remaining)
continue
while _text_single_queue and len(batch) < max_batch:
batch.append(_text_single_queue.popleft())
try:
embs = _encode_local_st([task.text for task in batch], normalize_embeddings=False)
if embs is None or len(embs) != len(batch):
raise RuntimeError(
f"Text model response length mismatch in micro-batch: "
f"expected {len(batch)}, got {0 if embs is None else len(embs)}"
)
for task, emb in zip(batch, embs):
vec = _as_list(emb, normalize=task.normalize)
if vec is None:
raise RuntimeError("Text model returned empty embedding in micro-batch")
task.result = vec
except Exception as exc:
for task in batch:
task.error = exc
finally:
for task in batch:
task.done.set()
def _encode_single_text_with_microbatch(text: str, normalize: bool) -> List[float]:
task = _SingleTextTask(
text=text,
normalize=normalize,
created_at=time.perf_counter(),
done=threading.Event(),
)
with _text_single_queue_cv:
_text_single_queue.append(task)
_text_single_queue_cv.notify()
if not task.done.wait(timeout=_TEXT_REQUEST_TIMEOUT_SEC):
with _text_single_queue_cv:
try:
_text_single_queue.remove(task)
except ValueError:
pass
raise RuntimeError(
f"Timed out waiting for text micro-batch worker ({_TEXT_REQUEST_TIMEOUT_SEC:.1f}s)"
)
if task.error is not None:
raise task.error
if task.result is None:
raise RuntimeError("Text micro-batch worker returned empty result")
return task.result
|
0a3764c4
tangwang
优化embedding模型加载
|
188
189
190
|
@app.on_event("startup")
def load_models():
"""Load models at service startup to avoid first-request latency."""
|
07cf5a93
tangwang
START_EMBEDDING=...
|
191
|
global _text_model, _image_model, _text_backend_name
|
7bfb9946
tangwang
向量化模块
|
192
|
|
0a3764c4
tangwang
优化embedding模型加载
|
193
|
logger.info("Loading embedding models at startup...")
|
7bfb9946
tangwang
向量化模块
|
194
|
|
0a3764c4
tangwang
优化embedding模型加载
|
195
|
# Load text model
|
40f1e391
tangwang
cnclip
|
196
197
|
if open_text_model:
try:
|
07cf5a93
tangwang
START_EMBEDDING=...
|
198
199
200
|
backend_name, backend_cfg = get_embedding_backend_config()
_text_backend_name = backend_name
if backend_name == "tei":
|
77516841
tangwang
tidy embeddings
|
201
|
from embeddings.text_embedding_tei import TEITextModel
|
07cf5a93
tangwang
START_EMBEDDING=...
|
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
|
base_url = (
os.getenv("TEI_BASE_URL")
or backend_cfg.get("base_url")
or CONFIG.TEI_BASE_URL
)
timeout_sec = int(
os.getenv("TEI_TIMEOUT_SEC")
or backend_cfg.get("timeout_sec")
or CONFIG.TEI_TIMEOUT_SEC
)
logger.info("Loading text backend: tei (base_url=%s)", base_url)
_text_model = TEITextModel(
base_url=str(base_url),
timeout_sec=timeout_sec,
)
elif backend_name == "local_st":
|
77516841
tangwang
tidy embeddings
|
219
|
from embeddings.text_embedding_sentence_transformers import Qwen3TextModel
|
950a640e
tangwang
embeddings
|
220
|
|
07cf5a93
tangwang
START_EMBEDDING=...
|
221
222
223
224
225
226
227
|
model_id = (
os.getenv("TEXT_MODEL_ID")
or backend_cfg.get("model_id")
or CONFIG.TEXT_MODEL_ID
)
logger.info("Loading text backend: local_st (model=%s)", model_id)
_text_model = Qwen3TextModel(model_id=str(model_id))
|
efd435cf
tangwang
tei性能调优:
|
228
|
_start_text_batch_worker()
|
07cf5a93
tangwang
START_EMBEDDING=...
|
229
230
231
232
233
234
|
else:
raise ValueError(
f"Unsupported embedding backend: {backend_name}. "
"Supported: tei, local_st"
)
logger.info("Text backend loaded successfully: %s", _text_backend_name)
|
40f1e391
tangwang
cnclip
|
235
236
237
238
|
except Exception as e:
logger.error(f"Failed to load text model: {e}", exc_info=True)
raise
|
0a3764c4
tangwang
优化embedding模型加载
|
239
|
|
c10f90fe
tangwang
cnclip
|
240
|
# Load image model: clip-as-service (recommended) or local CN-CLIP
|
40f1e391
tangwang
cnclip
|
241
242
|
if open_image_model:
try:
|
c10f90fe
tangwang
cnclip
|
243
|
if CONFIG.USE_CLIP_AS_SERVICE:
|
950a640e
tangwang
embeddings
|
244
245
|
from embeddings.clip_as_service_encoder import ClipAsServiceImageEncoder
|
c10f90fe
tangwang
cnclip
|
246
247
248
249
250
251
252
|
logger.info(f"Loading image encoder via clip-as-service: {CONFIG.CLIP_AS_SERVICE_SERVER}")
_image_model = ClipAsServiceImageEncoder(
server=CONFIG.CLIP_AS_SERVICE_SERVER,
batch_size=CONFIG.IMAGE_BATCH_SIZE,
)
logger.info("Image model (clip-as-service) loaded successfully")
else:
|
950a640e
tangwang
embeddings
|
253
254
|
from embeddings.clip_model import ClipImageModel
|
c10f90fe
tangwang
cnclip
|
255
256
257
258
259
260
|
logger.info(f"Loading local image model: {CONFIG.IMAGE_MODEL_NAME} (device: {CONFIG.IMAGE_DEVICE})")
_image_model = ClipImageModel(
model_name=CONFIG.IMAGE_MODEL_NAME,
device=CONFIG.IMAGE_DEVICE,
)
logger.info("Image model (local CN-CLIP) loaded successfully")
|
40f1e391
tangwang
cnclip
|
261
|
except Exception as e:
|
ed948666
tangwang
tidy
|
262
263
|
logger.error("Failed to load image model: %s", e, exc_info=True)
raise
|
0a3764c4
tangwang
优化embedding模型加载
|
264
265
|
logger.info("All embedding models loaded successfully, service ready")
|
7bfb9946
tangwang
向量化模块
|
266
267
|
|
efd435cf
tangwang
tei性能调优:
|
268
269
270
271
272
|
@app.on_event("shutdown")
def stop_workers() -> None:
_stop_text_batch_worker()
|
200fdddf
tangwang
embed norm
|
273
274
275
276
277
278
279
280
|
def _normalize_vector(vec: np.ndarray) -> np.ndarray:
norm = float(np.linalg.norm(vec))
if not np.isfinite(norm) or norm <= 0.0:
raise RuntimeError("Embedding vector has invalid norm (must be > 0)")
return vec / norm
def _as_list(embedding: Optional[np.ndarray], normalize: bool = False) -> Optional[List[float]]:
|
7bfb9946
tangwang
向量化模块
|
281
282
283
284
285
286
|
if embedding is None:
return None
if not isinstance(embedding, np.ndarray):
embedding = np.array(embedding, dtype=np.float32)
if embedding.ndim != 1:
embedding = embedding.reshape(-1)
|
200fdddf
tangwang
embed norm
|
287
288
289
290
|
embedding = embedding.astype(np.float32, copy=False)
if normalize:
embedding = _normalize_vector(embedding).astype(np.float32, copy=False)
return embedding.tolist()
|
7bfb9946
tangwang
向量化模块
|
291
292
293
294
|
@app.get("/health")
def health() -> Dict[str, Any]:
|
0a3764c4
tangwang
优化embedding模型加载
|
295
296
297
298
|
"""Health check endpoint. Returns status and model loading state."""
return {
"status": "ok",
"text_model_loaded": _text_model is not None,
|
07cf5a93
tangwang
START_EMBEDDING=...
|
299
|
"text_backend": _text_backend_name,
|
0a3764c4
tangwang
优化embedding模型加载
|
300
301
|
"image_model_loaded": _image_model is not None,
}
|
7bfb9946
tangwang
向量化模块
|
302
303
304
|
@app.post("/embed/text")
|
200fdddf
tangwang
embed norm
|
305
|
def embed_text(texts: List[str], normalize: Optional[bool] = None) -> List[Optional[List[float]]]:
|
0a3764c4
tangwang
优化embedding模型加载
|
306
307
|
if _text_model is None:
raise RuntimeError("Text model not loaded")
|
200fdddf
tangwang
embed norm
|
308
|
effective_normalize = bool(CONFIG.TEXT_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize)
|
ed948666
tangwang
tidy
|
309
|
normalized: List[str] = []
|
7bfb9946
tangwang
向量化模块
|
310
|
for i, t in enumerate(texts):
|
7bfb9946
tangwang
向量化模块
|
311
|
if not isinstance(t, str):
|
ed948666
tangwang
tidy
|
312
313
314
315
316
317
|
raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: must be string")
s = t.strip()
if not s:
raise HTTPException(status_code=400, detail=f"Invalid text at index {i}: empty string")
normalized.append(s)
|
28e57bb1
tangwang
日志体系优化
|
318
319
320
321
322
323
324
325
|
logger.info(
"embed_text request | backend=%s inputs=%d normalize=%s preview=%s",
_text_backend_name,
len(normalized),
effective_normalize,
_preview_inputs(normalized, _LOG_PREVIEW_COUNT, _LOG_TEXT_PREVIEW_CHARS),
)
|
efd435cf
tangwang
tei性能调优:
|
326
|
t0 = time.perf_counter()
|
54ccf28c
tangwang
tei
|
327
|
try:
|
efd435cf
tangwang
tei性能调优:
|
328
329
330
331
332
333
334
|
# local_st backend uses in-process torch model, keep serialized encode for safety;
# TEI backend is an HTTP client and supports concurrent requests.
if _text_backend_name == "local_st":
if len(normalized) == 1 and _text_batch_worker is not None:
out = [_encode_single_text_with_microbatch(normalized[0], normalize=effective_normalize)]
elapsed_ms = (time.perf_counter() - t0) * 1000.0
logger.info(
|
28e57bb1
tangwang
日志体系优化
|
335
|
"embed_text done | backend=%s mode=microbatch-single inputs=%d normalize=%s dim=%d elapsed_ms=%.2f",
|
efd435cf
tangwang
tei性能调优:
|
336
337
338
|
_text_backend_name,
len(normalized),
effective_normalize,
|
28e57bb1
tangwang
日志体系优化
|
339
|
len(out[0]) if out and out[0] is not None else 0,
|
efd435cf
tangwang
tei性能调优:
|
340
341
342
343
344
|
elapsed_ms,
)
return out
embs = _encode_local_st(normalized, normalize_embeddings=False)
else:
|
77516841
tangwang
tidy embeddings
|
345
|
embs = _text_model.encode(
|
54ccf28c
tangwang
tei
|
346
347
348
|
normalized,
batch_size=int(CONFIG.TEXT_BATCH_SIZE),
device=CONFIG.TEXT_DEVICE,
|
200fdddf
tangwang
embed norm
|
349
|
normalize_embeddings=effective_normalize,
|
54ccf28c
tangwang
tei
|
350
351
352
353
354
355
356
|
)
except Exception as e:
logger.error("Text embedding backend failure: %s", e, exc_info=True)
raise HTTPException(
status_code=502,
detail=f"Text embedding backend failure: {e}",
) from e
|
ed948666
tangwang
tidy
|
357
358
359
360
361
362
363
|
if embs is None or len(embs) != len(normalized):
raise RuntimeError(
f"Text model response length mismatch: expected {len(normalized)}, "
f"got {0 if embs is None else len(embs)}"
)
out: List[Optional[List[float]]] = []
for i, emb in enumerate(embs):
|
200fdddf
tangwang
embed norm
|
364
|
vec = _as_list(emb, normalize=effective_normalize)
|
ed948666
tangwang
tidy
|
365
366
367
|
if vec is None:
raise RuntimeError(f"Text model returned empty embedding for index {i}")
out.append(vec)
|
efd435cf
tangwang
tei性能调优:
|
368
369
|
elapsed_ms = (time.perf_counter() - t0) * 1000.0
logger.info(
|
28e57bb1
tangwang
日志体系优化
|
370
|
"embed_text done | backend=%s inputs=%d normalize=%s dim=%d elapsed_ms=%.2f",
|
efd435cf
tangwang
tei性能调优:
|
371
372
373
|
_text_backend_name,
len(normalized),
effective_normalize,
|
28e57bb1
tangwang
日志体系优化
|
374
|
len(out[0]) if out and out[0] is not None else 0,
|
efd435cf
tangwang
tei性能调优:
|
375
376
|
elapsed_ms,
)
|
7bfb9946
tangwang
向量化模块
|
377
378
379
380
|
return out
@app.post("/embed/image")
|
200fdddf
tangwang
embed norm
|
381
|
def embed_image(images: List[str], normalize: Optional[bool] = None) -> List[Optional[List[float]]]:
|
0a3764c4
tangwang
优化embedding模型加载
|
382
|
if _image_model is None:
|
ed948666
tangwang
tidy
|
383
|
raise RuntimeError("Image model not loaded")
|
200fdddf
tangwang
embed norm
|
384
|
effective_normalize = bool(CONFIG.IMAGE_NORMALIZE_EMBEDDINGS) if normalize is None else bool(normalize)
|
ed948666
tangwang
tidy
|
385
|
urls: List[str] = []
|
c10f90fe
tangwang
cnclip
|
386
|
for i, url_or_path in enumerate(images):
|
c10f90fe
tangwang
cnclip
|
387
|
if not isinstance(url_or_path, str):
|
ed948666
tangwang
tidy
|
388
389
390
391
392
|
raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: must be string URL/path")
s = url_or_path.strip()
if not s:
raise HTTPException(status_code=400, detail=f"Invalid image at index {i}: empty URL/path")
urls.append(s)
|
c10f90fe
tangwang
cnclip
|
393
|
|
28e57bb1
tangwang
日志体系优化
|
394
395
396
397
398
399
400
401
|
logger.info(
"embed_image request | inputs=%d normalize=%s preview=%s",
len(urls),
effective_normalize,
_preview_inputs(urls, _LOG_PREVIEW_COUNT, _LOG_IMAGE_PREVIEW_CHARS),
)
t0 = time.perf_counter()
|
7bfb9946
tangwang
向量化模块
|
402
|
with _image_encode_lock:
|
200fdddf
tangwang
embed norm
|
403
404
405
406
407
|
vectors = _image_model.encode_image_urls(
urls,
batch_size=CONFIG.IMAGE_BATCH_SIZE,
normalize_embeddings=effective_normalize,
)
|
ed948666
tangwang
tidy
|
408
409
410
411
412
413
414
|
if vectors is None or len(vectors) != len(urls):
raise RuntimeError(
f"Image model response length mismatch: expected {len(urls)}, "
f"got {0 if vectors is None else len(vectors)}"
)
out: List[Optional[List[float]]] = []
for i, vec in enumerate(vectors):
|
200fdddf
tangwang
embed norm
|
415
|
out_vec = _as_list(vec, normalize=effective_normalize)
|
ed948666
tangwang
tidy
|
416
417
418
|
if out_vec is None:
raise RuntimeError(f"Image model returned empty embedding for index {i}")
out.append(out_vec)
|
28e57bb1
tangwang
日志体系优化
|
419
420
421
422
423
424
425
426
|
elapsed_ms = (time.perf_counter() - t0) * 1000.0
logger.info(
"embed_image done | inputs=%d normalize=%s dim=%d elapsed_ms=%.2f",
len(urls),
effective_normalize,
len(out[0]) if out and out[0] is not None else 0,
elapsed_ms,
)
|
7bfb9946
tangwang
向量化模块
|
427
|
return out
|