# Indexer 模块 负责 MySQL → ES 文档富化(多语言、翻译、向量、规格聚合)。**请求示例**见 `docs/QUICKSTART.md` §3.2。 --- ## 一、整体架构说明 ### 1.1 系统角色划分 - **Java 索引程序** - 负责“**什么时候、对哪些 SPU 做索引**”(调度 & 触发)。 - 负责**商品/店铺/类目等基础数据同步**(写 MySQL)。 - 负责**多租户环境下的全量/增量索引调度**,但不再关心具体 doc 字段细节。 - **Python 索引富化模块(本项目 saas-search / indexer)** - 负责“**如何把 MySQL 基础数据变成符合 ES mapping 的 doc**”,包括: - 多语言字段组织; - 翻译调用与缓存; - 向量生成与(可选)缓存; - 规格、SKU 聚合、类目路径解析等。 - 保留当前“**直接写 ES**”能力(BulkIndexingService, IncrementalIndexerService)。 - **新增:提供 HTTP 接口**,接收 Java 传入的完整 doc/基础数据,返回或直接写入 ES-ready doc,以支持“Java 只调接口、不关心字段细节”的新架构。 --- ## 二、Java 索引程序职责(保留 & 对接) ### 2.1 现有职责(需保留) 1. **索引触发与调度** - 全量: - `ShoplazzaProductIndexFullJob` → `ProductIndexServiceImpl.fullIndex(...)` - 按 tenant 分页拉取 SPU,调用批量索引。 - 增量: - MQ 消费(`ShoplazzaProductCreateAndUpdateConsumerService`); - 手工/API 触发增量索引 → `incrementalIndex(tenantId, spuId)`。 2. **MySQL 基础数据维护** - **店铺配置表 `shoplazza_shop_config`**: - 字段: - `primary_language`:店铺主语言; - `translate_to_en`:是否需要翻译成英文; - `translate_to_zh`:是否需要翻译成中文。 - 逻辑: - 每晚商品同步(`ShoplazzaProductSyncServiceImpl`)时,根据店铺 locale/Shoplazza 配置,写入/更新 `primary_language` 与翻译开关字段。 - **类目表 `shoplazza_product_category`**: - 同步/修正逻辑封装在 `ProductCategoryService` 中: - `getProductCategoryByPathIdList(tenantId, categoryIdList)`; - 当 mapping 对不上时触发 `syncProductCategoryByApi(shopId)` 再重查。 3. **Shopify/Shoplazza 商品同步 & 并发控制** - MQ 等机制用于削峰,避免店匠批量导入商品时压垮服务: - 同步逻辑在 `ShoplazzaProductSyncServiceImpl`; - 对接 MQ 消息:商品创建、更新、删除等事件; - 对高并发导入,拆分为小批次写入 MySQL + 后续异步索引。 4. **索引结构调整为 per-tenant** - 在 Java 中已统一使用: - `indexName = elasticsearchProperties.buildIndexName(tenantId);` - 索引命名形如:`{namespace}search_products_tenant_{tenant_id}`,其中 `namespace` 用于区分 prod/uat/test 等环境。 - Python 侧通过 `indexer/mapping_generator.get_tenant_index_name(tenant_id)` 统一生成索引名: - 索引命名规则为:`{ES_INDEX_NAMESPACE}search_products_tenant_{tenant_id}`; - `ES_INDEX_NAMESPACE` 来源于 `config.env_config`,默认 prod 环境为空字符串,其它环境可设置为 `uat_`、`test_` 等。 ### 2.2 Java 侧不再深入关心的部分 - ES 文档结构 `ProductIndexDocument` 的字段细节(title/brief/description/vendor/category_xxx/tags/specifications/skus/embedding 等)。 - 翻译、向量等具体算法逻辑。 - `qanchors` 等外部内容理解字段的生成。 **新职责边界**: Java 只负责“**选出要索引的 SPU + 从 MySQL 拉取原始数据 + 调用 Python 服务**(或交给 Python 做完整索引)”。 --- ## 三、Python 索引富化模块职责 ### 3.1 职责总览 - 输入:**MySQL 基础数据**(`shoplazza_product_spu/sku/option/category/image` 等)。 - 输出:**符合 `mappings/search_products.json` 的 doc 列表**,包括: - 多语言文本字段:`title.*`, `brief.*`, `description.*`, `vendor.*`, `category_path.*`, `category_name_text.*`; - 算法特征:`title_embedding`, `image_embedding`; - 结构化字段:`tags`, `specifications`, `skus`, `min_price`, `max_price`, `compare_at_price`, `total_inventory`, `sales` 等。 - 附加: - 翻译调用 & **Redis 缓存**(继承 Java 的 key 组织和 TTL 策略); - 向量生成(文本 & 图片); - ES 写入能力(Bulk & Incremental)。 ### 3.2 当前 Python 模块结构(简述) - `indexer/spu_transformer.py`: - 从 MySQL 读取 SPU/SKU/Option 数据。 - `indexer/document_transformer.py` (`SPUDocumentTransformer`): - 把单个 SPU + SKUs + Options 转成 ES 文档(doc)。 - `indexer/bulk_indexing_service.py`: - 全量索引服务,调用 `SPUTransformer` → `SPUDocumentTransformer` → `BulkIndexer` 写 ES。 - `indexer/incremental_service.py`: - 增量索引服务,按 SPU 列表批量更新/删除 ES 文档。 新设计中,本模块还将新增: - **HTTP 富化接口**(例如 `POST /index/enrich_docs` / `POST /index/enrich_and_index`); - **翻译客户端 + Redis 缓存**,按 Java 规则组织 key; - **(可选)向量缓存**。 --- ## 四、翻译与多语言字段设计(Java → Python 迁移) ### 4.1 语言决策策略(从 Java 迁移) 在 Java 中,语言决策逻辑在 `ProductIndexConvert.convertToIndexDocument(...)`,现规则: 1. **基础配置** - `primaryLanguage = shopConfig.primaryLanguage`(主语言); - `translateToEn = shopConfig.translateToEn`; - `translateToZh = shopConfig.translateToZh`。 2. **检测内容语言** - 标题:`queryTextLang = BaiDuTransApi.queryTextLang(spu.title)`; - 若检测不到,则视为 `queryTextLang = primaryLanguage`。 3. **确定源语言 `defSrcLang` 与目标翻译语言 `defLang`** - 情况 A:`primaryLanguage == queryTextLang`(不缺主语言) - `defSrcLang = primaryLanguage`; - 若 `translateToEn && primaryLanguage != "en"` → `defLang = "en"`; - 若 `translateToZh && primaryLanguage != "zh"` → `defLang = "zh"`。 - 情况 B:`primaryLanguage != queryTextLang`(认为“缺主语言”) - `defSrcLang = queryTextLang`; - `defIsMissPrimaryLanguage = true`; - 若 `translateToEn && queryTextLang != "en"` → `defLang = "en"`; - 若 `translateToZh && queryTextLang != "zh"` → `defLang = "zh"`; - 若上述都不满足(没有翻到 en/zh),则回退: - `defIsMissPrimaryLanguage = false`; - `defLang = primaryLanguage`(翻译回主语言)。 - 兜底:若 `defLang` 仍为空,默认 `defLang = "en"`。 4. **DocumentTranslation 元数据(用于后续检查/补偿)** ```java documentTranslation.setDefSrcLang(defSrcLang); documentTranslation.setDefLang(defLang); documentTranslation.setDefQueryTextLang(queryTextLang); documentTranslation.setDefIsMissPrimaryLanguage(isMissPrimaryLanguage); ``` **类目字段**(`category`, `category_path`)有类似一套独立的决策逻辑,写入 `defCategorySrcLang / defCategoryLang / defCategoryQueryTextLang / defCategoryIsMissPrimaryLanguage`。 > **Python 需做的**:在 `SPUDocumentTransformer` 内部复刻这套决策逻辑,对 title/brief/description/vendor/keywords & category 字段分别计算源语言 / 目标语言 / 主语言缺失标记,保存在一个等价的结构中(不一定叫 `DocumentTranslation`,但含义相同)。 ### 4.2 多语言字段填充规则 以标题为例(Java 中的 `DocumentTitle`): - 原始 title:`spu.title`; - 多语言写入: ```java DocumentTitle title = new DocumentTitle(); title.set(defLang, translationTitle) // 翻译结果(例如 en 或 zh) .set(defSrcLang, spu.getTitle()) // 原文 .set(primaryLanguage, primaryTitle); // 若缺主语言,则从 queryTextLang 翻回主语言 doc.setTitle(title); ``` 同样模式适用于: - `keywords`:从 `spu.seoKeywords` 翻译生成; - `brief`:从 `spu.brief` 翻译生成; - `description`:从清理 HTML 后的 `spu.description` 翻译生成; - `vendor`:从 `spu.vendor` 翻译生成。 **类目字段**: - `category_name_text`:基于 `spu.category`; - `category_path`:基于类目表 `product_category` 的 name 列表拼出的路径字符串 `allPathName`。 分别写入: ```java categoryNameText.set(categoryLang, translationCategory) .set(defLang, spu.getCategory()) .set(primaryLanguage, primaryCategory); categoryPath.set(categoryLang, translationCategoryPath) .set(defLang, allPathName) .set(primaryLanguage, primaryCategoryPath); ``` > **Python 需做的**:在构造 doc 时,为各多语言字段生成 dict: > > - 至少包含 `{defSrcLang: 原文}`; > - 如有翻译,加入 `{defLang: 翻译}`; > - 若 `isMissPrimaryLanguage` 为 true,再加入 `{primaryLanguage: 回译结果}`。 --- ## 五、翻译服务与 Redis 缓存设计(必须继承) ### 5.1 外部翻译接口 你当前要使用的翻译接口(Python 侧): ```bash curl -X POST http://127.0.0.1:6006/translate \ -H "Content-Type: application/json" \ -d '{"text":"儿童小男孩女孩开学 100 天衬衫短袖 搞笑图案字母印花庆祝上衣", "target_lang":"en", "source_lang":"zh", "model":"qwen-mt", "scene":"sku_name"}' ``` - 请求参数: - `text`:待翻译文本; - `target_lang`:目标语言(如 `"en"`、`"zh"` 等); - `source_lang`:源语言; - `model`:启用的翻译能力名称; - `scene`:翻译场景(如 `sku_name`、`general`)。 - 响应(参考 Java `TranslationServiceImpl.querySaasTranslate`): - JSON 里包含 `status` 字段,如果是 `"success"`,且 `translated_text` 非空,则返回翻译结果。 ### 5.2 Redis 缓存 key 规则(与 Java 完全对齐) 在 `TranslationServiceImpl` 中,缓存 key 定义: ```java private static final Integer DEFAULT_TTL_DAYS = 30; private String buildCacheKey(Long tenantId, String sourceText, String targetLang) { String hash = DigestUtils.md5Hex(sourceText); return String.format("translation:%s:%s:%s", tenantId, targetLang.toLowerCase(), hash); } ``` - **Key 模式**:`translation:{tenantId}:{targetLangLower}:{md5(sourceText)}`。 - **Value**:`translatedText`(单纯的翻译结果字符串)。 - **TTL**:30 天(`DEFAULT_TTL_DAYS = 30`)。 缓存读写逻辑: ```java // 读 String cache = queryCacheTranslation(tenantId, text, targetLang); if (cache != null) { // 构造 TranslateDTO 返回 } // 写 saveCacheTranslation(tenantId, text, targetLang, translatedText); ``` **你在 Python 侧必须继承的:** - 相同的 key 组织规则; - 相同的 TTL; - 相同的维度(tenant_id + 目标语言 + 原文 md5)。 这样可以**复用以前在 Java 里已经积累的翻译缓存**,也保证后续迁移过程中行为一致。 --- ## 六、向量服务与缓存(扩展设计) ### 6.1 文本向量(title_embedding) Java 侧: ```java List titleEmbedding = vectorService.generateTextVector(spu.getTitle()); if (StrUtil.isNotBlank(spu.getTitle()) && CollUtil.isNotEmpty(titleEmbedding)) { doc.setTitleEmbedding(titleEmbedding); } ``` 你当前 Python 侧已有: - `embeddings/text_encoder.py`(通过 6005 向量服务调用 Qwen3-Embedding-0.6B); - `SPUDocumentTransformer._fill_title_embedding` 已封装了调用 encoder 的逻辑。 **建议缓存策略(可选,但推荐):** - Key:`text_vector:{model_name}:{md5(title)}`; - Value:向量数组(可序列化成 JSON 或 msgpack); - TTL:可设为较长时间(例如 30 天或不设置 TTL,由容量控制)。 ### 6.2 图片向量(image_embedding) Java 侧: - 对 `ShoplazzaProductImageDO.src` 调用 `vectorService.generateImageVector(image.getSrc())`; - 写入 `image_embedding.vector`(1024 维)+ `url`。 Python 侧已有 `embeddings/clip_encoder.py` 可用 CN-CLIP 模型;缓存策略类似: - Key:`image_vector:{model_name}:{md5(image_url)}`。 --- ## 七、doc 组织逻辑迁移(从 Java 的 ProductIndexConvert 到 Python 的 SPUDocumentTransformer) ### 7.1 需要完整迁移的要点 #### 7.1.1 基础字段 - `tenant_id`:`spu.tenant_id`; - `spu_id`:`spu.id`; - `create_time` / `update_time`:格式化为 ISO 字符串(`yyyy-MM-dd'T'HH:mm:ss`); - 主图 `image_url`: - 若 `image_src` 以 `http` 开头 → 直接使用; - 否则前缀 `//`。 #### 7.1.2 多语言字段(title/brief/description/vendor/keywords/category_name_text/category_path) - 完整复刻前文第 4 节的逻辑: - 语言决策; - 调翻译接口(带缓存); - 构造多语言对象(Python 中为 dict): ```python title = {} title[src_lang] = spu.title if translation_title: title[def_lang] = translation_title if is_miss_primary and primary_title: title[primary_lang] = primary_title doc["title"] = title ``` - `keywords` 来源:`spu.seo_keywords`; - 类目路径需要从 `product_category` 表取 name 列表,按 level 排序后拼成 `allPathName = "一级/二级/三级"`。 #### 7.1.3 tags - 同 Java 逻辑: ```python if spu.tags: doc["tags"] = [t.strip() for t in spu.tags.split(",") if t.strip()] ``` #### 7.1.4 规格、SKU、价格、库存 迁移 Java 的 `parseOptions` & `parseSkus` 逻辑: - `option1_name`, `option2_name`, `option3_name`: - 按 `position` 排序 Option 表; - 取前三个,写 name; - 每个 Option 的 `values` 去重后写入 `optionX_values`; - 同时构建 `valueNameMap[value] = optionName`,用于构建 `specifications`。 - `specifications`: - 遍历所有 SKU: - 若 `option1` 非空:构造 1 条 `{sku_id, name=valueNameMap[option1], value=option1}`; - 同理 `option2`、`option3`。 - `skus`(nested): - 每条 SKU 映射为: - `sku_id`, `price`, `compare_at_price`, `sku_code`, `stock`, `weight`, `weight_unit`, `option1_value`, `option2_value`, `option3_value`, `image_src`。 - 聚合字段: - `min_price` / `max_price`:全体 SKU `price` 的最小/最大; - `compare_at_price`:全体 SKU `compare_at_price` 的最大值(若 SPU 有 compare_at_price 可优先); - `sku_prices`:所有 SKU price 列表; - `sku_weights`:所有 SKU weight(long)列表; - `sku_weight_units`:weight_unit 去重列表; - `total_inventory`:所有 SKU `inventory_quantity` 总和; - `sales`:虚拟销量 `spu.fake_sales`。 ### 7.2 qanchors / keywords 扩展 该能力已迁移到独立内容理解服务。本仓库仍保留字段模型与消费侧能力,但不再负责在 indexer 内部生成 `qanchors` / `enriched_*`。 --- ## 八、接口设计 ### 8.1 保留的能力:直接写 ES(现有) - **全量索引**: - CLI:`python main.py ingest ...` 或 `scripts/ingest.sh`; - 入口:`BulkIndexingService.bulk_index(tenant_id, recreate_index, batch_size)`: - 生成 tenant index 名; - 如需重建则删除再建索引; - 从 MySQL 拉数据 → `SPUTransformer.transform_batch()` → `BulkIndexer` 写 ES。 - **增量索引**: - `IncrementalIndexerService.index_spus_to_es(es_client, tenant_id, spu_ids, index_name, batch_size, delete_spu_ids)`: - 对于 deleted / DB 已无的 SPU,删除 ES 文档; - 对仍存在的 SPU,从 MySQL 拉数据 → `create_document_transformer` → `SPUDocumentTransformer.transform_spu_to_doc` → 批量写入 ES。 ### 8.2 新增接口一:文档富化(不写 ES) **目的**:供 Java 索引程序调用,仅获取 ES-ready docs,自行写入 ES,或作为后续多用途数据源。 - **接口示例**:`POST /index/enrich_docs` - **入参**(伪 JSON): ```json { "tenant_id": "123", "shop_config": { "primary_language": "en", "translate_to_en": true, "translate_to_zh": false }, "spus": [ { "spu": { /* 映射 shoplazza_product_spu */ }, "skus": [ /* shoplazza_product_sku 列表 */ ], "options": [ /* shoplazza_product_option 列表 */ ], "images": [ /* shoplazza_product_image 列表(可选) */ ] }, ... ] } ``` > 可选:也支持只传 `tenant_id + spu_ids`,由 Python 侧自行查 MySQL(对接现有 `SPUTransformer`),但从职责划分上,更推荐 **Java 查完基础数据再传给 Python**。 - **出参**: ```json { "tenant_id": "123", "docs": [ { "spu_id": "1", "tenant_id": "123", "title": { "en": "...", "zh": "...", ... }, "brief": { ... }, "description": { ... }, "vendor": { ... }, "tags": ["xxx","yyy"], "image_url": "...", "title_embedding": [ ... 1024 floats ... ], "image_embedding": [ { "url": "...", "vector": [ ... ] } ], "category_name_text": { ... }, "category_path": { ... }, "category_id": "xxx", "category1_name": "xxx", "specifications": [ ... ], "skus": [ ... ], "min_price": ..., "max_price": ..., "compare_at_price": ..., "total_inventory": ..., "sales": ... }, ... ] } ``` ### 8.3 新增接口二:富化 + 写 ES **目的**:Java 只负责调度,不关心 ES client 细节。 - **接口示例**:`POST /index/enrich_and_index` - **入参**:同上(基础数据 / spu_ids + tenant_id)。 - **内部逻辑**: 1. 按 4–7 节的规则构造 docs(含翻译 & 向量 & 缓存); 2. 使用 `BulkIndexer` 写入 `search_products_tenant_{tenant_id}`; 3. 返回统计信息。 - **出参**(例): ```json { "tenant_id": "123", "index_name": "search_products_tenant_123", "total": 100, "indexed": 98, "failed": 2, "failed_spu_ids": ["456","789"], "elapsed_ms": 12345 } ``` --- ## 九、小结 这份设计的目标是: - **保留现有 Java 调度 & 数据同步能力**,不破坏已有全量/增量任务和 MQ 削峰; - **把 ES 文档结构、多语言逻辑、翻译与向量等算法能力全部收拢到 Python 索引富化模块**,实现“单一 owner”; - **完全继承 Java 现有的翻译缓存策略**(Redis key & TTL & 维度),保证行为与性能的一致性; - **为未来字段扩展(包括外部内容理解字段接入)预留清晰路径**:字段模型可继续保留,但生成职责可独立演进。 --- ## 十、实际 HTTP 接口与测试用例(速查) ### 10.1 端口与服务 - `./scripts/start_backend.sh` → `main.py serve` → 端口 `6002`,**没有 `/indexer/*` 路由**。 - `./scripts/start_indexer.sh` → `main.py serve-indexer` → 端口 `6004`,只暴露 `/indexer/*`。 **实际调用索引相关接口时,请始终访问 `6004`。** ### 10.2 关键接口 - **构建文档(正式使用)**:`POST /indexer/build-docs` - 入参:`tenant_id + items[ { spu, skus, options } ]` - 输出:`docs` 数组,每个元素是完整 ES doc,不查库、不写 ES。 - 注意:当前不再内置生成 `qanchors` / `enriched_*`;如需这些字段,请由独立内容理解服务生成后自行合并。 - **构建文档(测试用,内部查库)**:`POST /indexer/build-docs-from-db` - 入参:`{"tenant_id": "...", "spu_ids": ["..."]}` - 内部:按 `spu_ids` 从 MySQL 查出 SPU/SKU/Option,再走与 `build-docs` 相同的转换逻辑。 - **全量壳**:`POST /indexer/reindex`(查库 + 转 doc + 写 ES,用于自测) - **增量壳**:`POST /indexer/index`(查库 + 转 doc + 写 ES,用于自测) - **单文档查看**:`POST /indexer/documents` - **健康检查**:`GET /indexer/health` ### 10.3 典型测试流程(以 tenant 170, spu_id 223167 为例,支持多环境) 1. 启动 indexer 服务(以本地或 prod 环境为例,.env 中 RUNTIME_ENV=prod, ES_INDEX_NAMESPACE=''): ```bash ./scripts/stop.sh ./scripts/start_indexer.sh ``` 2. 构建指定 SPU 的 ES doc: ```bash curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \ -H "Content-Type: application/json" \ -d '{"tenant_id": "170", "spu_ids": ["223167"]}' ``` 3. 预期返回(节选): ```json { "tenant_id": "170", "docs": [ { "tenant_id": "170", "spu_id": "223167", "title": { "en": "...Floerns Women's Gothic...", "zh": "弗洛恩斯 女士哥特风..." }, "tags": ["Floerns", "Clothing", "Shoes & Jewelry", "..."], "skus": [ { "sku_id": "3988393", "price": 25.99, "compare_at_price": 25.99, "stock": 100 } ], "min_price": 25.99, "max_price": 25.99, "compare_at_price": 25.99, "total_inventory": 100, "title_embedding": [ /* 1024 维向量 */ ] } ], "total": 1, "success_count": 1, "failed_count": 0, "failed": [] } ``` 4. 使用 `docs/常用查询 - ES.md` 中的查询,对应验证 ES 索引中的文档(注意索引名前缀): ```bash # prod / 本地环境:ES_INDEX_NAMESPACE 为空,索引名为 search_products_tenant_170 curl -u 'essa:***' \ -X GET 'http://localhost:9200/search_products_tenant_170/_search?pretty' \ -H 'Content-Type: application/json' \ -d '{ "size": 5, "_source": ["title", "tags"], "query": { "bool": { "filter": [ { "term": { "spu_id": "223167" } } ] } } }' # UAT 环境:假设 .env 中配置 ES_INDEX_NAMESPACE=uat_, # 则索引名为 uat_search_products_tenant_170 curl -u 'essa:***' \ -X GET 'http://localhost:9200/uat_search_products_tenant_170/_search?pretty' \ -H 'Content-Type: application/json' \ -d '{ "size": 5, "_source": ["title", "tags"], "query": { "bool": { "filter": [ { "term": { "spu_id": "223167" } } ] } } }' ``` 通过这套流程可以完整验证:MySQL → Python 富化 → ES doc → ES 查询 的全链路行为是否符合预期。*** End Patch"""} ***!