Blame view

embeddings/server.py 5.36 KB
7bfb9946   tangwang   向量化模块
1
2
3
4
5
6
7
8
  """
  Embedding service (FastAPI).
  
  API (simple list-in, list-out; aligned by index; failures -> null):
  - POST /embed/text   body: ["text1", "text2", ...] -> [[...], null, ...]
  - POST /embed/image  body: ["url_or_path1", ...]  -> [[...], null, ...]
  """
  
0a3764c4   tangwang   优化embedding模型加载
9
  import logging
7bfb9946   tangwang   向量化模块
10
11
12
13
14
15
16
17
18
  import threading
  from typing import Any, Dict, List, Optional
  
  import numpy as np
  from fastapi import FastAPI
  
  from embeddings.config import CONFIG
  from embeddings.bge_model import BgeTextModel
  from embeddings.clip_model import ClipImageModel
c10f90fe   tangwang   cnclip
19
20
  from embeddings.clip_as_service_encoder import ClipAsServiceImageEncoder
  from embeddings.protocols import ImageEncoderProtocol
7bfb9946   tangwang   向量化模块
21
  
0a3764c4   tangwang   优化embedding模型加载
22
  logger = logging.getLogger(__name__)
7bfb9946   tangwang   向量化模块
23
  
a7920e17   tangwang   项目名称和部署路径修改
24
  app = FastAPI(title="saas-search Embedding Service", version="1.0.0")
7bfb9946   tangwang   向量化模块
25
  
0a3764c4   tangwang   优化embedding模型加载
26
27
  # Models are loaded at startup, not lazily
  _text_model: Optional[BgeTextModel] = None
c10f90fe   tangwang   cnclip
28
  _image_model: Optional[ImageEncoderProtocol] = None
40f1e391   tangwang   cnclip
29
  open_text_model = True
c10f90fe   tangwang   cnclip
30
  open_image_model = True  # Enable image embedding when using clip-as-service
7bfb9946   tangwang   向量化模块
31
32
33
34
35
  
  _text_encode_lock = threading.Lock()
  _image_encode_lock = threading.Lock()
  
  
0a3764c4   tangwang   优化embedding模型加载
36
37
38
39
  @app.on_event("startup")
  def load_models():
      """Load models at service startup to avoid first-request latency."""
      global _text_model, _image_model
7bfb9946   tangwang   向量化模块
40
  
0a3764c4   tangwang   优化embedding模型加载
41
      logger.info("Loading embedding models at startup...")
7bfb9946   tangwang   向量化模块
42
  
0a3764c4   tangwang   优化embedding模型加载
43
      # Load text model
40f1e391   tangwang   cnclip
44
45
46
47
48
49
50
51
52
      if open_text_model:
          try:
              logger.info(f"Loading text model: {CONFIG.TEXT_MODEL_DIR}")
              _text_model = BgeTextModel(model_dir=CONFIG.TEXT_MODEL_DIR)
              logger.info("Text model loaded successfully")
          except Exception as e:
              logger.error(f"Failed to load text model: {e}", exc_info=True)
              raise
      
0a3764c4   tangwang   优化embedding模型加载
53
  
c10f90fe   tangwang   cnclip
54
      # Load image model: clip-as-service (recommended) or local CN-CLIP
40f1e391   tangwang   cnclip
55
56
      if open_image_model:
          try:
c10f90fe   tangwang   cnclip
57
58
59
60
61
62
63
64
65
66
67
68
69
70
              if CONFIG.USE_CLIP_AS_SERVICE:
                  logger.info(f"Loading image encoder via clip-as-service: {CONFIG.CLIP_AS_SERVICE_SERVER}")
                  _image_model = ClipAsServiceImageEncoder(
                      server=CONFIG.CLIP_AS_SERVICE_SERVER,
                      batch_size=CONFIG.IMAGE_BATCH_SIZE,
                  )
                  logger.info("Image model (clip-as-service) loaded successfully")
              else:
                  logger.info(f"Loading local image model: {CONFIG.IMAGE_MODEL_NAME} (device: {CONFIG.IMAGE_DEVICE})")
                  _image_model = ClipImageModel(
                      model_name=CONFIG.IMAGE_MODEL_NAME,
                      device=CONFIG.IMAGE_DEVICE,
                  )
                  logger.info("Image model (local CN-CLIP) loaded successfully")
40f1e391   tangwang   cnclip
71
72
73
          except Exception as e:
              logger.error(f"Failed to load image model: {e}", exc_info=True)
              raise
0a3764c4   tangwang   优化embedding模型加载
74
75
  
      logger.info("All embedding models loaded successfully, service ready")
7bfb9946   tangwang   向量化模块
76
77
78
79
80
81
82
83
84
85
86
87
88
89
  
  
  def _as_list(embedding: Optional[np.ndarray]) -> Optional[List[float]]:
      if embedding is None:
          return None
      if not isinstance(embedding, np.ndarray):
          embedding = np.array(embedding, dtype=np.float32)
      if embedding.ndim != 1:
          embedding = embedding.reshape(-1)
      return embedding.astype(np.float32).tolist()
  
  
  @app.get("/health")
  def health() -> Dict[str, Any]:
0a3764c4   tangwang   优化embedding模型加载
90
91
92
93
94
95
      """Health check endpoint. Returns status and model loading state."""
      return {
          "status": "ok",
          "text_model_loaded": _text_model is not None,
          "image_model_loaded": _image_model is not None,
      }
7bfb9946   tangwang   向量化模块
96
97
98
99
  
  
  @app.post("/embed/text")
  def embed_text(texts: List[str]) -> List[Optional[List[float]]]:
0a3764c4   tangwang   优化embedding模型加载
100
101
      if _text_model is None:
          raise RuntimeError("Text model not loaded")
7bfb9946   tangwang   向量化模块
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
      out: List[Optional[List[float]]] = [None] * len(texts)
  
      indexed_texts: List[tuple] = []
      for i, t in enumerate(texts):
          if t is None:
              continue
          if not isinstance(t, str):
              t = str(t)
          t = t.strip()
          if not t:
              continue
          indexed_texts.append((i, t))
  
      if not indexed_texts:
          return out
  
      batch_texts = [t for _, t in indexed_texts]
      try:
          with _text_encode_lock:
0a3764c4   tangwang   优化embedding模型加载
121
              embs = _text_model.encode_batch(
7bfb9946   tangwang   向量化模块
122
123
124
125
126
127
128
129
130
131
132
133
                  batch_texts, batch_size=int(CONFIG.TEXT_BATCH_SIZE), device=CONFIG.TEXT_DEVICE
              )
          for j, (idx, _t) in enumerate(indexed_texts):
              out[idx] = _as_list(embs[j])
      except Exception:
          # keep Nones
          pass
      return out
  
  
  @app.post("/embed/image")
  def embed_image(images: List[str]) -> List[Optional[List[float]]]:
0a3764c4   tangwang   优化embedding模型加载
134
135
      if _image_model is None:
          raise RuntimeError("Image model not loaded")
7bfb9946   tangwang   向量化模块
136
137
      out: List[Optional[List[float]]] = [None] * len(images)
  
c10f90fe   tangwang   cnclip
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
      # Normalize inputs
      urls = []
      indices = []
      for i, url_or_path in enumerate(images):
          if url_or_path is None:
              continue
          if not isinstance(url_or_path, str):
              url_or_path = str(url_or_path)
          url_or_path = url_or_path.strip()
          if url_or_path:
              urls.append(url_or_path)
              indices.append(i)
  
      if not urls:
          return out
  
7bfb9946   tangwang   向量化模块
154
      with _image_encode_lock:
c10f90fe   tangwang   cnclip
155
156
157
158
159
160
161
162
          try:
              # Both ClipAsServiceImageEncoder and ClipImageModel implement encode_image_urls(urls, batch_size)
              vectors = _image_model.encode_image_urls(urls, batch_size=CONFIG.IMAGE_BATCH_SIZE)
              for j, idx in enumerate(indices):
                  out[idx] = _as_list(vectors[j] if j < len(vectors) else None)
          except Exception:
              for idx in indices:
                  out[idx] = None
7bfb9946   tangwang   向量化模块
163
      return out