Commit 8dcb1dc08318529602a58cd0059be394a55376cd

Authored by tangwang
1 parent 363578ca

feat: stream thinking and tool debug to UI

Made-with: Cursor
Showing 2 changed files with 248 additions and 87 deletions   Show diff stats
... ... @@ -668,6 +668,41 @@ def render_message_with_refs(
668 668 st.caption(f"[搜索结果 {ref_id} 不可用]")
669 669  
670 670  
  671 +def render_debug_steps_panel(debug_steps: list[dict], expanded: bool = True) -> None:
  672 + """Render debug steps with thinking/tool details."""
  673 + with st.expander("思考 & 工具调用详细过程", expanded=expanded):
  674 + for idx, step in enumerate(debug_steps, 1):
  675 + node = step.get("node", "unknown")
  676 + st.markdown(f"**Step {idx} – {node}**")
  677 +
  678 + if node == "agent":
  679 + msgs = step.get("messages", [])
  680 + if msgs:
  681 + st.markdown("**Agent Messages**")
  682 + for m in msgs:
  683 + st.markdown(f"- `{m.get('type', 'assistant')}`: {m.get('content', '')}")
  684 + if m.get("thinking"):
  685 + st.markdown(" - `thinking`:")
  686 + st.code(m.get("thinking", ""), language="text")
  687 +
  688 + tcs = step.get("tool_calls", [])
  689 + if tcs:
  690 + st.markdown("**Planned Tool Calls**")
  691 + for j, tc in enumerate(tcs, 1):
  692 + st.markdown(f"- **{j}. {tc.get('name')}**")
  693 + st.code(tc.get("args", {}), language="json")
  694 +
  695 + elif node == "tools":
  696 + results = step.get("results", [])
  697 + if results:
  698 + st.markdown("**Tool Results**")
  699 + for j, r in enumerate(results, 1):
  700 + st.markdown(f"- **Result {j}:**")
  701 + st.code(r.get("content", ""), language="text")
  702 +
  703 + st.markdown("---")
  704 +
  705 +
671 706 def display_message(message: dict, msg_index: int = 0):
672 707 """Display a chat message. msg_index keeps widget keys unique across messages."""
673 708 role = message["role"]
... ... @@ -698,34 +733,7 @@ def display_message(message: dict, msg_index: int = 0):
698 733  
699 734 # Debug panel
700 735 if debug_steps and st.session_state.get("show_debug"):
701   - with st.expander("思考 & 工具调用详细过程", expanded=False):
702   - for idx, step in enumerate(debug_steps, 1):
703   - node = step.get("node", "unknown")
704   - st.markdown(f"**Step {idx} – {node}**")
705   -
706   - if node == "agent":
707   - msgs = step.get("messages", [])
708   - if msgs:
709   - st.markdown("**Agent Messages**")
710   - for m in msgs:
711   - st.markdown(f"- `{m.get('type', 'assistant')}`: {m.get('content', '')}")
712   -
713   - tcs = step.get("tool_calls", [])
714   - if tcs:
715   - st.markdown("**Planned Tool Calls**")
716   - for j, tc in enumerate(tcs, 1):
717   - st.markdown(f"- **{j}. {tc.get('name')}**")
718   - st.code(tc.get("args", {}), language="json")
719   -
720   - elif node == "tools":
721   - results = step.get("results", [])
722   - if results:
723   - st.markdown("**Tool Results**")
724   - for j, r in enumerate(results, 1):
725   - st.markdown(f"- **Result {j}:**")
726   - st.code(r.get("content", ""), language="text")
727   -
728   - st.markdown("---")
  736 + render_debug_steps_panel(debug_steps, expanded=True)
729 737  
730 738 # Render message: expand [SEARCH_RESULTS_REF:ref_id] tokens into product card blocks
731 739 session_id = st.session_state.get("session_id", "")
... ... @@ -1150,11 +1158,61 @@ def main():
1150 1158 try:
1151 1159 shopping_agent = st.session_state.shopping_agent
1152 1160  
1153   - # Process with agent
1154   - result = shopping_agent.chat(
1155   - query=agent_query,
1156   - image_path=image_path,
1157   - )
  1161 + # Stream assistant updates to UI immediately
  1162 + with messages_container:
  1163 + live_container = st.container()
  1164 + with live_container:
  1165 + live_tool_caption = st.empty()
  1166 + live_debug_placeholder = st.empty()
  1167 + live_response_placeholder = st.empty()
  1168 +
  1169 + live_response = ""
  1170 + live_tool_calls: list[dict] = []
  1171 + live_debug_steps: list[dict] = []
  1172 + result = None
  1173 +
  1174 + def _render_live() -> None:
  1175 + if live_tool_calls:
  1176 + tool_names = [tc.get("name", "") for tc in live_tool_calls if tc.get("name")]
  1177 + live_tool_caption.caption(" → ".join(tool_names))
  1178 + else:
  1179 + live_tool_caption.empty()
  1180 +
  1181 + if st.session_state.get("show_debug") and live_debug_steps:
  1182 + with live_debug_placeholder.container():
  1183 + render_debug_steps_panel(live_debug_steps, expanded=True)
  1184 + else:
  1185 + live_debug_placeholder.empty()
  1186 +
  1187 + if live_response:
  1188 + live_response_placeholder.markdown(live_response)
  1189 + else:
  1190 + live_response_placeholder.markdown("…")
  1191 +
  1192 + for event in shopping_agent.chat_stream(query=agent_query, image_path=image_path):
  1193 + event_type = event.get("type")
  1194 + if event_type in {"debug_update", "response_delta", "response_replace"}:
  1195 + if "tool_calls" in event:
  1196 + live_tool_calls = event.get("tool_calls", live_tool_calls)
  1197 + if "debug_steps" in event:
  1198 + live_debug_steps = event.get("debug_steps", live_debug_steps)
  1199 + if event_type == "response_delta":
  1200 + live_response = event.get("response", live_response)
  1201 + elif event_type == "response_replace":
  1202 + live_response = event.get("response", live_response)
  1203 + _render_live()
  1204 + elif event_type == "done":
  1205 + result = event.get("result")
  1206 +
  1207 + if not result:
  1208 + result = {
  1209 + "response": live_response or "抱歉,处理您的请求时未返回结果。",
  1210 + "tool_calls": live_tool_calls,
  1211 + "debug_steps": live_debug_steps,
  1212 + "search_refs": {},
  1213 + "error": True,
  1214 + }
  1215 +
1158 1216 response = result["response"]
1159 1217 tool_calls = result.get("tool_calls", [])
1160 1218 debug_steps = result.get("debug_steps", [])
... ...
app/agents/shopping_agent.py
... ... @@ -14,7 +14,7 @@ import re
14 14 from urllib.parse import urlparse
15 15 from datetime import datetime
16 16 from pathlib import Path
17   -from typing import Any, Optional, Sequence
  17 +from typing import Any, Iterator, Optional, Sequence
18 18  
19 19 from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
20 20 from langchain_core.outputs import ChatResult
... ... @@ -44,39 +44,22 @@ SYSTEM_PROMPT = f""" 角色定义
44 44 一些原则:
45 45 1. 价值提供与信息收集的原则:
46 46 1. 兼顾价值提供和需求澄清:适时的提供有价值的信息,如商品推荐、穿搭建议、趋势信息,在推荐方向上有需求缺口、需要明确的重要信息时,要适时的做“信息收集”,引导式的澄清需求、提高商品发现的效率,形成“提供-反馈”的良性循环。
47   - 2. 意图判断-缺口大:当无法从对话中确定关键变量(如使用对象不明确、无法判断男性或女性使用、品类细分不清等)时,从“使用对象”、“品类细分”、“使用场景”、“风格效果”等高层意图维度切入,提供方向性选项 + 1–2个关键问题,引导用户做选择(以下仅提供参考思路,具体话术不要照搬):
  47 + 2. 意图判断-缺口大:当无法从对话中确定关键变量(如使用对象不明确、无法判断男性或女性使用、品类细分不清等)时,从“使用对象”、“品类细分”、“使用场景”、“风格效果”等高层意图维度切入,提供方向性选项 + 1–3个关键问题,引导用户做选择(以下仅提供参考思路,具体话术不要照搬):
48 48 1. 人群不明确时(如果从对话中无法确认用使用人群,比如搜索意图是男女都可以消费的品类比如T恤、裤子):男款、女款,还是中性风都可以?
49 49 2. 确定是女性、但是风格不明确时:你想穿出哪种感觉?职场干练 松弛自在 活力元气 温柔知性
50 50 3. 使用场景不明确时:平时通勤场合多吗?还是更喜欢生活化穿搭?
51   - 4. 如上此类,存在大的需求缺口,则需要先进行澄清、不要调用商品搜索
  51 + 4. 如上此类,存在大的需求缺口,则务必先问清楚,直接提问即可,而不是擅自假设、调用工具、擅自搜索商品和推荐商品
52 52 3. 意图判断-缺口小:直接检索+方案呈现,根据情况,可以考虑该方向下重要的决策因素(思考哪些维度最可能影响推荐结果),进行提议和问题收集,让用户既得到相关信息、又得到下一步的方向引导、同时也有机会修正或者细化诉求。
53 53 4. 选项驱动式澄清:推荐几个清晰的方向,呈现方案或商品搜索结果,再做澄清
54 54 5. 单轮对话最好只提1-2个问题。
55 55 6. 站在用户立场思考:比如询问用户期待的效果或感觉、使用的场合、想解决的问题,而不是询问具体的款式、参数,你需要将用户表达的需求翻译为具体可检索的商品特征(版型、材质、设计元素、风格标签等),并据此筛选商品、组织推荐逻辑。
56 56 2. 如何使用search_products:
57   - 1. 可以生成多个query进行搜索:在需要搜索商品的时候,可以将需求分解为 2-4 个搜索查询,每个 query 聚焦一个明确的商品子类或搜索角度
  57 + 1. 在需要搜索商品的时候,可以将需求分解为 2-4 个query,每个 query 聚焦一个明确的商品子类或搜索角度,每个query对应一个工具调用
58 58 2. 可以根据搜索结果调整搜索策略:每次调用 search_products 后,工具会返回搜索结果的相关性的判断、以及搜索结果的topN的title,你需要决策是否要调整搜索策略,比如结果质量太差,可能需要调整搜索词、或者加大试探的query数量(不要超过3-5个)。结果太差的原因有可能是你生成的query不合理、请根据你看到的商品名称的构成组织搜索关键词。
59 59 3. 在最终回复中使用 [SEARCH_RESULTS_REF:ref_id] 内联引用搜索结果:
60 60 1. 搜索工具会返回一个结果引用标识[SEARCH_RESULTS_REF:ref_id],撰写最终答复的时候请直接引用 [SEARCH_RESULTS_REF:ref_id] ,系统会自动在该位置渲染对应的商品卡片列表,无需复述搜索结果。
61 61 2. 因为系统会自动将[SEARCH_RESULTS_REF:ref_id]渲染为搜索结果,所以[SEARCH_RESULTS_REF:ref_id]必须独占一行,且只在需要渲染该query完整的搜索结果时才进行引用,同一个结果不要重复引用。
62   - """
63   -
64   -SYSTEM_PROMPT___2 = """ 角色定义
65   - 你是我们店铺的一名专业的电商导购,是一个善于倾听、主动引导、懂得搭配的“时尚顾问”,通过有温度的对话,给用户提供有价值的信息,包括需求引导、方案推荐、搜索结果推荐,最终促成满意的购物决策或转化行为。
66   - 作为我们店铺的一名专业的销售,除了本店铺的商品的推荐,你可以给用户提供有帮助的信息,但是不要虚构商品、提供本商店搜索结果以外的商品。
67   -
68   - 一些原则:
69   - 1. 价值提供与信息收集的原则:
70   - 1. 优先价值提供:适时的提供有价值的信息,如商品推荐、穿搭建议、趋势信息,在推荐方向上有需求缺口、需要明确的重要信息时,要适时的做“信息收集”,引导式的澄清需求、提高商品发现的效率,形成“提供-反馈”的良性循环。
71   - 2. 缺口大(比如品类或者使用人群都不能确定)→ 给出方案推荐 + 1-2个关键问题让用户选择;缺口小→直接检索+方案呈现,根据情况,可以考虑该方向下重要的决策因素,进行提议和问题收集,让用户既得到相关信息、又得到下一步的方向引导、同时也有机会修正或者细化诉求。
72   - 3. 选项驱动式澄清:推荐几个清晰的方向,呈现方案或商品搜索结果,再做澄清
73   - 4. 单轮对话最好只提一个问题,最多两个,禁止多问题堆叠。
74   - 5. 站在用户立场思考:比如询问用户期待的效果或感觉、使用的场合、想解决的问题,而不是询问具体的款式、参数,你需要将用户表达的需求翻译为具体可检索的商品特征(版型、材质、设计元素、风格标签等),并据此筛选商品、组织推荐逻辑。
75   - 2. 如何使用make_search_products_tool:
76   - 1. 可以生成多个query进行搜索:在需要搜索商品的时候,可以将需求分解为 2-4 个搜索查询,每个 query 聚焦一个明确的商品子类或搜索角度。
77   - 2. 可以根据搜索结果调整搜索策略:每次调用 search_products 后,工具会返回搜索结果的相关性的判断、以及搜索结果的topN的title,你需要决策是否要调整搜索策略,比如结果质量太差,可能需要调整搜索词、或者加大试探的query数量(不要超过3-5个)。
78   - 3. 使用 [SEARCH_RESULTS_REF:ref_id] 内联引用搜索结果:搜索工具会返回一个结果引用标识[SEARCH_RESULTS_REF:ref_id],撰写最终答复的时候可以直接引用将 [SEARCH_RESULTS_REF:ref_id] ,系统会自动在该位置渲染对应的商品卡片列表,无需复述搜索结果。
79   - 4. 因为系统会自动将[SEARCH_RESULTS_REF:ref_id]渲染为搜索结果,所以只在需要渲染该query完整的搜索结果时才进行引用,同一个结果不要重复引用。
  62 +4. 今天是{datetime.now().strftime("%Y-%m-%d")},所有与当前时间(比如天气、最新或即将发生的事件)相关的问题,都要使用web_search工具)。
80 63 """
81 64  
82 65  
... ... @@ -254,6 +237,14 @@ def _message_for_log(msg: BaseMessage, include_thinking: bool = False) -> dict:
254 237 return out
255 238  
256 239  
  240 +def _iter_text_chunks(text: str, chunk_size: int = 24) -> Iterator[str]:
  241 + """Yield text in small chunks for UI-friendly streaming."""
  242 + if not text:
  243 + return
  244 + for i in range(0, len(text), chunk_size):
  245 + yield text[i : i + chunk_size]
  246 +
  247 +
257 248 # ── DashScope thinking 支持 ─────────────────────────────────────────────────────
258 249 # LangChain 解析 chat completion 时不会把 API 返回的 reasoning_content 写入 message,
259 250 # 子类在 _create_chat_result 中把 reasoning_content 注入到 additional_kwargs,便于日志打印。
... ... @@ -370,26 +361,31 @@ class ShoppingAgent:
370 361  
371 362 return workflow.compile(checkpointer=MemorySaver())
372 363  
373   - def chat(self, query: str, image_path: Optional[str] = None) -> dict:
  364 + def chat_stream(self, query: str, image_path: Optional[str] = None) -> Iterator[dict]:
374 365 """
375   - Process a user query and return the agent response with metadata.
  366 + Stream this turn as incremental events for frontend rendering.
376 367  
377   - Returns:
378   - dict with keys:
379   - response – final AI message text (may contain [SEARCH_RESULTS_REF:ref_id] tokens)
380   - tool_calls – list of {name, args, result_preview}
381   - debug_steps – detailed per-node step log
382   - search_refs – dict[ref_id → SearchResult] for all searches this turn
383   - error – bool
  368 + Yield event dicts:
  369 + - debug_update: contains latest tool_calls/debug_steps snapshot
  370 + - response_delta: contains incremental assistant response text
  371 + - response_replace: replace streamed response when reconciliation is needed
  372 + - done: final payload compatible with chat()
384 373 """
385 374 try:
386   - logger.info(f"[{self.session_id}] chat: {query!r} image={bool(image_path)}")
  375 + logger.info(f"[{self.session_id}] chat(stream): {query!r} image={bool(image_path)}")
387 376  
388 377 if image_path and not Path(image_path).exists():
389   - return {
390   - "response": f"错误:图片文件不存在:{image_path}",
391   - "error": True,
  378 + yield {
  379 + "type": "done",
  380 + "result": {
  381 + "response": f"错误:图片文件不存在:{image_path}",
  382 + "tool_calls": [],
  383 + "debug_steps": [],
  384 + "search_refs": {},
  385 + "error": True,
  386 + },
392 387 }
  388 + return
393 389  
394 390 # Snapshot registry before the turn so we can report new additions
395 391 registry_before = set(global_registry.get_all(self.session_id).keys())
... ... @@ -406,6 +402,7 @@ class ShoppingAgent:
406 402  
407 403 tool_calls: list[dict] = []
408 404 debug_steps: list[dict] = []
  405 + streamed_response = ""
409 406  
410 407 for event in self.graph.stream(input_state, config=config):
411 408 logger.debug(f"[{self.session_id}] event keys: {list(event.keys())}")
... ... @@ -414,20 +411,54 @@ class ShoppingAgent:
414 411 agent_out = event["agent"]
415 412 step_msgs: list[dict] = []
416 413 step_tcs: list[dict] = []
  414 + final_candidate_text = ""
417 415  
418 416 for msg in agent_out.get("messages", []):
419 417 text = _extract_message_text(msg)
420   - step_msgs.append({
  418 + thinking = _extract_thinking(msg)
  419 + step_entry = {
421 420 "type": getattr(msg, "type", "assistant"),
422 421 "content": text[:500],
423   - })
424   - if hasattr(msg, "tool_calls") and msg.tool_calls:
  422 + }
  423 + if thinking:
  424 + step_entry["thinking"] = thinking[:500]
  425 + step_msgs.append(step_entry)
  426 +
  427 + has_tool_calls = bool(hasattr(msg, "tool_calls") and msg.tool_calls)
  428 + if has_tool_calls:
425 429 for tc in msg.tool_calls:
426 430 entry = {"name": tc.get("name"), "args": tc.get("args", {})}
427 431 tool_calls.append(entry)
428 432 step_tcs.append(entry)
429   -
430   - debug_steps.append({"node": "agent", "messages": step_msgs, "tool_calls": step_tcs})
  433 + else:
  434 + role = getattr(msg, "type", "")
  435 + formal = _extract_formal_reply(msg) or _extract_message_text(msg)
  436 + if role in ("ai", "assistant") and formal.strip():
  437 + final_candidate_text = formal.strip()
  438 +
  439 + debug_steps.append(
  440 + {"node": "agent", "messages": step_msgs, "tool_calls": step_tcs}
  441 + )
  442 + yield {
  443 + "type": "debug_update",
  444 + "tool_calls": tool_calls,
  445 + "debug_steps": debug_steps,
  446 + }
  447 +
  448 + # When final assistant text is produced in this node, stream it immediately.
  449 + if final_candidate_text:
  450 + pending = final_candidate_text
  451 + if pending.startswith(streamed_response):
  452 + pending = pending[len(streamed_response) :]
  453 + for delta in _iter_text_chunks(pending):
  454 + streamed_response += delta
  455 + yield {
  456 + "type": "response_delta",
  457 + "delta": delta,
  458 + "response": streamed_response,
  459 + "tool_calls": tool_calls,
  460 + "debug_steps": debug_steps,
  461 + }
431 462  
432 463 if "tools" in event:
433 464 tools_out = event["tools"]
... ... @@ -443,7 +474,12 @@ class ShoppingAgent:
443 474 unresolved[i]["result"] = preview
444 475 tc_name = unresolved[i].get("name", "")
445 476 tc_args = unresolved[i].get("args", {})
446   - result_log = text if len(text) <= _LOG_TOOL_RESULT_MAX else text[:_LOG_TOOL_RESULT_MAX] + f"... [truncated total {len(text)}]"
  477 + result_log = (
  478 + text
  479 + if len(text) <= _LOG_TOOL_RESULT_MAX
  480 + else text[:_LOG_TOOL_RESULT_MAX]
  481 + + f"... [truncated total {len(text)}]"
  482 + )
447 483 logger.info(
448 484 "[%s] TOOL_CALL_RESULT name=%s args=%s result=%s",
449 485 self.session_id,
... ... @@ -454,11 +490,48 @@ class ShoppingAgent:
454 490 step_results.append({"content": preview})
455 491  
456 492 debug_steps.append({"node": "tools", "results": step_results})
  493 + yield {
  494 + "type": "debug_update",
  495 + "tool_calls": tool_calls,
  496 + "debug_steps": debug_steps,
  497 + }
457 498  
458 499 final_state = self.graph.get_state(config)
459 500 final_msg = final_state.values["messages"][-1]
460 501 response_text = _extract_formal_reply(final_msg) or _extract_message_text(final_msg)
461 502  
  503 + # Reconcile streamed text with canonical final response.
  504 + if response_text and not streamed_response:
  505 + for delta in _iter_text_chunks(response_text):
  506 + streamed_response += delta
  507 + yield {
  508 + "type": "response_delta",
  509 + "delta": delta,
  510 + "response": streamed_response,
  511 + "tool_calls": tool_calls,
  512 + "debug_steps": debug_steps,
  513 + }
  514 + elif response_text and response_text != streamed_response:
  515 + if response_text.startswith(streamed_response):
  516 + pending = response_text[len(streamed_response) :]
  517 + for delta in _iter_text_chunks(pending):
  518 + streamed_response += delta
  519 + yield {
  520 + "type": "response_delta",
  521 + "delta": delta,
  522 + "response": streamed_response,
  523 + "tool_calls": tool_calls,
  524 + "debug_steps": debug_steps,
  525 + }
  526 + else:
  527 + streamed_response = response_text
  528 + yield {
  529 + "type": "response_replace",
  530 + "response": streamed_response,
  531 + "tool_calls": tool_calls,
  532 + "debug_steps": debug_steps,
  533 + }
  534 +
462 535 # Collect new SearchResults added during this turn
463 536 registry_after = global_registry.get_all(self.session_id)
464 537 new_refs = {
... ... @@ -471,24 +544,54 @@ class ShoppingAgent:
471 544 f"[{self.session_id}] done — tool_calls={len(tool_calls)}, new_refs={list(new_refs.keys())}"
472 545 )
473 546  
474   - return {
475   - "response": response_text,
476   - "tool_calls": tool_calls,
477   - "debug_steps": debug_steps,
478   - "search_refs": new_refs,
479   - "error": False,
  547 + yield {
  548 + "type": "done",
  549 + "result": {
  550 + "response": response_text,
  551 + "tool_calls": tool_calls,
  552 + "debug_steps": debug_steps,
  553 + "search_refs": new_refs,
  554 + "error": False,
  555 + },
480 556 }
481 557  
482 558 except Exception as e:
483   - logger.error(f"[{self.session_id}] chat error: {e}", exc_info=True)
484   - return {
485   - "response": f"抱歉,处理您的请求时遇到错误:{e}",
486   - "tool_calls": [],
487   - "debug_steps": [],
488   - "search_refs": {},
489   - "error": True,
  559 + logger.error(f"[{self.session_id}] chat stream error: {e}", exc_info=True)
  560 + yield {
  561 + "type": "done",
  562 + "result": {
  563 + "response": f"抱歉,处理您的请求时遇到错误:{e}",
  564 + "tool_calls": [],
  565 + "debug_steps": [],
  566 + "search_refs": {},
  567 + "error": True,
  568 + },
490 569 }
491 570  
  571 + def chat(self, query: str, image_path: Optional[str] = None) -> dict:
  572 + """
  573 + Process a user query and return the agent response with metadata.
  574 +
  575 + Returns:
  576 + dict with keys:
  577 + response – final AI message text (may contain [SEARCH_RESULTS_REF:ref_id] tokens)
  578 + tool_calls – list of {name, args, result_preview}
  579 + debug_steps – detailed per-node step log
  580 + search_refs – dict[ref_id → SearchResult] for all searches this turn
  581 + error – bool
  582 + """
  583 + result: Optional[dict] = None
  584 + for event in self.chat_stream(query=query, image_path=image_path):
  585 + if event.get("type") == "done":
  586 + result = event.get("result")
  587 + return result or {
  588 + "response": "抱歉,处理您的请求时未返回结果。",
  589 + "tool_calls": [],
  590 + "debug_steps": [],
  591 + "search_refs": {},
  592 + "error": True,
  593 + }
  594 +
492 595 def get_conversation_history(self) -> list:
493 596 try:
494 597 config = {"configurable": {"thread_id": self.session_id}}
... ...