From e7a2c0b73eb8429700d0fa9d30505605bcb100a4 Mon Sep 17 00:00:00 2001 From: tangwang Date: Mon, 9 Mar 2026 10:25:44 +0800 Subject: [PATCH] img encode --- embeddings/image_encoder.py | 17 +++++++++++++++++ indexer/document_transformer.py | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- indexer/incremental_service.py | 9 +++++++++ indexer/indexing_utils.py | 12 +++++++++--- scripts/create_tenant_index.sh | 1 - scripts/test_build_docs_api.py | 155 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ tests/ci/test_service_api_contracts.py | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 312 insertions(+), 6 deletions(-) create mode 100644 scripts/test_build_docs_api.py diff --git a/embeddings/image_encoder.py b/embeddings/image_encoder.py index d92c578..93cf0e6 100644 --- a/embeddings/image_encoder.py +++ b/embeddings/image_encoder.py @@ -149,3 +149,20 @@ class CLIPImageEncoder: results[batch_indices[j]] = None return results + + def encode_image_urls( + self, + urls: List[str], + batch_size: Optional[int] = None, + ) -> List[Optional[np.ndarray]]: + """ + 与 ClipImageModel / ClipAsServiceImageEncoder 一致的接口,供索引器 document_transformer 调用。 + + Args: + urls: 图片 URL 列表 + batch_size: 批大小(默认 8) + + Returns: + 与 urls 等长的向量列表,失败为 None + """ + return self.encode_batch(urls, batch_size=batch_size or 8) diff --git a/indexer/document_transformer.py b/indexer/document_transformer.py index 0b9059c..d04433b 100644 --- a/indexer/document_transformer.py +++ b/indexer/document_transformer.py @@ -5,7 +5,7 @@ SPU文档转换器 - 公共转换逻辑。 输出文档结构与 mappings/search_products.json 及 索引字段说明v2 一致, 供 search/searcher 与 search/es_query_builder 使用。 - 多语言字段:title, brief, description, vendor, category_path, category_name_text -- 嵌套:specifications, skus;向量:title_embedding(image_embedding 由 Java 索引或外部填充) +- 嵌套:specifications, skus;向量:title_embedding、image_embedding(可选,需提供 image_encoder) """ import pandas as pd @@ -38,7 +38,9 @@ class SPUDocumentTransformer: translator: Optional[Any] = None, translation_prompts: Optional[Dict[str, str]] = None, encoder: Optional[Any] = None, - enable_title_embedding: bool = True + enable_title_embedding: bool = True, + image_encoder: Optional[Any] = None, + enable_image_embedding: bool = False, ): """ 初始化文档转换器。 @@ -51,6 +53,8 @@ class SPUDocumentTransformer: translation_prompts: 翻译提示词配置(可选) encoder: 文本编码器实例(可选,用于生成title_embedding) enable_title_embedding: 是否启用标题向量化(默认True) + image_encoder: 图片编码器实例(可选,需实现 encode_image_urls(urls) -> List[Optional[np.ndarray]]) + enable_image_embedding: 是否启用图片向量化(默认False) """ self.category_id_to_name = category_id_to_name self.searchable_option_dimensions = searchable_option_dimensions @@ -59,6 +63,8 @@ class SPUDocumentTransformer: self.translation_prompts = translation_prompts or {} self.encoder = encoder self.enable_title_embedding = enable_title_embedding + self.image_encoder = image_encoder + self.enable_image_embedding = bool(enable_image_embedding and image_encoder is not None) def transform_spu_to_doc( self, @@ -116,6 +122,10 @@ class SPUDocumentTransformer: # Image URL self._fill_image_url(doc, spu_row) + # Image embedding(与 mappings/search_products.json 中 image_embedding 嵌套结构一致) + if self.enable_image_embedding: + self._fill_image_embedding(doc, spu_row, skus) + # Sales (fake_sales) if pd.notna(spu_row.get('fake_sales')): try: @@ -364,6 +374,56 @@ class SPUDocumentTransformer: image_src = f"//{image_src}" if not image_src.startswith('//') else image_src doc['image_url'] = image_src + def _fill_image_embedding( + self, doc: Dict[str, Any], spu_row: pd.Series, skus: pd.DataFrame + ) -> None: + """ + 填充 image_embedding 嵌套字段,与 mappings/search_products.json 一致: + [{ "vector": [float, ...], "url": "..." }, ...] + 收集 SPU 主图 + SKU 图片 URL,去重后调用 image_encoder 生成向量。 + """ + urls: List[str] = [] + seen: set = set() + + def _add(url: str) -> None: + if not url or not str(url).strip(): + return + u = str(url).strip() + if u.startswith("//"): + u = "https:" + u + if u not in seen: + seen.add(u) + urls.append(u) + + if doc.get("image_url"): + _add(doc["image_url"]) + if pd.notna(spu_row.get("image_src")): + _add(str(spu_row["image_src"])) + if not skus.empty and "image_src" in skus.columns: + for _, row in skus.iterrows(): + if pd.notna(row.get("image_src")): + _add(str(row["image_src"])) + + if not urls: + return + try: + vectors = self.image_encoder.encode_image_urls(urls, batch_size=8) + if not vectors or len(vectors) != len(urls): + return + out = [] + for url, vec in zip(urls, vectors): + if vec is None: + continue + if isinstance(vec, np.ndarray): + vec = vec.astype(np.float32) + out.append({"vector": vec.tolist(), "url": url}) + elif hasattr(vec, "tolist"): + out.append({"vector": vec.tolist(), "url": url}) + if out: + doc["image_embedding"] = out + except Exception as e: + logger.warning("Failed to generate image_embedding for SPU %s: %s", doc.get("spu_id"), e) + def _process_skus( self, skus: pd.DataFrame, diff --git a/indexer/incremental_service.py b/indexer/incremental_service.py index cd9f49b..cac7710 100644 --- a/indexer/incremental_service.py +++ b/indexer/incremental_service.py @@ -74,11 +74,20 @@ class IncrementalIndexerService: encoder = None enable_embedding = False + image_encoder: Optional[Any] = None + try: + from embeddings.image_encoder import CLIPImageEncoder + image_encoder = CLIPImageEncoder() + except Exception as e: + logger.debug("Image encoder not available for indexer: %s", e) + transformer = create_document_transformer( category_id_to_name=self.category_id_to_name, tenant_id=tenant_id, encoder=encoder, enable_title_embedding=False, # batch fill later + image_encoder=image_encoder, + enable_image_embedding=(image_encoder is not None), config=config, ) diff --git a/indexer/indexing_utils.py b/indexer/indexing_utils.py index 4693269..5c800fa 100644 --- a/indexer/indexing_utils.py +++ b/indexer/indexing_utils.py @@ -59,11 +59,13 @@ def create_document_transformer( translation_prompts: Optional[Dict[str, str]] = None, encoder: Optional[Any] = None, enable_title_embedding: bool = True, + image_encoder: Optional[Any] = None, + enable_image_embedding: bool = False, config: Optional[Any] = None, ) -> SPUDocumentTransformer: """ 创建文档转换器(统一初始化逻辑)。 - + Args: category_id_to_name: 分类ID到名称的映射 tenant_id: 租户ID @@ -72,7 +74,9 @@ def create_document_transformer( translation_prompts: 翻译提示词配置(如果为None则从配置加载) encoder: 文本编码器实例(如果为None且enable_title_embedding为True则根据配置初始化) enable_title_embedding: 是否启用标题向量化(默认True) - + image_encoder: 图片编码器(可选,需实现 encode_image_urls(urls)) + enable_image_embedding: 是否启用 image_embedding 填充(默认False) + Returns: SPUDocumentTransformer实例 """ @@ -128,6 +132,8 @@ def create_document_transformer( translator=translator, translation_prompts=translation_prompts, encoder=encoder, - enable_title_embedding=enable_title_embedding + enable_title_embedding=enable_title_embedding, + image_encoder=image_encoder, + enable_image_embedding=enable_image_embedding, ) diff --git a/scripts/create_tenant_index.sh b/scripts/create_tenant_index.sh index cde2b4a..d0a4e44 100755 --- a/scripts/create_tenant_index.sh +++ b/scripts/create_tenant_index.sh @@ -61,7 +61,6 @@ echo echo "删除索引: $ES_INDEX" echo curl -X DELETE "${ES_HOST}/${ES_INDEX}" $AUTH_PARAM -s -o /dev/null -w "HTTP状态码: %{http_code}\n" - echo echo "创建索引: $ES_INDEX" echo diff --git a/scripts/test_build_docs_api.py b/scripts/test_build_docs_api.py new file mode 100644 index 0000000..1a37f5a --- /dev/null +++ b/scripts/test_build_docs_api.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +""" +测试 POST /indexer/build-docs 接口:构造请求数据、调用接口、打印完整响应。 + +用法: + 1. 先启动 Indexer 服务: ./scripts/start_indexer.sh (或 uvicorn api.indexer_app:app --port 6004) + 2. 执行: python scripts/test_build_docs_api.py + + 也可指定地址: INDEXER_URL=http://localhost:6004 python scripts/test_build_docs_api.py +""" + +import json +import os +import sys +from datetime import datetime, timezone + +# 项目根目录 +ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, ROOT) + +# 默认使用 requests 调真实服务;若未安装则回退到 TestClient +try: + import requests + HAS_REQUESTS = True +except ImportError: + HAS_REQUESTS = False + + +def build_sample_request(): + """构造一条完整的 build-docs 请求体(对应 shoplazza_product_spu / sku / option 表结构)。""" + now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + return { + "tenant_id": "162", + "items": [ + { + "spu": { + "id": 10001, + "tenant_id": "162", + "title": "测试T恤 纯棉短袖", + "brief": "舒适纯棉,多色可选", + "description": "这是一款适合日常穿着的纯棉T恤,透气吸汗。", + "vendor": "测试品牌", + "category": "服装/上衣/T恤", + "category_id": 100, + "category_level": 2, + "category_path": "服装/上衣/T恤", + "fake_sales": 1280, + "image_src": "https://example.com/img/tshirt.jpg", + "tags": "T恤,纯棉,短袖,夏季", + "create_time": now, + "update_time": now, + }, + "skus": [ + { + "id": 20001, + "spu_id": 10001, + "price": 99.0, + "compare_at_price": 129.0, + "sku": "SKU-TSHIRT-001", + "inventory_quantity": 50, + "option1": "黑色", + "option2": "M", + "option3": None, + }, + { + "id": 20002, + "spu_id": 10001, + "price": 99.0, + "compare_at_price": 129.0, + "sku": "SKU-TSHIRT-002", + "inventory_quantity": 30, + "option1": "白色", + "option2": "L", + "option3": None, + }, + ], + "options": [ + {"id": 1, "position": 1, "name": "颜色"}, + {"id": 2, "position": 2, "name": "尺码"}, + ], + } + ], + } + + +def call_via_http(base_url: str, body: dict): + """通过 HTTP 调用 build-docs。""" + url = f"{base_url.rstrip('/')}/indexer/build-docs" + r = requests.post(url, json=body, timeout=30) + return r.status_code, r.text, r.json() if r.headers.get("content-type", "").startswith("application/json") else None + + +def call_via_test_client(body: dict): + """通过 FastAPI TestClient 调用(不依赖已启动服务,但需 DB/ES 已配置)。""" + from fastapi.testclient import TestClient + import api.indexer_app as indexer_app + + with TestClient(indexer_app.app) as client: + r = client.post("/indexer/build-docs", json=body) + return r.status_code, r.text, r.json() if r.headers.get("content-type", "").startswith("application/json") else None + + +def main(): + body = build_sample_request() + + print("=" * 60) + print("【请求】POST /indexer/build-docs") + print("=" * 60) + print(json.dumps(body, ensure_ascii=False, indent=2)) + + base_url = os.getenv("INDEXER_URL", "http://localhost:6004") + use_http = HAS_REQUESTS and (os.getenv("USE_TEST_CLIENT", "").lower() not in ("1", "true", "yes")) + + if use_http: + try: + status, raw, data = call_via_http(base_url, body) + except requests.RequestException as e: + print("\n[错误] 无法连接 Indexer 服务:", e) + print("请先启动: ./scripts/start_indexer.sh 或 uvicorn api.indexer_app:app --port 6004") + if HAS_REQUESTS: + print("或使用进程内测试: USE_TEST_CLIENT=1 python scripts/test_build_docs_api.py") + sys.exit(1) + else: + if not use_http and not HAS_REQUESTS: + print("\n[提示] 未安装 requests,使用 TestClient 调用(需配置 DB/ES)。") + else: + print("\n[提示] 使用 TestClient 调用(USE_TEST_CLIENT=1)。") + try: + status, raw, data = call_via_test_client(body) + except Exception as e: + print("\n[错误] TestClient 调用失败:", e) + print("请确保已 source activate.sh 且 DB/ES 环境变量正确,或先启动 Indexer 再用 HTTP 调用。") + sys.exit(1) + + print("\n" + "=" * 60) + print("【响应】HTTP status =", status) + print("=" * 60) + if data is not None: + print(json.dumps(data, ensure_ascii=False, indent=2, default=str)) + if data.get("docs"): + doc = data["docs"][0] + print("\n" + "=" * 60) + print("【返回 doc 顶层字段】共 {} 个".format(len(doc))) + print("=" * 60) + for k in sorted(doc.keys()): + print(" ", k) + else: + print(raw) + + if status != 200: + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/tests/ci/test_service_api_contracts.py b/tests/ci/test_service_api_contracts.py index bd78df3..ffe522a 100644 --- a/tests/ci/test_service_api_contracts.py +++ b/tests/ci/test_service_api_contracts.py @@ -1,5 +1,7 @@ from __future__ import annotations +import json +from pathlib import Path from types import SimpleNamespace from typing import Any, Dict, List @@ -219,6 +221,64 @@ def test_indexer_build_docs_contract(indexer_client: TestClient): assert data["docs"][0]["spu_id"] == "1" +def test_indexer_build_docs_show_request_response(indexer_client: TestClient): + """ + 调用「输入 SPU 详情、输出 ES doc」的 build-docs 接口,并打印完整请求与响应, + 便于核对返回的 ES 文档字段是否齐全。 + 运行: pytest tests/ci/test_service_api_contracts.py::test_indexer_build_docs_show_request_response -v -s + """ + request_body = { + "tenant_id": "162", + "items": [ + { + "spu": {"id": 1, "title": "T-shirt", "brief": "A simple T-shirt"}, + "skus": [], + "options": [], + } + ], + } + print("\n" + "=" * 60) + print("【请求】POST /indexer/build-docs") + print("=" * 60) + print(json.dumps(request_body, ensure_ascii=False, indent=2)) + + response = indexer_client.post("/indexer/build-docs", json=request_body) + data = response.json() + + print("\n" + "=" * 60) + print("【响应】status_code =", response.status_code) + print("=" * 60) + print(json.dumps(data, ensure_ascii=False, indent=2, default=str)) + + if data.get("docs"): + doc = data["docs"][0] + doc_keys = sorted(doc.keys()) + print("\n" + "=" * 60) + print("【返回 doc 顶层字段】共 {} 个".format(len(doc_keys))) + print("=" * 60) + for k in doc_keys: + print(" ", k) + + # 与 ES mapping 顶层字段对比 + mapping_path = Path(__file__).resolve().parents[2] / "mappings" / "search_products.json" + if mapping_path.exists(): + with open(mapping_path) as f: + mapping = json.load(f) + expected_top_level = set(mapping.get("mappings", {}).get("properties", {}).keys()) + returned = set(doc_keys) + missing = expected_top_level - returned + extra = returned - expected_top_level + print("\n" + "=" * 60) + print("【与 mappings/search_products.json 对比】") + print("=" * 60) + print(" mapping 中应有、当前 doc 未返回:", sorted(missing) if missing else "(无)") + print(" 当前 doc 多出:", sorted(extra) if extra else "(无)") + + assert response.status_code == 200 + assert data["success_count"] == 1 + assert data["docs"][0]["spu_id"] == "1" + + def test_indexer_build_docs_from_db_contract(indexer_client: TestClient): """POST /indexer/build-docs-from-db: tenant_id + spu_ids, returns same shape as build-docs.""" response = indexer_client.post( -- libgit2 0.21.2