Commit 453992a87dd623affa8475b9d7fdbc7abaadf431

Authored by tangwang
1 parent b735cced

需求:

索引的两项功能:
1. 多语言。 店铺配置的语言如果不等于zh,那么要调用翻译 获得中文翻译结果,同时 如果不等于en,要翻译en的结果。
要缓存到redis。 先查询缓存,没命中缓存再调用翻译,然后存入redis缓存起来。
这些逻辑应该是 @query/translator.py 内部的,不需要调用的地方关心。但是现在是  DictCache,直接改掉,改为redis的缓存

2. 填充 标题的向量化字段。如果该店铺的标题向量化打开,那么应该请求向量化模型根据英文的title得到embedding。使用 BgeEncoder.

以上两个模块的缓存,过期时间都是 最近多长时间内没有访问过。

feat:
1. 更新 REDIS_CONFIG 配置
在 config/env_config.py 中添加了用户提供的配置项(snapshot_db, translation_cache_expire_days, translation_cache_prefix 等)
2. 修改 query/translator.py
将 DictCache 改为 Redis 缓存
实现了 translate_for_indexing 方法,自动处理多语言翻译:
如果店铺语言不等于 zh,自动翻译成 zh
如果店铺语言不等于 en,自动翻译成 en
翻译逻辑封装在 translator.py 内部,调用方无需关心
3. 修改 embeddings/text_encoder.py
在 BgeEncoder 中添加了 Redis 缓存
实现了滑动过期策略(每次访问时重置过期时间)
缓存逻辑参考了提供的 CacheManager 对象
4. 修改 indexer/document_transformer.py
添加了 encoder 和 enable_title_embedding 参数
实现了 _fill_title_embedding 方法,使用英文标题(title_en)生成 embedding
更新了 _fill_text_fields 方法,使用新的 translate_for_indexing 方法
5. 更新 indexer/indexing_utils.py
更新了 create_document_transformer 函数,支持新的 encoder 和 enable_title_embedding 参数
如果启用标题向量化且未提供 encoder,会自动初始化 BgeEncoder
config/env_config.py
... ... @@ -26,7 +26,14 @@ ES_CONFIG = {
26 26 REDIS_CONFIG = {
27 27 'host': os.getenv('REDIS_HOST', 'localhost'),
28 28 'port': int(os.getenv('REDIS_PORT', 6479)),
29   - 'password': os.getenv('REDIS_PASSWORD'),
  29 + 'snapshot_db': int(os.getenv('REDIS_SNAPSHOT_DB', 0)),
  30 + 'password': os.getenv('REDIS_PASSWORD', 'BMfv5aI31kgHWtlx'),
  31 + 'socket_timeout': int(os.getenv('REDIS_SOCKET_TIMEOUT', 1)),
  32 + 'socket_connect_timeout': int(os.getenv('REDIS_SOCKET_CONNECT_TIMEOUT', 1)),
  33 + 'retry_on_timeout': os.getenv('REDIS_RETRY_ON_TIMEOUT', 'False').lower() == 'true',
  34 + 'cache_expire_days': int(os.getenv('REDIS_CACHE_EXPIRE_DAYS', 180)), # 6 months
  35 + 'translation_cache_expire_days': int(os.getenv('REDIS_TRANSLATION_CACHE_EXPIRE_DAYS', 360)),
  36 + 'translation_cache_prefix': os.getenv('REDIS_TRANSLATION_CACHE_PREFIX', 'trans'),
30 37 }
31 38  
32 39 # DeepL API Key
... ...
embeddings/text_encoder.py
... ... @@ -9,11 +9,20 @@ import requests
9 9 import time
10 10 import threading
11 11 import numpy as np
  12 +import pickle
  13 +import redis
  14 +from datetime import timedelta
  15 +from typing import List, Union, Dict, Any, Optional
12 16 import logging
13   -from typing import List, Union, Dict, Any
14 17  
15 18 logger = logging.getLogger(__name__)
16 19  
  20 +# Try to import REDIS_CONFIG, but allow import to fail
  21 +try:
  22 + from config.env_config import REDIS_CONFIG
  23 +except ImportError:
  24 + REDIS_CONFIG = {}
  25 +
17 26  
18 27 class BgeEncoder:
19 28 """
... ... @@ -31,6 +40,26 @@ class BgeEncoder:
31 40 logger.info(f"Creating BgeEncoder instance with service URL: {service_url}")
32 41 cls._instance.service_url = service_url
33 42 cls._instance.endpoint = f"{service_url}/embedding/generate_embeddings"
  43 +
  44 + # Initialize Redis cache
  45 + try:
  46 + cls._instance.redis_client = redis.Redis(
  47 + host=REDIS_CONFIG.get('host', 'localhost'),
  48 + port=REDIS_CONFIG.get('port', 6479),
  49 + password=REDIS_CONFIG.get('password'),
  50 + decode_responses=False, # Keep binary data as is
  51 + socket_timeout=REDIS_CONFIG.get('socket_timeout', 1),
  52 + socket_connect_timeout=REDIS_CONFIG.get('socket_connect_timeout', 1),
  53 + retry_on_timeout=REDIS_CONFIG.get('retry_on_timeout', False),
  54 + health_check_interval=10 # 避免复用坏连接
  55 + )
  56 + # Test connection
  57 + cls._instance.redis_client.ping()
  58 + cls._instance.expire_time = timedelta(days=REDIS_CONFIG.get('cache_expire_days', 180))
  59 + logger.info("Redis cache initialized for embeddings")
  60 + except Exception as e:
  61 + logger.warning(f"Failed to initialize Redis cache for embeddings: {e}, continuing without cache")
  62 + cls._instance.redis_client = None
34 63 return cls._instance
35 64  
36 65 def _call_service(self, request_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
... ... @@ -63,7 +92,7 @@ class BgeEncoder:
63 92 batch_size: int = 32
64 93 ) -> np.ndarray:
65 94 """
66   - Encode text into embeddings via network service.
  95 + Encode text into embeddings via network service with Redis caching.
67 96  
68 97 Args:
69 98 sentences: Single string or list of strings to encode
... ... @@ -78,11 +107,24 @@ class BgeEncoder:
78 107 if isinstance(sentences, str):
79 108 sentences = [sentences]
80 109  
81   - # Prepare request data
82   - request_data = []
  110 + # Check cache first
  111 + cached_embeddings = []
  112 + uncached_indices = []
  113 + uncached_texts = []
  114 +
83 115 for i, text in enumerate(sentences):
  116 + cached = self._get_cached_embedding(text, 'en') # Use 'en' as default language for title embedding
  117 + if cached is not None:
  118 + cached_embeddings.append((i, cached))
  119 + else:
  120 + uncached_indices.append(i)
  121 + uncached_texts.append(text)
  122 +
  123 + # Prepare request data for uncached texts
  124 + request_data = []
  125 + for i, text in enumerate(uncached_texts):
84 126 request_item = {
85   - "id": str(i),
  127 + "id": str(uncached_indices[i]),
86 128 "name_zh": text
87 129 }
88 130  
... ... @@ -93,43 +135,58 @@ class BgeEncoder:
93 135  
94 136 request_data.append(request_item)
95 137  
96   - try:
97   - # Call service
98   - response_data = self._call_service(request_data)
99   -
100   - # Process response
101   - embeddings = []
102   - for i, text in enumerate(sentences):
103   - # Find corresponding response by ID
104   - response_item = None
105   - for item in response_data:
106   - if str(item.get("id")) == str(i):
107   - response_item = item
108   - break
109   -
110   - if response_item:
111   - # Try Chinese embedding first, then English, then Russian
112   - embedding = None
113   - for lang in ["embedding_zh", "embedding_en", "embedding_ru"]:
114   - if lang in response_item and response_item[lang] is not None:
115   - embedding = response_item[lang]
  138 + # Process response
  139 + embeddings = [None] * len(sentences)
  140 +
  141 + # Fill in cached embeddings
  142 + for idx, cached_emb in cached_embeddings:
  143 + embeddings[idx] = cached_emb
  144 +
  145 + # If there are uncached texts, call service
  146 + if uncached_texts:
  147 + try:
  148 + # Call service
  149 + response_data = self._call_service(request_data)
  150 +
  151 + # Process response
  152 + for i, text in enumerate(uncached_texts):
  153 + original_idx = uncached_indices[i]
  154 + # Find corresponding response by ID
  155 + response_item = None
  156 + for item in response_data:
  157 + if str(item.get("id")) == str(original_idx):
  158 + response_item = item
116 159 break
117 160  
118   - if embedding is not None:
119   - embeddings.append(embedding)
  161 + if response_item:
  162 + # Try Chinese embedding first, then English, then Russian
  163 + embedding = None
  164 + for lang in ["embedding_zh", "embedding_en", "embedding_ru"]:
  165 + if lang in response_item and response_item[lang] is not None:
  166 + embedding = response_item[lang]
  167 + break
  168 +
  169 + if embedding is not None:
  170 + embedding_array = np.array(embedding, dtype=np.float32)
  171 + embeddings[original_idx] = embedding_array
  172 + # Cache the embedding
  173 + self._set_cached_embedding(text, 'en', embedding_array)
  174 + else:
  175 + logger.warning(f"No embedding found for text {original_idx}: {text[:50]}...")
  176 + embeddings[original_idx] = np.zeros(1024, dtype=np.float32)
120 177 else:
121   - logger.warning(f"No embedding found for text {i}: {text[:50]}...")
122   - embeddings.append([0.0] * 1024)
123   - else:
124   - logger.warning(f"No response found for text {i}")
125   - embeddings.append([0.0] * 1024)
126   -
127   - return np.array(embeddings, dtype=np.float32)
128   -
129   - except Exception as e:
130   - logger.error(f"Failed to encode texts: {e}", exc_info=True)
131   - # Return zero embeddings as fallback
132   - return np.zeros((len(sentences), 1024), dtype=np.float32)
  178 + logger.warning(f"No response found for text {original_idx}")
  179 + embeddings[original_idx] = np.zeros(1024, dtype=np.float32)
  180 +
  181 + except Exception as e:
  182 + logger.error(f"Failed to encode texts: {e}", exc_info=True)
  183 + # Fill missing embeddings with zeros
  184 + for idx in uncached_indices:
  185 + if embeddings[idx] is None:
  186 + embeddings[idx] = np.zeros(1024, dtype=np.float32)
  187 +
  188 + # Convert to numpy array
  189 + return np.array(embeddings, dtype=np.float32)
133 190  
134 191 def encode_batch(
135 192 self,
... ... @@ -149,3 +206,48 @@ class BgeEncoder:
149 206 numpy array of embeddings
150 207 """
151 208 return self.encode(texts, batch_size=batch_size, device=device)
  209 +
  210 + def _get_cache_key(self, query: str, language: str) -> str:
  211 + """Generate a cache key for the query"""
  212 + return f"embedding:{language}:{query}"
  213 +
  214 + def _get_cached_embedding(self, query: str, language: str) -> Optional[np.ndarray]:
  215 + """Get embedding from cache if exists (with sliding expiration)"""
  216 + if not self.redis_client:
  217 + return None
  218 +
  219 + try:
  220 + cache_key = self._get_cache_key(query, language)
  221 + cached_data = self.redis_client.get(cache_key)
  222 + if cached_data:
  223 + logger.debug(f"Cache hit for embedding: {query}")
  224 + # Update expiration time on access (sliding expiration)
  225 + self.redis_client.expire(cache_key, self.expire_time)
  226 + return pickle.loads(cached_data)
  227 + return None
  228 + except Exception as e:
  229 + logger.error(f"Error retrieving embedding from cache: {e}")
  230 + return None
  231 +
  232 + def _set_cached_embedding(self, query: str, language: str, embedding: np.ndarray) -> bool:
  233 + """Store embedding in cache"""
  234 + if not self.redis_client:
  235 + return False
  236 +
  237 + try:
  238 + cache_key = self._get_cache_key(query, language)
  239 + serialized_data = pickle.dumps(embedding)
  240 + self.redis_client.setex(
  241 + cache_key,
  242 + self.expire_time,
  243 + serialized_data
  244 + )
  245 + logger.debug(f"Successfully cached embedding for query: {query}")
  246 + return True
  247 + except (redis.exceptions.BusyLoadingError, redis.exceptions.ConnectionError,
  248 + redis.exceptions.TimeoutError, redis.exceptions.RedisError) as e:
  249 + logger.warning(f"Redis error storing embedding in cache: {e}")
  250 + return False
  251 + except Exception as e:
  252 + logger.error(f"Error storing embedding in cache: {e}")
  253 + return False
... ...
indexer/document_transformer.py
... ... @@ -29,7 +29,9 @@ class SPUDocumentTransformer:
29 29 searchable_option_dimensions: List[str],
30 30 tenant_config: Optional[Dict[str, Any]] = None,
31 31 translator: Optional[Any] = None,
32   - translation_prompts: Optional[Dict[str, str]] = None
  32 + translation_prompts: Optional[Dict[str, str]] = None,
  33 + encoder: Optional[Any] = None,
  34 + enable_title_embedding: bool = True
33 35 ):
34 36 """
35 37 初始化文档转换器。
... ... @@ -40,12 +42,16 @@ class SPUDocumentTransformer:
40 42 tenant_config: 租户配置(包含主语言和翻译配置)
41 43 translator: 翻译器实例(可选,如果提供则启用翻译功能)
42 44 translation_prompts: 翻译提示词配置(可选)
  45 + encoder: 文本编码器实例(可选,用于生成title_embedding)
  46 + enable_title_embedding: 是否启用标题向量化(默认True)
43 47 """
44 48 self.category_id_to_name = category_id_to_name
45 49 self.searchable_option_dimensions = searchable_option_dimensions
46 50 self.tenant_config = tenant_config or {}
47 51 self.translator = translator
48 52 self.translation_prompts = translation_prompts or {}
  53 + self.encoder = encoder
  54 + self.enable_title_embedding = enable_title_embedding
49 55  
50 56 def transform_spu_to_doc(
51 57 self,
... ... @@ -81,11 +87,13 @@ class SPUDocumentTransformer:
81 87  
82 88 # 获取租户配置
83 89 primary_lang = self.tenant_config.get('primary_language', 'zh')
84   - translate_to_en = self.tenant_config.get('translate_to_en', True)
85   - translate_to_zh = self.tenant_config.get('translate_to_zh', False)
86 90  
87   - # 文本字段处理(根据主语言和翻译配置)
88   - self._fill_text_fields(doc, spu_row, primary_lang, translate_to_en, translate_to_zh)
  91 + # 文本字段处理(使用translator的内部逻辑自动处理多语言翻译)
  92 + self._fill_text_fields(doc, spu_row, primary_lang)
  93 +
  94 + # 标题向量化处理(如果启用)
  95 + if self.enable_title_embedding and self.encoder:
  96 + self._fill_title_embedding(doc)
89 97  
90 98 # Tags
91 99 if pd.notna(spu_row.get('tags')):
... ... @@ -160,114 +168,119 @@ class SPUDocumentTransformer:
160 168 self,
161 169 doc: Dict[str, Any],
162 170 spu_row: pd.Series,
163   - primary_lang: str,
164   - translate_to_en: bool,
165   - translate_to_zh: bool
  171 + primary_lang: str
166 172 ):
167   - """填充文本字段(根据主语言和翻译配置)。"""
168   - # 主语言字段
169   - primary_suffix = '_zh' if primary_lang == 'zh' else '_en'
170   - secondary_suffix = '_en' if primary_lang == 'zh' else '_zh'
171   -
  173 + """
  174 + 填充文本字段(根据主语言自动处理多语言翻译)。
  175 +
  176 + 翻译逻辑在translator内部处理:
  177 + - 如果店铺语言不等于zh,自动翻译成zh
  178 + - 如果店铺语言不等于en,自动翻译成en
  179 + """
172 180 # Title
173 181 if pd.notna(spu_row.get('title')):
174 182 title_text = str(spu_row['title'])
175   - doc[f'title{primary_suffix}'] = title_text
176   - # 如果需要翻译,调用翻译服务(同步模式)
177   - if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh):
178   - if self.translator:
179   - target_lang = 'en' if primary_lang == 'zh' else 'zh'
180   - # 根据目标语言选择对应的提示词
181   - if target_lang == 'zh':
182   - prompt = self.translation_prompts.get('product_title_zh') or self.translation_prompts.get('default_zh')
183   - else:
184   - prompt = self.translation_prompts.get('product_title_en') or self.translation_prompts.get('default_en')
185   - translated = self.translator.translate(
186   - title_text,
187   - target_lang=target_lang,
188   - source_lang=primary_lang,
189   - prompt=prompt
190   - )
191   - doc[f'title{secondary_suffix}'] = translated if translated else None
192   - else:
193   - doc[f'title{secondary_suffix}'] = None # 无翻译器,设为None
  183 +
  184 + # 使用translator的translate_for_indexing方法,自动处理多语言翻译
  185 + if self.translator:
  186 + # 根据目标语言选择对应的提示词
  187 + prompt_zh = self.translation_prompts.get('product_title_zh') or self.translation_prompts.get('default_zh')
  188 + prompt_en = self.translation_prompts.get('product_title_en') or self.translation_prompts.get('default_en')
  189 +
  190 + # 调用translate_for_indexing,自动处理翻译逻辑
  191 + translations = self.translator.translate_for_indexing(
  192 + title_text,
  193 + shop_language=primary_lang,
  194 + source_lang=primary_lang,
  195 + prompt=prompt_zh if primary_lang == 'zh' else prompt_en
  196 + )
  197 +
  198 + # 填充翻译结果
  199 + doc['title_zh'] = translations.get('zh') or (title_text if primary_lang == 'zh' else None)
  200 + doc['title_en'] = translations.get('en') or (title_text if primary_lang == 'en' else None)
194 201 else:
195   - doc[f'title{secondary_suffix}'] = None
  202 + # 无翻译器,只填充主语言字段
  203 + if primary_lang == 'zh':
  204 + doc['title_zh'] = title_text
  205 + doc['title_en'] = None
  206 + else:
  207 + doc['title_zh'] = None
  208 + doc['title_en'] = title_text
196 209 else:
197   - doc[f'title{primary_suffix}'] = None
198   - doc[f'title{secondary_suffix}'] = None
  210 + doc['title_zh'] = None
  211 + doc['title_en'] = None
199 212  
200 213 # Brief
201 214 if pd.notna(spu_row.get('brief')):
202 215 brief_text = str(spu_row['brief'])
203   - doc[f'brief{primary_suffix}'] = brief_text
204   - if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh):
205   - if self.translator:
206   - target_lang = 'en' if primary_lang == 'zh' else 'zh'
207   - # 根据目标语言选择对应的提示词
208   - prompt = self.translation_prompts.get(f'default_{target_lang}') or self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en')
209   - translated = self.translator.translate(
210   - brief_text,
211   - target_lang=target_lang,
212   - source_lang=primary_lang,
213   - prompt=prompt
214   - )
215   - doc[f'brief{secondary_suffix}'] = translated if translated else None
216   - else:
217   - doc[f'brief{secondary_suffix}'] = None
  216 + if self.translator:
  217 + prompt = self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en')
  218 + translations = self.translator.translate_for_indexing(
  219 + brief_text,
  220 + shop_language=primary_lang,
  221 + source_lang=primary_lang,
  222 + prompt=prompt
  223 + )
  224 + doc['brief_zh'] = translations.get('zh') or (brief_text if primary_lang == 'zh' else None)
  225 + doc['brief_en'] = translations.get('en') or (brief_text if primary_lang == 'en' else None)
218 226 else:
219   - doc[f'brief{secondary_suffix}'] = None
  227 + if primary_lang == 'zh':
  228 + doc['brief_zh'] = brief_text
  229 + doc['brief_en'] = None
  230 + else:
  231 + doc['brief_zh'] = None
  232 + doc['brief_en'] = brief_text
220 233 else:
221   - doc[f'brief{primary_suffix}'] = None
222   - doc[f'brief{secondary_suffix}'] = None
  234 + doc['brief_zh'] = None
  235 + doc['brief_en'] = None
223 236  
224 237 # Description
225 238 if pd.notna(spu_row.get('description')):
226 239 desc_text = str(spu_row['description'])
227   - doc[f'description{primary_suffix}'] = desc_text
228   - if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh):
229   - if self.translator:
230   - target_lang = 'en' if primary_lang == 'zh' else 'zh'
231   - # 根据目标语言选择对应的提示词
232   - prompt = self.translation_prompts.get(f'default_{target_lang}') or self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en')
233   - translated = self.translator.translate(
234   - desc_text,
235   - target_lang=target_lang,
236   - source_lang=primary_lang,
237   - prompt=prompt
238   - )
239   - doc[f'description{secondary_suffix}'] = translated if translated else None
240   - else:
241   - doc[f'description{secondary_suffix}'] = None
  240 + if self.translator:
  241 + prompt = self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en')
  242 + translations = self.translator.translate_for_indexing(
  243 + desc_text,
  244 + shop_language=primary_lang,
  245 + source_lang=primary_lang,
  246 + prompt=prompt
  247 + )
  248 + doc['description_zh'] = translations.get('zh') or (desc_text if primary_lang == 'zh' else None)
  249 + doc['description_en'] = translations.get('en') or (desc_text if primary_lang == 'en' else None)
242 250 else:
243   - doc[f'description{secondary_suffix}'] = None
  251 + if primary_lang == 'zh':
  252 + doc['description_zh'] = desc_text
  253 + doc['description_en'] = None
  254 + else:
  255 + doc['description_zh'] = None
  256 + doc['description_en'] = desc_text
244 257 else:
245   - doc[f'description{primary_suffix}'] = None
246   - doc[f'description{secondary_suffix}'] = None
  258 + doc['description_zh'] = None
  259 + doc['description_en'] = None
247 260  
248 261 # Vendor
249 262 if pd.notna(spu_row.get('vendor')):
250 263 vendor_text = str(spu_row['vendor'])
251   - doc[f'vendor{primary_suffix}'] = vendor_text
252   - if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh):
253   - if self.translator:
254   - target_lang = 'en' if primary_lang == 'zh' else 'zh'
255   - # 根据目标语言选择对应的提示词
256   - prompt = self.translation_prompts.get(f'default_{target_lang}') or self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en')
257   - translated = self.translator.translate(
258   - vendor_text,
259   - target_lang=target_lang,
260   - source_lang=primary_lang,
261   - prompt=prompt
262   - )
263   - doc[f'vendor{secondary_suffix}'] = translated if translated else None
264   - else:
265   - doc[f'vendor{secondary_suffix}'] = None
  264 + if self.translator:
  265 + prompt = self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en')
  266 + translations = self.translator.translate_for_indexing(
  267 + vendor_text,
  268 + shop_language=primary_lang,
  269 + source_lang=primary_lang,
  270 + prompt=prompt
  271 + )
  272 + doc['vendor_zh'] = translations.get('zh') or (vendor_text if primary_lang == 'zh' else None)
  273 + doc['vendor_en'] = translations.get('en') or (vendor_text if primary_lang == 'en' else None)
266 274 else:
267   - doc[f'vendor{secondary_suffix}'] = None
  275 + if primary_lang == 'zh':
  276 + doc['vendor_zh'] = vendor_text
  277 + doc['vendor_en'] = None
  278 + else:
  279 + doc['vendor_zh'] = None
  280 + doc['vendor_en'] = vendor_text
268 281 else:
269   - doc[f'vendor{primary_suffix}'] = None
270   - doc[f'vendor{secondary_suffix}'] = None
  282 + doc['vendor_zh'] = None
  283 + doc['vendor_en'] = None
271 284  
272 285 def _fill_category_fields(self, doc: Dict[str, Any], spu_row: pd.Series):
273 286 """填充类目相关字段。"""
... ... @@ -542,4 +555,36 @@ class SPUDocumentTransformer:
542 555 sku_data['image_src'] = str(sku_row['image_src'])
543 556  
544 557 return sku_data
  558 +
  559 + def _fill_title_embedding(self, doc: Dict[str, Any]) -> None:
  560 + """
  561 + 填充标题向量化字段。
  562 +
  563 + 使用英文标题(title_en)生成embedding。如果title_en不存在,则使用title_zh。
  564 +
  565 + Args:
  566 + doc: ES文档字典
  567 + """
  568 + # 优先使用英文标题,如果没有则使用中文标题
  569 + title_text = doc.get('title_en') or doc.get('title_zh')
  570 +
  571 + if not title_text or not title_text.strip():
  572 + logger.debug(f"No title text available for embedding, SPU: {doc.get('spu_id')}")
  573 + return
  574 +
  575 + try:
  576 + # 使用BgeEncoder生成embedding
  577 + # encode方法返回numpy数组,形状为(n, 1024)
  578 + embeddings = self.encoder.encode(title_text)
  579 +
  580 + if embeddings is not None and len(embeddings) > 0:
  581 + # 取第一个embedding(因为只传了一个文本)
  582 + embedding = embeddings[0]
  583 + # 转换为列表格式(ES需要)
  584 + doc['title_embedding'] = embedding.tolist()
  585 + logger.debug(f"Generated title_embedding for SPU: {doc.get('spu_id')}, title: {title_text[:50]}...")
  586 + else:
  587 + logger.warning(f"Failed to generate embedding for title: {title_text[:50]}...")
  588 + except Exception as e:
  589 + logger.error(f"Error generating title_embedding for SPU {doc.get('spu_id')}: {e}", exc_info=True)
545 590  
... ...
indexer/indexing_utils.py
... ... @@ -56,7 +56,9 @@ def create_document_transformer(
56 56 tenant_id: str,
57 57 searchable_option_dimensions: Optional[list] = None,
58 58 translator: Optional[Any] = None,
59   - translation_prompts: Optional[Dict[str, str]] = None
  59 + translation_prompts: Optional[Dict[str, str]] = None,
  60 + encoder: Optional[Any] = None,
  61 + enable_title_embedding: bool = True
60 62 ) -> SPUDocumentTransformer:
61 63 """
62 64 创建文档转换器(统一初始化逻辑)。
... ... @@ -67,6 +69,8 @@ def create_document_transformer(
67 69 searchable_option_dimensions: 可搜索的option维度列表(如果为None则从配置加载)
68 70 translator: 翻译器实例(如果为None则根据配置初始化)
69 71 translation_prompts: 翻译提示词配置(如果为None则从配置加载)
  72 + encoder: 文本编码器实例(如果为None且enable_title_embedding为True则根据配置初始化)
  73 + enable_title_embedding: 是否启用标题向量化(默认True)
70 74  
71 75 Returns:
72 76 SPUDocumentTransformer实例
... ... @@ -76,7 +80,7 @@ def create_document_transformer(
76 80 tenant_config = tenant_config_loader.get_tenant_config(tenant_id)
77 81  
78 82 # 加载搜索配置(如果需要)
79   - if searchable_option_dimensions is None or translator is None or translation_prompts is None:
  83 + if searchable_option_dimensions is None or translator is None or translation_prompts is None or (encoder is None and enable_title_embedding):
80 84 try:
81 85 config_loader = ConfigLoader()
82 86 config = config_loader.load_config()
... ... @@ -95,6 +99,16 @@ def create_document_transformer(
95 99  
96 100 if translation_prompts is None:
97 101 translation_prompts = config.query_config.translation_prompts
  102 +
  103 + # 初始化encoder(如果启用标题向量化且未提供encoder)
  104 + if encoder is None and enable_title_embedding and config.query_config.enable_text_embedding:
  105 + try:
  106 + from embeddings.text_encoder import BgeEncoder
  107 + encoder = BgeEncoder()
  108 + logger.info("BgeEncoder initialized for title embedding")
  109 + except Exception as e:
  110 + logger.warning(f"Failed to initialize BgeEncoder: {e}, title embedding will be disabled")
  111 + enable_title_embedding = False
98 112 except Exception as e:
99 113 logger.warning(f"Failed to load config, using defaults: {e}")
100 114 if searchable_option_dimensions is None:
... ... @@ -107,6 +121,8 @@ def create_document_transformer(
107 121 searchable_option_dimensions=searchable_option_dimensions,
108 122 tenant_config=tenant_config,
109 123 translator=translator,
110   - translation_prompts=translation_prompts
  124 + translation_prompts=translation_prompts,
  125 + encoder=encoder,
  126 + enable_title_embedding=enable_title_embedding
111 127 )
112 128  
... ...
query/translator.py
... ... @@ -13,18 +13,20 @@ https://developers.deepl.com/api-reference/translate/request-translation
13 13  
14 14 import requests
15 15 import re
  16 +import redis
16 17 from concurrent.futures import ThreadPoolExecutor
  18 +from datetime import timedelta
17 19 from typing import Dict, List, Optional
18   -from utils.cache import DictCache
19 20 import logging
20 21  
21 22 logger = logging.getLogger(__name__)
22 23  
23   -# Try to import DEEPL_AUTH_KEY, but allow import to fail
  24 +# Try to import DEEPL_AUTH_KEY and REDIS_CONFIG, but allow import to fail
24 25 try:
25   - from config.env_config import DEEPL_AUTH_KEY
  26 + from config.env_config import DEEPL_AUTH_KEY, REDIS_CONFIG
26 27 except ImportError:
27 28 DEEPL_AUTH_KEY = None
  29 + REDIS_CONFIG = {}
28 30  
29 31  
30 32 class Translator:
... ... @@ -74,9 +76,30 @@ class Translator:
74 76 self.glossary_id = glossary_id
75 77 self.translation_context = translation_context or "e-commerce product search"
76 78  
  79 + # Initialize Redis cache if enabled
77 80 if use_cache:
78   - self.cache = DictCache(".cache/translations.json")
  81 + try:
  82 + self.redis_client = redis.Redis(
  83 + host=REDIS_CONFIG.get('host', 'localhost'),
  84 + port=REDIS_CONFIG.get('port', 6479),
  85 + password=REDIS_CONFIG.get('password'),
  86 + decode_responses=True, # Return str instead of bytes
  87 + socket_timeout=REDIS_CONFIG.get('socket_timeout', 1),
  88 + socket_connect_timeout=REDIS_CONFIG.get('socket_connect_timeout', 1),
  89 + retry_on_timeout=REDIS_CONFIG.get('retry_on_timeout', False),
  90 + health_check_interval=10, # 避免复用坏连接
  91 + )
  92 + # Test connection
  93 + self.redis_client.ping()
  94 + self.expire_time = timedelta(days=REDIS_CONFIG.get('translation_cache_expire_days', 360))
  95 + self.cache_prefix = REDIS_CONFIG.get('translation_cache_prefix', 'trans')
  96 + logger.info("Redis cache initialized for translations")
  97 + except Exception as e:
  98 + logger.warning(f"Failed to initialize Redis cache: {e}, falling back to no cache")
  99 + self.redis_client = None
  100 + self.cache = None
79 101 else:
  102 + self.redis_client = None
80 103 self.cache = None
81 104  
82 105 # Thread pool for async translation
... ... @@ -131,8 +154,8 @@ class Translator:
131 154 cache_key = ':'.join(cache_key_parts)
132 155  
133 156 # Check cache (include context and prompt in cache key for accuracy)
134   - if self.use_cache:
135   - cached = self.cache.get(cache_key, category="translations")
  157 + if self.use_cache and self.redis_client:
  158 + cached = self._get_cached_translation_redis(text, target_lang, source_lang, translation_context, prompt)
136 159 if cached:
137 160 return cached
138 161  
... ... @@ -155,8 +178,8 @@ class Translator:
155 178 result = text
156 179  
157 180 # Cache result
158   - if result and self.use_cache:
159   - self.cache.set(cache_key, result, category="translations")
  181 + if result and self.use_cache and self.redis_client:
  182 + self._set_cached_translation_redis(text, target_lang, result, source_lang, translation_context, prompt)
160 183  
161 184 return result
162 185  
... ... @@ -395,16 +418,57 @@ class Translator:
395 418 prompt: Optional[str] = None
396 419 ) -> Optional[str]:
397 420 """Get translation from cache if available."""
398   - if not self.cache:
  421 + if not self.redis_client:
  422 + return None
  423 + return self._get_cached_translation_redis(text, target_lang, source_lang, context, prompt)
  424 +
  425 + def _get_cached_translation_redis(
  426 + self,
  427 + text: str,
  428 + target_lang: str,
  429 + source_lang: Optional[str] = None,
  430 + context: Optional[str] = None,
  431 + prompt: Optional[str] = None
  432 + ) -> Optional[str]:
  433 + """Get translation from Redis cache with sliding expiration."""
  434 + if not self.redis_client:
399 435 return None
400 436  
401   - translation_context = context or self.translation_context
402   - cache_key_parts = [source_lang or 'auto', target_lang, translation_context]
403   - if prompt:
404   - cache_key_parts.append(prompt)
405   - cache_key_parts.append(text)
406   - cache_key = ':'.join(cache_key_parts)
407   - return self.cache.get(cache_key, category="translations")
  437 + try:
  438 + # Build cache key: prefix:target_lang:text
  439 + # For simplicity, we use target_lang and text as key
  440 + # Context and prompt are not included in key to maximize cache hits
  441 + cache_key = f"{self.cache_prefix}:{target_lang.upper()}:{text}"
  442 + value = self.redis_client.get(cache_key)
  443 + if value:
  444 + # Sliding expiration: reset expiration time on access
  445 + self.redis_client.expire(cache_key, self.expire_time)
  446 + logger.debug(f"[Translator] Cache hit for translation: {text} -> {target_lang}")
  447 + return value
  448 + return None
  449 + except Exception as e:
  450 + logger.error(f"[Translator] Redis error during get translation cache: '{text}' {target_lang}: {e}")
  451 + return None
  452 +
  453 + def _set_cached_translation_redis(
  454 + self,
  455 + text: str,
  456 + target_lang: str,
  457 + translation: str,
  458 + source_lang: Optional[str] = None,
  459 + context: Optional[str] = None,
  460 + prompt: Optional[str] = None
  461 + ) -> None:
  462 + """Store translation in Redis cache."""
  463 + if not self.redis_client:
  464 + return
  465 +
  466 + try:
  467 + cache_key = f"{self.cache_prefix}:{target_lang.upper()}:{text}"
  468 + self.redis_client.setex(cache_key, self.expire_time, translation)
  469 + logger.debug(f"[Translator] Cached translation: {text} -> {target_lang}: {translation}")
  470 + except Exception as e:
  471 + logger.error(f"[Translator] Redis error during set translation cache: '{text}' {target_lang}: {e}")
408 472  
409 473 def _translate_async(
410 474 self,
... ... @@ -507,6 +571,83 @@ class Translator:
507 571 # The user can configure a glossary for better results
508 572 return translated_text
509 573  
  574 + def translate_for_indexing(
  575 + self,
  576 + text: str,
  577 + shop_language: str,
  578 + source_lang: Optional[str] = None,
  579 + context: Optional[str] = None,
  580 + prompt: Optional[str] = None
  581 + ) -> Dict[str, Optional[str]]:
  582 + """
  583 + Translate text for indexing based on shop language configuration.
  584 +
  585 + This method automatically handles multi-language translation:
  586 + - If shop language is not 'zh', translate to Chinese (zh)
  587 + - If shop language is not 'en', translate to English (en)
  588 +
  589 + All translation logic is internal - callers don't need to worry about
  590 + which languages to translate to.
  591 +
  592 + Args:
  593 + text: Text to translate
  594 + shop_language: Shop's configured language (e.g., 'zh', 'en', 'ru')
  595 + source_lang: Source language code (optional, auto-detect if None)
  596 + context: Additional context for translation (optional)
  597 + prompt: Translation prompt/instruction (optional)
  598 +
  599 + Returns:
  600 + Dictionary with 'zh' and 'en' keys containing translated text (or None if not needed)
  601 + Example: {'zh': '中文翻译', 'en': 'English translation'}
  602 + """
  603 + if not text or not text.strip():
  604 + return {'zh': None, 'en': None}
  605 +
  606 + # Skip translation for symbol-only queries
  607 + if re.match(r'^[\d\s_-]+$', text):
  608 + logger.info(f"[Translator] Skip translation for symbol-only query: '{text}'")
  609 + return {'zh': None, 'en': None}
  610 +
  611 + results = {'zh': None, 'en': None}
  612 + shop_lang_lower = shop_language.lower() if shop_language else ""
  613 +
  614 + # Determine which languages need translation
  615 + targets = []
  616 + if "zh" not in shop_lang_lower:
  617 + targets.append("zh")
  618 + if "en" not in shop_lang_lower:
  619 + targets.append("en")
  620 +
  621 + # If shop language is already zh and en, no translation needed
  622 + if not targets:
  623 + # Use original text for both languages
  624 + if "zh" in shop_lang_lower:
  625 + results['zh'] = text
  626 + if "en" in shop_lang_lower:
  627 + results['en'] = text
  628 + return results
  629 +
  630 + # Translate to each target language
  631 + for target_lang in targets:
  632 + # Check cache first
  633 + cached = self._get_cached_translation_redis(text, target_lang, source_lang, context, prompt)
  634 + if cached:
  635 + results[target_lang] = cached
  636 + logger.debug(f"[Translator] Cache hit for indexing: '{text}' -> {target_lang}: {cached}")
  637 + continue
  638 +
  639 + # Translate synchronously for indexing (we need the result immediately)
  640 + translated = self.translate(
  641 + text,
  642 + target_lang=target_lang,
  643 + source_lang=source_lang or shop_language,
  644 + context=context,
  645 + prompt=prompt
  646 + )
  647 + results[target_lang] = translated
  648 +
  649 + return results
  650 +
510 651 def get_translation_needs(
511 652 self,
512 653 detected_lang: str,
... ... @@ -524,7 +665,7 @@ class Translator:
524 665 """
525 666 # If detected language is in supported list, translate to others
526 667 if detected_lang in supported_langs:
527   - return [lang for lang in supported_langs if lang != detected_lang]
  668 + return [lang for lang in supported_langs if detected_lang != lang]
528 669  
529 670 # Otherwise, translate to all supported languages
530 671 return supported_langs
... ...