diff --git a/app.py b/app.py index 95f4c8c..a3e3f0c 100644 --- a/app.py +++ b/app.py @@ -668,6 +668,41 @@ def render_message_with_refs( st.caption(f"[搜索结果 {ref_id} 不可用]") +def render_debug_steps_panel(debug_steps: list[dict], expanded: bool = True) -> None: + """Render debug steps with thinking/tool details.""" + with st.expander("思考 & 工具调用详细过程", expanded=expanded): + for idx, step in enumerate(debug_steps, 1): + node = step.get("node", "unknown") + st.markdown(f"**Step {idx} – {node}**") + + if node == "agent": + msgs = step.get("messages", []) + if msgs: + st.markdown("**Agent Messages**") + for m in msgs: + st.markdown(f"- `{m.get('type', 'assistant')}`: {m.get('content', '')}") + if m.get("thinking"): + st.markdown(" - `thinking`:") + st.code(m.get("thinking", ""), language="text") + + tcs = step.get("tool_calls", []) + if tcs: + st.markdown("**Planned Tool Calls**") + for j, tc in enumerate(tcs, 1): + st.markdown(f"- **{j}. {tc.get('name')}**") + st.code(tc.get("args", {}), language="json") + + elif node == "tools": + results = step.get("results", []) + if results: + st.markdown("**Tool Results**") + for j, r in enumerate(results, 1): + st.markdown(f"- **Result {j}:**") + st.code(r.get("content", ""), language="text") + + st.markdown("---") + + def display_message(message: dict, msg_index: int = 0): """Display a chat message. msg_index keeps widget keys unique across messages.""" role = message["role"] @@ -698,34 +733,7 @@ def display_message(message: dict, msg_index: int = 0): # Debug panel if debug_steps and st.session_state.get("show_debug"): - with st.expander("思考 & 工具调用详细过程", expanded=False): - for idx, step in enumerate(debug_steps, 1): - node = step.get("node", "unknown") - st.markdown(f"**Step {idx} – {node}**") - - if node == "agent": - msgs = step.get("messages", []) - if msgs: - st.markdown("**Agent Messages**") - for m in msgs: - st.markdown(f"- `{m.get('type', 'assistant')}`: {m.get('content', '')}") - - tcs = step.get("tool_calls", []) - if tcs: - st.markdown("**Planned Tool Calls**") - for j, tc in enumerate(tcs, 1): - st.markdown(f"- **{j}. {tc.get('name')}**") - st.code(tc.get("args", {}), language="json") - - elif node == "tools": - results = step.get("results", []) - if results: - st.markdown("**Tool Results**") - for j, r in enumerate(results, 1): - st.markdown(f"- **Result {j}:**") - st.code(r.get("content", ""), language="text") - - st.markdown("---") + render_debug_steps_panel(debug_steps, expanded=True) # Render message: expand [SEARCH_RESULTS_REF:ref_id] tokens into product card blocks session_id = st.session_state.get("session_id", "") @@ -1150,11 +1158,61 @@ def main(): try: shopping_agent = st.session_state.shopping_agent - # Process with agent - result = shopping_agent.chat( - query=agent_query, - image_path=image_path, - ) + # Stream assistant updates to UI immediately + with messages_container: + live_container = st.container() + with live_container: + live_tool_caption = st.empty() + live_debug_placeholder = st.empty() + live_response_placeholder = st.empty() + + live_response = "" + live_tool_calls: list[dict] = [] + live_debug_steps: list[dict] = [] + result = None + + def _render_live() -> None: + if live_tool_calls: + tool_names = [tc.get("name", "") for tc in live_tool_calls if tc.get("name")] + live_tool_caption.caption(" → ".join(tool_names)) + else: + live_tool_caption.empty() + + if st.session_state.get("show_debug") and live_debug_steps: + with live_debug_placeholder.container(): + render_debug_steps_panel(live_debug_steps, expanded=True) + else: + live_debug_placeholder.empty() + + if live_response: + live_response_placeholder.markdown(live_response) + else: + live_response_placeholder.markdown("…") + + for event in shopping_agent.chat_stream(query=agent_query, image_path=image_path): + event_type = event.get("type") + if event_type in {"debug_update", "response_delta", "response_replace"}: + if "tool_calls" in event: + live_tool_calls = event.get("tool_calls", live_tool_calls) + if "debug_steps" in event: + live_debug_steps = event.get("debug_steps", live_debug_steps) + if event_type == "response_delta": + live_response = event.get("response", live_response) + elif event_type == "response_replace": + live_response = event.get("response", live_response) + _render_live() + elif event_type == "done": + result = event.get("result") + + if not result: + result = { + "response": live_response or "抱歉,处理您的请求时未返回结果。", + "tool_calls": live_tool_calls, + "debug_steps": live_debug_steps, + "search_refs": {}, + "error": True, + } + response = result["response"] tool_calls = result.get("tool_calls", []) debug_steps = result.get("debug_steps", []) diff --git a/app/agents/shopping_agent.py b/app/agents/shopping_agent.py index 55b162b..cfa186f 100644 --- a/app/agents/shopping_agent.py +++ b/app/agents/shopping_agent.py @@ -14,7 +14,7 @@ import re from urllib.parse import urlparse from datetime import datetime from pathlib import Path -from typing import Any, Optional, Sequence +from typing import Any, Iterator, Optional, Sequence from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage from langchain_core.outputs import ChatResult @@ -44,39 +44,22 @@ SYSTEM_PROMPT = f""" 角色定义 一些原则: 1. 价值提供与信息收集的原则: 1. 兼顾价值提供和需求澄清:适时的提供有价值的信息,如商品推荐、穿搭建议、趋势信息,在推荐方向上有需求缺口、需要明确的重要信息时,要适时的做“信息收集”,引导式的澄清需求、提高商品发现的效率,形成“提供-反馈”的良性循环。 - 2. 意图判断-缺口大:当无法从对话中确定关键变量(如使用对象不明确、无法判断男性或女性使用、品类细分不清等)时,从“使用对象”、“品类细分”、“使用场景”、“风格效果”等高层意图维度切入,提供方向性选项 + 1–2个关键问题,引导用户做选择(以下仅提供参考思路,具体话术不要照搬): + 2. 意图判断-缺口大:当无法从对话中确定关键变量(如使用对象不明确、无法判断男性或女性使用、品类细分不清等)时,从“使用对象”、“品类细分”、“使用场景”、“风格效果”等高层意图维度切入,提供方向性选项 + 1–3个关键问题,引导用户做选择(以下仅提供参考思路,具体话术不要照搬): 1. 人群不明确时(如果从对话中无法确认用使用人群,比如搜索意图是男女都可以消费的品类比如T恤、裤子):男款、女款,还是中性风都可以? 2. 确定是女性、但是风格不明确时:你想穿出哪种感觉?职场干练 松弛自在 活力元气 温柔知性 3. 使用场景不明确时:平时通勤场合多吗?还是更喜欢生活化穿搭? - 4. 如上此类,存在大的需求缺口,则需要先进行澄清、不要调用商品搜索。 + 4. 如上此类,存在大的需求缺口,则务必先问清楚,直接提问即可,而不是擅自假设、调用工具、擅自搜索商品和推荐商品。 3. 意图判断-缺口小:直接检索+方案呈现,根据情况,可以考虑该方向下重要的决策因素(思考哪些维度最可能影响推荐结果),进行提议和问题收集,让用户既得到相关信息、又得到下一步的方向引导、同时也有机会修正或者细化诉求。 4. 选项驱动式澄清:推荐几个清晰的方向,呈现方案或商品搜索结果,再做澄清 5. 单轮对话最好只提1-2个问题。 6. 站在用户立场思考:比如询问用户期待的效果或感觉、使用的场合、想解决的问题,而不是询问具体的款式、参数,你需要将用户表达的需求翻译为具体可检索的商品特征(版型、材质、设计元素、风格标签等),并据此筛选商品、组织推荐逻辑。 2. 如何使用search_products: - 1. 可以生成多个query进行搜索:在需要搜索商品的时候,可以将需求分解为 2-4 个搜索查询,每个 query 聚焦一个明确的商品子类或搜索角度。 + 1. 在需要搜索商品的时候,可以将需求分解为 2-4 个query,每个 query 聚焦一个明确的商品子类或搜索角度,每个query对应一个工具调用。 2. 可以根据搜索结果调整搜索策略:每次调用 search_products 后,工具会返回搜索结果的相关性的判断、以及搜索结果的topN的title,你需要决策是否要调整搜索策略,比如结果质量太差,可能需要调整搜索词、或者加大试探的query数量(不要超过3-5个)。结果太差的原因有可能是你生成的query不合理、请根据你看到的商品名称的构成组织搜索关键词。 3. 在最终回复中使用 [SEARCH_RESULTS_REF:ref_id] 内联引用搜索结果: 1. 搜索工具会返回一个结果引用标识[SEARCH_RESULTS_REF:ref_id],撰写最终答复的时候请直接引用 [SEARCH_RESULTS_REF:ref_id] ,系统会自动在该位置渲染对应的商品卡片列表,无需复述搜索结果。 2. 因为系统会自动将[SEARCH_RESULTS_REF:ref_id]渲染为搜索结果,所以[SEARCH_RESULTS_REF:ref_id]必须独占一行,且只在需要渲染该query完整的搜索结果时才进行引用,同一个结果不要重复引用。 - """ - -SYSTEM_PROMPT___2 = """ 角色定义 - 你是我们店铺的一名专业的电商导购,是一个善于倾听、主动引导、懂得搭配的“时尚顾问”,通过有温度的对话,给用户提供有价值的信息,包括需求引导、方案推荐、搜索结果推荐,最终促成满意的购物决策或转化行为。 - 作为我们店铺的一名专业的销售,除了本店铺的商品的推荐,你可以给用户提供有帮助的信息,但是不要虚构商品、提供本商店搜索结果以外的商品。 - - 一些原则: - 1. 价值提供与信息收集的原则: - 1. 优先价值提供:适时的提供有价值的信息,如商品推荐、穿搭建议、趋势信息,在推荐方向上有需求缺口、需要明确的重要信息时,要适时的做“信息收集”,引导式的澄清需求、提高商品发现的效率,形成“提供-反馈”的良性循环。 - 2. 缺口大(比如品类或者使用人群都不能确定)→ 给出方案推荐 + 1-2个关键问题让用户选择;缺口小→直接检索+方案呈现,根据情况,可以考虑该方向下重要的决策因素,进行提议和问题收集,让用户既得到相关信息、又得到下一步的方向引导、同时也有机会修正或者细化诉求。 - 3. 选项驱动式澄清:推荐几个清晰的方向,呈现方案或商品搜索结果,再做澄清 - 4. 单轮对话最好只提一个问题,最多两个,禁止多问题堆叠。 - 5. 站在用户立场思考:比如询问用户期待的效果或感觉、使用的场合、想解决的问题,而不是询问具体的款式、参数,你需要将用户表达的需求翻译为具体可检索的商品特征(版型、材质、设计元素、风格标签等),并据此筛选商品、组织推荐逻辑。 - 2. 如何使用make_search_products_tool: - 1. 可以生成多个query进行搜索:在需要搜索商品的时候,可以将需求分解为 2-4 个搜索查询,每个 query 聚焦一个明确的商品子类或搜索角度。 - 2. 可以根据搜索结果调整搜索策略:每次调用 search_products 后,工具会返回搜索结果的相关性的判断、以及搜索结果的topN的title,你需要决策是否要调整搜索策略,比如结果质量太差,可能需要调整搜索词、或者加大试探的query数量(不要超过3-5个)。 - 3. 使用 [SEARCH_RESULTS_REF:ref_id] 内联引用搜索结果:搜索工具会返回一个结果引用标识[SEARCH_RESULTS_REF:ref_id],撰写最终答复的时候可以直接引用将 [SEARCH_RESULTS_REF:ref_id] ,系统会自动在该位置渲染对应的商品卡片列表,无需复述搜索结果。 - 4. 因为系统会自动将[SEARCH_RESULTS_REF:ref_id]渲染为搜索结果,所以只在需要渲染该query完整的搜索结果时才进行引用,同一个结果不要重复引用。 +4. 今天是{datetime.now().strftime("%Y-%m-%d")},所有与当前时间(比如天气、最新或即将发生的事件)相关的问题,都要使用web_search工具)。 """ @@ -254,6 +237,14 @@ def _message_for_log(msg: BaseMessage, include_thinking: bool = False) -> dict: return out +def _iter_text_chunks(text: str, chunk_size: int = 24) -> Iterator[str]: + """Yield text in small chunks for UI-friendly streaming.""" + if not text: + return + for i in range(0, len(text), chunk_size): + yield text[i : i + chunk_size] + + # ── DashScope thinking 支持 ───────────────────────────────────────────────────── # LangChain 解析 chat completion 时不会把 API 返回的 reasoning_content 写入 message, # 子类在 _create_chat_result 中把 reasoning_content 注入到 additional_kwargs,便于日志打印。 @@ -370,26 +361,31 @@ class ShoppingAgent: return workflow.compile(checkpointer=MemorySaver()) - def chat(self, query: str, image_path: Optional[str] = None) -> dict: + def chat_stream(self, query: str, image_path: Optional[str] = None) -> Iterator[dict]: """ - Process a user query and return the agent response with metadata. + Stream this turn as incremental events for frontend rendering. - Returns: - dict with keys: - response – final AI message text (may contain [SEARCH_RESULTS_REF:ref_id] tokens) - tool_calls – list of {name, args, result_preview} - debug_steps – detailed per-node step log - search_refs – dict[ref_id → SearchResult] for all searches this turn - error – bool + Yield event dicts: + - debug_update: contains latest tool_calls/debug_steps snapshot + - response_delta: contains incremental assistant response text + - response_replace: replace streamed response when reconciliation is needed + - done: final payload compatible with chat() """ try: - logger.info(f"[{self.session_id}] chat: {query!r} image={bool(image_path)}") + logger.info(f"[{self.session_id}] chat(stream): {query!r} image={bool(image_path)}") if image_path and not Path(image_path).exists(): - return { - "response": f"错误:图片文件不存在:{image_path}", - "error": True, + yield { + "type": "done", + "result": { + "response": f"错误:图片文件不存在:{image_path}", + "tool_calls": [], + "debug_steps": [], + "search_refs": {}, + "error": True, + }, } + return # Snapshot registry before the turn so we can report new additions registry_before = set(global_registry.get_all(self.session_id).keys()) @@ -406,6 +402,7 @@ class ShoppingAgent: tool_calls: list[dict] = [] debug_steps: list[dict] = [] + streamed_response = "" for event in self.graph.stream(input_state, config=config): logger.debug(f"[{self.session_id}] event keys: {list(event.keys())}") @@ -414,20 +411,54 @@ class ShoppingAgent: agent_out = event["agent"] step_msgs: list[dict] = [] step_tcs: list[dict] = [] + final_candidate_text = "" for msg in agent_out.get("messages", []): text = _extract_message_text(msg) - step_msgs.append({ + thinking = _extract_thinking(msg) + step_entry = { "type": getattr(msg, "type", "assistant"), "content": text[:500], - }) - if hasattr(msg, "tool_calls") and msg.tool_calls: + } + if thinking: + step_entry["thinking"] = thinking[:500] + step_msgs.append(step_entry) + + has_tool_calls = bool(hasattr(msg, "tool_calls") and msg.tool_calls) + if has_tool_calls: for tc in msg.tool_calls: entry = {"name": tc.get("name"), "args": tc.get("args", {})} tool_calls.append(entry) step_tcs.append(entry) - - debug_steps.append({"node": "agent", "messages": step_msgs, "tool_calls": step_tcs}) + else: + role = getattr(msg, "type", "") + formal = _extract_formal_reply(msg) or _extract_message_text(msg) + if role in ("ai", "assistant") and formal.strip(): + final_candidate_text = formal.strip() + + debug_steps.append( + {"node": "agent", "messages": step_msgs, "tool_calls": step_tcs} + ) + yield { + "type": "debug_update", + "tool_calls": tool_calls, + "debug_steps": debug_steps, + } + + # When final assistant text is produced in this node, stream it immediately. + if final_candidate_text: + pending = final_candidate_text + if pending.startswith(streamed_response): + pending = pending[len(streamed_response) :] + for delta in _iter_text_chunks(pending): + streamed_response += delta + yield { + "type": "response_delta", + "delta": delta, + "response": streamed_response, + "tool_calls": tool_calls, + "debug_steps": debug_steps, + } if "tools" in event: tools_out = event["tools"] @@ -443,7 +474,12 @@ class ShoppingAgent: unresolved[i]["result"] = preview tc_name = unresolved[i].get("name", "") tc_args = unresolved[i].get("args", {}) - result_log = text if len(text) <= _LOG_TOOL_RESULT_MAX else text[:_LOG_TOOL_RESULT_MAX] + f"... [truncated total {len(text)}]" + result_log = ( + text + if len(text) <= _LOG_TOOL_RESULT_MAX + else text[:_LOG_TOOL_RESULT_MAX] + + f"... [truncated total {len(text)}]" + ) logger.info( "[%s] TOOL_CALL_RESULT name=%s args=%s result=%s", self.session_id, @@ -454,11 +490,48 @@ class ShoppingAgent: step_results.append({"content": preview}) debug_steps.append({"node": "tools", "results": step_results}) + yield { + "type": "debug_update", + "tool_calls": tool_calls, + "debug_steps": debug_steps, + } final_state = self.graph.get_state(config) final_msg = final_state.values["messages"][-1] response_text = _extract_formal_reply(final_msg) or _extract_message_text(final_msg) + # Reconcile streamed text with canonical final response. + if response_text and not streamed_response: + for delta in _iter_text_chunks(response_text): + streamed_response += delta + yield { + "type": "response_delta", + "delta": delta, + "response": streamed_response, + "tool_calls": tool_calls, + "debug_steps": debug_steps, + } + elif response_text and response_text != streamed_response: + if response_text.startswith(streamed_response): + pending = response_text[len(streamed_response) :] + for delta in _iter_text_chunks(pending): + streamed_response += delta + yield { + "type": "response_delta", + "delta": delta, + "response": streamed_response, + "tool_calls": tool_calls, + "debug_steps": debug_steps, + } + else: + streamed_response = response_text + yield { + "type": "response_replace", + "response": streamed_response, + "tool_calls": tool_calls, + "debug_steps": debug_steps, + } + # Collect new SearchResults added during this turn registry_after = global_registry.get_all(self.session_id) new_refs = { @@ -471,24 +544,54 @@ class ShoppingAgent: f"[{self.session_id}] done — tool_calls={len(tool_calls)}, new_refs={list(new_refs.keys())}" ) - return { - "response": response_text, - "tool_calls": tool_calls, - "debug_steps": debug_steps, - "search_refs": new_refs, - "error": False, + yield { + "type": "done", + "result": { + "response": response_text, + "tool_calls": tool_calls, + "debug_steps": debug_steps, + "search_refs": new_refs, + "error": False, + }, } except Exception as e: - logger.error(f"[{self.session_id}] chat error: {e}", exc_info=True) - return { - "response": f"抱歉,处理您的请求时遇到错误:{e}", - "tool_calls": [], - "debug_steps": [], - "search_refs": {}, - "error": True, + logger.error(f"[{self.session_id}] chat stream error: {e}", exc_info=True) + yield { + "type": "done", + "result": { + "response": f"抱歉,处理您的请求时遇到错误:{e}", + "tool_calls": [], + "debug_steps": [], + "search_refs": {}, + "error": True, + }, } + def chat(self, query: str, image_path: Optional[str] = None) -> dict: + """ + Process a user query and return the agent response with metadata. + + Returns: + dict with keys: + response – final AI message text (may contain [SEARCH_RESULTS_REF:ref_id] tokens) + tool_calls – list of {name, args, result_preview} + debug_steps – detailed per-node step log + search_refs – dict[ref_id → SearchResult] for all searches this turn + error – bool + """ + result: Optional[dict] = None + for event in self.chat_stream(query=query, image_path=image_path): + if event.get("type") == "done": + result = event.get("result") + return result or { + "response": "抱歉,处理您的请求时未返回结果。", + "tool_calls": [], + "debug_steps": [], + "search_refs": {}, + "error": True, + } + def get_conversation_history(self) -> list: try: config = {"configurable": {"thread_id": self.session_id}} -- libgit2 0.21.2