""" Search Tools for Product Discovery - search_products is created via make_search_products_tool(session_id, registry). - After search API, an LLM labels each result as Relevant / Partially Relevant / Irrelevant; we count and store the curated list in the registry, return [SEARCH_REF:ref_id] + quality counts + top10 titles. """ import base64 import json import logging import os from pathlib import Path from typing import Optional import requests from langchain_core.tools import tool from openai import OpenAI from app.config import settings from app.search_registry import ( ProductItem, SearchResult, SearchResultRegistry, global_registry, new_ref_id, ) logger = logging.getLogger(__name__) _openai_client: Optional[OpenAI] = None def _normalize_image_url(url: Optional[str]) -> Optional[str]: """Normalize image_url from API (e.g. ////cnres.appracle.com/... → https://cnres.appracle.com/...).""" if not url or not isinstance(url, str): return None url = url.strip() if not url: return None if url.startswith("https://") or url.startswith("http://"): return url # // or ////host/path → https://host/path (exactly one "//" after scheme) if url.startswith("/"): return "https://" + url.lstrip("/") return "https://" + url def get_openai_client() -> OpenAI: global _openai_client if _openai_client is None: kwargs = {"api_key": settings.openai_api_key} if settings.openai_api_base_url: kwargs["base_url"] = settings.openai_api_base_url _openai_client = OpenAI(**kwargs) return _openai_client # ── LLM quality assessment ───────────────────────────────────────────────────── def _assess_search_quality(query: str, raw_products: list) -> tuple[list[str], str]: """ Use LLM to label each search result and write a short quality_summary. Returns (labels, quality_summary). labels: one per product; quality_summary: 1–2 sentences. """ n = len(raw_products) if n == 0: return [], "" lines = [] for i, p in enumerate(raw_products, 1): title = (p.get("title") or "")[:60] lines.append(f"{i}. {title}") product_text = "\n".join(lines) prompt = f"""评估以下搜索结果与用户查询的匹配程度,完成两件事: 1. 为每条结果打一个等级:Relevant / Partially Relevant / Irrelevant。 2. 写一段 quality_summary(1–2 句话):简要说明搜索结果主要包含哪些商品、是否基本满足搜索意图、整体匹配度如何。 用户查询:{query} 搜索结果(共 {n} 条): {product_text} 等级说明:Relevant=完全符合查询意图;Partially Relevant=基本相关(如品类等主需求匹配但部分属性不完全符合);Irrelevant=不相关。 请严格按以下 JSON 输出,仅输出 JSON,无其他内容: {{"labels": ["Relevant", "Partially Relevant", "Irrelevant", ...], "quality_summary": "你的1-2句总结"}} labels 数组长度必须等于 {n}。""" try: client = get_openai_client() resp = client.chat.completions.create( model=settings.openai_model, messages=[{"role": "user", "content": prompt}], max_tokens=1200, temperature=0.1, ) raw = resp.choices[0].message.content.strip() if raw.startswith("```"): raw = raw.split("```")[1] if raw.startswith("json"): raw = raw[4:] raw = raw.strip() data = json.loads(raw) labels = data.get("labels", []) valid = {"Relevant", "Partially Relevant", "Irrelevant"} labels = [l if l in valid else "Partially Relevant" for l in labels] while len(labels) < n: labels.append("Partially Relevant") quality_summary = (data.get("quality_summary") or "").strip() or "" return labels[:n], quality_summary except Exception as e: logger.warning(f"Quality assessment failed: {e}; using fallback.") return ["Partially Relevant"] * n, "" # ── Tool factory ─────────────────────────────────────────────────────────────── def make_search_products_tool( session_id: str, registry: SearchResultRegistry, ): """ Return a search_products tool bound to a specific session and registry. The tool: 1. Calls the product search API. 2. Runs LLM quality assessment on up to 20 results. 3. Stores a SearchResult in the registry. 4. Returns a concise quality summary + [SEARCH_REF:ref_id]. """ @tool def search_products(query: str, limit: int = 20) -> str: """搜索商品库并做质量评估:LLM 为每条结果打等级(Relevant / Partially Relevant / Irrelevant),返回引用与 top10 标题。 Args: query: 自然语言商品描述 limit: 最多返回条数(1-20) Returns: 【搜索完成】+ 结果引用 [SEARCH_REF:ref_id] + 质量情况(评估条数、Relevant/Partially Relevant 数)+ results list(top10 标题) """ try: logger.info(f"[{session_id}] search_products: query={query!r} limit={limit}") url = f"{settings.search_api_base_url.rstrip('/')}/search/" headers = { "Content-Type": "application/json", "X-Tenant-ID": settings.search_api_tenant_id, } payload = { "query": query, "size": min(max(limit, 1), 20), "from": 0, "language": "zh", "enable_rerank": True, "rerank_query_template": query, "rerank_doc_template": "{title}", } resp = requests.post(url, json=payload, headers=headers, timeout=60) if resp.status_code != 200: logger.error(f"Search API error {resp.status_code}: {resp.text[:300]}") return f"搜索失败:API 返回状态码 {resp.status_code},请稍后重试。" data = resp.json() raw_results: list = data.get("results", []) total_hits: int = data.get("total", 0) if not raw_results: return ( f"【搜索完成】query='{query}'\n" "未找到匹配商品,建议换用更宽泛或不同角度的关键词重新搜索。" ) labels, quality_summary = _assess_search_quality(query, raw_results) perfect_count = sum(1 for l in labels if l == "Relevant") partial_count = sum(1 for l in labels if l == "Partially Relevant") irrelevant_count = len(labels) - perfect_count - partial_count products: list[ProductItem] = [] for raw, label in zip(raw_results, labels): if label not in ("Relevant", "Partially Relevant"): continue products.append( ProductItem( spu_id=str(raw.get("spu_id", "")), title=raw.get("title") or "", price=raw.get("price"), category_path=( raw.get("category_path") or raw.get("category_name") ), vendor=raw.get("vendor"), image_url=_normalize_image_url(raw.get("image_url")), relevance_score=raw.get("relevance_score"), match_label=label, tags=raw.get("tags") or [], specifications=raw.get("specifications") or [], ) ) ref_id = new_ref_id() result = SearchResult( ref_id=ref_id, query=query, total_api_hits=total_hits, returned_count=len(raw_results), perfect_count=perfect_count, partial_count=partial_count, irrelevant_count=irrelevant_count, quality_summary=quality_summary, products=products, ) registry.register(session_id, result) assessed_n = len(raw_results) logger.info( "[%s] Registered %s: query=%s assessed=%s perfect=%s partial=%s", session_id, ref_id, query, assessed_n, perfect_count, partial_count, ) top10_titles = [ (raw.get("title") or "未知")[:80] for raw in raw_results[:10] ] results_list = "\n".join(f"{i}. {t}" for i, t in enumerate(top10_titles, 1)) return ( f"【搜索完成】query='{query}'\n" f"结果引用:[SEARCH_REF:{ref_id}]\n" f"搜索结果质量情况:评估总条数{assessed_n}条,Relevant {perfect_count} 条,Partially Relevant {partial_count} 条。\n" f"results list:\n{results_list}" ) except requests.exceptions.RequestException as e: logger.error(f"[{session_id}] Search network error: {e}", exc_info=True) return f"搜索失败(网络错误):{e}" except Exception as e: logger.error(f"[{session_id}] Search error: {e}", exc_info=True) return f"搜索失败:{e}" return search_products # ── Standalone tools (no session binding needed) ─────────────────────────────── @tool def web_search(query: str) -> str: """使用 Tavily 进行通用 Web 搜索,补充外部/实时知识。 触发场景: - 需要**外部知识**:流行趋势、品牌、搭配文化、节日习俗等 - 需要**实时/及时信息**:所有与天气相关的问题、当季流行元素、某地近期或者未来的事件、所有依赖当前时间相关的信息 - 需要**宏观参考**:不同场合/国家的穿着建议、选购攻略 Args: query: 要搜索的问题,自然语言描述 Returns: 总结后的回答 + 若干参考来源链接 """ try: api_key = os.getenv("TAVILY_API_KEY") if not api_key: return ( "无法调用外部 Web 搜索:未检测到 TAVILY_API_KEY 环境变量。\n" "请在运行环境中配置 TAVILY_API_KEY 后再重试。" ) logger.info(f"web_search: {query!r}") url = "https://api.tavily.com/search" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } payload = { "query": query, "search_depth": "advanced", "include_answer": True, } response = requests.post(url, json=payload, headers=headers, timeout=60) if response.status_code != 200: return f"调用外部 Web 搜索失败:Tavily 返回状态码 {response.status_code}" data = response.json() answer = data.get("answer") or "(Tavily 未返回直接回答,仅返回了搜索结果。)" results = data.get("results") or [] output_lines = [ "【外部 Web 搜索结果(Tavily)】", "", "回答摘要:", answer.strip(), ] if results: output_lines.append("") output_lines.append("参考来源(部分):") for idx, item in enumerate(results[:5], 1): title = item.get("title") or "无标题" link = item.get("url") or "" output_lines.append(f"{idx}. {title}") if link: output_lines.append(f" 链接: {link}") return "\n".join(output_lines).strip() except requests.exceptions.RequestException as e: logger.error("web_search network error: %s", e, exc_info=True) return f"调用外部 Web 搜索失败(网络错误):{e}" except Exception as e: logger.error("web_search error: %s", e, exc_info=True) return f"调用外部 Web 搜索失败:{e}" @tool def analyze_image_style(image_path: str) -> str: """分析用户上传的商品图片,提取视觉风格属性,用于后续商品搜索。 适用场景: - 用户上传图片,想找相似商品 - 需要理解图片中商品的风格、颜色、材质等属性 Args: image_path: 图片文件路径 Returns: 商品视觉属性的详细文字描述,可直接作为 search_products 的 query """ try: logger.info(f"analyze_image_style: {image_path!r}") img_path = Path(image_path) if not img_path.exists(): return f"错误:图片文件不存在:{image_path}" with open(img_path, "rb") as f: image_data = base64.b64encode(f.read()).decode("utf-8") prompt = """请分析这张商品图片,提供详细的视觉属性描述,用于商品搜索。 请包含: - 商品类型(如:连衣裙、运动鞋、双肩包、西装等) - 主要颜色 - 风格定位(如:休闲、正式、运动、复古、现代简约等) - 图案/纹理(如:纯色、条纹、格纹、碎花、几何图案等) - 关键设计特征(如:领型、袖长、版型、材质外观等) - 适用场合(如:办公、户外、度假、聚会、运动等) 输出格式:3-4句自然语言描述,可直接用作搜索关键词。""" client = get_openai_client() response = client.chat.completions.create( model=settings.openai_vision_model, messages=[ { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_data}", "detail": "high", }, }, ], } ], max_tokens=800, temperature=0.3, ) analysis = response.choices[0].message.content.strip() logger.info("Image analysis completed.") return analysis except Exception as e: logger.error(f"analyze_image_style error: {e}", exc_info=True) return f"图片分析失败:{e}" # ── Tool list factory ────────────────────────────────────────────────────────── def get_all_tools( session_id: str = "default", registry: Optional[SearchResultRegistry] = None, ) -> list: """ Return all agent tools. search_products is session-bound (factory); other tools are stateless. """ if registry is None: registry = global_registry return [ make_search_products_tool(session_id, registry), analyze_image_style, web_search, ]