Commit 501066e10913edc8e5b5e11f7dd045b22424d1bb

Authored by tangwang
1 parent beef466f

redis 缓存 LLM结果

indexer/document_transformer.py
@@ -512,12 +512,15 @@ class SPUDocumentTransformer: @@ -512,12 +512,15 @@ class SPUDocumentTransformer:
512 "features", 512 "features",
513 ] 513 ]
514 514
  515 + tenant_id = doc.get("tenant_id")
  516 +
515 for lang in llm_langs: 517 for lang in llm_langs:
516 try: 518 try:
517 rows = analyze_products( 519 rows = analyze_products(
518 products=[{"id": spu_id, "title": title}], 520 products=[{"id": spu_id, "title": title}],
519 target_lang=lang, 521 target_lang=lang,
520 batch_size=1, 522 batch_size=1,
  523 + tenant_id=str(tenant_id),
521 ) 524 )
522 except Exception as e: 525 except Exception as e:
523 logger.warning( 526 logger.warning(
indexer/process_products.py
@@ -9,13 +9,18 @@ import os @@ -9,13 +9,18 @@ import os
9 import json 9 import json
10 import logging 10 import logging
11 import time 11 import time
  12 +import hashlib
12 from datetime import datetime 13 from datetime import datetime
13 from typing import List, Dict, Tuple, Any, Optional 14 from typing import List, Dict, Tuple, Any, Optional
  15 +
  16 +import redis
14 import requests 17 import requests
15 from pathlib import Path 18 from pathlib import Path
16 from requests.adapters import HTTPAdapter 19 from requests.adapters import HTTPAdapter
17 from urllib3.util.retry import Retry 20 from urllib3.util.retry import Retry
18 21
  22 +from config.env_config import REDIS_CONFIG
  23 +
19 # 配置 24 # 配置
20 BATCH_SIZE = 20 25 BATCH_SIZE = 20
21 API_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1" 26 API_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
@@ -51,6 +56,29 @@ logging.basicConfig( @@ -51,6 +56,29 @@ logging.basicConfig(
51 logger = logging.getLogger(__name__) 56 logger = logging.getLogger(__name__)
52 57
53 58
  59 +# Redis 缓存(用于 anchors / 语义属性)
  60 +ANCHOR_CACHE_PREFIX = REDIS_CONFIG.get("anchor_cache_prefix", "product_anchors")
  61 +ANCHOR_CACHE_EXPIRE_DAYS = int(REDIS_CONFIG.get("anchor_cache_expire_days", 30))
  62 +_anchor_redis: Optional[redis.Redis] = None
  63 +
  64 +try:
  65 + _anchor_redis = redis.Redis(
  66 + host=REDIS_CONFIG.get("host", "localhost"),
  67 + port=REDIS_CONFIG.get("port", 6479),
  68 + password=REDIS_CONFIG.get("password"),
  69 + decode_responses=True,
  70 + socket_timeout=REDIS_CONFIG.get("socket_timeout", 1),
  71 + socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1),
  72 + retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False),
  73 + health_check_interval=10,
  74 + )
  75 + _anchor_redis.ping()
  76 + logger.info("Redis cache initialized for product anchors and semantic attributes")
  77 +except Exception as e:
  78 + logger.warning(f"Failed to initialize Redis for anchors cache: {e}")
  79 + _anchor_redis = None
  80 +
  81 +
54 LANG_LABELS: Dict[str, str] = { 82 LANG_LABELS: Dict[str, str] = {
55 "zh": "中文", 83 "zh": "中文",
56 "en": "英文", 84 "en": "英文",
@@ -93,6 +121,51 @@ SYSTEM_MESSAGES: Dict[str, str] = { @@ -93,6 +121,51 @@ SYSTEM_MESSAGES: Dict[str, str] = {
93 } 121 }
94 122
95 123
  124 +def _make_anchor_cache_key(
  125 + title: str,
  126 + target_lang: str,
  127 + tenant_id: Optional[str] = None,
  128 +) -> str:
  129 + """构造 anchors/语义属性的缓存 key。"""
  130 + base = (tenant_id or "global").strip()
  131 + h = hashlib.md5(title.encode("utf-8")).hexdigest()
  132 + return f"{ANCHOR_CACHE_PREFIX}:{base}:{target_lang}:{h}"
  133 +
  134 +
  135 +def _get_cached_anchor_result(
  136 + title: str,
  137 + target_lang: str,
  138 + tenant_id: Optional[str] = None,
  139 +) -> Optional[Dict[str, Any]]:
  140 + if not _anchor_redis:
  141 + return None
  142 + try:
  143 + key = _make_anchor_cache_key(title, target_lang, tenant_id)
  144 + raw = _anchor_redis.get(key)
  145 + if not raw:
  146 + return None
  147 + return json.loads(raw)
  148 + except Exception as e:
  149 + logger.warning(f"Failed to get anchor cache: {e}")
  150 + return None
  151 +
  152 +
  153 +def _set_cached_anchor_result(
  154 + title: str,
  155 + target_lang: str,
  156 + result: Dict[str, Any],
  157 + tenant_id: Optional[str] = None,
  158 +) -> None:
  159 + if not _anchor_redis:
  160 + return
  161 + try:
  162 + key = _make_anchor_cache_key(title, target_lang, tenant_id)
  163 + ttl = ANCHOR_CACHE_EXPIRE_DAYS * 24 * 3600
  164 + _anchor_redis.setex(key, ttl, json.dumps(result, ensure_ascii=False))
  165 + except Exception as e:
  166 + logger.warning(f"Failed to set anchor cache: {e}")
  167 +
  168 +
96 def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str: 169 def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str:
97 """根据目标语言创建 LLM 提示词和表头说明。""" 170 """根据目标语言创建 LLM 提示词和表头说明。"""
98 if target_lang == "en": 171 if target_lang == "en":
@@ -537,6 +610,7 @@ def analyze_products( @@ -537,6 +610,7 @@ def analyze_products(
537 products: List[Dict[str, str]], 610 products: List[Dict[str, str]],
538 target_lang: str = "zh", 611 target_lang: str = "zh",
539 batch_size: Optional[int] = None, 612 batch_size: Optional[int] = None,
  613 + tenant_id: Optional[str] = None,
540 ) -> List[Dict[str, Any]]: 614 ) -> List[Dict[str, Any]]:
541 """ 615 """
542 库调用入口:根据输入+语言,返回锚文本及各维度信息。 616 库调用入口:根据输入+语言,返回锚文本及各维度信息。
@@ -555,6 +629,19 @@ def analyze_products( @@ -555,6 +629,19 @@ def analyze_products(
555 if not products: 629 if not products:
556 return [] 630 return []
557 631
  632 + # 简单路径:索引阶段通常 batch_size=1,这里优先做单条缓存命中
  633 + if len(products) == 1:
  634 + p = products[0]
  635 + title = str(p.get("title") or "").strip()
  636 + if title:
  637 + cached = _get_cached_anchor_result(title, target_lang, tenant_id=tenant_id)
  638 + if cached:
  639 + logger.info(
  640 + f"[analyze_products] Cache hit for title='{title[:50]}...', "
  641 + f"lang={target_lang}, tenant_id={tenant_id or 'global'}"
  642 + )
  643 + return [cached]
  644 +
558 bs = batch_size or BATCH_SIZE 645 bs = batch_size or BATCH_SIZE
559 all_results: List[Dict[str, Any]] = [] 646 all_results: List[Dict[str, Any]] = []
560 total_batches = (len(products) + bs - 1) // bs 647 total_batches = (len(products) + bs - 1) // bs
@@ -569,4 +656,18 @@ def analyze_products( @@ -569,4 +656,18 @@ def analyze_products(
569 batch_results = process_batch(batch, batch_num=batch_num, target_lang=target_lang) 656 batch_results = process_batch(batch, batch_num=batch_num, target_lang=target_lang)
570 all_results.extend(batch_results) 657 all_results.extend(batch_results)
571 658
  659 + # 写入缓存
  660 + for item in batch_results:
  661 + title_input = str(item.get("title_input") or "").strip()
  662 + if not title_input:
  663 + continue
  664 + if item.get("error"):
  665 + # 不缓存错误结果,避免放大临时故障
  666 + continue
  667 + try:
  668 + _set_cached_anchor_result(title_input, target_lang, item, tenant_id=tenant_id)
  669 + except Exception:
  670 + # 已在内部记录 warning
  671 + pass
  672 +
572 return all_results 673 return all_results