""" ShopAgent - Streamlit UI Multi-modal fashion shopping assistant with conversational AI """ import logging import re import uuid from pathlib import Path from typing import Optional import streamlit as st from PIL import Image, ImageOps from app.agents.shopping_agent import ShoppingAgent from app.search_registry import ProductItem, SearchResult, global_registry # Matches [SEARCH_REF:sr_xxxxxxxx] tokens embedded in AI responses. # Case-insensitive, optional spaces around the id. SEARCH_REF_PATTERN = re.compile(r"\[SEARCH_REF:\s*([a-zA-Z0-9_]+)\s*\]", re.IGNORECASE) # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) # Page config st.set_page_config( page_title="ShopAgent", page_icon="👗", layout="centered", initial_sidebar_state="collapsed", ) # Custom CSS - ChatGPT-like style st.markdown( """ """, unsafe_allow_html=True, ) # Initialize session state def initialize_session(): """Initialize session state variables""" if "session_id" not in st.session_state: st.session_state.session_id = str(uuid.uuid4()) if "shopping_agent" not in st.session_state: st.session_state.shopping_agent = ShoppingAgent( session_id=st.session_state.session_id ) if "messages" not in st.session_state: st.session_state.messages = [] if "uploaded_image" not in st.session_state: st.session_state.uploaded_image = None if "show_image_upload" not in st.session_state: st.session_state.show_image_upload = False # Debug panel toggle if "show_debug" not in st.session_state: st.session_state.show_debug = False def save_uploaded_image(uploaded_file) -> Optional[str]: """Save uploaded image to temp directory""" if uploaded_file is None: return None try: temp_dir = Path("temp_uploads") temp_dir.mkdir(exist_ok=True) image_path = temp_dir / f"{st.session_state.session_id}_{uploaded_file.name}" with open(image_path, "wb") as f: f.write(uploaded_file.getbuffer()) logger.info(f"Saved uploaded image to {image_path}") return str(image_path) except Exception as e: logger.error(f"Error saving uploaded image: {e}") st.error(f"Failed to save image: {str(e)}") return None def _load_product_image(product: ProductItem) -> Optional[Image.Image]: """Try to load a product image: image_url from API → local data/images → None.""" if product.image_url: try: import requests resp = requests.get(product.image_url, timeout=10) if resp.status_code == 200: import io return Image.open(io.BytesIO(resp.content)) except Exception as e: logger.debug(f"Remote image fetch failed for {product.spu_id}: {e}") local = Path(f"data/images/{product.spu_id}.jpg") if local.exists(): try: return Image.open(local) except Exception as e: logger.debug(f"Local image load failed {local}: {e}") return None def display_product_card_from_item(product: ProductItem) -> None: """Render a single product card from a ProductItem (registry entry).""" img = _load_product_image(product) if img: target = (220, 220) try: img = ImageOps.fit(img, target, method=Image.Resampling.LANCZOS) except AttributeError: img = ImageOps.fit(img, target, method=Image.LANCZOS) st.image(img, use_container_width=True) else: st.markdown( '
🛍️
', unsafe_allow_html=True, ) title = product.title or "未知商品" st.markdown(f"**{title[:40]}**" + ("…" if len(title) > 40 else "")) if product.price is not None: st.caption(f"¥{product.price:.2f}") label_style = "⭐" if product.match_label == "完美匹配" else "✦" st.caption(f"{label_style} {product.match_label}") def render_search_result_block(result: SearchResult) -> None: """ Render a full search result block in place of a [SEARCH_REF:xxx] token. Shows: - A styled header with query text + quality verdict + match counts - A grid of product cards (perfect matches first, then partial; max 6) """ verdict_icon = {"优质": "✅", "一般": "〰️", "较差": "⚠️"}.get(result.quality_verdict, "🔍") header_html = ( f'
' f'' f'🔍 {result.query}' f'  {verdict_icon} {result.quality_verdict}' f' · 完美匹配 {result.perfect_count} 件' f' · 相关 {result.partial_count} 件' f'
' ) st.markdown(header_html, unsafe_allow_html=True) # Perfect matches first, fall back to partials if none perfect = [p for p in result.products if p.match_label == "完美匹配"] partial = [p for p in result.products if p.match_label == "部分匹配"] to_show = (perfect + partial)[:6] if perfect else partial[:6] if not to_show: st.caption("(本次搜索未找到可展示的商品)") return cols = st.columns(min(len(to_show), 3)) for i, product in enumerate(to_show): with cols[i % 3]: display_product_card_from_item(product) def render_message_with_refs(content: str, session_id: str) -> None: """ Render an assistant message that may contain [SEARCH_REF:xxx] tokens. Text segments are rendered as markdown. [SEARCH_REF:xxx] tokens are replaced with full product card blocks loaded from the global registry. """ # re.split with a capture group alternates: [text, ref_id, text, ref_id, ...] parts = SEARCH_REF_PATTERN.split(content) for i, segment in enumerate(parts): if i % 2 == 0: # Text segment text = segment.strip() if text: st.markdown(text) else: # ref_id segment ref_id = segment.strip() result = global_registry.get(session_id, ref_id) if result: render_search_result_block(result) else: # ref not found (e.g. old session after restart) st.caption(f"[搜索结果 {ref_id} 不可用]") def display_message(message: dict): """Display a chat message""" role = message["role"] content = message["content"] image_path = message.get("image_path") tool_calls = message.get("tool_calls", []) debug_steps = message.get("debug_steps", []) if role == "user": st.markdown('
', unsafe_allow_html=True) if image_path and Path(image_path).exists(): try: img = Image.open(image_path) st.image(img, width=200) except Exception: logger.warning(f"Failed to load user uploaded image: {image_path}") st.markdown(content) st.markdown("
", unsafe_allow_html=True) else: # assistant # Tool call breadcrumb if tool_calls: tool_names = [tc["name"] for tc in tool_calls] st.caption(" → ".join(tool_names)) st.markdown("") # Debug panel if debug_steps and st.session_state.get("show_debug"): with st.expander("思考 & 工具调用详细过程", expanded=False): for idx, step in enumerate(debug_steps, 1): node = step.get("node", "unknown") st.markdown(f"**Step {idx} – {node}**") if node == "agent": msgs = step.get("messages", []) if msgs: st.markdown("**Agent Messages**") for m in msgs: st.markdown(f"- `{m.get('type', 'assistant')}`: {m.get('content', '')}") tcs = step.get("tool_calls", []) if tcs: st.markdown("**Planned Tool Calls**") for j, tc in enumerate(tcs, 1): st.markdown(f"- **{j}. {tc.get('name')}**") st.code(tc.get("args", {}), language="json") elif node == "tools": results = step.get("results", []) if results: st.markdown("**Tool Results**") for j, r in enumerate(results, 1): st.markdown(f"- **Result {j}:**") st.code(r.get("content", ""), language="text") st.markdown("---") # Render message: expand [SEARCH_REF:xxx] tokens into product card blocks session_id = st.session_state.get("session_id", "") render_message_with_refs(content, session_id) st.markdown("", unsafe_allow_html=True) def display_welcome(): """Display welcome screen""" col1, col2, col3, col4 = st.columns(4) with col1: st.markdown( """
💗
懂你
能记住你的偏好,给你推荐适合的
""", unsafe_allow_html=True, ) with col2: st.markdown( """
🛍️
懂商品
深度理解店铺内所有商品,智能匹配你的需求
""", unsafe_allow_html=True, ) with col3: st.markdown( """
💭
贴心
任意聊
""", unsafe_allow_html=True, ) with col4: st.markdown( """
👗
懂时尚
穿搭顾问 + 轻松对比
""", unsafe_allow_html=True, ) st.markdown("

", unsafe_allow_html=True) def main(): """Main Streamlit app""" initialize_session() # Header st.markdown( """
👗 ShopAgent
AI Fashion Shopping Assistant
""", unsafe_allow_html=True, ) # Sidebar (collapsed by default, but accessible) with st.sidebar: st.markdown("### ⚙️ Settings") if st.button("🗑️ Clear Chat", use_container_width=True): if "shopping_agent" in st.session_state: st.session_state.shopping_agent.clear_history() # Clear search result registry for this session session_id = st.session_state.get("session_id", "") if session_id: global_registry.clear_session(session_id) st.session_state.messages = [] st.session_state.uploaded_image = None st.rerun() # Debug toggle st.markdown("---") st.checkbox( "显示调试过程 (debug)", key="show_debug", value=True, help="展开后可查看中间思考过程及工具调用详情", ) st.markdown("---") st.caption(f"Session: `{st.session_state.session_id[:8]}...`") # Chat messages container messages_container = st.container() with messages_container: if not st.session_state.messages: display_welcome() else: for message in st.session_state.messages: display_message(message) # Fixed input area at bottom (using container to simulate fixed position) st.markdown('
', unsafe_allow_html=True) input_container = st.container() with input_container: # Image upload area (shown when + is clicked) if st.session_state.show_image_upload: uploaded_file = st.file_uploader( "Choose an image", type=["jpg", "jpeg", "png"], key="file_uploader", ) if uploaded_file: st.session_state.uploaded_image = uploaded_file # Show preview col1, col2 = st.columns([1, 4]) with col1: img = Image.open(uploaded_file) st.image(img, width=100) with col2: if st.button("❌ Remove"): st.session_state.uploaded_image = None st.session_state.show_image_upload = False st.rerun() # Input row col1, col2 = st.columns([1, 12]) with col1: # Image upload toggle button if st.button("➕", help="Add image", use_container_width=True): st.session_state.show_image_upload = ( not st.session_state.show_image_upload ) st.rerun() with col2: # Text input user_query = st.chat_input( "Ask about fashion products...", key="chat_input", ) st.markdown("
", unsafe_allow_html=True) # Process user input if user_query: # Ensure shopping agent is initialized if "shopping_agent" not in st.session_state: st.error("Session not initialized. Please refresh the page.") st.stop() # Save uploaded image if present, or get from recent history image_path = None if st.session_state.uploaded_image: # User explicitly uploaded an image for this query image_path = save_uploaded_image(st.session_state.uploaded_image) else: # Check if query refers to a previous image query_lower = user_query.lower() if any( ref in query_lower for ref in [ "this", "that", "the image", "the shirt", "the product", "it", ] ): # Find the most recent message with an image for msg in reversed(st.session_state.messages): if msg.get("role") == "user" and msg.get("image_path"): image_path = msg["image_path"] logger.info(f"Using image from previous message: {image_path}") break # Add user message st.session_state.messages.append( { "role": "user", "content": user_query, "image_path": image_path, } ) # Display user message immediately with messages_container: display_message(st.session_state.messages[-1]) # Process with shopping agent try: shopping_agent = st.session_state.shopping_agent # Handle greetings without invoking the agent query_lower = user_query.lower().strip() # Process with agent result = shopping_agent.chat( query=user_query, image_path=image_path, ) response = result["response"] tool_calls = result.get("tool_calls", []) debug_steps = result.get("debug_steps", []) # Add assistant message st.session_state.messages.append( { "role": "assistant", "content": response, "tool_calls": tool_calls, "debug_steps": debug_steps, } ) # Clear uploaded image and hide upload area after sending st.session_state.uploaded_image = None st.session_state.show_image_upload = False # Auto-scroll to bottom with JavaScript st.markdown( """ """, unsafe_allow_html=True, ) except Exception as e: logger.error(f"Error processing query: {e}", exc_info=True) error_msg = f"I apologize, I encountered an error: {str(e)}" st.session_state.messages.append( { "role": "assistant", "content": error_msg, } ) # Rerun to update UI st.rerun() if __name__ == "__main__": main()