diff --git a/README.md b/README.md index 67cbb92..3fa9fc2 100644 --- a/README.md +++ b/README.md @@ -212,6 +212,17 @@ curl -X POST http://localhost:6002/search/ \ - `scripts/ingest.sh [recreate]`:驱动 `indexer/` 模块写入 `search_products` - 详解:`测试数据指南.md` +- **索引富化 & Java 对接** + - Java 索引程序负责:全量/增量调度 + 从 MySQL 查询 `shoplazza_product_spu/sku/option/...` + - Python `indexer` 模块负责:**MySQL 行 → ES doc** 的全部逻辑(多语言、翻译、向量、规格聚合等) + - 正式对接接口(推荐): + - `POST http://:6004/indexer/build-docs` + - 入参:`tenant_id + items[{spu, skus, options}]` + - 出参:与 `mappings/search_products.json` 完全一致的 `docs` 列表,上游自行写入 ES + - 调试/自测接口(内部使用): + - `POST http://127.0.0.1:6004/indexer/build-docs-from-db`,只需要 `tenant_id + spu_ids`,由服务内部查库并返回 ES doc + - 详解:`indexer/README.md`、`docs/索引字段说明v2.md` + - **搜索服务 & API** - `api/`(FastAPI)承载 REST API,`search/` + `query/` 负责查询解析与下发 - API、分页、过滤、Facet、KNN 等:`搜索API对接指南.md` diff --git a/api/routes/indexer.py b/api/routes/indexer.py index 732cbec..097f879 100644 --- a/api/routes/indexer.py +++ b/api/routes/indexer.py @@ -6,8 +6,8 @@ import asyncio from fastapi import APIRouter, HTTPException -from typing import List -from pydantic import BaseModel +from typing import Any, Dict, List +from pydantic import BaseModel, Field import logging from sqlalchemy import text @@ -38,6 +38,44 @@ class GetDocumentsRequest(BaseModel): spu_ids: List[str] +class BuildDocItem(BaseModel): + """ + 单个 SPU 的原始数据包(由上游从 MySQL 查询得到)。 + + - spu: 一行 SPU 记录,对应 shoplazza_product_spu 表 + - skus: 该 SPU 下的所有 SKU 记录,对应 shoplazza_product_sku 表 + - options: 该 SPU 的所有 Option 记录,对应 shoplazza_product_option 表 + """ + spu: Dict[str, Any] = Field(..., description="单个 SPU 的原始字段(MySQL 行数据)") + skus: List[Dict[str, Any]] = Field(default_factory=list, description="该 SPU 关联的 SKU 列表") + options: List[Dict[str, Any]] = Field(default_factory=list, description="该 SPU 关联的 Option 列表") + + +class BuildDocsRequest(BaseModel): + """ + 基于上游已查询出的 MySQL 原始数据,构建 ES 索引文档(不访问数据库、不写入 ES)。 + + 该接口是 Java 等外部索引程序正式使用的“doc 生成接口”: + - 上游负责:全量 / 增量调度 + 从 MySQL 查询出各表数据 + - 本模块负责:根据配置和算法,将原始行数据转换为与 mappings/search_products.json 一致的 ES 文档 + """ + tenant_id: str = Field(..., description="租户 ID,用于加载租户配置、语言策略等") + items: List[BuildDocItem] = Field(..., description="需要构建 doc 的 SPU 列表(含其 SKUs 和 Options)") + + +class BuildDocsFromDbRequest(BaseModel): + """ + 便捷测试请求:只提供 tenant_id 和 spu_ids,由本服务从 MySQL 查询原始数据, + 然后内部调用 /indexer/build-docs 的同一套逻辑构建 ES doc。 + + 用途: + - 本地/联调时快速验证 doc 结构,无需手工构造庞大的 BuildDocsRequest JSON + - 生产正式使用建议直接走 BuildDocsRequest,由外层(Java)控制 MySQL 查询 + """ + tenant_id: str = Field(..., description="租户 ID") + spu_ids: List[str] = Field(..., description="需要构建 doc 的 SPU ID 列表") + + @router.post("/reindex") async def reindex_all(request: ReindexRequest): """ @@ -139,6 +177,238 @@ async def index_spus(request: IndexSpusRequest): raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") +@router.post("/build-docs") +async def build_docs(request: BuildDocsRequest): + """ + 构建 ES 文档(不访问数据库、不写入 ES)。 + + 使用场景: + - 上游(例如 Java 索引程序)已经从 MySQL 查询出了 SPU / SKU / Option 等原始行数据 + - 希望复用本项目的全部“索引富化”能力(多语言、翻译、向量、规格聚合等) + - 只需要拿到与 `mappings/search_products.json` 一致的 doc 列表,由上游自行写入 ES + """ + try: + if not request.items: + raise HTTPException(status_code=400, detail="items cannot be empty") + if len(request.items) > 200: + raise HTTPException(status_code=400, detail="Maximum 200 items allowed per request") + + incremental_service = get_incremental_service() + if incremental_service is None: + raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized") + + # 复用增量索引服务中的 transformer 缓存与配置 / 语言 / embedding 初始化逻辑 + transformer, encoder, enable_embedding = incremental_service._get_transformer_bundle( + tenant_id=request.tenant_id + ) + + import pandas as pd + + docs: List[Dict[str, Any]] = [] + failed: List[Dict[str, Any]] = [] + + for item in request.items: + try: + # 将上游传入的 MySQL 行数据转换为 Pandas 结构,复用 SPUDocumentTransformer + spu_df = pd.DataFrame([item.spu]) + spu_row = spu_df.iloc[0] + skus_df = pd.DataFrame(item.skus) if item.skus else pd.DataFrame() + options_df = pd.DataFrame(item.options) if item.options else pd.DataFrame() + + doc = transformer.transform_spu_to_doc( + tenant_id=request.tenant_id, + spu_row=spu_row, + skus=skus_df, + options=options_df, + ) + + if doc is None: + failed.append( + { + "spu_id": str(item.spu.get("id")), + "error": "transform_spu_to_doc returned None", + } + ) + continue + + # 在“构建 doc”接口中,是否补齐 embedding 由内部配置决定(与增量索引一致) + # 此处不强制生成 / 不强制关闭,只复用 transformer_bundle 的 encoder / enable_embedding 设置。 + if enable_embedding and encoder: + title_obj = doc.get("title") or {} + title_text = None + if isinstance(title_obj, dict): + title_text = title_obj.get("en") or title_obj.get("zh") + if not title_text: + for v in title_obj.values(): + if v and str(v).strip(): + title_text = str(v) + break + if title_text and str(title_text).strip(): + try: + embeddings = encoder.encode(title_text) + if embeddings is not None and len(embeddings) > 0: + emb0 = embeddings[0] + import numpy as np + + if isinstance(emb0, np.ndarray): + doc["title_embedding"] = emb0.tolist() + except Exception: + # 构建 doc 接口不因为 embedding 失败而整体失败 + pass + + docs.append(doc) + except Exception as e: + failed.append( + { + "spu_id": str(item.spu.get("id")), + "error": str(e), + } + ) + + return { + "tenant_id": request.tenant_id, + "docs": docs, + "total": len(request.items), + "success_count": len(docs), + "failed_count": len(failed), + "failed": failed, + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error building docs for tenant_id={request.tenant_id}: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + + +@router.post("/build-docs-from-db") +async def build_docs_from_db(request: BuildDocsFromDbRequest): + """ + 基于数据库数据构建 ES 文档(测试 / 调试用)。 + + - 入参:tenant_id + spu_ids + - 步骤: + 1. 使用增量索引服务的查询能力,从 MySQL 批量加载 SPU / SKU / Option + 2. 组装为 BuildDocsRequest 的 items + 3. 内部调用与 /indexer/build-docs 相同的构建逻辑,返回 ES-ready docs + + 注意: + - 该接口主要用于本项目自测和调试;正式生产建议由上游(Java)自行查库后调用 /indexer/build-docs + """ + try: + if not request.spu_ids: + raise HTTPException(status_code=400, detail="spu_ids cannot be empty") + if len(request.spu_ids) > 200: + raise HTTPException(status_code=400, detail="Maximum 200 SPU IDs allowed per request") + + incremental_service = get_incremental_service() + if incremental_service is None: + raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized") + + # 直接复用增量服务里的批量查询方法,从 MySQL 拉取原始行数据 + # 只加载未删除的记录(include_deleted=False) + spu_df = incremental_service._load_spus_for_spu_ids( + tenant_id=request.tenant_id, + spu_ids=request.spu_ids, + include_deleted=False + ) + if spu_df.empty: + return { + "tenant_id": request.tenant_id, + "docs": [], + "total": 0, + "success_count": 0, + "failed_count": len(request.spu_ids), + "failed": [ + {"spu_id": spu_id, "error": "SPU not found or deleted"} + for spu_id in request.spu_ids + ], + } + + # 仅对存在的 spu_id 构建 item,避免无效 ID + # _load_skus_for_spu_ids / _load_options_for_spu_ids 会自动过滤不存在的 spu_id + existing_ids = [str(int(i)) for i in spu_df["id"].tolist()] + skus_df = incremental_service._load_skus_for_spu_ids( + tenant_id=request.tenant_id, spu_ids=existing_ids + ) + options_df = incremental_service._load_options_for_spu_ids( + tenant_id=request.tenant_id, spu_ids=existing_ids + ) + + import pandas as pd + + # group by spu_id 方便取子集 + sku_groups = skus_df.groupby("spu_id") if not skus_df.empty else None + option_groups = options_df.groupby("spu_id") if not options_df.empty else None + + items: List[BuildDocItem] = [] + failed: List[Dict[str, Any]] = [] + + for _, spu_row in spu_df.iterrows(): + spu_id = int(spu_row["id"]) + try: + spu_dict = spu_row.to_dict() + skus = ( + sku_groups.get_group(spu_id).to_dict("records") + if sku_groups is not None and spu_id in sku_groups.groups + else [] + ) + options = ( + option_groups.get_group(spu_id).to_dict("records") + if option_groups is not None and spu_id in option_groups.groups + else [] + ) + items.append( + BuildDocItem( + spu=spu_dict, + skus=skus, + options=options, + ) + ) + except Exception as e: + failed.append( + { + "spu_id": str(spu_id), + "error": str(e), + } + ) + + if not items: + return { + "tenant_id": request.tenant_id, + "docs": [], + "total": 0, + "success_count": 0, + "failed_count": len(request.spu_ids), + "failed": failed + or [ + {"spu_id": spu_id, "error": "SPU not found or data load failed"} + for spu_id in request.spu_ids + ], + } + + # 调用与 /indexer/build-docs 相同的构建逻辑 + build_request = BuildDocsRequest(tenant_id=request.tenant_id, items=items) + result = await build_docs(build_request) + + # 合并两层 failed 信息 + merged_failed = list(result.get("failed", [])) if isinstance(result, dict) else [] + merged_failed.extend(failed) + + if isinstance(result, dict): + result["failed"] = merged_failed + # 更新 failed_count + result["failed_count"] = len(merged_failed) + return result + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error building docs from DB for tenant_id={request.tenant_id}: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + + @router.post("/documents") async def get_documents(request: GetDocumentsRequest): """ diff --git a/docs/常用查询 - ES.md b/docs/常用查询 - ES.md index ddf98f4..4be7823 100644 --- a/docs/常用查询 - ES.md +++ b/docs/常用查询 - ES.md @@ -3,6 +3,9 @@ # ====================================== # 租户相关 # ====================================== +# +# 说明:索引已按租户拆分为 search_products_tenant_{tenant_id}, +# 一般情况下不需要在查询中再按 tenant_id 过滤(可选保留用于排查)。 ### 1. 根据 tenant_id / spu_id 查询 curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_170/_search?pretty' -H 'Content-Type: application/json' -d '{ @@ -11,60 +14,54 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_ "query": { "bool": { "filter": [ - { "term": {"spu_id" : 206150} }, - { "term": { "tenant_id": "170" } } + { "term": {"spu_id" : 206150} } ] } } }' -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_search?pretty' -H 'Content-Type: application/json' -d '{ - "size": 100, - "_source": ["title"], - "query": { - "bool": { - "filter": [ - { "term": { "tenant_id": "170" } } - ] - } - } - }' +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_170/_search?pretty' -H 'Content-Type: application/json' -d '{ + "size": 100, + "_source": ["title"], + "query": { + "match_all": {} + } +}' curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_170/_search?pretty' -H 'Content-Type: application/json' -d '{ - "size": 5, - "_source": ["title", "keyword", "keyword.zh", "tags"], - "query": { - "bool": { - "filter": [ - { "term": { "spu_id": "223167" } } - ] - } + "size": 5, + "_source": ["title", "keywords", "tags"], + "query": { + "bool": { + "filter": [ + { "term": { "spu_id": "223167" } } + ] } - }' + } +}' curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_170/_search?pretty' -H 'Content-Type: application/json' -d '{ - "size": 1, - "_source": ["title", "keyword", "keyword.zh", "tags"], - "query": { - "bool": { - "must": [ - { - "match": { - "title.en": { - "query": "Floerns Women Gothic Graphic Ribbed Strapless Tube Top Asymmetrical Ruched Bandeau Tops" - } + "size": 1, + "_source": ["title", "keywords", "tags"], + "query": { + "bool": { + "must": [ + { + "match": { + "title.en": { + "query": "Floerns Women Gothic Graphic Ribbed Strapless Tube Top Asymmetrical Ruched Bandeau Tops" } } - ], - "filter": [ - { "term": { "tenant_id": "170" } }, - { "terms": { "tags": ["女装", "派对"] } } - ] - } + } + ], + "filter": [ + { "terms": { "tags": ["女装", "派对"] } } + ] } + } }' @@ -89,17 +86,17 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_ } }' -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_analyze' -H 'Content-Type: application/json' -d '{ - "analyzer": "icu_analyzer", +Curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_170/_analyze' -H 'Content-Type: application/json' -d '{ + "analyzer": "index_ansj", "text": "14寸第4代-眼珠实身冰雪公仔带手动大推车,搪胶雪宝宝" }' -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_analyze' -H 'Content-Type: application/json' -d '{ - "analyzer": "hanlp_standard", +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_170/_analyze' -H 'Content-Type: application/json' -d '{ + "analyzer": "query_ansj", "text": "14寸第4代-眼珠实身冰雪公仔带手动大推车,搪胶雪宝宝" }' -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_search?pretty' -H 'Content-Type: application/json' -d '{ +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_170/_search?pretty' -H 'Content-Type: application/json' -d '{ "size": 100, "from": 0, "query": { @@ -127,16 +124,14 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/ ], "filter": [ { - "term": { - "tenant_id": "170" - } + "match_all": {} } ] } } }' -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_search?pretty' -H 'Content-Type: application/json' -d '{ +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_170/_search?pretty' -H 'Content-Type: application/json' -d '{ "size": 1, "from": 0, "query": { @@ -163,11 +158,7 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/ } ], "filter": [ - { - "term": { - "tenant_id": "170" - } - } + { "match_all": {} } ] } }, @@ -259,12 +250,10 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/ } }' -GET /search_products/_search +GET /search_products_tenant_2/_search { "query": { - "term": { - "tenant_id": "2" - } + "match_all": {} } } @@ -282,11 +271,9 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/ ### 2. 统计租户的总文档数 -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_count?pretty' -H 'Content-Type: application/json' -d '{ +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_170/_count?pretty' -H 'Content-Type: application/json' -d '{ "query": { - "term": { - "tenant_id": "170" - } + "match_all": {} } }' @@ -298,7 +285,7 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_c ## 1. 检查ES文档的分面字段数据 ### 1.1 查询特定租户的商品,显示分面相关字段 -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_search?pretty' -H 'Content-Type: application/json' -d '{ +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_162/_search?pretty' -H 'Content-Type: application/json' -d '{ "query": { "term": { "tenant_id": "162" @@ -319,7 +306,7 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s }' ### 1.2 验证category1_name字段是否有数据 -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_search?pretty' -H 'Content-Type: application/json' -d '{ +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_162/_search?pretty' -H 'Content-Type: application/json' -d '{ "query": { "bool": { "filter": [ @@ -332,7 +319,7 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s }' ### 1.3 验证specifications字段是否有数据 -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_search?pretty' -H 'Content-Type: application/json' -d '{ +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_162/_search?pretty' -H 'Content-Type: application/json' -d '{ "query": { "bool": { "filter": [ @@ -347,17 +334,15 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s ## 2. 分面聚合查询(Facet Aggregations) ### 2.1 category1_name 分面聚合 -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_search?pretty' -H 'Content-Type: application/json' -d '{ +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_162/_search?pretty' -H 'Content-Type: application/json' -d '{ "query": { - "term": { - "tenant_id": "162" - } + "match_all": {} }, "size": 0, "aggs": { "category1_name_facet": { "terms": { - "field": "category1_name.keyword", + "field": "category1_name", "size": 50 } } @@ -365,11 +350,9 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s }' ### 2.2 specifications.color 分面聚合 -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_search?pretty' -H 'Content-Type: application/json' -d '{ +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_162/_search?pretty' -H 'Content-Type: application/json' -d '{ "query": { - "term": { - "tenant_id": "162" - } + "match_all": {} }, "size": 0, "aggs": { @@ -387,7 +370,7 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s "aggs": { "values": { "terms": { - "field": "specifications.value.keyword", + "field": "specifications.value", "size": 50 } } @@ -399,11 +382,9 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s }' ### 2.3 specifications.size 分面聚合 -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_search?pretty' -H 'Content-Type: application/json' -d '{ +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_162/_search?pretty' -H 'Content-Type: application/json' -d '{ "query": { - "term": { - "tenant_id": "162" - } + "match_all": {} }, "size": 0, "aggs": { @@ -421,7 +402,7 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s "aggs": { "values": { "terms": { - "field": "specifications.value.keyword", + "field": "specifications.value", "size": 50 } } @@ -433,11 +414,9 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s }' ### 2.4 specifications.material 分面聚合 -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_search?pretty' -H 'Content-Type: application/json' -d '{ +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_162/_search?pretty' -H 'Content-Type: application/json' -d '{ "query": { - "term": { - "tenant_id": "162" - } + "match_all": {} }, "size": 0, "aggs": { @@ -455,7 +434,7 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s "aggs": { "values": { "terms": { - "field": "specifications.value.keyword", + "field": "specifications.value", "size": 50 } } @@ -467,17 +446,15 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s }' ### 2.5 综合分面聚合(category + color + size + material) -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_search?pretty' -H 'Content-Type: application/json' -d '{ +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_162/_search?pretty' -H 'Content-Type: application/json' -d '{ "query": { - "term": { - "tenant_id": "162" - } + "match_all": {} }, "size": 0, "aggs": { "category1_name_facet": { "terms": { - "field": "category1_name.keyword", + "field": "category1_name", "size": 50 } }, @@ -495,7 +472,7 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s "aggs": { "values": { "terms": { - "field": "specifications.value.keyword", + "field": "specifications.value", "size": 50 } } @@ -517,7 +494,7 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s "aggs": { "values": { "terms": { - "field": "specifications.value.keyword", + "field": "specifications.value", "size": 50 } } @@ -539,7 +516,7 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s "aggs": { "values": { "terms": { - "field": "specifications.value.keyword", + "field": "specifications.value", "size": 50 } } @@ -594,11 +571,10 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s ## 4. 统计查询 ### 4.1 统计有category1_name的文档数量 -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_count?pretty' -H 'Content-Type: application/json' -d '{ +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_162/_count?pretty' -H 'Content-Type: application/json' -d '{ "query": { "bool": { "filter": [ - { "term": { "tenant_id": "162" } }, { "exists": { "field": "category1_name" } } ] } @@ -606,11 +582,10 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_c }' ### 4.2 统计有specifications的文档数量 -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_count?pretty' -H 'Content-Type: application/json' -d '{ +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_162/_count?pretty' -H 'Content-Type: application/json' -d '{ "query": { "bool": { "filter": [ - { "term": { "tenant_id": "162" } }, { "exists": { "field": "specifications" } } ] } @@ -621,7 +596,7 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_c ## 5. 诊断问题场景 ### 5.1 查找没有category1_name但有category的文档(MySQL有数据但ES没有) -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_search?pretty' -H 'Content-Type: application/json' -d '{ +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_162/_search?pretty' -H 'Content-Type: application/json' -d '{ "query": { "bool": { "filter": [ @@ -637,7 +612,7 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s }' ### 5.2 查找有option但没有specifications的文档(数据转换问题) -curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_search?pretty' -H 'Content-Type: application/json' -d '{ +curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products_tenant_162/_search?pretty' -H 'Content-Type: application/json' -d '{ "query": { "bool": { "filter": [ @@ -655,7 +630,7 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_s 重排序: -GET /search_products/_search +GET /search_products_tenant_170/_search { "query": { "match": { diff --git a/docs/搜索API对接指南.md b/docs/搜索API对接指南.md index aebbc09..6d9553d 100644 --- a/docs/搜索API对接指南.md +++ b/docs/搜索API对接指南.md @@ -132,9 +132,11 @@ curl -X POST "http://120.76.41.98:6002/search/" \ | 搜索建议 | GET | `/search/suggestions` | 搜索建议(框架,暂未实现) ⚠️ TODO | | 即时搜索 | GET | `/search/instant` | 边输入边搜索(框架) ⚠️ TODO | | 获取文档 | GET | `/search/{doc_id}` | 获取单个文档 | -| 全量索引 | POST | `/indexer/reindex` | 全量索引接口(导入数据,不删除索引) | -| 增量索引 | POST | `/indexer/index` | 增量索引接口(指定SPU ID列表进行索引,支持自动检测删除和显式删除) | +| 全量索引 | POST | `/indexer/reindex` | 全量索引接口(导入数据,不删除索引,仅推荐自测使用) | +| 增量索引 | POST | `/indexer/index` | 增量索引接口(指定SPU ID列表进行索引,支持自动检测删除和显式删除,仅推荐自测使用) | | 查询文档 | POST | `/indexer/documents` | 查询SPU文档数据(不写入ES) | +| 构建ES文档(正式对接) | POST | `/indexer/build-docs` | 基于上游提供的 MySQL 行数据构建 ES doc,不写入 ES,供 Java 等调用后自行写入 | +| 构建ES文档(测试用) | POST | `/indexer/build-docs-from-db` | 仅在测试/调试时使用,根据 `tenant_id + spu_ids` 内部查库并构建 ES doc | | 索引健康检查 | GET | `/indexer/health` | 检查索引服务状态 | | 健康检查 | GET | `/admin/health` | 服务健康检查 | | 获取配置 | GET | `/admin/config` | 获取租户配置 | @@ -871,7 +873,7 @@ curl "http://localhost:6002/search/12345" -H "X-Tenant-ID: 162" ### 5.1 全量索引接口 - **端点**: `POST /indexer/reindex` -- **描述**: 全量索引,将指定租户的所有SPU数据导入到ES索引(不会删除现有索引) +- **描述**: 全量索引,将指定租户的所有SPU数据导入到ES索引(不会删除现有索引)。**推荐仅用于自测/运维场景**;生产环境下更推荐由 Java 等上游控制调度与写 ES。 #### 请求参数 @@ -977,7 +979,7 @@ cat logs/indexer.log | jq 'select(.operation == "request_complete") | {timestamp ### 5.2 增量索引接口 - **端点**: `POST /indexer/index` -- **描述**: 增量索引接口,根据指定的SPU ID列表进行索引,直接将数据写入ES。用于增量更新指定商品。 +- **描述**: 增量索引接口,根据指定的SPU ID列表进行索引,直接将数据写入ES。用于增量更新指定商品。**推荐仅作为内部/调试入口**;正式对接建议改用 `/indexer/build-docs`,由上游写 ES。 **删除说明**: - `spu_ids`中的SPU:如果数据库`deleted=1`,自动从ES删除,响应状态为`deleted` @@ -1242,6 +1244,101 @@ curl -X POST "http://localhost:6004/indexer/documents" \ } ``` +### 5.5 文档构建接口(正式对接推荐) + +#### 5.5.1 `POST /indexer/build-docs` + +- **描述**: + 基于调用方(通常是 Java 索引程序)提供的 **MySQL 行数据** 构建 ES 文档(doc),**不写入 ES**。 + 由本服务负责“如何构建 doc”(多语言、翻译、向量、规格聚合等),由调用方负责“何时调度 + 如何写 ES”。 + +#### 请求参数 + +```json +{ + "tenant_id": "170", + "items": [ + { + "spu": { "id": 223167, "tenant_id": 170, "title": "..." }, + "skus": [ + { "id": 3988393, "spu_id": 223167, "price": 25.99, "compare_at_price": 25.99 } + ], + "options": [] + } + ] +} +``` + +> `spu` / `skus` / `options` 字段应当直接使用从 `shoplazza_product_spu` / `shoplazza_product_sku` / `shoplazza_product_option` 查询出的行字段。 + +#### 响应示例(节选) + +```json +{ + "tenant_id": "170", + "docs": [ + { + "tenant_id": "170", + "spu_id": "223167", + "title": { "en": "...", "zh": "..." }, + "tags": ["Floerns", "Clothing", "Shoes & Jewelry"], + "skus": [ + { + "sku_id": "3988393", + "price": 25.99, + "compare_at_price": 25.99, + "stock": 100 + } + ], + "min_price": 25.99, + "max_price": 25.99, + "compare_at_price": 25.99, + "total_inventory": 100, + "title_embedding": [/* 1024 维向量 */] + // 其余字段与 mappings/search_products.json 一致 + } + ], + "total": 1, + "success_count": 1, + "failed_count": 0, + "failed": [] +} +``` + +#### 使用建议 + +- **生产环境推荐流程**: + 1. Java 根据业务逻辑决定哪些 SPU 需要(全量/增量)处理; + 2. Java 从 MySQL 查询 SPU/SKU/Option 行,拼成 `items`; + 3. 调用 `/indexer/build-docs` 获取 ES-ready `docs`; + 4. Java 使用自己的 ES 客户端写入 `search_products_tenant_{tenant_id}`。 + +### 5.6 文档构建接口(测试 / 自测) + +#### 5.6.1 `POST /indexer/build-docs-from-db` + +- **描述**: + 仅用于测试/调试:调用方只提供 `tenant_id` 和 `spu_ids`,由 indexer 服务内部从 MySQL 查询 SPU/SKU/Option,然后调用与 `/indexer/build-docs` 相同的文档构建逻辑,返回 ES-ready doc。 + +#### 请求参数 + +```json +{ + "tenant_id": "170", + "spu_ids": ["223167"] +} +``` + +#### 请求示例 + +```bash +curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ + -H "Content-Type: application/json" \ + -d '{"tenant_id": "170", "spu_ids": ["223167"]}' +``` + +返回结构与 `/indexer/build-docs` 相同,可直接用于对比 ES 实际文档或调试字段映射问题。 + #### 请求示例 ```bash diff --git a/docs/测试Pipeline说明.md b/docs/测试Pipeline说明.md index 92a9ada..ee254cb 100644 --- a/docs/测试Pipeline说明.md +++ b/docs/测试Pipeline说明.md @@ -145,6 +145,71 @@ pytest tests/integration/ -v pytest tests/integration/test_api_integration.py -v ``` +### 5. 索引 & 文档构建流水线验证(手动) + +除了自动化测试外,推荐在联调/问题排查时手动跑一遍“**从 MySQL 到 ES doc**”的索引流水线,确保字段与 mapping、查询逻辑一致。 + +#### 5.1 启动 Indexer 服务 + +```bash +cd /home/tw/SearchEngine +./scripts/stop.sh # 停掉已有进程(可选) +./scripts/start_indexer.sh # 启动专用 indexer 服务,默认端口 6004 +``` + +#### 5.2 基于数据库构建 ES doc(只看、不写 ES) + +> 场景:已经知道某个 `tenant_id` 和 `spu_id`,想看它在“最新逻辑下”的 ES 文档长什么样。 + +```bash +curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ + -H "Content-Type: application/json" \ + -d '{ + "tenant_id": "170", + "spu_ids": ["223167"] + }' +``` + +返回中: + +- `docs[0]` 为当前代码构造出来的完整 ES doc(与 `mappings/search_products.json` 对齐); +- 可以直接比对: + - 索引字段说明:`docs/索引字段说明v2.md` + - 实际 ES 文档:`docs/常用查询 - ES.md` 中的查询示例(按 `spu_id` 过滤)。 + +#### 5.3 与 ES 实际数据对比 + +```bash +curl -u 'essa:***' \ + -X GET 'http://localhost:9200/search_products_tenant_170/_search?pretty' \ + -H 'Content-Type: application/json' \ + -d '{ + "size": 5, + "_source": ["title", "tags"], + "query": { + "bool": { + "filter": [ + { "term": { "spu_id": "223167" } } + ] + } + } + }' +``` + +对比如下内容是否一致: + +- 多语言字段:`title/brief/description/vendor/category_name_text/category_path`; +- 结构字段:`tags/specifications/skus/min_price/max_price/compare_at_price/total_inventory` 等; +- 算法字段:`title_embedding` 是否存在(值不必逐项比对)。 + +如果两边不一致,可以结合: + +- `indexer/document_transformer.py`(文档构造逻辑); +- `indexer/incremental_service.py`(增量索引/查库逻辑); +- `logs/indexer.log`(索引日志) + +逐步缩小问题范围。 + ### 4. 性能测试 (Performance Tests) **目的**: 验证系统性能指标 diff --git a/indexer/README.md b/indexer/README.md new file mode 100644 index 0000000..66170ca --- /dev/null +++ b/indexer/README.md @@ -0,0 +1,584 @@ +## 一、整体架构说明 + +### 1.1 系统角色划分 + +- **Java 索引程序(/home/tw/saas-server)** + - 负责“**什么时候、对哪些 SPU 做索引**”(调度 & 触发)。 + - 负责**商品/店铺/类目等基础数据同步**(写 MySQL)。 + - 负责**多租户环境下的全量/增量索引调度**,但不再关心具体 doc 字段细节。 + +- **Python 索引富化模块(本项目 SearchEngine / indexer)** + - 负责“**如何把 MySQL 基础数据变成符合 ES mapping 的 doc**”,包括: + - 多语言字段组织; + - 翻译调用与缓存; + - 向量生成与(可选)缓存; + - 规格、SKU 聚合、类目路径解析等。 + - 保留当前“**直接写 ES**”能力(BulkIndexingService, IncrementalIndexerService)。 + - **新增:提供 HTTP 接口**,接收 Java 传入的完整 doc/基础数据,返回或直接写入 ES-ready doc,以支持“Java 只调接口、不关心字段细节”的新架构。 + +--- + +## 二、Java 索引程序职责(保留 & 对接) + +### 2.1 现有职责(需保留) + +1. **索引触发与调度** + - 全量: + - `ShoplazzaProductIndexFullJob` → `ProductIndexServiceImpl.fullIndex(...)` + - 按 tenant 分页拉取 SPU,调用批量索引。 + - 增量: + - MQ 消费(`ShoplazzaProductCreateAndUpdateConsumerService`); + - 手工/API 触发增量索引 → `incrementalIndex(tenantId, spuId)`。 + +2. **MySQL 基础数据维护** + - **店铺配置表 `shoplazza_shop_config`**: + - 字段: + - `primary_language`:店铺主语言; + - `translate_to_en`:是否需要翻译成英文; + - `translate_to_zh`:是否需要翻译成中文。 + - 逻辑: + - 每晚商品同步(`ShoplazzaProductSyncServiceImpl`)时,根据店铺 locale/Shoplazza 配置,写入/更新 `primary_language` 与翻译开关字段。 + - **类目表 `shoplazza_product_category`**: + - 同步/修正逻辑封装在 `ProductCategoryService` 中: + - `getProductCategoryByPathIdList(tenantId, categoryIdList)`; + - 当 mapping 对不上时触发 `syncProductCategoryByApi(shopId)` 再重查。 + +3. **Shopify/Shoplazza 商品同步 & 并发控制** + - MQ 等机制用于削峰,避免店匠批量导入商品时压垮服务: + - 同步逻辑在 `ShoplazzaProductSyncServiceImpl`; + - 对接 MQ 消息:商品创建、更新、删除等事件; + - 对高并发导入,拆分为小批次写入 MySQL + 后续异步索引。 + +4. **索引结构调整为 per-tenant** + - 在 Java 中已统一使用: + - `indexName = elasticsearchProperties.buildIndexName(tenantId);` + - 索引命名形如:`search_products_tenant_{tenant_id}`。 + - Python 侧对应 `get_tenant_index_name(tenant_id)`。 + +### 2.2 Java 侧不再深入关心的部分 + +- ES 文档结构 `ProductIndexDocument` 的字段细节(title/brief/description/vendor/category_xxx/tags/specifications/skus/embedding 等)。 +- 翻译、向量等具体算法逻辑。 +- qanchors/keywords 等新特征的计算。 + +**新职责边界**: +Java 只负责“**选出要索引的 SPU + 从 MySQL 拉取原始数据 + 调用 Python 服务**(或交给 Python 做完整索引)”。 + +--- + +## 三、Python 索引富化模块职责 + +### 3.1 职责总览 + +- 输入:**MySQL 基础数据**(`shoplazza_product_spu/sku/option/category/image` 等)。 +- 输出:**符合 `mappings/search_products.json` 的 doc 列表**,包括: + - 多语言文本字段:`title.*`, `brief.*`, `description.*`, `vendor.*`, `category_path.*`, `category_name_text.*`; + - 算法特征:`title_embedding`, `image_embedding`, `qanchors.*`, `keywords.*`(未来扩展); + - 结构化字段:`tags`, `specifications`, `skus`, `min_price`, `max_price`, `compare_at_price`, `total_inventory`, `sales` 等。 +- 附加: + - 翻译调用 & **Redis 缓存**(继承 Java 的 key 组织和 TTL 策略); + - 向量生成(文本 & 图片); + - ES 写入能力(Bulk & Incremental)。 + +### 3.2 当前 Python 模块结构(简述) + +- `indexer/spu_transformer.py`: + - 从 MySQL 读取 SPU/SKU/Option 数据。 +- `indexer/document_transformer.py` (`SPUDocumentTransformer`): + - 把单个 SPU + SKUs + Options 转成 ES 文档(doc)。 +- `indexer/bulk_indexing_service.py`: + - 全量索引服务,调用 `SPUTransformer` → `SPUDocumentTransformer` → `BulkIndexer` 写 ES。 +- `indexer/incremental_service.py`: + - 增量索引服务,按 SPU 列表批量更新/删除 ES 文档。 + +新设计中,本模块还将新增: + +- **HTTP 富化接口**(例如 `POST /index/enrich_docs` / `POST /index/enrich_and_index`); +- **翻译客户端 + Redis 缓存**,按 Java 规则组织 key; +- **(可选)向量缓存**。 + +--- + +## 四、翻译与多语言字段设计(Java → Python 迁移) + +### 4.1 语言决策策略(从 Java 迁移) + +在 Java 中,语言决策逻辑在 `ProductIndexConvert.convertToIndexDocument(...)`,现规则: + +1. **基础配置** + - `primaryLanguage = shopConfig.primaryLanguage`(主语言); + - `translateToEn = shopConfig.translateToEn`; + - `translateToZh = shopConfig.translateToZh`。 + +2. **检测内容语言** + - 标题:`queryTextLang = BaiDuTransApi.queryTextLang(spu.title)`; + - 若检测不到,则视为 `queryTextLang = primaryLanguage`。 + +3. **确定源语言 `defSrcLang` 与目标翻译语言 `defLang`** + - 情况 A:`primaryLanguage == queryTextLang`(不缺主语言) + - `defSrcLang = primaryLanguage`; + - 若 `translateToEn && primaryLanguage != "en"` → `defLang = "en"`; + - 若 `translateToZh && primaryLanguage != "zh"` → `defLang = "zh"`。 + - 情况 B:`primaryLanguage != queryTextLang`(认为“缺主语言”) + - `defSrcLang = queryTextLang`; + - `defIsMissPrimaryLanguage = true`; + - 若 `translateToEn && queryTextLang != "en"` → `defLang = "en"`; + - 若 `translateToZh && queryTextLang != "zh"` → `defLang = "zh"`; + - 若上述都不满足(没有翻到 en/zh),则回退: + - `defIsMissPrimaryLanguage = false`; + - `defLang = primaryLanguage`(翻译回主语言)。 + + - 兜底:若 `defLang` 仍为空,默认 `defLang = "en"`。 + +4. **DocumentTranslation 元数据(用于后续检查/补偿)** + +```java +documentTranslation.setDefSrcLang(defSrcLang); +documentTranslation.setDefLang(defLang); +documentTranslation.setDefQueryTextLang(queryTextLang); +documentTranslation.setDefIsMissPrimaryLanguage(isMissPrimaryLanguage); +``` + +**类目字段**(`category`, `category_path`)有类似一套独立的决策逻辑,写入 `defCategorySrcLang / defCategoryLang / defCategoryQueryTextLang / defCategoryIsMissPrimaryLanguage`。 + +> **Python 需做的**:在 `SPUDocumentTransformer` 内部复刻这套决策逻辑,对 title/brief/description/vendor/keywords & category 字段分别计算源语言 / 目标语言 / 主语言缺失标记,保存在一个等价的结构中(不一定叫 `DocumentTranslation`,但含义相同)。 + +### 4.2 多语言字段填充规则 + +以标题为例(Java 中的 `DocumentTitle`): + +- 原始 title:`spu.title`; +- 多语言写入: + +```java +DocumentTitle title = new DocumentTitle(); +title.set(defLang, translationTitle) // 翻译结果(例如 en 或 zh) + .set(defSrcLang, spu.getTitle()) // 原文 + .set(primaryLanguage, primaryTitle); // 若缺主语言,则从 queryTextLang 翻回主语言 +doc.setTitle(title); +``` + +同样模式适用于: + +- `keywords`:从 `spu.seoKeywords` 翻译生成; +- `brief`:从 `spu.brief` 翻译生成; +- `description`:从清理 HTML 后的 `spu.description` 翻译生成; +- `vendor`:从 `spu.vendor` 翻译生成。 + +**类目字段**: + +- `category_name_text`:基于 `spu.category`; +- `category_path`:基于类目表 `product_category` 的 name 列表拼出的路径字符串 `allPathName`。 + +分别写入: + +```java +categoryNameText.set(categoryLang, translationCategory) + .set(defLang, spu.getCategory()) + .set(primaryLanguage, primaryCategory); +categoryPath.set(categoryLang, translationCategoryPath) + .set(defLang, allPathName) + .set(primaryLanguage, primaryCategoryPath); +``` + +> **Python 需做的**:在构造 doc 时,为各多语言字段生成 dict: +> +> - 至少包含 `{defSrcLang: 原文}`; +> - 如有翻译,加入 `{defLang: 翻译}`; +> - 若 `isMissPrimaryLanguage` 为 true,再加入 `{primaryLanguage: 回译结果}`。 + +--- + +## 五、翻译服务与 Redis 缓存设计(必须继承) + +### 5.1 外部翻译接口 + +你当前要使用的翻译接口(Python 侧): + +```bash +curl -X POST http://120.76.41.98:6006/translate \ + -H "Content-Type: application/json" \ + -d '{"text":"儿童小男孩女孩开学 100 天衬衫短袖 搞笑图案字母印花庆祝上衣", + "target_lang":"en", + "source_lang":"auto"}' +``` + +- 请求参数: + - `text`:待翻译文本; + - `target_lang`:目标语言(如 `"en"`、`"zh"` 等); + - `source_lang`:源语言(支持 `"auto"` 自动检测)。 +- 响应(参考 Java `TranslationServiceImpl.querySaasTranslate`): + - JSON 里包含 `status` 字段,如果是 `"success"`,且 `translated_text` 非空,则返回翻译结果。 + +### 5.2 Redis 缓存 key 规则(与 Java 完全对齐) + +在 `TranslationServiceImpl` 中,缓存 key 定义: + +```java +private static final Integer DEFAULT_TTL_DAYS = 30; + +private String buildCacheKey(Long tenantId, String sourceText, String targetLang) { + String hash = DigestUtils.md5Hex(sourceText); + return String.format("translation:%s:%s:%s", + tenantId, targetLang.toLowerCase(), hash); +} +``` + +- **Key 模式**:`translation:{tenantId}:{targetLangLower}:{md5(sourceText)}`。 +- **Value**:`translatedText`(单纯的翻译结果字符串)。 +- **TTL**:30 天(`DEFAULT_TTL_DAYS = 30`)。 + +缓存读写逻辑: + +```java +// 读 +String cache = queryCacheTranslation(tenantId, text, targetLang); +if (cache != null) { + // 构造 TranslateDTO 返回 +} + +// 写 +saveCacheTranslation(tenantId, text, targetLang, translatedText); +``` + +**你在 Python 侧必须继承的:** + +- 相同的 key 组织规则; +- 相同的 TTL; +- 相同的维度(tenant_id + 目标语言 + 原文 md5)。 + +这样可以**复用以前在 Java 里已经积累的翻译缓存**,也保证后续迁移过程中行为一致。 + +--- + +## 六、向量服务与缓存(扩展设计) + +### 6.1 文本向量(title_embedding) + +Java 侧: + +```java +List titleEmbedding = vectorService.generateTextVector(spu.getTitle()); +if (StrUtil.isNotBlank(spu.getTitle()) && CollUtil.isNotEmpty(titleEmbedding)) { + doc.setTitleEmbedding(titleEmbedding); +} +``` + +你当前 Python 侧已有: + +- `embeddings/text_encoder.py`(BGE-M3 模型); +- `SPUDocumentTransformer._fill_title_embedding` 已封装了调用 encoder 的逻辑。 + +**建议缓存策略(可选,但推荐):** + +- Key:`text_vector:{model_name}:{md5(title)}`; +- Value:向量数组(可序列化成 JSON 或 msgpack); +- TTL:可设为较长时间(例如 30 天或不设置 TTL,由容量控制)。 + +### 6.2 图片向量(image_embedding) + +Java 侧: + +- 对 `ShoplazzaProductImageDO.src` 调用 `vectorService.generateImageVector(image.getSrc())`; +- 写入 `image_embedding.vector`(1024 维)+ `url`。 + +Python 侧已有 `embeddings/clip_encoder.py` 可用 CN-CLIP 模型;缓存策略类似: + +- Key:`image_vector:{model_name}:{md5(image_url)}`。 + +--- + +## 七、doc 组织逻辑迁移(从 Java 的 ProductIndexConvert 到 Python 的 SPUDocumentTransformer) + +### 7.1 需要完整迁移的要点 + +#### 7.1.1 基础字段 + +- `tenant_id`:`spu.tenant_id`; +- `spu_id`:`spu.id`; +- `create_time` / `update_time`:格式化为 ISO 字符串(`yyyy-MM-dd'T'HH:mm:ss`); +- 主图 `image_url`: + - 若 `image_src` 以 `http` 开头 → 直接使用; + - 否则前缀 `//`。 + +#### 7.1.2 多语言字段(title/brief/description/vendor/keywords/category_name_text/category_path) + +- 完整复刻前文第 4 节的逻辑: + - 语言决策; + - 调翻译接口(带缓存); + - 构造多语言对象(Python 中为 dict): + + ```python + title = {} + title[src_lang] = spu.title + if translation_title: title[def_lang] = translation_title + if is_miss_primary and primary_title: title[primary_lang] = primary_title + doc["title"] = title + ``` + +- `keywords` 来源:`spu.seo_keywords`; +- 类目路径需要从 `product_category` 表取 name 列表,按 level 排序后拼成 `allPathName = "一级/二级/三级"`。 + +#### 7.1.3 tags + +- 同 Java 逻辑: + +```python +if spu.tags: + doc["tags"] = [t.strip() for t in spu.tags.split(",") if t.strip()] +``` + +#### 7.1.4 规格、SKU、价格、库存 + +迁移 Java 的 `parseOptions` & `parseSkus` 逻辑: + +- `option1_name`, `option2_name`, `option3_name`: + - 按 `position` 排序 Option 表; + - 取前三个,写 name; + - 每个 Option 的 `values` 去重后写入 `optionX_values`; + - 同时构建 `valueNameMap[value] = optionName`,用于构建 `specifications`。 + +- `specifications`: + - 遍历所有 SKU: + - 若 `option1` 非空:构造 1 条 `{sku_id, name=valueNameMap[option1], value=option1}`; + - 同理 `option2`、`option3`。 + +- `skus`(nested): + - 每条 SKU 映射为: + - `sku_id`, `price`, `compare_at_price`, `sku_code`, `stock`, `weight`, `weight_unit`, `option1_value`, `option2_value`, `option3_value`, `image_src`。 + +- 聚合字段: + - `min_price` / `max_price`:全体 SKU `price` 的最小/最大; + - `compare_at_price`:全体 SKU `compare_at_price` 的最大值(若 SPU 有 compare_at_price 可优先); + - `sku_prices`:所有 SKU price 列表; + - `sku_weights`:所有 SKU weight(long)列表; + - `sku_weight_units`:weight_unit 去重列表; + - `total_inventory`:所有 SKU `inventory_quantity` 总和; + - `sales`:虚拟销量 `spu.fake_sales`。 + +### 7.2 qanchors / keywords 扩展 + +- 当前 Java 中 `qanchors` 字段结构已存在,但未赋值; +- 设计建议: + - 在 Python 侧基于: + - 标题 / brief / description / tags / 类目等,做**查询锚点**抽取; + - 按与 `title/keywords` 类似的多语言结构写入 `qanchors.{lang}`; + - 翻译策略可选: + - 在生成锚点后再调用翻译; + - 或使用原始文本的翻译结果组合。 + +--- + +## 八、接口设计 + +### 8.1 保留的能力:直接写 ES(现有) + +- **全量索引**: + - CLI:`python main.py ingest ...` 或 `scripts/ingest.sh`; + - 入口:`BulkIndexingService.bulk_index(tenant_id, recreate_index, batch_size)`: + - 生成 tenant index 名; + - 如需重建则删除再建索引; + - 从 MySQL 拉数据 → `SPUTransformer.transform_batch()` → `BulkIndexer` 写 ES。 + +- **增量索引**: + - `IncrementalIndexerService.index_spus_to_es(es_client, tenant_id, spu_ids, index_name, batch_size, delete_spu_ids)`: + - 对于 deleted / DB 已无的 SPU,删除 ES 文档; + - 对仍存在的 SPU,从 MySQL 拉数据 → `create_document_transformer` → `SPUDocumentTransformer.transform_spu_to_doc` → 批量写入 ES。 + +### 8.2 新增接口一:文档富化(不写 ES) + +**目的**:供 Java 索引程序调用,仅获取 ES-ready docs,自行写入 ES,或作为后续多用途数据源。 + +- **接口示例**:`POST /index/enrich_docs` +- **入参**(伪 JSON): + +```json +{ + "tenant_id": "123", + "shop_config": { + "primary_language": "en", + "translate_to_en": true, + "translate_to_zh": false + }, + "spus": [ + { + "spu": { /* 映射 shoplazza_product_spu */ }, + "skus": [ /* shoplazza_product_sku 列表 */ ], + "options": [ /* shoplazza_product_option 列表 */ ], + "images": [ /* shoplazza_product_image 列表(可选) */ ] + }, + ... + ] +} +``` + +> 可选:也支持只传 `tenant_id + spu_ids`,由 Python 侧自行查 MySQL(对接现有 `SPUTransformer`),但从职责划分上,更推荐 **Java 查完基础数据再传给 Python**。 + +- **出参**: + +```json +{ + "tenant_id": "123", + "docs": [ + { + "spu_id": "1", + "tenant_id": "123", + "title": { "en": "...", "zh": "...", ... }, + "qanchors": { ... }, + "keywords": { ... }, + "brief": { ... }, + "description": { ... }, + "vendor": { ... }, + "tags": ["xxx","yyy"], + "image_url": "...", + "title_embedding": [ ... 1024 floats ... ], + "image_embedding": [ { "url": "...", "vector": [ ... ] } ], + "category_name_text": { ... }, + "category_path": { ... }, + "category_id": "xxx", + "category1_name": "xxx", + "specifications": [ ... ], + "skus": [ ... ], + "min_price": ..., + "max_price": ..., + "compare_at_price": ..., + "total_inventory": ..., + "sales": ... + }, + ... + ] +} +``` + +### 8.3 新增接口二:富化 + 写 ES + +**目的**:Java 只负责调度,不关心 ES client 细节。 + +- **接口示例**:`POST /index/enrich_and_index` +- **入参**:同上(基础数据 / spu_ids + tenant_id)。 +- **内部逻辑**: + 1. 按 4–7 节的规则构造 docs(含翻译 & 向量 & 缓存); + 2. 使用 `BulkIndexer` 写入 `search_products_tenant_{tenant_id}`; + 3. 返回统计信息。 + +- **出参**(例): + +```json +{ + "tenant_id": "123", + "index_name": "search_products_tenant_123", + "total": 100, + "indexed": 98, + "failed": 2, + "failed_spu_ids": ["456","789"], + "elapsed_ms": 12345 +} +``` + +--- + +## 九、小结 + +这份设计的目标是: + +- **保留现有 Java 调度 & 数据同步能力**,不破坏已有全量/增量任务和 MQ 削峰; +- **把 ES 文档结构、多语言逻辑、翻译与向量等算法能力全部收拢到 Python 索引富化模块**,实现“单一 owner”; +- **完全继承 Java 现有的翻译缓存策略**(Redis key & TTL & 维度),保证行为与性能的一致性; +- **为未来字段扩展(qanchors、更多 tags/特征)预留清晰路径**:仅需在 Python 侧新增逻辑和 mapping,不再拉 Java 入伙。 + +--- + +## 十、实际 HTTP 接口与测试用例(速查) + +### 10.1 端口与服务 + +- `./scripts/start_backend.sh` → `main.py serve` → 端口 `6002`,**没有 `/indexer/*` 路由**。 +- `./scripts/start_indexer.sh` → `main.py serve-indexer` → 端口 `6004`,只暴露 `/indexer/*`。 + +**实际调用索引相关接口时,请始终访问 `6004`。** + +### 10.2 关键接口 + +- **构建文档(正式使用)**:`POST /indexer/build-docs` + - 入参:`tenant_id + items[ { spu, skus, options } ]` + - 输出:`docs` 数组,每个元素是完整 ES doc,不查库、不写 ES。 + +- **构建文档(测试用,内部查库)**:`POST /indexer/build-docs-from-db` + - 入参:`{"tenant_id": "...", "spu_ids": ["..."]}` + - 内部:按 `spu_ids` 从 MySQL 查出 SPU/SKU/Option,再走与 `build-docs` 相同的转换逻辑。 + +- **全量壳**:`POST /indexer/reindex`(查库 + 转 doc + 写 ES,用于自测) +- **增量壳**:`POST /indexer/index`(查库 + 转 doc + 写 ES,用于自测) +- **单文档查看**:`POST /indexer/documents` +- **健康检查**:`GET /indexer/health` + +### 10.3 典型测试流程(以 tenant 170, spu_id 223167 为例) + +1. 启动 indexer 服务: + +```bash +./scripts/stop.sh +./scripts/start_indexer.sh +``` + +2. 构建指定 SPU 的 ES doc: + +```bash +curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ + -H "Content-Type: application/json" \ + -d '{"tenant_id": "170", "spu_ids": ["223167"]}' +``` + +3. 预期返回(节选): + +```json +{ + "tenant_id": "170", + "docs": [ + { + "tenant_id": "170", + "spu_id": "223167", + "title": { "en": "...Floerns Women's Gothic...", "zh": "弗洛恩斯 女士哥特风..." }, + "tags": ["Floerns", "Clothing", "Shoes & Jewelry", "..."], + "skus": [ + { + "sku_id": "3988393", + "price": 25.99, + "compare_at_price": 25.99, + "stock": 100 + } + ], + "min_price": 25.99, + "max_price": 25.99, + "compare_at_price": 25.99, + "total_inventory": 100, + "title_embedding": [ /* 1024 维向量 */ ] + } + ], + "total": 1, + "success_count": 1, + "failed_count": 0, + "failed": [] +} +``` + +4. 使用 `docs/常用查询 - ES.md` 中的查询,对应验证 ES 索引中的文档: + +```bash +curl -u 'essa:***' \ + -X GET 'http://localhost:9200/search_products_tenant_170/_search?pretty' \ + -H 'Content-Type: application/json' \ + -d '{ + "size": 5, + "_source": ["title", "tags"], + "query": { + "bool": { + "filter": [ + { "term": { "spu_id": "223167" } } + ] + } + } + }' +``` + +通过这套流程可以完整验证:MySQL → Python 富化 → ES doc → ES 查询 的全链路行为是否符合预期。*** End Patch"""} ***! diff --git a/indexer/document_transformer.py b/indexer/document_transformer.py index 83cf27e..8177fc5 100644 --- a/indexer/document_transformer.py +++ b/indexer/document_transformer.py @@ -141,13 +141,8 @@ class SPUDocumentTransformer: doc['min_price'] = 0.0 doc['max_price'] = 0.0 - # 优先使用 SPU 级 compare_at_price(与索引字段说明v2一致),否则取 SKU 最大值 - if pd.notna(spu_row.get('compare_at_price')): - try: - doc['compare_at_price'] = float(spu_row['compare_at_price']) - except (ValueError, TypeError): - doc['compare_at_price'] = float(max(compare_prices)) if compare_prices else None - elif compare_prices: + # SPU 不再读取 compare_at_price 字段;ES 的 compare_at_price 使用所有 SKU 中的最大对比价 + if compare_prices: doc['compare_at_price'] = float(max(compare_prices)) else: doc['compare_at_price'] = None diff --git a/indexer/incremental_service.py b/indexer/incremental_service.py index 84f3a9f..cd9f49b 100644 --- a/indexer/incremental_service.py +++ b/indexer/incremental_service.py @@ -176,7 +176,7 @@ class IncrementalIndexerService: image_src, image_width, image_height, image_path, image_alt, tags, note, category, category_id, category_google_id, category_level, category_path, - compare_at_price, fake_sales, display_fake_sales, + fake_sales, display_fake_sales, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND id = :spu_id @@ -191,7 +191,7 @@ class IncrementalIndexerService: image_src, image_width, image_height, image_path, image_alt, tags, note, category, category_id, category_google_id, category_level, category_path, - compare_at_price, fake_sales, display_fake_sales, + fake_sales, display_fake_sales, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0 @@ -243,7 +243,7 @@ class IncrementalIndexerService: image_src, image_width, image_height, image_path, image_alt, tags, note, category, category_id, category_google_id, category_level, category_path, - compare_at_price, fake_sales, display_fake_sales, + fake_sales, display_fake_sales, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND id IN :spu_ids @@ -258,7 +258,7 @@ class IncrementalIndexerService: image_src, image_width, image_height, image_path, image_alt, tags, note, category, category_id, category_google_id, category_level, category_path, - compare_at_price, fake_sales, display_fake_sales, + fake_sales, display_fake_sales, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND deleted = 0 AND id IN :spu_ids diff --git a/indexer/prompts.txt b/indexer/prompts.txt new file mode 100644 index 0000000..24dded0 --- /dev/null +++ b/indexer/prompts.txt @@ -0,0 +1,30 @@ +因为需要组织整个doc,我需要将当前的java程序迁移过来,项目路径在 /home/tw/saas-server +程序相对路径 包括但不限于 module-shoplazza/src/main/java/com/hsyl/saas/module/shoplazza/service/index/ProductIndexServiceImpl.java +请仔细阅读java相关代码,提取相关逻辑,特别是 翻译的相关字段 + + + + + +架构说明: + +java索引程序职责: + +负责增量、全量的触发,调度。 + +包括但不限于: +1、索引结构调整成按tenant_id的结构,并翻译对应的语言shoplazza_shop_config表对应的新增字段primary_language,translate_to_en,translate_to_zh +2、每晚上商品同步时,判断当前店铺主语言是什么,存入primary_language +3、同步店匠的类目shoplazza_product_category +4、加入MQ处理店匠批量导入商品并发太高,服务器承载不了的问题 + + +本模块: +负责 msyql 基础数据 → 索引结构的doc (包括缓存) + +翻译接口: curl -X POST http://120.76.41.98:6006/translate -H "Content-Type: application/json" -d '{"text":"儿童小男孩女孩开学 100 天衬衫短袖 搞笑图案字母印花庆祝上衣","target_lang":"en","source_lang":"auto"}' + +java的组织doc的逻辑都需要迁移过来。 + +当前项目,是直接将doc写入ES,这个功能也保留,但是,也要提供一个接口,输入完整的字段信息 + diff --git a/indexer/spu_transformer.py b/indexer/spu_transformer.py index fdd5624..5c78b34 100644 --- a/indexer/spu_transformer.py +++ b/indexer/spu_transformer.py @@ -45,7 +45,7 @@ class SPUTransformer: image_src, image_width, image_height, image_path, image_alt, tags, note, category, category_id, category_google_id, category_level, category_path, - compare_at_price, fake_sales, display_fake_sales, + fake_sales, display_fake_sales, tenant_id, creator, create_time, updater, update_time, deleted FROM shoplazza_product_spu WHERE tenant_id = :tenant_id AND deleted = 0 -- libgit2 0.21.2