Blame view

app/agents/shopping_agent.py 29.4 KB
e7f2b240   tangwang   first commit
1
2
  """
  Conversational Shopping Agent with LangGraph
66442668   tangwang   feat: 搜索结果引用与并行搜索...
3
4
5
6
  
  Architecture:
  - ReAct-style agent: plan  search  evaluate  re-plan or respond
  - search_products is session-bound, writing curated results to SearchResultRegistry
897b5ca9   tangwang   perf: 前端性能优化 + 搜索...
7
  - Final AI message references results via [SEARCH_RESULTS_REF:ref_id] tokens instead of
66442668   tangwang   feat: 搜索结果引用与并行搜索...
8
    re-listing product details; the UI renders product cards from the registry
e7f2b240   tangwang   first commit
9
10
  """
  
825828c4   tangwang   fix: search image...
11
  import json
e7f2b240   tangwang   first commit
12
  import logging
621b6925   tangwang   up
13
  import re
363578ca   tangwang   **feat: robust th...
14
  from urllib.parse import urlparse
621b6925   tangwang   up
15
  from datetime import datetime
e7f2b240   tangwang   first commit
16
  from pathlib import Path
8dcb1dc0   tangwang   feat: stream thin...
17
  from typing import Any, Iterator, Optional, Sequence
e7f2b240   tangwang   first commit
18
19
  
  from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
363578ca   tangwang   **feat: robust th...
20
  from langchain_core.outputs import ChatResult
e7f2b240   tangwang   first commit
21
22
23
24
25
26
27
28
  from langchain_openai import ChatOpenAI
  from langgraph.checkpoint.memory import MemorySaver
  from langgraph.graph import END, START, StateGraph
  from langgraph.graph.message import add_messages
  from langgraph.prebuilt import ToolNode
  from typing_extensions import Annotated, TypedDict
  
  from app.config import settings
66442668   tangwang   feat: 搜索结果引用与并行搜索...
29
  from app.search_registry import global_registry
e7f2b240   tangwang   first commit
30
31
32
33
  from app.tools.search_tools import get_all_tools
  
  logger = logging.getLogger(__name__)
  
66442668   tangwang   feat: 搜索结果引用与并行搜索...
34
35
36
37
38
  # ── System prompt ──────────────────────────────────────────────────────────────
  # Universal: works for any e-commerce vertical (fashion, electronics, home, etc.)
  # Key design decisions:
  #   1. Guides multi-query search planning with explicit evaluate-and-decide loop
  #   2. Forbids re-listing product details in the final response
897b5ca9   tangwang   perf: 前端性能优化 + 搜索...
39
  #   3. Mandates [SEARCH_RESULTS_REF:ref_id] inline citation as the only product presentation mechanism
187f3a6a   tangwang   prompt
40
41
42
43
44
  SYSTEM_PROMPT = f"""  角色定义
    你是我们店铺的一名专业的电商导购,是一个善于倾听、主动引导、懂得搭配的“时尚顾问”,通过有温度的对话,给用户提供有价值的信息,包括需求引导、方案推荐、搜索结果推荐,最终促成满意的购物决策或转化行为。
    作为我们店铺的一名专业的销售,除了本店铺的商品的推荐,你可以给用户提供有帮助的信息,但是不要虚构商品、提供本商店搜索结果以外的商品。
    
    一些原则:
621b6925   tangwang   up
45
  1. 价值提供与信息收集的原则:
187f3a6a   tangwang   prompt
46
    1. 兼顾价值提供和需求澄清:适时的提供有价值的信息,如商品推荐、穿搭建议、趋势信息,在推荐方向上有需求缺口、需要明确的重要信息时,要适时的做“信息收集”,引导式的澄清需求、提高商品发现的效率,形成“提供-反馈”的良性循环。
8dcb1dc0   tangwang   feat: stream thin...
47
    2. 意图判断-缺口大:当无法从对话中确定关键变量(如使用对象不明确、无法判断男性或女性使用、品类细分不清等)时,从“使用对象”、“品类细分”、“使用场景”、“风格效果”等高层意图维度切入,提供方向性选项 + 13个关键问题,引导用户做选择(以下仅提供参考思路,具体话术不要照搬):
187f3a6a   tangwang   prompt
48
49
50
      1. 人群不明确时(如果从对话中无法确认用使用人群,比如搜索意图是男女都可以消费的品类比如T恤、裤子):男款、女款,还是中性风都可以?
      2. 确定是女性、但是风格不明确时:你想穿出哪种感觉?职场干练 松弛自在 活力元气 温柔知性
      3. 使用场景不明确时:平时通勤场合多吗?还是更喜欢生活化穿搭?
8dcb1dc0   tangwang   feat: stream thin...
51
      4. 如上此类,存在大的需求缺口,则务必先问清楚,直接提问即可,而不是擅自假设、调用工具、擅自搜索商品和推荐商品。
621b6925   tangwang   up
52
53
    3. 意图判断-缺口小:直接检索+方案呈现,根据情况,可以考虑该方向下重要的决策因素(思考哪些维度最可能影响推荐结果),进行提议和问题收集,让用户既得到相关信息、又得到下一步的方向引导、同时也有机会修正或者细化诉求。
    4. 选项驱动式澄清:推荐几个清晰的方向,呈现方案或商品搜索结果,再做澄清
187f3a6a   tangwang   prompt
54
    5. 单轮对话最好只提1-2个问题。
621b6925   tangwang   up
55
    6. 站在用户立场思考:比如询问用户期待的效果或感觉、使用的场合、想解决的问题,而不是询问具体的款式、参数,你需要将用户表达的需求翻译为具体可检索的商品特征(版型、材质、设计元素、风格标签等),并据此筛选商品、组织推荐逻辑。
897b5ca9   tangwang   perf: 前端性能优化 + 搜索...
56
  2. 如何使用search_products
8dcb1dc0   tangwang   feat: stream thin...
57
    1. 在需要搜索商品的时候,可以将需求分解为 2-4 query,每个 query 聚焦一个明确的商品子类或搜索角度,每个query对应一个工具调用。
621b6925   tangwang   up
58
    2. 可以根据搜索结果调整搜索策略:每次调用 search_products 后,工具会返回搜索结果的相关性的判断、以及搜索结果的topNtitle,你需要决策是否要调整搜索策略,比如结果质量太差,可能需要调整搜索词、或者加大试探的query数量(不要超过3-5个)。结果太差的原因有可能是你生成的query不合理、请根据你看到的商品名称的构成组织搜索关键词。
897b5ca9   tangwang   perf: 前端性能优化 + 搜索...
59
60
61
  3. 在最终回复中使用 [SEARCH_RESULTS_REF:ref_id] 内联引用搜索结果:
    1. 搜索工具会返回一个结果引用标识[SEARCH_RESULTS_REF:ref_id],撰写最终答复的时候请直接引用 [SEARCH_RESULTS_REF:ref_id] ,系统会自动在该位置渲染对应的商品卡片列表,无需复述搜索结果。
    2. 因为系统会自动将[SEARCH_RESULTS_REF:ref_id]渲染为搜索结果,所以[SEARCH_RESULTS_REF:ref_id]必须独占一行,且只在需要渲染该query完整的搜索结果时才进行引用,同一个结果不要重复引用。
8dcb1dc0   tangwang   feat: stream thin...
62
  4. 今天是{datetime.now().strftime("%Y-%m-%d")},所有与当前时间(比如天气、最新或即将发生的事件)相关的问题,都要使用web_search工具)。
66442668   tangwang   feat: 搜索结果引用与并行搜索...
63
64
65
66
67
68
69
70
71
72
73
  """
  
  
  # ── Agent state ────────────────────────────────────────────────────────────────
  
  class AgentState(TypedDict):
      messages: Annotated[Sequence[BaseMessage], add_messages]
      current_image_path: Optional[str]
  
  
  # ── Helper ─────────────────────────────────────────────────────────────────────
e7f2b240   tangwang   first commit
74
  
825828c4   tangwang   fix: search image...
75
76
77
78
79
  # Max length for logging single content field (avoid huge logs)
  _LOG_CONTENT_MAX = 8000
  _LOG_TOOL_RESULT_MAX = 4000
  
  
e7f2b240   tangwang   first commit
80
  def _extract_message_text(msg) -> str:
66442668   tangwang   feat: 搜索结果引用与并行搜索...
81
      """Extract plain text from a LangChain message (handles str or content_blocks)."""
e7f2b240   tangwang   first commit
82
83
84
85
86
87
88
      content = getattr(msg, "content", "")
      if isinstance(content, str):
          return content
      if isinstance(content, list):
          parts = []
          for block in content:
              if isinstance(block, dict):
66442668   tangwang   feat: 搜索结果引用与并行搜索...
89
                  parts.append(block.get("text") or block.get("content") or "")
e7f2b240   tangwang   first commit
90
91
92
93
94
95
              else:
                  parts.append(str(block))
          return "".join(str(p) for p in parts)
      return str(content) if content else ""
  
  
621b6925   tangwang   up
96
97
  # 部分 API(如 DeepSeek)在 content 中返回 think 标签块,需去掉后只保留正式回复
  _RE_THINK_TAGS = re.compile(r"<think>.*?<\/think>", re.DOTALL | re.IGNORECASE)
363578ca   tangwang   **feat: robust th...
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
  # 仅提取 <think> 标签内正文(用于日志打印 thinking)
  _RE_THINK_INNER = re.compile(r"<think>(.*?)<\/think>", re.DOTALL | re.IGNORECASE)
  
  
  def _normalize_base_url(base_url: Optional[str]) -> str:
      return (base_url or "").strip().rstrip("/")
  
  
  def _is_openai_official_base_url(base_url: Optional[str]) -> bool:
      normalized = _normalize_base_url(base_url)
      if not normalized:
          return False
      hostname = (urlparse(normalized).hostname or "").lower()
      return hostname.endswith("api.openai.com")
  
  
  def _is_dashscope_base_url(base_url: Optional[str]) -> bool:
      normalized = _normalize_base_url(base_url)
      if not normalized:
          return False
      hostname = (urlparse(normalized).hostname or "").lower()
      return "dashscope" in hostname
  
  
  def _coerce_reasoning_text(value: Any) -> str:
      """Best-effort conversion from reasoning payload to plain text."""
      if value is None:
          return ""
      if isinstance(value, str):
          return value.strip()
      if isinstance(value, dict):
          parts: list[str] = []
          for key in ("content", "summary", "text", "reasoning_content"):
              item = value.get(key)
              if isinstance(item, str) and item.strip():
                  parts.append(item.strip())
              elif isinstance(item, list):
                  for sub in item:
                      s = _coerce_reasoning_text(sub)
                      if s:
                          parts.append(s)
          if parts:
              return "\n".join(parts).strip()
          try:
              return json.dumps(value, ensure_ascii=False)
          except Exception:
              return str(value).strip()
      if isinstance(value, list):
          parts = [_coerce_reasoning_text(v) for v in value]
          joined = "\n".join(p for p in parts if p)
          if joined:
              return joined.strip()
          try:
              return json.dumps(value, ensure_ascii=False)
          except Exception:
              return str(value).strip()
      return str(value).strip()
621b6925   tangwang   up
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
  
  
  def _extract_formal_reply(msg) -> str:
      """
      只截取大模型回复中的「正式结果」,去掉 thinking/reasoning 内容。
      -  content  list(如 Responses API):只取 type  output_text/text 的块,跳过 reasoning
      -  content  str:去掉 think 标签及其内容。
      """
      content = getattr(msg, "content", "")
      if isinstance(content, list):
          parts = []
          for block in content:
              if not isinstance(block, dict):
                  continue
              block_type = (block.get("type") or "").lower()
              if block_type in ("reasoning",):
                  continue
              text = block.get("text") or block.get("content") or ""
              if text:
                  parts.append(text)
          return "".join(str(p) for p in parts).strip()
      if isinstance(content, str):
          return _RE_THINK_TAGS.sub("", content).strip()
      return str(content).strip() if content else ""
  
  
363578ca   tangwang   **feat: robust th...
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
  def _extract_thinking(msg) -> str:
      """提取大模型回复中的 thinking/reasoning 内容(仅用于日志)。"""
      kwargs = getattr(msg, "additional_kwargs", None) or {}
      # DashScope 等兼容接口返回的 reasoning_content(由 ChatOpenAIWithReasoningContent 注入)
      rc = _coerce_reasoning_text(kwargs.get("reasoning_content"))
      if rc:
          return rc
      # Responses API 等返回的 reasoning 字段
      reasoning = _coerce_reasoning_text(kwargs.get("reasoning"))
      if reasoning:
          return reasoning
      content = getattr(msg, "content", "")
      if isinstance(content, list):
          parts = []
          for block in content:
              if not isinstance(block, dict):
                  continue
              block_type = (block.get("type") or "").lower()
              if block_type not in ("reasoning", "reasoning_content", "thinking"):
                  continue
              text = _coerce_reasoning_text(block.get("text") or block.get("content") or block)
              if text:
                  parts.append(text)
          if parts:
              return "".join(str(p) for p in parts).strip()
      if isinstance(content, str):
          m = _RE_THINK_INNER.search(content)
          if m:
              return m.group(1).strip()
      return ""
  
  
  def _message_for_log(msg: BaseMessage, include_thinking: bool = False) -> dict:
825828c4   tangwang   fix: search image...
214
      """Serialize a message for structured logging (content truncated)."""
363578ca   tangwang   **feat: robust th...
215
216
      msg_kwargs = getattr(msg, "additional_kwargs", None) or {}
      if msg_kwargs and any(k in msg_kwargs for k in ("reasoning", "reasoning_content")):
621b6925   tangwang   up
217
218
219
          text = _extract_formal_reply(msg) or _extract_message_text(msg)
      else:
          text = _extract_message_text(msg)
825828c4   tangwang   fix: search image...
220
221
222
223
224
225
      if len(text) > _LOG_CONTENT_MAX:
          text = text[:_LOG_CONTENT_MAX] + f"... [truncated, total {len(text)} chars]"
      out: dict[str, Any] = {
          "type": getattr(msg, "type", "unknown"),
          "content": text,
      }
363578ca   tangwang   **feat: robust th...
226
227
228
229
230
231
      if include_thinking:
          thinking = _extract_thinking(msg)
          if thinking:
              if len(thinking) > _LOG_CONTENT_MAX:
                  thinking = thinking[:_LOG_CONTENT_MAX] + f"... [truncated, total {len(thinking)} chars]"
              out["thinking"] = thinking
825828c4   tangwang   fix: search image...
232
233
234
235
236
237
238
239
      if hasattr(msg, "tool_calls") and msg.tool_calls:
          out["tool_calls"] = [
              {"name": tc.get("name"), "args": tc.get("args", {})}
              for tc in msg.tool_calls
          ]
      return out
  
  
8dcb1dc0   tangwang   feat: stream thin...
240
241
242
243
244
245
246
247
  def _iter_text_chunks(text: str, chunk_size: int = 24) -> Iterator[str]:
      """Yield text in small chunks for UI-friendly streaming."""
      if not text:
          return
      for i in range(0, len(text), chunk_size):
          yield text[i : i + chunk_size]
  
  
363578ca   tangwang   **feat: robust th...
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
  # ── DashScope thinking 支持 ─────────────────────────────────────────────────────
  # LangChain 解析 chat completion 时不会把 API 返回的 reasoning_content 写入 message,
  # 子类在 _create_chat_result 中把 reasoning_content 注入到 additional_kwargs,便于日志打印。
  
  class ChatOpenAIWithReasoningContent(ChatOpenAI):
      """ChatOpenAI 子类:将 API 返回的 reasoning_content 注入到 message.additional_kwargs。"""
  
      def _create_chat_result(
          self,
          response: Any,
          generation_info: Optional[dict] = None,
      ) -> ChatResult:
          result = super()._create_chat_result(response, generation_info)
          if isinstance(response, dict):
              response_dict = response
          else:
              response_dict = getattr(response, "model_dump", None)
              response_dict = response_dict() if callable(response_dict) else {}
          if not response_dict:
              return result
          choices = response_dict.get("choices") or []
          for i, res in enumerate(choices):
              if i >= len(result.generations):
                  break
              msg_dict = res.get("message") or {}
              if isinstance(msg_dict, dict) and "reasoning_content" in msg_dict:
                  rc = msg_dict["reasoning_content"]
                  if rc and isinstance(result.generations[i].message, BaseMessage):
                      result.generations[i].message.additional_kwargs["reasoning_content"] = rc
          return result
  
  
66442668   tangwang   feat: 搜索结果引用与并行搜索...
280
  # ── Agent class ────────────────────────────────────────────────────────────────
e7f2b240   tangwang   first commit
281
  
e7f2b240   tangwang   first commit
282
  class ShoppingAgent:
66442668   tangwang   feat: 搜索结果引用与并行搜索...
283
      """ReAct shopping agent with search-evaluate-decide loop and registry-based result referencing."""
e7f2b240   tangwang   first commit
284
285
286
287
  
      def __init__(self, session_id: Optional[str] = None):
          self.session_id = session_id or "default"
  
621b6925   tangwang   up
288
          llm_kwargs: dict[str, Any] = dict(
e7f2b240   tangwang   first commit
289
290
291
292
              model=settings.openai_model,
              temperature=settings.openai_temperature,
              api_key=settings.openai_api_key,
          )
363578ca   tangwang   **feat: robust th...
293
294
295
296
297
298
299
          base_url = _normalize_base_url(settings.openai_api_base_url)
          if base_url:
              llm_kwargs["base_url"] = base_url
  
          use_reasoning = getattr(settings, "openai_use_reasoning", False)
          if use_reasoning and (not base_url or _is_openai_official_base_url(base_url)):
              # OpenAI 官方 endpoint:使用 Responses API 的 reasoning 参数。
621b6925   tangwang   up
300
301
302
              llm_kwargs["use_responses_api"] = True
              effort = getattr(settings, "openai_reasoning_effort", "medium") or "medium"
              llm_kwargs["model_kwargs"] = {"reasoning": {"effort": effort, "summary": "none"}}
363578ca   tangwang   **feat: robust th...
303
304
305
306
307
308
309
310
311
312
          elif use_reasoning and _is_dashscope_base_url(base_url):
              # DashScope 兼容 endpoint:通过 extra_body 开启思考,返回 reasoning_content。
              extra = llm_kwargs.get("extra_body") or {}
              llm_kwargs["extra_body"] = {**extra, "enable_thinking": True}
          elif use_reasoning and base_url:
              logger.info(
                  "Reasoning requested but base_url is non-OpenAI/non-DashScope; "
                  "skipping provider-specific reasoning params. base_url=%s",
                  base_url,
              )
bad17b15   tangwang   调通baseline
313
  
363578ca   tangwang   **feat: robust th...
314
315
316
317
318
319
          llm_class = (
              ChatOpenAIWithReasoningContent
              if base_url and not _is_openai_official_base_url(base_url)
              else ChatOpenAI
          )
          self.llm = llm_class(**llm_kwargs)
e7f2b240   tangwang   first commit
320
  
66442668   tangwang   feat: 搜索结果引用与并行搜索...
321
322
          # Tools are session-bound so search_products writes to the right registry partition
          self.tools = get_all_tools(session_id=self.session_id, registry=global_registry)
e7f2b240   tangwang   first commit
323
324
          self.llm_with_tools = self.llm.bind_tools(self.tools)
  
e7f2b240   tangwang   first commit
325
          self.graph = self._build_graph()
66442668   tangwang   feat: 搜索结果引用与并行搜索...
326
          logger.info(f"ShoppingAgent ready — session={self.session_id}")
e7f2b240   tangwang   first commit
327
328
  
      def _build_graph(self):
e7f2b240   tangwang   first commit
329
          def agent_node(state: AgentState):
e7f2b240   tangwang   first commit
330
              messages = state["messages"]
e7f2b240   tangwang   first commit
331
              if not any(isinstance(m, SystemMessage) for m in messages):
66442668   tangwang   feat: 搜索结果引用与并行搜索...
332
                  messages = [SystemMessage(content=SYSTEM_PROMPT)] + list(messages)
825828c4   tangwang   fix: search image...
333
334
335
336
337
              request_log = [_message_for_log(m) for m in messages]
              req_json = json.dumps(request_log, ensure_ascii=False)
              if len(req_json) > _LOG_CONTENT_MAX:
                  req_json = req_json[:_LOG_CONTENT_MAX] + f"... [truncated total {len(req_json)}]"
              logger.info("[%s] LLM_REQUEST messages=%s", self.session_id, req_json)
e7f2b240   tangwang   first commit
338
              response = self.llm_with_tools.invoke(messages)
363578ca   tangwang   **feat: robust th...
339
              response_log = _message_for_log(response, include_thinking=True)
825828c4   tangwang   fix: search image...
340
341
342
343
344
              logger.info(
                  "[%s] LLM_RESPONSE %s",
                  self.session_id,
                  json.dumps(response_log, ensure_ascii=False),
              )
e7f2b240   tangwang   first commit
345
346
              return {"messages": [response]}
  
e7f2b240   tangwang   first commit
347
          def should_continue(state: AgentState):
66442668   tangwang   feat: 搜索结果引用与并行搜索...
348
349
              last = state["messages"][-1]
              if hasattr(last, "tool_calls") and last.tool_calls:
e7f2b240   tangwang   first commit
350
                  return "tools"
e7f2b240   tangwang   first commit
351
352
              return END
  
66442668   tangwang   feat: 搜索结果引用与并行搜索...
353
          tool_node = ToolNode(self.tools)
e7f2b240   tangwang   first commit
354
  
66442668   tangwang   feat: 搜索结果引用与并行搜索...
355
          workflow = StateGraph(AgentState)
e7f2b240   tangwang   first commit
356
357
          workflow.add_node("agent", agent_node)
          workflow.add_node("tools", tool_node)
e7f2b240   tangwang   first commit
358
359
360
361
          workflow.add_edge(START, "agent")
          workflow.add_conditional_edges("agent", should_continue, ["tools", END])
          workflow.add_edge("tools", "agent")
  
66442668   tangwang   feat: 搜索结果引用与并行搜索...
362
          return workflow.compile(checkpointer=MemorySaver())
e7f2b240   tangwang   first commit
363
  
8dcb1dc0   tangwang   feat: stream thin...
364
      def chat_stream(self, query: str, image_path: Optional[str] = None) -> Iterator[dict]:
66442668   tangwang   feat: 搜索结果引用与并行搜索...
365
          """
8dcb1dc0   tangwang   feat: stream thin...
366
          Stream this turn as incremental events for frontend rendering.
e7f2b240   tangwang   first commit
367
  
8dcb1dc0   tangwang   feat: stream thin...
368
369
370
371
372
          Yield event dicts:
            - debug_update: contains latest tool_calls/debug_steps snapshot
            - response_delta: contains incremental assistant response text
            - response_replace: replace streamed response when reconciliation is needed
            - done: final payload compatible with chat()
e7f2b240   tangwang   first commit
373
374
          """
          try:
8dcb1dc0   tangwang   feat: stream thin...
375
              logger.info(f"[{self.session_id}] chat(stream): {query!r} image={bool(image_path)}")
e7f2b240   tangwang   first commit
376
  
e7f2b240   tangwang   first commit
377
              if image_path and not Path(image_path).exists():
8dcb1dc0   tangwang   feat: stream thin...
378
379
380
381
382
383
384
385
386
                  yield {
                      "type": "done",
                      "result": {
                          "response": f"错误:图片文件不存在:{image_path}",
                          "tool_calls": [],
                          "debug_steps": [],
                          "search_refs": {},
                          "error": True,
                      },
e7f2b240   tangwang   first commit
387
                  }
8dcb1dc0   tangwang   feat: stream thin...
388
                  return
e7f2b240   tangwang   first commit
389
  
66442668   tangwang   feat: 搜索结果引用与并行搜索...
390
391
392
              # Snapshot registry before the turn so we can report new additions
              registry_before = set(global_registry.get_all(self.session_id).keys())
  
e7f2b240   tangwang   first commit
393
394
              message_content = query
              if image_path:
66442668   tangwang   feat: 搜索结果引用与并行搜索...
395
                  message_content = f"{query}\n[用户上传了图片:{image_path}]"
e7f2b240   tangwang   first commit
396
  
e7f2b240   tangwang   first commit
397
398
399
400
401
402
              config = {"configurable": {"thread_id": self.session_id}}
              input_state = {
                  "messages": [HumanMessage(content=message_content)],
                  "current_image_path": image_path,
              }
  
66442668   tangwang   feat: 搜索结果引用与并行搜索...
403
404
              tool_calls: list[dict] = []
              debug_steps: list[dict] = []
8dcb1dc0   tangwang   feat: stream thin...
405
              streamed_response = ""
66442668   tangwang   feat: 搜索结果引用与并行搜索...
406
  
e7f2b240   tangwang   first commit
407
              for event in self.graph.stream(input_state, config=config):
66442668   tangwang   feat: 搜索结果引用与并行搜索...
408
                  logger.debug(f"[{self.session_id}] event keys: {list(event.keys())}")
01b46131   tangwang   流程跑通
409
  
e7f2b240   tangwang   first commit
410
                  if "agent" in event:
66442668   tangwang   feat: 搜索结果引用与并行搜索...
411
412
413
                      agent_out = event["agent"]
                      step_msgs: list[dict] = []
                      step_tcs: list[dict] = []
8dcb1dc0   tangwang   feat: stream thin...
414
                      final_candidate_text = ""
01b46131   tangwang   流程跑通
415
  
66442668   tangwang   feat: 搜索结果引用与并行搜索...
416
417
                      for msg in agent_out.get("messages", []):
                          text = _extract_message_text(msg)
8dcb1dc0   tangwang   feat: stream thin...
418
419
                          thinking = _extract_thinking(msg)
                          step_entry = {
01b46131   tangwang   流程跑通
420
                              "type": getattr(msg, "type", "assistant"),
66442668   tangwang   feat: 搜索结果引用与并行搜索...
421
                              "content": text[:500],
8dcb1dc0   tangwang   feat: stream thin...
422
423
424
425
426
427
428
                          }
                          if thinking:
                              step_entry["thinking"] = thinking[:500]
                          step_msgs.append(step_entry)
  
                          has_tool_calls = bool(hasattr(msg, "tool_calls") and msg.tool_calls)
                          if has_tool_calls:
01b46131   tangwang   流程跑通
429
                              for tc in msg.tool_calls:
66442668   tangwang   feat: 搜索结果引用与并行搜索...
430
431
432
                                  entry = {"name": tc.get("name"), "args": tc.get("args", {})}
                                  tool_calls.append(entry)
                                  step_tcs.append(entry)
8dcb1dc0   tangwang   feat: stream thin...
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
                          else:
                              role = getattr(msg, "type", "")
                              formal = _extract_formal_reply(msg) or _extract_message_text(msg)
                              if role in ("ai", "assistant") and formal.strip():
                                  final_candidate_text = formal.strip()
  
                      debug_steps.append(
                          {"node": "agent", "messages": step_msgs, "tool_calls": step_tcs}
                      )
                      yield {
                          "type": "debug_update",
                          "tool_calls": tool_calls,
                          "debug_steps": debug_steps,
                      }
  
                      # When final assistant text is produced in this node, stream it immediately.
                      if final_candidate_text:
                          pending = final_candidate_text
                          if pending.startswith(streamed_response):
                              pending = pending[len(streamed_response) :]
                          for delta in _iter_text_chunks(pending):
                              streamed_response += delta
                              yield {
                                  "type": "response_delta",
                                  "delta": delta,
                                  "response": streamed_response,
                                  "tool_calls": tool_calls,
                                  "debug_steps": debug_steps,
                              }
01b46131   tangwang   流程跑通
462
  
66442668   tangwang   feat: 搜索结果引用与并行搜索...
463
464
465
466
                  if "tools" in event:
                      tools_out = event["tools"]
                      step_results: list[dict] = []
                      msgs = tools_out.get("messages", [])
01b46131   tangwang   流程跑通
467
  
66442668   tangwang   feat: 搜索结果引用与并行搜索...
468
469
470
471
472
473
474
                      # Match results back to tool_calls by position within this event
                      unresolved = [tc for tc in tool_calls if "result" not in tc]
                      for i, msg in enumerate(msgs):
                          text = _extract_message_text(msg)
                          preview = text[:600] + ("…" if len(text) > 600 else "")
                          if i < len(unresolved):
                              unresolved[i]["result"] = preview
825828c4   tangwang   fix: search image...
475
476
                              tc_name = unresolved[i].get("name", "")
                              tc_args = unresolved[i].get("args", {})
8dcb1dc0   tangwang   feat: stream thin...
477
478
479
480
481
482
                              result_log = (
                                  text
                                  if len(text) <= _LOG_TOOL_RESULT_MAX
                                  else text[:_LOG_TOOL_RESULT_MAX]
                                  + f"... [truncated total {len(text)}]"
                              )
825828c4   tangwang   fix: search image...
483
484
485
486
487
488
489
                              logger.info(
                                  "[%s] TOOL_CALL_RESULT name=%s args=%s result=%s",
                                  self.session_id,
                                  tc_name,
                                  json.dumps(tc_args, ensure_ascii=False),
                                  result_log,
                              )
66442668   tangwang   feat: 搜索结果引用与并行搜索...
490
                          step_results.append({"content": preview})
01b46131   tangwang   流程跑通
491
  
66442668   tangwang   feat: 搜索结果引用与并行搜索...
492
                      debug_steps.append({"node": "tools", "results": step_results})
8dcb1dc0   tangwang   feat: stream thin...
493
494
495
496
497
                      yield {
                          "type": "debug_update",
                          "tool_calls": tool_calls,
                          "debug_steps": debug_steps,
                      }
e7f2b240   tangwang   first commit
498
  
e7f2b240   tangwang   first commit
499
              final_state = self.graph.get_state(config)
66442668   tangwang   feat: 搜索结果引用与并行搜索...
500
              final_msg = final_state.values["messages"][-1]
621b6925   tangwang   up
501
              response_text = _extract_formal_reply(final_msg) or _extract_message_text(final_msg)
66442668   tangwang   feat: 搜索结果引用与并行搜索...
502
  
8dcb1dc0   tangwang   feat: stream thin...
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
              # Reconcile streamed text with canonical final response.
              if response_text and not streamed_response:
                  for delta in _iter_text_chunks(response_text):
                      streamed_response += delta
                      yield {
                          "type": "response_delta",
                          "delta": delta,
                          "response": streamed_response,
                          "tool_calls": tool_calls,
                          "debug_steps": debug_steps,
                      }
              elif response_text and response_text != streamed_response:
                  if response_text.startswith(streamed_response):
                      pending = response_text[len(streamed_response) :]
                      for delta in _iter_text_chunks(pending):
                          streamed_response += delta
                          yield {
                              "type": "response_delta",
                              "delta": delta,
                              "response": streamed_response,
                              "tool_calls": tool_calls,
                              "debug_steps": debug_steps,
                          }
                  else:
                      streamed_response = response_text
                      yield {
                          "type": "response_replace",
                          "response": streamed_response,
                          "tool_calls": tool_calls,
                          "debug_steps": debug_steps,
                      }
  
66442668   tangwang   feat: 搜索结果引用与并行搜索...
535
536
537
538
539
540
541
              # Collect new SearchResults added during this turn
              registry_after = global_registry.get_all(self.session_id)
              new_refs = {
                  ref_id: result
                  for ref_id, result in registry_after.items()
                  if ref_id not in registry_before
              }
e7f2b240   tangwang   first commit
542
  
66442668   tangwang   feat: 搜索结果引用与并行搜索...
543
544
545
              logger.info(
                  f"[{self.session_id}] done — tool_calls={len(tool_calls)}, new_refs={list(new_refs.keys())}"
              )
e7f2b240   tangwang   first commit
546
  
8dcb1dc0   tangwang   feat: stream thin...
547
548
549
550
551
552
553
554
555
              yield {
                  "type": "done",
                  "result": {
                      "response": response_text,
                      "tool_calls": tool_calls,
                      "debug_steps": debug_steps,
                      "search_refs": new_refs,
                      "error": False,
                  },
e7f2b240   tangwang   first commit
556
557
558
              }
  
          except Exception as e:
8dcb1dc0   tangwang   feat: stream thin...
559
560
561
562
563
564
565
566
567
568
              logger.error(f"[{self.session_id}] chat stream error: {e}", exc_info=True)
              yield {
                  "type": "done",
                  "result": {
                      "response": f"抱歉,处理您的请求时遇到错误:{e}",
                      "tool_calls": [],
                      "debug_steps": [],
                      "search_refs": {},
                      "error": True,
                  },
e7f2b240   tangwang   first commit
569
570
              }
  
8dcb1dc0   tangwang   feat: stream thin...
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
      def chat(self, query: str, image_path: Optional[str] = None) -> dict:
          """
          Process a user query and return the agent response with metadata.
  
          Returns:
              dict with keys:
                response       final AI message text (may contain [SEARCH_RESULTS_REF:ref_id] tokens)
                tool_calls     list of {name, args, result_preview}
                debug_steps    detailed per-node step log
                search_refs    dict[ref_id  SearchResult] for all searches this turn
                error          bool
          """
          result: Optional[dict] = None
          for event in self.chat_stream(query=query, image_path=image_path):
              if event.get("type") == "done":
                  result = event.get("result")
          return result or {
              "response": "抱歉,处理您的请求时未返回结果。",
              "tool_calls": [],
              "debug_steps": [],
              "search_refs": {},
              "error": True,
          }
  
e7f2b240   tangwang   first commit
595
      def get_conversation_history(self) -> list:
e7f2b240   tangwang   first commit
596
597
598
          try:
              config = {"configurable": {"thread_id": self.session_id}}
              state = self.graph.get_state(config)
e7f2b240   tangwang   first commit
599
600
601
              if not state or not state.values.get("messages"):
                  return []
  
e7f2b240   tangwang   first commit
602
              result = []
66442668   tangwang   feat: 搜索结果引用与并行搜索...
603
              for msg in state.values["messages"]:
e7f2b240   tangwang   first commit
604
605
                  if isinstance(msg, SystemMessage):
                      continue
66442668   tangwang   feat: 搜索结果引用与并行搜索...
606
                  if getattr(msg, "type", None) in ("system", "tool"):
e7f2b240   tangwang   first commit
607
                      continue
e7f2b240   tangwang   first commit
608
                  role = "user" if msg.type == "human" else "assistant"
621b6925   tangwang   up
609
610
                  content = _extract_formal_reply(msg) or _extract_message_text(msg) if role == "assistant" else _extract_message_text(msg)
                  result.append({"role": role, "content": content})
e7f2b240   tangwang   first commit
611
              return result
e7f2b240   tangwang   first commit
612
          except Exception as e:
66442668   tangwang   feat: 搜索结果引用与并行搜索...
613
              logger.error(f"get_conversation_history error: {e}")
e7f2b240   tangwang   first commit
614
615
616
              return []
  
      def clear_history(self):
66442668   tangwang   feat: 搜索结果引用与并行搜索...
617
          logger.info(f"[{self.session_id}] clear requested (use new session_id to fully reset)")
e7f2b240   tangwang   first commit
618
619
620
  
  
  def create_shopping_agent(session_id: Optional[str] = None) -> ShoppingAgent:
e7f2b240   tangwang   first commit
621
      return ShoppingAgent(session_id=session_id)