diff --git a/config/config.yaml b/config/config.yaml index 16976dc..301846f 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -209,6 +209,17 @@ services: cache: ttl_seconds: 62208000 sliding_expiration: true + # When false, cache keys are exact-match per request model only (ignores model_quality_tiers for lookups). + enable_model_quality_tier_cache: true + # Higher tier = better quality. Multiple models may share one tier (同级). + # A request may reuse Redis keys from models with tier > A or tier == A (not from lower tiers). + model_quality_tiers: + deepl: 30 + qwen-mt: 30 + llm: 30 + nllb-200-distilled-600m: 20 + opus-mt-zh-en: 10 + opus-mt-en-zh: 10 capabilities: qwen-mt: enabled: true diff --git a/docs/TODO.txt b/docs/TODO.txt index bf977ed..1372230 100644 --- a/docs/TODO.txt +++ b/docs/TODO.txt @@ -89,32 +89,6 @@ image_embedding改为,一个spu有多个sku向量,每个向量内部properti -2026-03-21 10:29:23,698 - elastic_transport.transport - INFO - POST http://localhost:9200/search_products_tenant_163/_search?include_named_queries_score=false [status:200 duration:0.009s] -2026-03-21 10:29:23,700 - request_context - INFO - 分页详情回填 | ids=20 | filled=20 | took=7ms -2026-03-21 10:29:23,700 - request_context - INFO - 重排分页切片 | from=20, size=20, 返回=20条 -2026-03-21 10:29:23,720 - embeddings.text_encoder - ERROR - TextEmbeddingEncoder service request failed: 502 Server Error: Bad Gateway for url: http://127.0.0.1:6005/embed/text?normalize=true&priority=1 -Traceback (most recent call last): - File "/data/saas-search/embeddings/text_encoder.py", line 63, in _call_service - response.raise_for_status() - File "/data/saas-search/.venv/lib/python3.12/site-packages/requests/models.py", line 1026, in raise_for_status - raise HTTPError(http_error_msg, response=self) -requests.exceptions.HTTPError: 502 Server Error: Bad Gateway for url: http://127.0.0.1:6005/embed/text?normalize=true&priority=1 -2026-03-21 10:29:23,720 - search.searcher - WARNING - Failed to encode SKU option1 values for final-page sorting: 502 Server Error: Bad Gateway for url: http://127.0.0.1:6005/embed/text?normalize=true&priority=1 -Traceback (most recent call last): - File "/data/saas-search/search/searcher.py", line 448, in _apply_sku_sorting_for_page_hits - encoded_option_vectors = text_encoder.encode(option1_values_to_encode, priority=1) - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - File "/data/saas-search/embeddings/text_encoder.py", line 112, in encode - response_data = self._call_service( - ^^^^^^^^^^^^^^^^^^^ - File "/data/saas-search/embeddings/text_encoder.py", line 63, in _call_service - response.raise_for_status() - File "/data/saas-search/.venv/lib/python3.12/site-packages/requests/models.py", line 1026, in raise_for_status - raise HTTPError(http_error_msg, response=self) -requests.exceptions.HTTPError: 502 Server Error: Bad Gateway for url: http://127.0.0.1:6005/embed/text?normalize=true&priority=1 -2026-03-21 10:29:23,721 - request_context - WARNING - SKU option embedding failed: 502 Server Error: Bad Gateway for url: http://127.0.0.1:6005/embed/text?normalize=true&priority=1 - - 是否需要: diff --git a/tests/test_translator_failure_semantics.py b/tests/test_translator_failure_semantics.py index 22643be..bfa924e 100644 --- a/tests/test_translator_failure_semantics.py +++ b/tests/test_translator_failure_semantics.py @@ -1,5 +1,7 @@ import logging +import pytest + from translation.cache import TranslationCache from translation.logging_utils import ( TranslationRequestFilter, @@ -7,6 +9,7 @@ from translation.logging_utils import ( reset_translation_request_id, ) from translation.service import TranslationService +from translation.settings import build_translation_config, translation_cache_probe_models class _FakeCache: @@ -16,7 +19,8 @@ class _FakeCache: self.get_calls = [] self.set_calls = [] - def get(self, *, model, target_lang, source_text): + def get(self, *, model, target_lang, source_text, log_lookup=True): + del log_lookup self.get_calls.append((model, target_lang, source_text)) return self.storage.get((model, target_lang, source_text)) @@ -191,3 +195,262 @@ def test_translation_route_log_focuses_on_routing_decision(monkeypatch, caplog): assert route_messages == [ "Translation route | backend=llm request_type=single use_cache=True cache_available=False" ] + + +def test_translation_cache_probe_models_order(): + cfg = {"cache": {"model_quality_tiers": {"low": 10, "high": 50, "mid": 30}}} + assert translation_cache_probe_models(cfg, "low") == ["high", "mid", "low"] + assert translation_cache_probe_models(cfg, "mid") == ["high", "mid"] + assert translation_cache_probe_models(cfg, "high") == ["high"] + assert translation_cache_probe_models(cfg, "unknown") == ["unknown"] + + +def test_translation_cache_probe_models_respects_enable_switch(): + cfg = { + "cache": { + "enable_model_quality_tier_cache": False, + "model_quality_tiers": {"peer-a": 50, "peer-b": 50, "top": 100}, + } + } + assert translation_cache_probe_models(cfg, "peer-a") == ["peer-a"] + + +def test_translation_cache_probe_models_same_tier_included(): + """Same numeric tier: all peers are probed (higher tier first, then name order).""" + cfg = {"cache": {"model_quality_tiers": {"peer-a": 50, "peer-b": 50, "top": 100}}} + assert translation_cache_probe_models(cfg, "peer-a") == ["top", "peer-a", "peer-b"] + assert translation_cache_probe_models(cfg, "peer-b") == ["top", "peer-b", "peer-a"] + + +def test_model_quality_tiers_unknown_capability_raises(): + with pytest.raises(ValueError, match="unknown capability"): + build_translation_config( + { + "service_url": "http://127.0.0.1:6006", + "timeout_sec": 10.0, + "default_model": "llm", + "default_scene": "general", + "cache": { + "ttl_seconds": 60, + "sliding_expiration": True, + "model_quality_tiers": {"ghost": 1}, + }, + "capabilities": { + "llm": { + "enabled": True, + "backend": "llm", + "model": "dummy-llm", + "base_url": "https://example.com", + "timeout_sec": 10.0, + "use_cache": True, + } + }, + } + ) + + +def test_tiered_cache_reuses_higher_tier_entry(monkeypatch): + monkeypatch.setattr(TranslationCache, "_init_redis_client", staticmethod(lambda: None)) + translate_calls = [] + + def _fake_create_backend(self, *, name, backend_type, cfg): + del self, backend_type, cfg + + class _Backend: + model = name + + @property + def supports_batch(self): + return True + + def translate(self, text, target_lang, source_lang=None, scene=None): + del target_lang, source_lang, scene + translate_calls.append((name, text)) + if isinstance(text, list): + return [f"{name}:{item}" for item in text] + return f"{name}:{text}" + + return _Backend() + + monkeypatch.setattr(TranslationService, "_create_backend", _fake_create_backend) + config = { + "service_url": "http://127.0.0.1:6006", + "timeout_sec": 10.0, + "default_model": "opus-mt-zh-en", + "default_scene": "general", + "capabilities": { + "deepl": { + "enabled": True, + "backend": "deepl", + "api_url": "https://api.deepl.com/v2/translate", + "timeout_sec": 10.0, + "use_cache": True, + }, + "opus-mt-zh-en": { + "enabled": True, + "backend": "local_marian", + "model_id": "dummy", + "model_dir": "dummy", + "device": "cpu", + "torch_dtype": "float32", + "batch_size": 8, + "max_input_length": 16, + "max_new_tokens": 16, + "num_beams": 1, + "use_cache": True, + }, + }, + "cache": { + "ttl_seconds": 60, + "sliding_expiration": True, + "model_quality_tiers": {"deepl": 100, "opus-mt-zh-en": 40}, + }, + } + + service = TranslationService(config) + fake_cache = _FakeCache() + fake_cache.storage[("deepl", "en", "商品标题")] = "from-deepl" + service._translation_cache = fake_cache + + out = service.translate("商品标题", target_lang="en", source_lang="zh", model="opus-mt-zh-en") + assert out == "from-deepl" + assert translate_calls == [] + assert fake_cache.get_calls == [("deepl", "en", "商品标题")] + + +def test_tiered_cache_reuses_same_tier_peer(monkeypatch): + """Model A may use cache written under model B when both share the same tier.""" + monkeypatch.setattr(TranslationCache, "_init_redis_client", staticmethod(lambda: None)) + translate_calls = [] + + def _fake_create_backend(self, *, name, backend_type, cfg): + del self, backend_type, cfg + + class _Backend: + model = name + + @property + def supports_batch(self): + return True + + def translate(self, text, target_lang, source_lang=None, scene=None): + del target_lang, source_lang, scene + translate_calls.append((name, text)) + if isinstance(text, list): + return [f"{name}:{item}" for item in text] + return f"{name}:{text}" + + return _Backend() + + monkeypatch.setattr(TranslationService, "_create_backend", _fake_create_backend) + marian_cap = { + "enabled": True, + "backend": "local_marian", + "model_id": "dummy", + "model_dir": "dummy", + "device": "cpu", + "torch_dtype": "float32", + "batch_size": 8, + "max_input_length": 16, + "max_new_tokens": 16, + "num_beams": 1, + "use_cache": True, + } + config = { + "service_url": "http://127.0.0.1:6006", + "timeout_sec": 10.0, + "default_model": "opus-mt-en-zh", + "default_scene": "general", + "capabilities": { + "opus-mt-zh-en": dict(marian_cap), + "opus-mt-en-zh": dict(marian_cap), + }, + "cache": { + "ttl_seconds": 60, + "sliding_expiration": True, + "model_quality_tiers": {"opus-mt-zh-en": 50, "opus-mt-en-zh": 50}, + }, + } + + service = TranslationService(config) + fake_cache = _FakeCache() + fake_cache.storage[("opus-mt-zh-en", "en", "hello")] = "from-zh-en" + service._translation_cache = fake_cache + + out = service.translate("hello", target_lang="en", source_lang="zh", model="opus-mt-en-zh") + assert out == "from-zh-en" + assert translate_calls == [] + assert fake_cache.get_calls == [ + ("opus-mt-en-zh", "en", "hello"), + ("opus-mt-zh-en", "en", "hello"), + ] + + +def test_tiered_cache_switch_off_uses_exact_model_only(monkeypatch): + monkeypatch.setattr(TranslationCache, "_init_redis_client", staticmethod(lambda: None)) + translate_calls = [] + + def _fake_create_backend(self, *, name, backend_type, cfg): + del self, backend_type, cfg + + class _Backend: + model = name + + @property + def supports_batch(self): + return True + + def translate(self, text, target_lang, source_lang=None, scene=None): + del target_lang, source_lang, scene + translate_calls.append((name, text)) + if isinstance(text, list): + return [f"{name}:{item}" for item in text] + return f"{name}:{text}" + + return _Backend() + + monkeypatch.setattr(TranslationService, "_create_backend", _fake_create_backend) + config = { + "service_url": "http://127.0.0.1:6006", + "timeout_sec": 10.0, + "default_model": "opus-mt-zh-en", + "default_scene": "general", + "capabilities": { + "deepl": { + "enabled": True, + "backend": "deepl", + "api_url": "https://api.deepl.com/v2/translate", + "timeout_sec": 10.0, + "use_cache": True, + }, + "opus-mt-zh-en": { + "enabled": True, + "backend": "local_marian", + "model_id": "dummy", + "model_dir": "dummy", + "device": "cpu", + "torch_dtype": "float32", + "batch_size": 8, + "max_input_length": 16, + "max_new_tokens": 16, + "num_beams": 1, + "use_cache": True, + }, + }, + "cache": { + "ttl_seconds": 60, + "sliding_expiration": True, + "enable_model_quality_tier_cache": False, + "model_quality_tiers": {"deepl": 100, "opus-mt-zh-en": 40}, + }, + } + + service = TranslationService(config) + fake_cache = _FakeCache() + fake_cache.storage[("deepl", "en", "商品标题")] = "from-deepl" + service._translation_cache = fake_cache + + out = service.translate("商品标题", target_lang="en", source_lang="zh", model="opus-mt-zh-en") + assert out == "opus-mt-zh-en:商品标题" + assert translate_calls == [("opus-mt-zh-en", "商品标题")] + assert fake_cache.get_calls == [("opus-mt-zh-en", "en", "商品标题")] diff --git a/translation/cache.py b/translation/cache.py index da292e2..23bc550 100644 --- a/translation/cache.py +++ b/translation/cache.py @@ -36,7 +36,13 @@ class TranslationCache: digest = hashlib.sha256(text.encode("utf-8")).hexdigest() return f"trans:{normalized_model}:{normalized_target_lang}:{text_prefix}{digest}" - def get(self, *, model: str, target_lang: str, source_text: str) -> Optional[str]: + def get( + self, + *, + model: str, + target_lang: str, + source_text: str + ) -> Optional[str]: if self.redis_client is None: return None key = self.build_key(model=model, target_lang=target_lang, source_text=source_text) diff --git a/translation/service.py b/translation/service.py index 456898b..354b558 100644 --- a/translation/service.py +++ b/translation/service.py @@ -3,7 +3,7 @@ from __future__ import annotations import logging -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple from config.loader import get_app_config from config.schema import AppConfig @@ -15,6 +15,7 @@ from translation.settings import ( get_translation_capability, normalize_translation_model, normalize_translation_scene, + translation_cache_probe_models, ) logger = logging.getLogger(__name__) @@ -247,7 +248,11 @@ class TranslationService: ) -> Optional[str]: if not text.strip(): return text - cached = self._translation_cache.get(model=model, target_lang=target_lang, source_text=text) + cached, _served = self._tiered_cache_get( + request_model=model, + target_lang=target_lang, + source_text=text, + ) if cached is not None: logger.info( "Translation cache served | request_type=single text_len=%s", @@ -279,6 +284,30 @@ class TranslationService: ) return translated + def _tiered_cache_get( + self, + *, + request_model: str, + target_lang: str, + source_text: str, + ) -> Tuple[Optional[str], Optional[str]]: + """Redis lookup: cache from higher-tier or **same-tier** models may satisfy A. + + Lower-tier entries are never read. Returns ``(translated, served_model)``. + """ + probe_models = translation_cache_probe_models(self.config, request_model) + + for probe_model in probe_models: + hit = self._translation_cache.get( + model=probe_model, + target_lang=target_lang, + source_text=source_text, + ) + if hit is not None: + return hit, probe_model + + return None, None + def _translate_batch_with_cache( self, *, @@ -300,8 +329,8 @@ class TranslationService: if not normalized_text.strip(): results[idx] = normalized_text continue - cached = self._translation_cache.get( - model=model, + cached, _served = self._tiered_cache_get( + request_model=model, target_lang=target_lang, source_text=normalized_text, ) diff --git a/translation/settings.py b/translation/settings.py index 0c40885..70d0fb9 100644 --- a/translation/settings.py +++ b/translation/settings.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import Any, Dict, List, Mapping, Optional +from typing import Any, Dict, List, Mapping, Optional, Tuple from translation.scenes import normalize_scene_name @@ -38,6 +38,7 @@ def build_translation_config(raw_cfg: Mapping[str, Any]) -> TranslationConfig: if not get_enabled_translation_models(config): raise ValueError("At least one translation capability must be enabled") + _validate_model_quality_tiers(config) return config @@ -86,18 +87,107 @@ def get_translation_cache(config: Mapping[str, Any]) -> Dict[str, Any]: return dict(cache) +def translation_cache_probe_models(config: Mapping[str, Any], request_model: str) -> List[str]: + """Redis cache key models to try. + + Sort order: (1) **tier** descending (higher quality first); (2) within the same tier, + the **request model** before other peers; (3) remaining ties by model name. + + For a request to model A with tier T, probes every configured model whose tier is + **greater than or equal to** T. Lower tiers are never used. + + When ``enable_model_quality_tier_cache`` is false, only the request model is probed. + + When ``model_quality_tiers`` is empty or ``request_model`` is not listed, only the + request model is probed (legacy exact-match behavior). + """ + rm = str(request_model or "").strip().lower() + cache = config.get("cache") + if not isinstance(cache, Mapping): + return [rm] + if not bool(cache.get("enable_model_quality_tier_cache", True)): + return [rm] + tiers = cache.get("model_quality_tiers") + if not isinstance(tiers, Mapping) or not tiers: + return [rm] + if rm not in tiers: + return [rm] + threshold = int(tiers[rm]) + scored: List[Tuple[int, str]] = [] + for name, tier_val in tiers.items(): + n = str(name).strip().lower() + t = int(tier_val) + if t >= threshold: + scored.append((t, n)) + scored.sort( + key=lambda item: ( + -item[0], + 0 if item[1] == rm else 1, + item[1], + ) + ) + out: List[str] = [] + seen: set[str] = set() + for _t, n in scored: + if n not in seen: + seen.add(n) + out.append(n) + return out + + def _build_cache_config(raw_cache: Any) -> Dict[str, Any]: if not isinstance(raw_cache, Mapping): raise ValueError("services.translation.cache must be a mapping") + if "enable_model_quality_tier_cache" in raw_cache: + enable_tier_cache = _require_bool( + raw_cache["enable_model_quality_tier_cache"], + "services.translation.cache.enable_model_quality_tier_cache", + ) + else: + enable_tier_cache = True return { "ttl_seconds": _require_positive_int(raw_cache.get("ttl_seconds"), "services.translation.cache.ttl_seconds"), "sliding_expiration": _require_bool( raw_cache.get("sliding_expiration"), "services.translation.cache.sliding_expiration", ), + "enable_model_quality_tier_cache": enable_tier_cache, + "model_quality_tiers": _build_model_quality_tiers(raw_cache.get("model_quality_tiers")), } +def _build_model_quality_tiers(raw: Any) -> Dict[str, int]: + if raw is None: + return {} + if not isinstance(raw, Mapping): + raise ValueError("services.translation.cache.model_quality_tiers must be a mapping") + resolved: Dict[str, int] = {} + for name, tier_val in raw.items(): + cap = _require_string(name, "services.translation.cache.model_quality_tiers key").lower() + field = f"services.translation.cache.model_quality_tiers.{cap}" + resolved[cap] = _require_non_negative_int(tier_val, field) + return resolved + + +def _validate_model_quality_tiers(config: TranslationConfig) -> None: + tiers = config["cache"].get("model_quality_tiers") + if not isinstance(tiers, Mapping) or not tiers: + return + caps = config["capabilities"] + for name in tiers: + if name not in caps: + raise ValueError( + f"services.translation.cache.model_quality_tiers references unknown capability '{name}'" + ) + + +def _require_non_negative_int(value: Any, field_name: str) -> int: + parsed = _require_int(value, field_name) + if parsed < 0: + raise ValueError(f"{field_name} must be >= 0") + return parsed + + def _build_capabilities(raw_capabilities: Any) -> Dict[str, Dict[str, Any]]: if not isinstance(raw_capabilities, Mapping): raise ValueError("services.translation.capabilities must be a mapping") -- libgit2 0.21.2