950a640e
tangwang
embeddings
|
1
|
"""Text embedding client for the local embedding HTTP service."""
|
be52af70
tangwang
first commit
|
2
|
|
950a640e
tangwang
embeddings
|
3
|
import logging
|
950a640e
tangwang
embeddings
|
4
5
|
from datetime import timedelta
from typing import Any, List, Optional, Union
|
be52af70
tangwang
first commit
|
6
|
|
be52af70
tangwang
first commit
|
7
|
import numpy as np
|
950a640e
tangwang
embeddings
|
8
|
import requests
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
9
10
|
logger = logging.getLogger(__name__)
|
be52af70
tangwang
first commit
|
11
|
|
86d8358b
tangwang
config optimize
|
12
|
from config.loader import get_app_config
|
7214c2e7
tangwang
mplemented**
|
13
14
|
from config.services_config import get_embedding_text_base_url
from embeddings.cache_keys import build_text_cache_key
|
4a37d233
tangwang
1. embedding cach...
|
15
|
from embeddings.redis_embedding_cache import RedisEmbeddingCache
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
16
|
from request_log_context import build_downstream_request_headers, build_request_log_extra
|
42e3aea6
tangwang
tidy
|
17
|
|
7214c2e7
tangwang
mplemented**
|
18
|
|
950a640e
tangwang
embeddings
|
19
|
class TextEmbeddingEncoder:
|
be52af70
tangwang
first commit
|
20
|
"""
|
950a640e
tangwang
embeddings
|
21
|
Text embedding encoder using network service.
|
be52af70
tangwang
first commit
|
22
|
"""
|
be52af70
tangwang
first commit
|
23
|
|
950a640e
tangwang
embeddings
|
24
|
def __init__(self, service_url: Optional[str] = None):
|
af03fdef
tangwang
embedding模块代码整理
|
25
|
resolved_url = service_url or get_embedding_text_base_url()
|
86d8358b
tangwang
config optimize
|
26
|
redis_config = get_app_config().infrastructure.redis
|
950a640e
tangwang
embeddings
|
27
28
|
self.service_url = str(resolved_url).rstrip("/")
self.endpoint = f"{self.service_url}/embed/text"
|
86d8358b
tangwang
config optimize
|
29
30
|
self.expire_time = timedelta(days=redis_config.cache_expire_days)
self.cache_prefix = str(redis_config.embedding_cache_prefix).strip() or "embedding"
|
950a640e
tangwang
embeddings
|
31
32
|
logger.info("Creating TextEmbeddingEncoder instance with service URL: %s", self.service_url)
|
4a37d233
tangwang
1. embedding cach...
|
33
34
35
36
37
|
self.cache = RedisEmbeddingCache(
key_prefix=self.cache_prefix,
namespace="",
expire_time=self.expire_time,
)
|
be52af70
tangwang
first commit
|
38
|
|
b754fd41
tangwang
图片向量化支持优先级参数
|
39
40
41
42
43
|
def _call_service(
self,
request_data: List[str],
normalize_embeddings: bool = True,
priority: int = 0,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
44
45
|
request_id: Optional[str] = None,
user_id: Optional[str] = None,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
46
|
) -> List[Any]:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
47
48
49
50
|
"""
Call the embedding service API.
Args:
|
7bfb9946
tangwang
向量化模块
|
51
|
request_data: List of texts
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
52
53
|
Returns:
|
7bfb9946
tangwang
向量化模块
|
54
|
List of embeddings (list[float]) or nulls (None), aligned to input order
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
55
|
"""
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
56
|
response = None
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
57
58
59
|
try:
response = requests.post(
self.endpoint,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
60
61
62
63
|
params={
"normalize": "true" if normalize_embeddings else "false",
"priority": max(0, int(priority)),
},
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
64
|
json=request_data,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
65
|
headers=build_downstream_request_headers(request_id=request_id, user_id=user_id),
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
66
67
68
69
70
|
timeout=60
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
body_preview = ""
if response is not None:
try:
body_preview = (response.text or "")[:300]
except Exception:
body_preview = ""
logger.error(
"TextEmbeddingEncoder service request failed | status=%s body=%s error=%s",
getattr(response, "status_code", "n/a"),
body_preview,
e,
exc_info=True,
extra=build_request_log_extra(request_id=request_id, user_id=user_id),
)
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
85
86
|
raise
|
be52af70
tangwang
first commit
|
87
88
89
90
|
def encode(
self,
sentences: Union[str, List[str]],
normalize_embeddings: bool = True,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
91
|
priority: int = 0,
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
92
|
device: str = 'cpu',
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
93
94
95
|
batch_size: int = 32,
request_id: Optional[str] = None,
user_id: Optional[str] = None,
|
be52af70
tangwang
first commit
|
96
97
|
) -> np.ndarray:
"""
|
453992a8
tangwang
需求:
|
98
|
Encode text into embeddings via network service with Redis caching.
|
be52af70
tangwang
first commit
|
99
100
101
|
Args:
sentences: Single string or list of strings to encode
|
200fdddf
tangwang
embed norm
|
102
|
normalize_embeddings: Whether to request normalized embeddings from service
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
103
104
|
device: Device parameter ignored for service compatibility
batch_size: Batch size for processing (used for service requests)
|
be52af70
tangwang
first commit
|
105
106
|
Returns:
|
ed948666
tangwang
tidy
|
107
108
|
numpy array of dtype=object,元素均为有效 np.ndarray 向量。
若任一输入无法生成向量,将直接抛出异常。
|
be52af70
tangwang
first commit
|
109
|
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
110
111
112
|
# Convert single string to list
if isinstance(sentences, str):
sentences = [sentences]
|
be52af70
tangwang
first commit
|
113
|
|
453992a8
tangwang
需求:
|
114
|
# Check cache first
|
b2e50710
tangwang
BgeEncoder.encode...
|
115
116
|
uncached_indices: List[int] = []
uncached_texts: List[str] = []
|
453992a8
tangwang
需求:
|
117
|
|
70a318c6
tangwang
fix bug
|
118
|
embeddings: List[Optional[np.ndarray]] = [None] * len(sentences)
|
70a318c6
tangwang
fix bug
|
119
|
for i, text in enumerate(sentences):
|
7214c2e7
tangwang
mplemented**
|
120
|
cached = self._get_cached_embedding(text, normalize_embeddings=normalize_embeddings)
|
70a318c6
tangwang
fix bug
|
121
122
123
124
125
126
127
|
if cached is not None:
embeddings[i] = cached
else:
uncached_indices.append(i)
uncached_texts.append(text)
# Prepare request data for uncached texts (after cache check)
|
7bfb9946
tangwang
向量化模块
|
128
|
request_data = list(uncached_texts)
|
453992a8
tangwang
需求:
|
129
130
131
|
# If there are uncached texts, call service
if uncached_texts:
|
b754fd41
tangwang
图片向量化支持优先级参数
|
132
133
134
135
|
response_data = self._call_service(
request_data,
normalize_embeddings=normalize_embeddings,
priority=priority,
|
4650fcec
tangwang
日志优化、日志串联(uid rqid)
|
136
137
|
request_id=request_id,
user_id=user_id,
|
b754fd41
tangwang
图片向量化支持优先级参数
|
138
|
)
|
453992a8
tangwang
需求:
|
139
|
|
ed948666
tangwang
tidy
|
140
141
142
143
144
145
146
|
# Process response
for i, text in enumerate(uncached_texts):
original_idx = uncached_indices[i]
if response_data and i < len(response_data):
embedding = response_data[i]
else:
embedding = None
|
7bfb9946
tangwang
向量化模块
|
147
|
|
ed948666
tangwang
tidy
|
148
149
150
151
|
if embedding is not None:
embedding_array = np.array(embedding, dtype=np.float32)
if self._is_valid_embedding(embedding_array):
embeddings[original_idx] = embedding_array
|
7214c2e7
tangwang
mplemented**
|
152
153
154
155
156
|
self._set_cached_embedding(
text,
embedding_array,
normalize_embeddings=normalize_embeddings,
)
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
157
|
else:
|
ed948666
tangwang
tidy
|
158
159
160
161
162
|
raise ValueError(
f"Invalid embedding returned from service for text index {original_idx}"
)
else:
raise ValueError(f"No embedding found for text index {original_idx}: {text[:50]}...")
|
453992a8
tangwang
需求:
|
163
|
|
77516841
tangwang
tidy embeddings
|
164
|
# 返回 numpy 数组(dtype=object),元素均为有效 np.ndarray 向量
|
b2e50710
tangwang
BgeEncoder.encode...
|
165
|
return np.array(embeddings, dtype=object)
|
3d588bef
tangwang
embeddings
|
166
|
|
b2e50710
tangwang
BgeEncoder.encode...
|
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
|
def _is_valid_embedding(self, embedding: np.ndarray) -> bool:
"""
Check if embedding is valid (not None, correct shape, no NaN/Inf).
Args:
embedding: Embedding array to validate
Returns:
True if valid, False otherwise
"""
if embedding is None:
return False
if not isinstance(embedding, np.ndarray):
return False
if embedding.size == 0:
return False
# Check for NaN or Inf values
if not np.isfinite(embedding).all():
return False
return True
|
200fdddf
tangwang
embed norm
|
188
189
|
def _get_cached_embedding(
self,
|
4a37d233
tangwang
1. embedding cach...
|
190
|
query: str,
|
7214c2e7
tangwang
mplemented**
|
191
192
|
*,
normalize_embeddings: bool,
|
200fdddf
tangwang
embed norm
|
193
|
) -> Optional[np.ndarray]:
|
4a37d233
tangwang
1. embedding cach...
|
194
|
"""Get embedding from cache if exists (with sliding expiration)."""
|
5bac9649
tangwang
文本 embedding 与图片 ...
|
195
196
|
cache_key = build_text_cache_key(query, normalize=normalize_embeddings)
embedding = self.cache.get(cache_key)
|
4a37d233
tangwang
1. embedding cach...
|
197
|
if embedding is not None:
|
7214c2e7
tangwang
mplemented**
|
198
|
logger.debug(
|
5bac9649
tangwang
文本 embedding 与图片 ...
|
199
|
"Cache hit for text embedding | normalize=%s query=%s key=%s",
|
7214c2e7
tangwang
mplemented**
|
200
201
|
normalize_embeddings,
query,
|
5bac9649
tangwang
文本 embedding 与图片 ...
|
202
|
cache_key,
|
7214c2e7
tangwang
mplemented**
|
203
|
)
|
4a37d233
tangwang
1. embedding cach...
|
204
|
return embedding
|
453992a8
tangwang
需求:
|
205
|
|
200fdddf
tangwang
embed norm
|
206
207
208
|
def _set_cached_embedding(
self,
query: str,
|
200fdddf
tangwang
embed norm
|
209
|
embedding: np.ndarray,
|
7214c2e7
tangwang
mplemented**
|
210
211
|
*,
normalize_embeddings: bool,
|
200fdddf
tangwang
embed norm
|
212
|
) -> bool:
|
4a37d233
tangwang
1. embedding cach...
|
213
|
"""Store embedding in cache."""
|
7214c2e7
tangwang
mplemented**
|
214
|
ok = self.cache.set(build_text_cache_key(query, normalize=normalize_embeddings), embedding)
|
4a37d233
tangwang
1. embedding cach...
|
215
|
if ok:
|
7214c2e7
tangwang
mplemented**
|
216
217
218
219
220
|
logger.debug(
"Successfully cached text embedding | normalize=%s query=%s",
normalize_embeddings,
query,
)
|
4a37d233
tangwang
1. embedding cach...
|
221
|
return ok
|