技术实现报告.md
23 KB
OmniShopAgent 项目技术实现报告
一、项目概述
OmniShopAgent 是一个基于 LangGraph 和 ReAct 模式 的自主多模态时尚购物智能体。系统能够自主决定调用哪些工具、维护对话状态、判断何时回复,实现智能化的商品发现与推荐。
核心特性
- 自主工具选择与执行:Agent 根据用户意图自主选择并调用工具
- 多模态搜索:支持文本搜索 + 图像搜索
- 对话上下文感知:多轮对话中保持上下文记忆
- 实时视觉分析:基于 VLM 的图片风格分析
二、技术栈
| 组件 | 技术选型 |
|---|---|
| 运行环境 | Python 3.12 |
| Agent 框架 | LangGraph 1.x |
| LLM 框架 | LangChain 1.x(支持任意 LLM,默认 gpt-4o-mini) |
| 文本向量 | text-embedding-3-small |
| 图像向量 | CLIP ViT-B/32 |
| 向量数据库 | Milvus |
| 前端 | Streamlit |
| 数据集 | Kaggle Fashion Products |
三、系统架构
3.1 整体架构图
┌─────────────────────────────────────────────────────────────────┐
│ Streamlit 前端 (app.py) │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ ShoppingAgent (shopping_agent.py) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ LangGraph StateGraph + ReAct Pattern │ │
│ │ START → Agent → [Has tool_calls?] → Tools → Agent → END │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────────┐ ┌─────────────────────┐
│ search_ │ │ search_by_image │ │ analyze_image_style │
│ products │ │ │ │ (OpenAI Vision) │
└──────┬───────┘ └────────┬─────────┘ └──────────┬───────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ EmbeddingService (embedding_service.py) │
│ OpenAI API (文本) │ CLIP Server (图像) │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ MilvusService (milvus_service.py) │
│ text_embeddings 集合 │ image_embeddings 集合 │
└─────────────────────────────────────────────────────────────────┘
3.2 Agent 流程图(LangGraph)
graph LR
START --> Agent
Agent -->|Has tool_calls| Tools
Agent -->|No tool_calls| END
Tools --> Agent
四、关键代码实现
4.1 Agent 核心实现(shopping_agent.py)
4.1.1 状态定义
from typing_extensions import Annotated, TypedDict
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
"""State for the shopping agent with message accumulation"""
messages: Annotated[Sequence[BaseMessage], add_messages]
current_image_path: Optional[str] # Track uploaded image
messages使用add_messages实现消息累加,支持多轮对话current_image_path存储当前上传的图片路径供工具使用
4.1.2 LangGraph 图构建
def _build_graph(self):
"""Build the LangGraph StateGraph"""
def agent_node(state: AgentState):
"""Agent decision node - decides which tools to call or when to respond"""
messages = state["messages"]
if not any(isinstance(m, SystemMessage) for m in messages):
messages = [SystemMessage(content=system_prompt)] + list(messages)
response = self.llm_with_tools.invoke(messages)
return {"messages": [response]}
tool_node = ToolNode(self.tools)
def should_continue(state: AgentState):
"""Determine if agent should continue or end"""
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return END
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue, ["tools", END])
workflow.add_edge("tools", "agent")
checkpointer = MemorySaver()
return workflow.compile(checkpointer=checkpointer)
关键点:
- agent_node:将消息传入 LLM,由 LLM 决定是否调用工具
- should_continue:若有
tool_calls则进入工具节点,否则结束 - MemorySaver:按
thread_id持久化对话状态
4.1.3 System Prompt 设计
system_prompt = """You are an intelligent fashion shopping assistant. You can:
1. Search for products by text description (use search_products)
2. Find visually similar products from images (use search_by_image)
3. Analyze image style and attributes (use analyze_image_style)
When a user asks about products:
- For text queries: use search_products directly
- For image uploads: decide if you need to analyze_image_style first, then search
- You can call multiple tools in sequence if needed
- Always provide helpful, friendly responses
CRITICAL FORMATTING RULES:
When presenting product results, you MUST use this EXACT format for EACH product:
1. [Product Name]
ID: [Product ID Number]
Category: [Category]
Color: [Color]
Gender: [Gender]
(Include Season, Usage, Relevance if available)
..."""
通过 system prompt 约束工具使用和输出格式,保证前端可正确解析产品信息。
4.1.4 对话入口与流式处理
def chat(self, query: str, image_path: Optional[str] = None) -> dict:
# Build input message
message_content = query
if image_path:
message_content = f"{query}\n[User uploaded image: {image_path}]"
config = {"configurable": {"thread_id": self.session_id}}
input_state = {
"messages": [HumanMessage(content=message_content)],
"current_image_path": image_path,
}
tool_calls = []
for event in self.graph.stream(input_state, config=config):
if "agent" in event:
for msg in event["agent"].get("messages", []):
if hasattr(msg, "tool_calls") and msg.tool_calls:
for tc in msg.tool_calls:
tool_calls.append({"name": tc["name"], "args": tc.get("args", {})})
if "tools" in event:
# 记录工具执行结果
...
final_state = self.graph.get_state(config)
response_text = final_state.values["messages"][-1].content
return {"response": response_text, "tool_calls": tool_calls, "error": False}
4.2 搜索工具实现(search_tools.py)
4.2.1 文本语义搜索
@tool
def search_products(query: str, limit: int = 5) -> str:
"""Search for fashion products using natural language descriptions."""
try:
embedding_service = get_embedding_service()
milvus_service = get_milvus_service()
query_embedding = embedding_service.get_text_embedding(query)
results = milvus_service.search_similar_text(
query_embedding=query_embedding,
limit=min(limit, 20),
filters=None,
output_fields=[
"id", "productDisplayName", "gender", "masterCategory",
"subCategory", "articleType", "baseColour", "season", "usage",
],
)
if not results:
return "No products found matching your search."
output = f"Found {len(results)} product(s):\n\n"
for idx, product in enumerate(results, 1):
output += f"{idx}. {product.get('productDisplayName', 'Unknown Product')}\n"
output += f" ID: {product.get('id', 'N/A')}\n"
output += f" Category: {product.get('masterCategory')} > {product.get('subCategory')} > {product.get('articleType')}\n"
output += f" Color: {product.get('baseColour')}\n"
output += f" Gender: {product.get('gender')}\n"
if "distance" in product:
similarity = 1 - product["distance"]
output += f" Relevance: {similarity:.2%}\n"
output += "\n"
return output.strip()
except Exception as e:
return f"Error searching products: {str(e)}"
4.2.2 图像相似度搜索
@tool
def search_by_image(image_path: str, limit: int = 5) -> str:
"""Find similar fashion products using an image."""
if not Path(image_path).exists():
return f"Error: Image file not found at '{image_path}'"
embedding_service = get_embedding_service()
milvus_service = get_milvus_service()
if not embedding_service.clip_client:
embedding_service.connect_clip()
image_embedding = embedding_service.get_image_embedding(image_path)
results = milvus_service.search_similar_images(
query_embedding=image_embedding,
limit=min(limit + 1, 21),
output_fields=[...],
)
# 过滤掉查询图像本身(如上传的是商品库中的图)
query_id = Path(image_path).stem
filtered_results = [r for r in results if Path(r.get("image_path", "")).stem != query_id]
filtered_results = filtered_results[:limit]
4.2.3 视觉分析(VLM)
@tool
def analyze_image_style(image_path: str) -> str:
"""Analyze a fashion product image using AI vision to extract detailed style information."""
with open(img_path, "rb") as image_file:
image_data = base64.b64encode(image_file.read()).decode("utf-8")
prompt = """Analyze this fashion product image and provide a detailed description.
Include:
- Product type (e.g., shirt, dress, shoes, pants, bag)
- Primary colors
- Style/design (e.g., casual, formal, sporty, vintage, modern)
- Pattern or texture (e.g., plain, striped, checked, floral)
- Key features (e.g., collar type, sleeve length, fit)
- Material appearance (if obvious, e.g., denim, cotton, leather)
- Suitable occasion (e.g., office wear, party, casual, sports)
Provide a comprehensive yet concise description (3-4 sentences)."""
client = get_openai_client()
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_data}", "detail": "high"}},
],
}],
max_tokens=500,
temperature=0.3,
)
return response.choices[0].message.content.strip()
4.3 向量服务实现
4.3.1 EmbeddingService(embedding_service.py)
class EmbeddingService:
def get_text_embedding(self, text: str) -> List[float]:
"""OpenAI text-embedding-3-small"""
response = self.openai_client.embeddings.create(
input=text, model=self.text_embedding_model
)
return response.data[0].embedding
def get_image_embedding(self, image_path: Union[str, Path]) -> List[float]:
"""CLIP 图像向量"""
if not self.clip_client:
raise RuntimeError("CLIP client not connected. Call connect_clip() first.")
result = self.clip_client.encode([str(image_path)])
if isinstance(result, np.ndarray):
embedding = result[0].tolist() if len(result.shape) > 1 else result.tolist()
else:
embedding = result[0].embedding.tolist()
return embedding
def get_text_embeddings_batch(self, texts: List[str], batch_size: int = 100) -> List[List[float]]:
"""批量文本嵌入,用于索引"""
for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size]
response = self.openai_client.embeddings.create(input=batch, ...)
embeddings = [item.embedding for item in response.data]
all_embeddings.extend(embeddings)
return all_embeddings
4.3.2 MilvusService(milvus_service.py)
文本集合 Schema:
schema = MilvusClient.create_schema(auto_id=False, enable_dynamic_field=True)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=2000)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=self.text_dim) # 1536
schema.add_field(field_name="productDisplayName", datatype=DataType.VARCHAR, max_length=500)
schema.add_field(field_name="gender", datatype=DataType.VARCHAR, max_length=50)
schema.add_field(field_name="masterCategory", datatype=DataType.VARCHAR, max_length=100)
# ... 更多元数据字段
图像集合 Schema:
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="image_path", datatype=DataType.VARCHAR, max_length=500)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=self.image_dim) # 512
# ... 产品元数据
相似度搜索:
def search_similar_text(self, query_embedding, limit=10, output_fields=None):
results = self.client.search(
collection_name=self.text_collection_name,
data=[query_embedding],
limit=limit,
output_fields=output_fields,
)
formatted_results = []
for hit in results[0]:
result = {"id": hit.get("id"), "distance": hit.get("distance")}
entity = hit.get("entity", {})
for field in output_fields:
if field in entity:
result[field] = entity.get(field)
formatted_results.append(result)
return formatted_results
4.4 数据索引脚本(index_data.py)
4.4.1 产品数据加载
def _load_products_from_csv(self) -> Dict[int, Dict[str, Any]]:
products = {}
# 加载 images.csv 映射
with open(self.images_csv, "r") as f:
images_dict = {int(row["filename"].split(".")[0]): row["link"] for row in csv.DictReader(f)}
# 加载 styles.csv
with open(self.styles_csv, "r") as f:
for row in csv.DictReader(f):
product_id = int(row["id"])
products[product_id] = {
"id": product_id,
"gender": row.get("gender", ""),
"masterCategory": row.get("masterCategory", ""),
"subCategory": row.get("subCategory", ""),
"articleType": row.get("articleType", ""),
"baseColour": row.get("baseColour", ""),
"season": row.get("season", ""),
"usage": row.get("usage", ""),
"productDisplayName": row.get("productDisplayName", ""),
"imagePath": f"{product_id}.jpg",
}
return products
4.4.2 文本索引
def _create_product_text(self, product: Dict[str, Any]) -> str:
"""构造产品文本用于 embedding"""
parts = [
product.get("productDisplayName", ""),
f"Gender: {product.get('gender', '')}",
f"Category: {product.get('masterCategory', '')} > {product.get('subCategory', '')}",
f"Type: {product.get('articleType', '')}",
f"Color: {product.get('baseColour', '')}",
f"Season: {product.get('season', '')}",
f"Usage: {product.get('usage', '')}",
]
return " | ".join([p for p in parts if p and p != "Gender: " and p != "Color: "])
4.4.3 批量索引流程
# 文本索引
texts = [self._create_product_text(p) for p in products]
embeddings = self.embedding_service.get_text_embeddings_batch(texts, batch_size=50)
milvus_data = [{
"id": product_id,
"text": text[:2000],
"embedding": embedding,
"productDisplayName": product["productDisplayName"][:500],
"gender": product["gender"][:50],
# ... 其他元数据
} for product_id, text, embedding in zip(...)]
self.milvus_service.insert_text_embeddings(milvus_data)
# 图像索引
image_paths = [self.image_dir / p["imagePath"] for p in products]
embeddings = self.embedding_service.get_image_embeddings_batch(image_paths, batch_size=32)
# 类似插入 image_embeddings 集合
4.5 Streamlit 前端(app.py)
4.5.1 会话与 Agent 初始化
def initialize_session():
if "session_id" not in st.session_state:
st.session_state.session_id = str(uuid.uuid4())
if "shopping_agent" not in st.session_state:
st.session_state.shopping_agent = ShoppingAgent(session_id=st.session_state.session_id)
if "messages" not in st.session_state:
st.session_state.messages = []
if "uploaded_image" not in st.session_state:
st.session_state.uploaded_image = None
4.5.2 产品信息解析
def extract_products_from_response(response: str) -> list:
"""从 Agent 回复中解析产品信息"""
products = []
for line in response.split("\n"):
if re.match(r"^\*?\*?\d+\.\s+", line):
if current_product:
products.append(current_product)
current_product = {"name": re.sub(r"^\*?\*?\d+\.\s+", "", line).replace("**", "").strip()}
elif "ID:" in line:
id_match = re.search(r"(?:ID|id):\s*(\d+)", line)
if id_match:
current_product["id"] = id_match.group(1)
elif "Category:" in line:
cat_match = re.search(r"Category:\s*(.+?)(?:\n|$)", line)
if cat_match:
current_product["category"] = cat_match.group(1).strip()
# ... Color, Gender, Season, Usage, Similarity/Relevance
return products
4.5.3 多轮对话中的图片引用
# 用户输入 "make them formal" 时,若上一条消息有图片,则引用该图片
if any(ref in query_lower for ref in ["this", "that", "the image", "it"]):
for msg in reversed(st.session_state.messages):
if msg.get("role") == "user" and msg.get("image_path"):
image_path = msg["image_path"]
break
4.6 配置管理(config.py)
class Settings(BaseSettings):
openai_api_key: str
openai_model: str = "gpt-4o-mini"
openai_embedding_model: str = "text-embedding-3-small"
clip_server_url: str = "grpc://localhost:51000"
milvus_uri: str = "http://localhost:19530"
text_collection_name: str = "text_embeddings"
image_collection_name: str = "image_embeddings"
text_dim: int = 1536
image_dim: int = 512
@property
def milvus_uri_absolute(self) -> str:
"""支持 Milvus Standalone 和 Milvus Lite"""
if self.milvus_uri.startswith(("http://", "https://")):
return self.milvus_uri
if self.milvus_uri.startswith("./"):
return os.path.join(base_dir, self.milvus_uri[2:])
return self.milvus_uri
class Config:
env_file = ".env"
五、部署与运行
5.1 依赖服务
# docker-compose.yml 提供
- etcd: 元数据存储
- minio: 对象存储
- milvus-standalone: 向量数据库
- attu: Milvus 管理界面
5.2 启动流程
# 1. 环境
pip install -r requirements.txt
cp .env.example .env # 配置 OPENAI_API_KEY
# 2. 下载数据
python scripts/download_dataset.py # Kaggle Fashion Product Images Dataset
# 3. 启动 CLIP 服务(需单独运行)
python -m clip_server
# 4. 启动 Milvus
docker-compose up
# 5. 索引数据
python scripts/index_data.py
# 6. 启动应用
streamlit run app.py
六、典型交互流程
| 场景 | 用户输入 | Agent 行为 | 工具调用 |
|---|---|---|---|
| 文本搜索 | "winter coats for women" | 直接文本搜索 | search_products("winter coats women") |
| 图像搜索 | [上传图片] "find similar" | 图像相似度搜索 | search_by_image(path) |
| 风格分析+搜索 | [上传复古夹克] "what style? find matching pants" | 先分析风格再搜索 | analyze_image_style(path) → search_products("vintage pants casual") |
| 多轮上下文 | [第1轮] "show me red dresses"[第2轮] "make them formal" | 结合上下文 | search_products("red formal dresses") |
七、设计要点总结
- ReAct 模式:Agent 自主决定何时调用工具、调用哪些工具、是否继续调用。
- LangGraph 状态图:
START → Agent → [条件] → Tools → Agent → END,支持多轮工具调用。 - 多模态:文本 + 图像 + VLM 分析,覆盖文本搜索、以图搜图、风格理解。
- 双向量集合:Milvus 中 text_embeddings / image_embeddings 分别存储,支持不同模态的检索。
- 会话持久化:
MemorySaver+thread_id实现多轮对话记忆。 - 格式约束:System prompt 严格限制产品输出格式,便于前端解析和展示。
八、附录:项目结构
OmniShopAgent/
├── app/
│ ├── agents/
│ │ └── shopping_agent.py
│ ├── config.py
│ ├── services/
│ │ ├── embedding_service.py
│ │ └── milvus_service.py
│ └── tools/
│ └── search_tools.py
├── scripts/
│ ├── download_dataset.py
│ └── index_data.py
├── app.py
├── docker-compose.yml
└── requirements.txt