From 501066e10913edc8e5b5e11f7dd045b22424d1bb Mon Sep 17 00:00:00 2001 From: tangwang Date: Tue, 3 Mar 2026 19:26:51 +0800 Subject: [PATCH] redis 缓存 LLM结果 --- indexer/document_transformer.py | 3 +++ indexer/process_products.py | 101 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 0 deletions(-) diff --git a/indexer/document_transformer.py b/indexer/document_transformer.py index 9522842..0b9059c 100644 --- a/indexer/document_transformer.py +++ b/indexer/document_transformer.py @@ -512,12 +512,15 @@ class SPUDocumentTransformer: "features", ] + tenant_id = doc.get("tenant_id") + for lang in llm_langs: try: rows = analyze_products( products=[{"id": spu_id, "title": title}], target_lang=lang, batch_size=1, + tenant_id=str(tenant_id), ) except Exception as e: logger.warning( diff --git a/indexer/process_products.py b/indexer/process_products.py index 6033113..d9ca462 100644 --- a/indexer/process_products.py +++ b/indexer/process_products.py @@ -9,13 +9,18 @@ import os import json import logging import time +import hashlib from datetime import datetime from typing import List, Dict, Tuple, Any, Optional + +import redis import requests from pathlib import Path from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry +from config.env_config import REDIS_CONFIG + # 配置 BATCH_SIZE = 20 API_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1" @@ -51,6 +56,29 @@ logging.basicConfig( logger = logging.getLogger(__name__) +# Redis 缓存(用于 anchors / 语义属性) +ANCHOR_CACHE_PREFIX = REDIS_CONFIG.get("anchor_cache_prefix", "product_anchors") +ANCHOR_CACHE_EXPIRE_DAYS = int(REDIS_CONFIG.get("anchor_cache_expire_days", 30)) +_anchor_redis: Optional[redis.Redis] = None + +try: + _anchor_redis = redis.Redis( + host=REDIS_CONFIG.get("host", "localhost"), + port=REDIS_CONFIG.get("port", 6479), + password=REDIS_CONFIG.get("password"), + decode_responses=True, + socket_timeout=REDIS_CONFIG.get("socket_timeout", 1), + socket_connect_timeout=REDIS_CONFIG.get("socket_connect_timeout", 1), + retry_on_timeout=REDIS_CONFIG.get("retry_on_timeout", False), + health_check_interval=10, + ) + _anchor_redis.ping() + logger.info("Redis cache initialized for product anchors and semantic attributes") +except Exception as e: + logger.warning(f"Failed to initialize Redis for anchors cache: {e}") + _anchor_redis = None + + LANG_LABELS: Dict[str, str] = { "zh": "中文", "en": "英文", @@ -93,6 +121,51 @@ SYSTEM_MESSAGES: Dict[str, str] = { } +def _make_anchor_cache_key( + title: str, + target_lang: str, + tenant_id: Optional[str] = None, +) -> str: + """构造 anchors/语义属性的缓存 key。""" + base = (tenant_id or "global").strip() + h = hashlib.md5(title.encode("utf-8")).hexdigest() + return f"{ANCHOR_CACHE_PREFIX}:{base}:{target_lang}:{h}" + + +def _get_cached_anchor_result( + title: str, + target_lang: str, + tenant_id: Optional[str] = None, +) -> Optional[Dict[str, Any]]: + if not _anchor_redis: + return None + try: + key = _make_anchor_cache_key(title, target_lang, tenant_id) + raw = _anchor_redis.get(key) + if not raw: + return None + return json.loads(raw) + except Exception as e: + logger.warning(f"Failed to get anchor cache: {e}") + return None + + +def _set_cached_anchor_result( + title: str, + target_lang: str, + result: Dict[str, Any], + tenant_id: Optional[str] = None, +) -> None: + if not _anchor_redis: + return + try: + key = _make_anchor_cache_key(title, target_lang, tenant_id) + ttl = ANCHOR_CACHE_EXPIRE_DAYS * 24 * 3600 + _anchor_redis.setex(key, ttl, json.dumps(result, ensure_ascii=False)) + except Exception as e: + logger.warning(f"Failed to set anchor cache: {e}") + + def create_prompt(products: List[Dict[str, str]], target_lang: str = "zh") -> str: """根据目标语言创建 LLM 提示词和表头说明。""" if target_lang == "en": @@ -537,6 +610,7 @@ def analyze_products( products: List[Dict[str, str]], target_lang: str = "zh", batch_size: Optional[int] = None, + tenant_id: Optional[str] = None, ) -> List[Dict[str, Any]]: """ 库调用入口:根据输入+语言,返回锚文本及各维度信息。 @@ -555,6 +629,19 @@ def analyze_products( if not products: return [] + # 简单路径:索引阶段通常 batch_size=1,这里优先做单条缓存命中 + if len(products) == 1: + p = products[0] + title = str(p.get("title") or "").strip() + if title: + cached = _get_cached_anchor_result(title, target_lang, tenant_id=tenant_id) + if cached: + logger.info( + f"[analyze_products] Cache hit for title='{title[:50]}...', " + f"lang={target_lang}, tenant_id={tenant_id or 'global'}" + ) + return [cached] + bs = batch_size or BATCH_SIZE all_results: List[Dict[str, Any]] = [] total_batches = (len(products) + bs - 1) // bs @@ -569,4 +656,18 @@ def analyze_products( batch_results = process_batch(batch, batch_num=batch_num, target_lang=target_lang) all_results.extend(batch_results) + # 写入缓存 + for item in batch_results: + title_input = str(item.get("title_input") or "").strip() + if not title_input: + continue + if item.get("error"): + # 不缓存错误结果,避免放大临时故障 + continue + try: + _set_cached_anchor_result(title_input, target_lang, item, tenant_id=tenant_id) + except Exception: + # 已在内部记录 warning + pass + return all_results -- libgit2 0.21.2