diff --git a/config/env_config.py b/config/env_config.py index e77a4f2..fb4afab 100644 --- a/config/env_config.py +++ b/config/env_config.py @@ -26,7 +26,14 @@ ES_CONFIG = { REDIS_CONFIG = { 'host': os.getenv('REDIS_HOST', 'localhost'), 'port': int(os.getenv('REDIS_PORT', 6479)), - 'password': os.getenv('REDIS_PASSWORD'), + 'snapshot_db': int(os.getenv('REDIS_SNAPSHOT_DB', 0)), + 'password': os.getenv('REDIS_PASSWORD', 'BMfv5aI31kgHWtlx'), + 'socket_timeout': int(os.getenv('REDIS_SOCKET_TIMEOUT', 1)), + 'socket_connect_timeout': int(os.getenv('REDIS_SOCKET_CONNECT_TIMEOUT', 1)), + 'retry_on_timeout': os.getenv('REDIS_RETRY_ON_TIMEOUT', 'False').lower() == 'true', + 'cache_expire_days': int(os.getenv('REDIS_CACHE_EXPIRE_DAYS', 180)), # 6 months + 'translation_cache_expire_days': int(os.getenv('REDIS_TRANSLATION_CACHE_EXPIRE_DAYS', 360)), + 'translation_cache_prefix': os.getenv('REDIS_TRANSLATION_CACHE_PREFIX', 'trans'), } # DeepL API Key diff --git a/embeddings/text_encoder.py b/embeddings/text_encoder.py index 43369fb..a27c287 100644 --- a/embeddings/text_encoder.py +++ b/embeddings/text_encoder.py @@ -9,11 +9,20 @@ import requests import time import threading import numpy as np +import pickle +import redis +from datetime import timedelta +from typing import List, Union, Dict, Any, Optional import logging -from typing import List, Union, Dict, Any logger = logging.getLogger(__name__) +# Try to import REDIS_CONFIG, but allow import to fail +try: + from config.env_config import REDIS_CONFIG +except ImportError: + REDIS_CONFIG = {} + class BgeEncoder: """ @@ -31,6 +40,26 @@ class BgeEncoder: logger.info(f"Creating BgeEncoder instance with service URL: {service_url}") cls._instance.service_url = service_url cls._instance.endpoint = f"{service_url}/embedding/generate_embeddings" + + # Initialize Redis cache + try: + cls._instance.redis_client = redis.Redis( + host=REDIS_CONFIG.get('host', 'localhost'), + port=REDIS_CONFIG.get('port', 6479), + password=REDIS_CONFIG.get('password'), + decode_responses=False, # Keep binary data as is + socket_timeout=REDIS_CONFIG.get('socket_timeout', 1), + socket_connect_timeout=REDIS_CONFIG.get('socket_connect_timeout', 1), + retry_on_timeout=REDIS_CONFIG.get('retry_on_timeout', False), + health_check_interval=10 # 避免复用坏连接 + ) + # Test connection + cls._instance.redis_client.ping() + cls._instance.expire_time = timedelta(days=REDIS_CONFIG.get('cache_expire_days', 180)) + logger.info("Redis cache initialized for embeddings") + except Exception as e: + logger.warning(f"Failed to initialize Redis cache for embeddings: {e}, continuing without cache") + cls._instance.redis_client = None return cls._instance def _call_service(self, request_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: @@ -63,7 +92,7 @@ class BgeEncoder: batch_size: int = 32 ) -> np.ndarray: """ - Encode text into embeddings via network service. + Encode text into embeddings via network service with Redis caching. Args: sentences: Single string or list of strings to encode @@ -78,11 +107,24 @@ class BgeEncoder: if isinstance(sentences, str): sentences = [sentences] - # Prepare request data - request_data = [] + # Check cache first + cached_embeddings = [] + uncached_indices = [] + uncached_texts = [] + for i, text in enumerate(sentences): + cached = self._get_cached_embedding(text, 'en') # Use 'en' as default language for title embedding + if cached is not None: + cached_embeddings.append((i, cached)) + else: + uncached_indices.append(i) + uncached_texts.append(text) + + # Prepare request data for uncached texts + request_data = [] + for i, text in enumerate(uncached_texts): request_item = { - "id": str(i), + "id": str(uncached_indices[i]), "name_zh": text } @@ -93,43 +135,58 @@ class BgeEncoder: request_data.append(request_item) - try: - # Call service - response_data = self._call_service(request_data) - - # Process response - embeddings = [] - for i, text in enumerate(sentences): - # Find corresponding response by ID - response_item = None - for item in response_data: - if str(item.get("id")) == str(i): - response_item = item - break - - if response_item: - # Try Chinese embedding first, then English, then Russian - embedding = None - for lang in ["embedding_zh", "embedding_en", "embedding_ru"]: - if lang in response_item and response_item[lang] is not None: - embedding = response_item[lang] + # Process response + embeddings = [None] * len(sentences) + + # Fill in cached embeddings + for idx, cached_emb in cached_embeddings: + embeddings[idx] = cached_emb + + # If there are uncached texts, call service + if uncached_texts: + try: + # Call service + response_data = self._call_service(request_data) + + # Process response + for i, text in enumerate(uncached_texts): + original_idx = uncached_indices[i] + # Find corresponding response by ID + response_item = None + for item in response_data: + if str(item.get("id")) == str(original_idx): + response_item = item break - if embedding is not None: - embeddings.append(embedding) + if response_item: + # Try Chinese embedding first, then English, then Russian + embedding = None + for lang in ["embedding_zh", "embedding_en", "embedding_ru"]: + if lang in response_item and response_item[lang] is not None: + embedding = response_item[lang] + break + + if embedding is not None: + embedding_array = np.array(embedding, dtype=np.float32) + embeddings[original_idx] = embedding_array + # Cache the embedding + self._set_cached_embedding(text, 'en', embedding_array) + else: + logger.warning(f"No embedding found for text {original_idx}: {text[:50]}...") + embeddings[original_idx] = np.zeros(1024, dtype=np.float32) else: - logger.warning(f"No embedding found for text {i}: {text[:50]}...") - embeddings.append([0.0] * 1024) - else: - logger.warning(f"No response found for text {i}") - embeddings.append([0.0] * 1024) - - return np.array(embeddings, dtype=np.float32) - - except Exception as e: - logger.error(f"Failed to encode texts: {e}", exc_info=True) - # Return zero embeddings as fallback - return np.zeros((len(sentences), 1024), dtype=np.float32) + logger.warning(f"No response found for text {original_idx}") + embeddings[original_idx] = np.zeros(1024, dtype=np.float32) + + except Exception as e: + logger.error(f"Failed to encode texts: {e}", exc_info=True) + # Fill missing embeddings with zeros + for idx in uncached_indices: + if embeddings[idx] is None: + embeddings[idx] = np.zeros(1024, dtype=np.float32) + + # Convert to numpy array + return np.array(embeddings, dtype=np.float32) def encode_batch( self, @@ -149,3 +206,48 @@ class BgeEncoder: numpy array of embeddings """ return self.encode(texts, batch_size=batch_size, device=device) + + def _get_cache_key(self, query: str, language: str) -> str: + """Generate a cache key for the query""" + return f"embedding:{language}:{query}" + + def _get_cached_embedding(self, query: str, language: str) -> Optional[np.ndarray]: + """Get embedding from cache if exists (with sliding expiration)""" + if not self.redis_client: + return None + + try: + cache_key = self._get_cache_key(query, language) + cached_data = self.redis_client.get(cache_key) + if cached_data: + logger.debug(f"Cache hit for embedding: {query}") + # Update expiration time on access (sliding expiration) + self.redis_client.expire(cache_key, self.expire_time) + return pickle.loads(cached_data) + return None + except Exception as e: + logger.error(f"Error retrieving embedding from cache: {e}") + return None + + def _set_cached_embedding(self, query: str, language: str, embedding: np.ndarray) -> bool: + """Store embedding in cache""" + if not self.redis_client: + return False + + try: + cache_key = self._get_cache_key(query, language) + serialized_data = pickle.dumps(embedding) + self.redis_client.setex( + cache_key, + self.expire_time, + serialized_data + ) + logger.debug(f"Successfully cached embedding for query: {query}") + return True + except (redis.exceptions.BusyLoadingError, redis.exceptions.ConnectionError, + redis.exceptions.TimeoutError, redis.exceptions.RedisError) as e: + logger.warning(f"Redis error storing embedding in cache: {e}") + return False + except Exception as e: + logger.error(f"Error storing embedding in cache: {e}") + return False diff --git a/indexer/document_transformer.py b/indexer/document_transformer.py index 14b3a75..168b40e 100644 --- a/indexer/document_transformer.py +++ b/indexer/document_transformer.py @@ -29,7 +29,9 @@ class SPUDocumentTransformer: searchable_option_dimensions: List[str], tenant_config: Optional[Dict[str, Any]] = None, translator: Optional[Any] = None, - translation_prompts: Optional[Dict[str, str]] = None + translation_prompts: Optional[Dict[str, str]] = None, + encoder: Optional[Any] = None, + enable_title_embedding: bool = True ): """ 初始化文档转换器。 @@ -40,12 +42,16 @@ class SPUDocumentTransformer: tenant_config: 租户配置(包含主语言和翻译配置) translator: 翻译器实例(可选,如果提供则启用翻译功能) translation_prompts: 翻译提示词配置(可选) + encoder: 文本编码器实例(可选,用于生成title_embedding) + enable_title_embedding: 是否启用标题向量化(默认True) """ self.category_id_to_name = category_id_to_name self.searchable_option_dimensions = searchable_option_dimensions self.tenant_config = tenant_config or {} self.translator = translator self.translation_prompts = translation_prompts or {} + self.encoder = encoder + self.enable_title_embedding = enable_title_embedding def transform_spu_to_doc( self, @@ -81,11 +87,13 @@ class SPUDocumentTransformer: # 获取租户配置 primary_lang = self.tenant_config.get('primary_language', 'zh') - translate_to_en = self.tenant_config.get('translate_to_en', True) - translate_to_zh = self.tenant_config.get('translate_to_zh', False) - # 文本字段处理(根据主语言和翻译配置) - self._fill_text_fields(doc, spu_row, primary_lang, translate_to_en, translate_to_zh) + # 文本字段处理(使用translator的内部逻辑自动处理多语言翻译) + self._fill_text_fields(doc, spu_row, primary_lang) + + # 标题向量化处理(如果启用) + if self.enable_title_embedding and self.encoder: + self._fill_title_embedding(doc) # Tags if pd.notna(spu_row.get('tags')): @@ -160,114 +168,119 @@ class SPUDocumentTransformer: self, doc: Dict[str, Any], spu_row: pd.Series, - primary_lang: str, - translate_to_en: bool, - translate_to_zh: bool + primary_lang: str ): - """填充文本字段(根据主语言和翻译配置)。""" - # 主语言字段 - primary_suffix = '_zh' if primary_lang == 'zh' else '_en' - secondary_suffix = '_en' if primary_lang == 'zh' else '_zh' - + """ + 填充文本字段(根据主语言自动处理多语言翻译)。 + + 翻译逻辑在translator内部处理: + - 如果店铺语言不等于zh,自动翻译成zh + - 如果店铺语言不等于en,自动翻译成en + """ # Title if pd.notna(spu_row.get('title')): title_text = str(spu_row['title']) - doc[f'title{primary_suffix}'] = title_text - # 如果需要翻译,调用翻译服务(同步模式) - if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh): - if self.translator: - target_lang = 'en' if primary_lang == 'zh' else 'zh' - # 根据目标语言选择对应的提示词 - if target_lang == 'zh': - prompt = self.translation_prompts.get('product_title_zh') or self.translation_prompts.get('default_zh') - else: - prompt = self.translation_prompts.get('product_title_en') or self.translation_prompts.get('default_en') - translated = self.translator.translate( - title_text, - target_lang=target_lang, - source_lang=primary_lang, - prompt=prompt - ) - doc[f'title{secondary_suffix}'] = translated if translated else None - else: - doc[f'title{secondary_suffix}'] = None # 无翻译器,设为None + + # 使用translator的translate_for_indexing方法,自动处理多语言翻译 + if self.translator: + # 根据目标语言选择对应的提示词 + prompt_zh = self.translation_prompts.get('product_title_zh') or self.translation_prompts.get('default_zh') + prompt_en = self.translation_prompts.get('product_title_en') or self.translation_prompts.get('default_en') + + # 调用translate_for_indexing,自动处理翻译逻辑 + translations = self.translator.translate_for_indexing( + title_text, + shop_language=primary_lang, + source_lang=primary_lang, + prompt=prompt_zh if primary_lang == 'zh' else prompt_en + ) + + # 填充翻译结果 + doc['title_zh'] = translations.get('zh') or (title_text if primary_lang == 'zh' else None) + doc['title_en'] = translations.get('en') or (title_text if primary_lang == 'en' else None) else: - doc[f'title{secondary_suffix}'] = None + # 无翻译器,只填充主语言字段 + if primary_lang == 'zh': + doc['title_zh'] = title_text + doc['title_en'] = None + else: + doc['title_zh'] = None + doc['title_en'] = title_text else: - doc[f'title{primary_suffix}'] = None - doc[f'title{secondary_suffix}'] = None + doc['title_zh'] = None + doc['title_en'] = None # Brief if pd.notna(spu_row.get('brief')): brief_text = str(spu_row['brief']) - doc[f'brief{primary_suffix}'] = brief_text - if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh): - if self.translator: - target_lang = 'en' if primary_lang == 'zh' else 'zh' - # 根据目标语言选择对应的提示词 - prompt = self.translation_prompts.get(f'default_{target_lang}') or self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') - translated = self.translator.translate( - brief_text, - target_lang=target_lang, - source_lang=primary_lang, - prompt=prompt - ) - doc[f'brief{secondary_suffix}'] = translated if translated else None - else: - doc[f'brief{secondary_suffix}'] = None + if self.translator: + prompt = self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') + translations = self.translator.translate_for_indexing( + brief_text, + shop_language=primary_lang, + source_lang=primary_lang, + prompt=prompt + ) + doc['brief_zh'] = translations.get('zh') or (brief_text if primary_lang == 'zh' else None) + doc['brief_en'] = translations.get('en') or (brief_text if primary_lang == 'en' else None) else: - doc[f'brief{secondary_suffix}'] = None + if primary_lang == 'zh': + doc['brief_zh'] = brief_text + doc['brief_en'] = None + else: + doc['brief_zh'] = None + doc['brief_en'] = brief_text else: - doc[f'brief{primary_suffix}'] = None - doc[f'brief{secondary_suffix}'] = None + doc['brief_zh'] = None + doc['brief_en'] = None # Description if pd.notna(spu_row.get('description')): desc_text = str(spu_row['description']) - doc[f'description{primary_suffix}'] = desc_text - if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh): - if self.translator: - target_lang = 'en' if primary_lang == 'zh' else 'zh' - # 根据目标语言选择对应的提示词 - prompt = self.translation_prompts.get(f'default_{target_lang}') or self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') - translated = self.translator.translate( - desc_text, - target_lang=target_lang, - source_lang=primary_lang, - prompt=prompt - ) - doc[f'description{secondary_suffix}'] = translated if translated else None - else: - doc[f'description{secondary_suffix}'] = None + if self.translator: + prompt = self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') + translations = self.translator.translate_for_indexing( + desc_text, + shop_language=primary_lang, + source_lang=primary_lang, + prompt=prompt + ) + doc['description_zh'] = translations.get('zh') or (desc_text if primary_lang == 'zh' else None) + doc['description_en'] = translations.get('en') or (desc_text if primary_lang == 'en' else None) else: - doc[f'description{secondary_suffix}'] = None + if primary_lang == 'zh': + doc['description_zh'] = desc_text + doc['description_en'] = None + else: + doc['description_zh'] = None + doc['description_en'] = desc_text else: - doc[f'description{primary_suffix}'] = None - doc[f'description{secondary_suffix}'] = None + doc['description_zh'] = None + doc['description_en'] = None # Vendor if pd.notna(spu_row.get('vendor')): vendor_text = str(spu_row['vendor']) - doc[f'vendor{primary_suffix}'] = vendor_text - if (primary_lang == 'zh' and translate_to_en) or (primary_lang == 'en' and translate_to_zh): - if self.translator: - target_lang = 'en' if primary_lang == 'zh' else 'zh' - # 根据目标语言选择对应的提示词 - prompt = self.translation_prompts.get(f'default_{target_lang}') or self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') - translated = self.translator.translate( - vendor_text, - target_lang=target_lang, - source_lang=primary_lang, - prompt=prompt - ) - doc[f'vendor{secondary_suffix}'] = translated if translated else None - else: - doc[f'vendor{secondary_suffix}'] = None + if self.translator: + prompt = self.translation_prompts.get('default_zh') or self.translation_prompts.get('default_en') + translations = self.translator.translate_for_indexing( + vendor_text, + shop_language=primary_lang, + source_lang=primary_lang, + prompt=prompt + ) + doc['vendor_zh'] = translations.get('zh') or (vendor_text if primary_lang == 'zh' else None) + doc['vendor_en'] = translations.get('en') or (vendor_text if primary_lang == 'en' else None) else: - doc[f'vendor{secondary_suffix}'] = None + if primary_lang == 'zh': + doc['vendor_zh'] = vendor_text + doc['vendor_en'] = None + else: + doc['vendor_zh'] = None + doc['vendor_en'] = vendor_text else: - doc[f'vendor{primary_suffix}'] = None - doc[f'vendor{secondary_suffix}'] = None + doc['vendor_zh'] = None + doc['vendor_en'] = None def _fill_category_fields(self, doc: Dict[str, Any], spu_row: pd.Series): """填充类目相关字段。""" @@ -542,4 +555,36 @@ class SPUDocumentTransformer: sku_data['image_src'] = str(sku_row['image_src']) return sku_data + + def _fill_title_embedding(self, doc: Dict[str, Any]) -> None: + """ + 填充标题向量化字段。 + + 使用英文标题(title_en)生成embedding。如果title_en不存在,则使用title_zh。 + + Args: + doc: ES文档字典 + """ + # 优先使用英文标题,如果没有则使用中文标题 + title_text = doc.get('title_en') or doc.get('title_zh') + + if not title_text or not title_text.strip(): + logger.debug(f"No title text available for embedding, SPU: {doc.get('spu_id')}") + return + + try: + # 使用BgeEncoder生成embedding + # encode方法返回numpy数组,形状为(n, 1024) + embeddings = self.encoder.encode(title_text) + + if embeddings is not None and len(embeddings) > 0: + # 取第一个embedding(因为只传了一个文本) + embedding = embeddings[0] + # 转换为列表格式(ES需要) + doc['title_embedding'] = embedding.tolist() + logger.debug(f"Generated title_embedding for SPU: {doc.get('spu_id')}, title: {title_text[:50]}...") + else: + logger.warning(f"Failed to generate embedding for title: {title_text[:50]}...") + except Exception as e: + logger.error(f"Error generating title_embedding for SPU {doc.get('spu_id')}: {e}", exc_info=True) diff --git a/indexer/indexing_utils.py b/indexer/indexing_utils.py index b0e0356..62fce55 100644 --- a/indexer/indexing_utils.py +++ b/indexer/indexing_utils.py @@ -56,7 +56,9 @@ def create_document_transformer( tenant_id: str, searchable_option_dimensions: Optional[list] = None, translator: Optional[Any] = None, - translation_prompts: Optional[Dict[str, str]] = None + translation_prompts: Optional[Dict[str, str]] = None, + encoder: Optional[Any] = None, + enable_title_embedding: bool = True ) -> SPUDocumentTransformer: """ 创建文档转换器(统一初始化逻辑)。 @@ -67,6 +69,8 @@ def create_document_transformer( searchable_option_dimensions: 可搜索的option维度列表(如果为None则从配置加载) translator: 翻译器实例(如果为None则根据配置初始化) translation_prompts: 翻译提示词配置(如果为None则从配置加载) + encoder: 文本编码器实例(如果为None且enable_title_embedding为True则根据配置初始化) + enable_title_embedding: 是否启用标题向量化(默认True) Returns: SPUDocumentTransformer实例 @@ -76,7 +80,7 @@ def create_document_transformer( tenant_config = tenant_config_loader.get_tenant_config(tenant_id) # 加载搜索配置(如果需要) - if searchable_option_dimensions is None or translator is None or translation_prompts is None: + if searchable_option_dimensions is None or translator is None or translation_prompts is None or (encoder is None and enable_title_embedding): try: config_loader = ConfigLoader() config = config_loader.load_config() @@ -95,6 +99,16 @@ def create_document_transformer( if translation_prompts is None: translation_prompts = config.query_config.translation_prompts + + # 初始化encoder(如果启用标题向量化且未提供encoder) + if encoder is None and enable_title_embedding and config.query_config.enable_text_embedding: + try: + from embeddings.text_encoder import BgeEncoder + encoder = BgeEncoder() + logger.info("BgeEncoder initialized for title embedding") + except Exception as e: + logger.warning(f"Failed to initialize BgeEncoder: {e}, title embedding will be disabled") + enable_title_embedding = False except Exception as e: logger.warning(f"Failed to load config, using defaults: {e}") if searchable_option_dimensions is None: @@ -107,6 +121,8 @@ def create_document_transformer( searchable_option_dimensions=searchable_option_dimensions, tenant_config=tenant_config, translator=translator, - translation_prompts=translation_prompts + translation_prompts=translation_prompts, + encoder=encoder, + enable_title_embedding=enable_title_embedding ) diff --git a/query/translator.py b/query/translator.py index f323c53..059f72e 100644 --- a/query/translator.py +++ b/query/translator.py @@ -13,18 +13,20 @@ https://developers.deepl.com/api-reference/translate/request-translation import requests import re +import redis from concurrent.futures import ThreadPoolExecutor +from datetime import timedelta from typing import Dict, List, Optional -from utils.cache import DictCache import logging logger = logging.getLogger(__name__) -# Try to import DEEPL_AUTH_KEY, but allow import to fail +# Try to import DEEPL_AUTH_KEY and REDIS_CONFIG, but allow import to fail try: - from config.env_config import DEEPL_AUTH_KEY + from config.env_config import DEEPL_AUTH_KEY, REDIS_CONFIG except ImportError: DEEPL_AUTH_KEY = None + REDIS_CONFIG = {} class Translator: @@ -74,9 +76,30 @@ class Translator: self.glossary_id = glossary_id self.translation_context = translation_context or "e-commerce product search" + # Initialize Redis cache if enabled if use_cache: - self.cache = DictCache(".cache/translations.json") + try: + self.redis_client = redis.Redis( + host=REDIS_CONFIG.get('host', 'localhost'), + port=REDIS_CONFIG.get('port', 6479), + password=REDIS_CONFIG.get('password'), + decode_responses=True, # Return str instead of bytes + socket_timeout=REDIS_CONFIG.get('socket_timeout', 1), + socket_connect_timeout=REDIS_CONFIG.get('socket_connect_timeout', 1), + retry_on_timeout=REDIS_CONFIG.get('retry_on_timeout', False), + health_check_interval=10, # 避免复用坏连接 + ) + # Test connection + self.redis_client.ping() + self.expire_time = timedelta(days=REDIS_CONFIG.get('translation_cache_expire_days', 360)) + self.cache_prefix = REDIS_CONFIG.get('translation_cache_prefix', 'trans') + logger.info("Redis cache initialized for translations") + except Exception as e: + logger.warning(f"Failed to initialize Redis cache: {e}, falling back to no cache") + self.redis_client = None + self.cache = None else: + self.redis_client = None self.cache = None # Thread pool for async translation @@ -131,8 +154,8 @@ class Translator: cache_key = ':'.join(cache_key_parts) # Check cache (include context and prompt in cache key for accuracy) - if self.use_cache: - cached = self.cache.get(cache_key, category="translations") + if self.use_cache and self.redis_client: + cached = self._get_cached_translation_redis(text, target_lang, source_lang, translation_context, prompt) if cached: return cached @@ -155,8 +178,8 @@ class Translator: result = text # Cache result - if result and self.use_cache: - self.cache.set(cache_key, result, category="translations") + if result and self.use_cache and self.redis_client: + self._set_cached_translation_redis(text, target_lang, result, source_lang, translation_context, prompt) return result @@ -395,16 +418,57 @@ class Translator: prompt: Optional[str] = None ) -> Optional[str]: """Get translation from cache if available.""" - if not self.cache: + if not self.redis_client: + return None + return self._get_cached_translation_redis(text, target_lang, source_lang, context, prompt) + + def _get_cached_translation_redis( + self, + text: str, + target_lang: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None + ) -> Optional[str]: + """Get translation from Redis cache with sliding expiration.""" + if not self.redis_client: return None - translation_context = context or self.translation_context - cache_key_parts = [source_lang or 'auto', target_lang, translation_context] - if prompt: - cache_key_parts.append(prompt) - cache_key_parts.append(text) - cache_key = ':'.join(cache_key_parts) - return self.cache.get(cache_key, category="translations") + try: + # Build cache key: prefix:target_lang:text + # For simplicity, we use target_lang and text as key + # Context and prompt are not included in key to maximize cache hits + cache_key = f"{self.cache_prefix}:{target_lang.upper()}:{text}" + value = self.redis_client.get(cache_key) + if value: + # Sliding expiration: reset expiration time on access + self.redis_client.expire(cache_key, self.expire_time) + logger.debug(f"[Translator] Cache hit for translation: {text} -> {target_lang}") + return value + return None + except Exception as e: + logger.error(f"[Translator] Redis error during get translation cache: '{text}' {target_lang}: {e}") + return None + + def _set_cached_translation_redis( + self, + text: str, + target_lang: str, + translation: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None + ) -> None: + """Store translation in Redis cache.""" + if not self.redis_client: + return + + try: + cache_key = f"{self.cache_prefix}:{target_lang.upper()}:{text}" + self.redis_client.setex(cache_key, self.expire_time, translation) + logger.debug(f"[Translator] Cached translation: {text} -> {target_lang}: {translation}") + except Exception as e: + logger.error(f"[Translator] Redis error during set translation cache: '{text}' {target_lang}: {e}") def _translate_async( self, @@ -507,6 +571,83 @@ class Translator: # The user can configure a glossary for better results return translated_text + def translate_for_indexing( + self, + text: str, + shop_language: str, + source_lang: Optional[str] = None, + context: Optional[str] = None, + prompt: Optional[str] = None + ) -> Dict[str, Optional[str]]: + """ + Translate text for indexing based on shop language configuration. + + This method automatically handles multi-language translation: + - If shop language is not 'zh', translate to Chinese (zh) + - If shop language is not 'en', translate to English (en) + + All translation logic is internal - callers don't need to worry about + which languages to translate to. + + Args: + text: Text to translate + shop_language: Shop's configured language (e.g., 'zh', 'en', 'ru') + source_lang: Source language code (optional, auto-detect if None) + context: Additional context for translation (optional) + prompt: Translation prompt/instruction (optional) + + Returns: + Dictionary with 'zh' and 'en' keys containing translated text (or None if not needed) + Example: {'zh': '中文翻译', 'en': 'English translation'} + """ + if not text or not text.strip(): + return {'zh': None, 'en': None} + + # Skip translation for symbol-only queries + if re.match(r'^[\d\s_-]+$', text): + logger.info(f"[Translator] Skip translation for symbol-only query: '{text}'") + return {'zh': None, 'en': None} + + results = {'zh': None, 'en': None} + shop_lang_lower = shop_language.lower() if shop_language else "" + + # Determine which languages need translation + targets = [] + if "zh" not in shop_lang_lower: + targets.append("zh") + if "en" not in shop_lang_lower: + targets.append("en") + + # If shop language is already zh and en, no translation needed + if not targets: + # Use original text for both languages + if "zh" in shop_lang_lower: + results['zh'] = text + if "en" in shop_lang_lower: + results['en'] = text + return results + + # Translate to each target language + for target_lang in targets: + # Check cache first + cached = self._get_cached_translation_redis(text, target_lang, source_lang, context, prompt) + if cached: + results[target_lang] = cached + logger.debug(f"[Translator] Cache hit for indexing: '{text}' -> {target_lang}: {cached}") + continue + + # Translate synchronously for indexing (we need the result immediately) + translated = self.translate( + text, + target_lang=target_lang, + source_lang=source_lang or shop_language, + context=context, + prompt=prompt + ) + results[target_lang] = translated + + return results + def get_translation_needs( self, detected_lang: str, @@ -524,7 +665,7 @@ class Translator: """ # If detected language is in supported list, translate to others if detected_lang in supported_langs: - return [lang for lang in supported_langs if lang != detected_lang] + return [lang for lang in supported_langs if detected_lang != lang] # Otherwise, translate to all supported languages return supported_langs -- libgit2 0.21.2