Indexer 模块
负责 MySQL → ES 文档富化(多语言、翻译、向量、规格聚合)。请求示例见 docs/QUICKSTART.md §3.2。
一、整体架构说明
1.1 系统角色划分
Java 索引程序(/home/tw/saas-server)
- 负责“什么时候、对哪些 SPU 做索引”(调度 & 触发)。
- 负责商品/店铺/类目等基础数据同步(写 MySQL)。
- 负责多租户环境下的全量/增量索引调度,但不再关心具体 doc 字段细节。
Python 索引富化模块(本项目 saas-search / indexer)
- 负责“如何把 MySQL 基础数据变成符合 ES mapping 的 doc”,包括:
- 多语言字段组织;
- 翻译调用与缓存;
- 向量生成与(可选)缓存;
- 规格、SKU 聚合、类目路径解析等。
- 保留当前“直接写 ES”能力(BulkIndexingService, IncrementalIndexerService)。
- 新增:提供 HTTP 接口,接收 Java 传入的完整 doc/基础数据,返回或直接写入 ES-ready doc,以支持“Java 只调接口、不关心字段细节”的新架构。
二、Java 索引程序职责(保留 & 对接)
2.1 现有职责(需保留)
索引触发与调度
- 全量:
ShoplazzaProductIndexFullJob→ProductIndexServiceImpl.fullIndex(...)- 按 tenant 分页拉取 SPU,调用批量索引。
- 增量:
- MQ 消费(
ShoplazzaProductCreateAndUpdateConsumerService); - 手工/API 触发增量索引 →
incrementalIndex(tenantId, spuId)。
- MQ 消费(
- 全量:
MySQL 基础数据维护
- 店铺配置表
shoplazza_shop_config:- 字段:
primary_language:店铺主语言;translate_to_en:是否需要翻译成英文;translate_to_zh:是否需要翻译成中文。- 逻辑:
- 每晚商品同步(
ShoplazzaProductSyncServiceImpl)时,根据店铺 locale/Shoplazza 配置,写入/更新primary_language与翻译开关字段。
- 类目表
shoplazza_product_category:- 同步/修正逻辑封装在
ProductCategoryService中: getProductCategoryByPathIdList(tenantId, categoryIdList);- 当 mapping 对不上时触发
syncProductCategoryByApi(shopId)再重查。
- 同步/修正逻辑封装在
- 店铺配置表
Shopify/Shoplazza 商品同步 & 并发控制
- MQ 等机制用于削峰,避免店匠批量导入商品时压垮服务:
- 同步逻辑在
ShoplazzaProductSyncServiceImpl; - 对接 MQ 消息:商品创建、更新、删除等事件;
- 对高并发导入,拆分为小批次写入 MySQL + 后续异步索引。
- 同步逻辑在
- MQ 等机制用于削峰,避免店匠批量导入商品时压垮服务:
索引结构调整为 per-tenant
- 在 Java 中已统一使用:
indexName = elasticsearchProperties.buildIndexName(tenantId);- 索引命名形如:
{namespace}search_products_tenant_{tenant_id},其中namespace用于区分 prod/uat/test 等环境。
- Python 侧通过
indexer/mapping_generator.get_tenant_index_name(tenant_id)统一生成索引名:- 索引命名规则为:
{ES_INDEX_NAMESPACE}search_products_tenant_{tenant_id}; ES_INDEX_NAMESPACE来源于config.env_config,默认 prod 环境为空字符串,其它环境可设置为uat_、test_等。
- 索引命名规则为:
- 在 Java 中已统一使用:
2.2 Java 侧不再深入关心的部分
- ES 文档结构
ProductIndexDocument的字段细节(title/brief/description/vendor/category_xxx/tags/specifications/skus/embedding 等)。 - 翻译、向量等具体算法逻辑。
- qanchors/keywords 等新特征的计算。
新职责边界:
Java 只负责“选出要索引的 SPU + 从 MySQL 拉取原始数据 + 调用 Python 服务(或交给 Python 做完整索引)”。
三、Python 索引富化模块职责
3.1 职责总览
- 输入:MySQL 基础数据(
shoplazza_product_spu/sku/option/category/image等)。 - 输出:符合
mappings/search_products.json的 doc 列表,包括:- 多语言文本字段:
title.*,brief.*,description.*,vendor.*,category_path.*,category_name_text.*; - 算法特征:
title_embedding,image_embedding,qanchors.*,keywords.*(未来扩展); - 结构化字段:
tags,specifications,skus,min_price,max_price,compare_at_price,total_inventory,sales等。
- 多语言文本字段:
- 附加:
- 翻译调用 & Redis 缓存(继承 Java 的 key 组织和 TTL 策略);
- 向量生成(文本 & 图片);
- ES 写入能力(Bulk & Incremental)。
3.2 当前 Python 模块结构(简述)
indexer/spu_transformer.py:- 从 MySQL 读取 SPU/SKU/Option 数据。
indexer/document_transformer.py(SPUDocumentTransformer):- 把单个 SPU + SKUs + Options 转成 ES 文档(doc)。
indexer/bulk_indexing_service.py:- 全量索引服务,调用
SPUTransformer→SPUDocumentTransformer→BulkIndexer写 ES。
- 全量索引服务,调用
indexer/incremental_service.py:- 增量索引服务,按 SPU 列表批量更新/删除 ES 文档。
新设计中,本模块还将新增:
- HTTP 富化接口(例如
POST /index/enrich_docs/POST /index/enrich_and_index); - 翻译客户端 + Redis 缓存,按 Java 规则组织 key;
- (可选)向量缓存。
四、翻译与多语言字段设计(Java → Python 迁移)
4.1 语言决策策略(从 Java 迁移)
在 Java 中,语言决策逻辑在 ProductIndexConvert.convertToIndexDocument(...),现规则:
基础配置
primaryLanguage = shopConfig.primaryLanguage(主语言);translateToEn = shopConfig.translateToEn;translateToZh = shopConfig.translateToZh。
检测内容语言
- 标题:
queryTextLang = BaiDuTransApi.queryTextLang(spu.title); - 若检测不到,则视为
queryTextLang = primaryLanguage。
- 标题:
确定源语言
defSrcLang与目标翻译语言defLang- 情况 A:
primaryLanguage == queryTextLang(不缺主语言)defSrcLang = primaryLanguage;- 若
translateToEn && primaryLanguage != "en"→defLang = "en"; - 若
translateToZh && primaryLanguage != "zh"→defLang = "zh"。
- 情况 B:
primaryLanguage != queryTextLang(认为“缺主语言”)defSrcLang = queryTextLang;defIsMissPrimaryLanguage = true;- 若
translateToEn && queryTextLang != "en"→defLang = "en"; - 若
translateToZh && queryTextLang != "zh"→defLang = "zh"; - 若上述都不满足(没有翻到 en/zh),则回退:
defIsMissPrimaryLanguage = false;defLang = primaryLanguage(翻译回主语言)。
- 情况 A:
- 兜底:若
defLang仍为空,默认defLang = "en"。
- DocumentTranslation 元数据(用于后续检查/补偿)
documentTranslation.setDefSrcLang(defSrcLang);
documentTranslation.setDefLang(defLang);
documentTranslation.setDefQueryTextLang(queryTextLang);
documentTranslation.setDefIsMissPrimaryLanguage(isMissPrimaryLanguage);
类目字段(category, category_path)有类似一套独立的决策逻辑,写入 defCategorySrcLang / defCategoryLang / defCategoryQueryTextLang / defCategoryIsMissPrimaryLanguage。
Python 需做的:在
SPUDocumentTransformer内部复刻这套决策逻辑,对 title/brief/description/vendor/keywords & category 字段分别计算源语言 / 目标语言 / 主语言缺失标记,保存在一个等价的结构中(不一定叫DocumentTranslation,但含义相同)。
4.2 多语言字段填充规则
以标题为例(Java 中的 DocumentTitle):
- 原始 title:
spu.title; - 多语言写入:
DocumentTitle title = new DocumentTitle();
title.set(defLang, translationTitle) // 翻译结果(例如 en 或 zh)
.set(defSrcLang, spu.getTitle()) // 原文
.set(primaryLanguage, primaryTitle); // 若缺主语言,则从 queryTextLang 翻回主语言
doc.setTitle(title);
同样模式适用于:
keywords:从spu.seoKeywords翻译生成;brief:从spu.brief翻译生成;description:从清理 HTML 后的spu.description翻译生成;vendor:从spu.vendor翻译生成。
类目字段:
category_name_text:基于spu.category;category_path:基于类目表product_category的 name 列表拼出的路径字符串allPathName。
分别写入:
categoryNameText.set(categoryLang, translationCategory)
.set(defLang, spu.getCategory())
.set(primaryLanguage, primaryCategory);
categoryPath.set(categoryLang, translationCategoryPath)
.set(defLang, allPathName)
.set(primaryLanguage, primaryCategoryPath);
Python 需做的:在构造 doc 时,为各多语言字段生成 dict:
- 至少包含
{defSrcLang: 原文};- 如有翻译,加入
{defLang: 翻译};- 若
isMissPrimaryLanguage为 true,再加入{primaryLanguage: 回译结果}。
五、翻译服务与 Redis 缓存设计(必须继承)
5.1 外部翻译接口
你当前要使用的翻译接口(Python 侧):
curl -X POST http://127.0.0.1:6006/translate \
-H "Content-Type: application/json" \
-d '{"text":"儿童小男孩女孩开学 100 天衬衫短袖 搞笑图案字母印花庆祝上衣",
"target_lang":"en",
"source_lang":"zh",
"model":"qwen-mt",
"scene":"sku_name"}'
- 请求参数:
text:待翻译文本;target_lang:目标语言(如"en"、"zh"等);source_lang:源语言;model:启用的翻译能力名称;scene:翻译场景(如sku_name、general)。
- 响应(参考 Java
TranslationServiceImpl.querySaasTranslate):- JSON 里包含
status字段,如果是"success",且translated_text非空,则返回翻译结果。
- JSON 里包含
5.2 Redis 缓存 key 规则(与 Java 完全对齐)
在 TranslationServiceImpl 中,缓存 key 定义:
private static final Integer DEFAULT_TTL_DAYS = 30;
private String buildCacheKey(Long tenantId, String sourceText, String targetLang) {
String hash = DigestUtils.md5Hex(sourceText);
return String.format("translation:%s:%s:%s",
tenantId, targetLang.toLowerCase(), hash);
}
- Key 模式:
translation:{tenantId}:{targetLangLower}:{md5(sourceText)}。 - Value:
translatedText(单纯的翻译结果字符串)。 - TTL:30 天(
DEFAULT_TTL_DAYS = 30)。
缓存读写逻辑:
// 读
String cache = queryCacheTranslation(tenantId, text, targetLang);
if (cache != null) {
// 构造 TranslateDTO 返回
}
// 写
saveCacheTranslation(tenantId, text, targetLang, translatedText);
你在 Python 侧必须继承的:
- 相同的 key 组织规则;
- 相同的 TTL;
- 相同的维度(tenant_id + 目标语言 + 原文 md5)。
这样可以复用以前在 Java 里已经积累的翻译缓存,也保证后续迁移过程中行为一致。
六、向量服务与缓存(扩展设计)
6.1 文本向量(title_embedding)
Java 侧:
List<Float> titleEmbedding = vectorService.generateTextVector(spu.getTitle());
if (StrUtil.isNotBlank(spu.getTitle()) && CollUtil.isNotEmpty(titleEmbedding)) {
doc.setTitleEmbedding(titleEmbedding);
}
你当前 Python 侧已有:
embeddings/text_encoder.py(通过 6005 向量服务调用 Qwen3-Embedding-0.6B);SPUDocumentTransformer._fill_title_embedding已封装了调用 encoder 的逻辑。
建议缓存策略(可选,但推荐):
- Key:
text_vector:{model_name}:{md5(title)}; - Value:向量数组(可序列化成 JSON 或 msgpack);
- TTL:可设为较长时间(例如 30 天或不设置 TTL,由容量控制)。
6.2 图片向量(image_embedding)
Java 侧:
- 对
ShoplazzaProductImageDO.src调用vectorService.generateImageVector(image.getSrc()); - 写入
image_embedding.vector(1024 维)+url。
Python 侧已有 embeddings/clip_encoder.py 可用 CN-CLIP 模型;缓存策略类似:
- Key:
image_vector:{model_name}:{md5(image_url)}。
七、doc 组织逻辑迁移(从 Java 的 ProductIndexConvert 到 Python 的 SPUDocumentTransformer)
7.1 需要完整迁移的要点
7.1.1 基础字段
tenant_id:spu.tenant_id;spu_id:spu.id;create_time/update_time:格式化为 ISO 字符串(yyyy-MM-dd'T'HH:mm:ss);- 主图
image_url:- 若
image_src以http开头 → 直接使用; - 否则前缀
//。
- 若
7.1.2 多语言字段(title/brief/description/vendor/keywords/category_name_text/category_path)
完整复刻前文第 4 节的逻辑:
- 语言决策;
- 调翻译接口(带缓存);
- 构造多语言对象(Python 中为 dict):
title = {} title[src_lang] = spu.title if translation_title: title[def_lang] = translation_title if is_miss_primary and primary_title: title[primary_lang] = primary_title doc["title"] = titlekeywords来源:spu.seo_keywords;类目路径需要从
product_category表取 name 列表,按 level 排序后拼成allPathName = "一级/二级/三级"。
7.1.3 tags
- 同 Java 逻辑:
if spu.tags:
doc["tags"] = [t.strip() for t in spu.tags.split(",") if t.strip()]
7.1.4 规格、SKU、价格、库存
迁移 Java 的 parseOptions & parseSkus 逻辑:
option1_name,option2_name,option3_name:- 按
position排序 Option 表; - 取前三个,写 name;
- 每个 Option 的
values去重后写入optionX_values; - 同时构建
valueNameMap[value] = optionName,用于构建specifications。
- 按
specifications:- 遍历所有 SKU:
- 若
option1非空:构造 1 条{sku_id, name=valueNameMap[option1], value=option1}; - 同理
option2、option3。
skus(nested):- 每条 SKU 映射为:
sku_id,price,compare_at_price,sku_code,stock,weight,weight_unit,option1_value,option2_value,option3_value,image_src。
聚合字段:
min_price/max_price:全体 SKUprice的最小/最大;compare_at_price:全体 SKUcompare_at_price的最大值(若 SPU 有 compare_at_price 可优先);sku_prices:所有 SKU price 列表;sku_weights:所有 SKU weight(long)列表;sku_weight_units:weight_unit 去重列表;total_inventory:所有 SKUinventory_quantity总和;sales:虚拟销量spu.fake_sales。
7.2 qanchors / keywords 扩展
- 当前 Java 中
qanchors字段结构已存在,但未赋值; - 设计建议:
- 在 Python 侧基于:
- 标题 / brief / description / tags / 类目等,做查询锚点抽取;
- 按与
title/keywords类似的多语言结构写入qanchors.{lang}; - 翻译策略可选:
- 在生成锚点后再调用翻译;
- 或使用原始文本的翻译结果组合。
八、接口设计
8.1 保留的能力:直接写 ES(现有)
全量索引:
- CLI:
python main.py ingest ...或scripts/ingest.sh; - 入口:
BulkIndexingService.bulk_index(tenant_id, recreate_index, batch_size): - 生成 tenant index 名;
- 如需重建则删除再建索引;
- 从 MySQL 拉数据 →
SPUTransformer.transform_batch()→BulkIndexer写 ES。
- CLI:
增量索引:
IncrementalIndexerService.index_spus_to_es(es_client, tenant_id, spu_ids, index_name, batch_size, delete_spu_ids):- 对于 deleted / DB 已无的 SPU,删除 ES 文档;
- 对仍存在的 SPU,从 MySQL 拉数据 →
create_document_transformer→SPUDocumentTransformer.transform_spu_to_doc→ 批量写入 ES。
8.2 新增接口一:文档富化(不写 ES)
目的:供 Java 索引程序调用,仅获取 ES-ready docs,自行写入 ES,或作为后续多用途数据源。
- 接口示例:
POST /index/enrich_docs - 入参(伪 JSON):
{
"tenant_id": "123",
"shop_config": {
"primary_language": "en",
"translate_to_en": true,
"translate_to_zh": false
},
"spus": [
{
"spu": { /* 映射 shoplazza_product_spu */ },
"skus": [ /* shoplazza_product_sku 列表 */ ],
"options": [ /* shoplazza_product_option 列表 */ ],
"images": [ /* shoplazza_product_image 列表(可选) */ ]
},
...
]
}
可选:也支持只传
tenant_id + spu_ids,由 Python 侧自行查 MySQL(对接现有SPUTransformer),但从职责划分上,更推荐 Java 查完基础数据再传给 Python。
- 出参:
{
"tenant_id": "123",
"docs": [
{
"spu_id": "1",
"tenant_id": "123",
"title": { "en": "...", "zh": "...", ... },
"qanchors": { ... },
"keywords": { ... },
"brief": { ... },
"description": { ... },
"vendor": { ... },
"tags": ["xxx","yyy"],
"image_url": "...",
"title_embedding": [ ... 1024 floats ... ],
"image_embedding": [ { "url": "...", "vector": [ ... ] } ],
"category_name_text": { ... },
"category_path": { ... },
"category_id": "xxx",
"category1_name": "xxx",
"specifications": [ ... ],
"skus": [ ... ],
"min_price": ...,
"max_price": ...,
"compare_at_price": ...,
"total_inventory": ...,
"sales": ...
},
...
]
}
8.3 新增接口二:富化 + 写 ES
目的:Java 只负责调度,不关心 ES client 细节。
- 接口示例:
POST /index/enrich_and_index - 入参:同上(基础数据 / spu_ids + tenant_id)。
内部逻辑:
- 按 4–7 节的规则构造 docs(含翻译 & 向量 & 缓存);
- 使用
BulkIndexer写入search_products_tenant_{tenant_id}; - 返回统计信息。
出参(例):
{
"tenant_id": "123",
"index_name": "search_products_tenant_123",
"total": 100,
"indexed": 98,
"failed": 2,
"failed_spu_ids": ["456","789"],
"elapsed_ms": 12345
}
九、小结
这份设计的目标是:
- 保留现有 Java 调度 & 数据同步能力,不破坏已有全量/增量任务和 MQ 削峰;
- 把 ES 文档结构、多语言逻辑、翻译与向量等算法能力全部收拢到 Python 索引富化模块,实现“单一 owner”;
- 完全继承 Java 现有的翻译缓存策略(Redis key & TTL & 维度),保证行为与性能的一致性;
- 为未来字段扩展(qanchors、更多 tags/特征)预留清晰路径:仅需在 Python 侧新增逻辑和 mapping,不再拉 Java 入伙。
十、实际 HTTP 接口与测试用例(速查)
10.1 端口与服务
./scripts/start_backend.sh→main.py serve→ 端口6002,没有/indexer/*路由。./scripts/start_indexer.sh→main.py serve-indexer→ 端口6004,只暴露/indexer/*。
实际调用索引相关接口时,请始终访问 6004。
10.2 关键接口
构建文档(正式使用):
POST /indexer/build-docs- 入参:
tenant_id + items[ { spu, skus, options } ] - 输出:
docs数组,每个元素是完整 ES doc,不查库、不写 ES。
- 入参:
构建文档(测试用,内部查库):
POST /indexer/build-docs-from-db- 入参:
{"tenant_id": "...", "spu_ids": ["..."]} - 内部:按
spu_ids从 MySQL 查出 SPU/SKU/Option,再走与build-docs相同的转换逻辑。
- 入参:
全量壳:
POST /indexer/reindex(查库 + 转 doc + 写 ES,用于自测)增量壳:
POST /indexer/index(查库 + 转 doc + 写 ES,用于自测)单文档查看:
POST /indexer/documents健康检查:
GET /indexer/health
10.3 典型测试流程(以 tenant 170, spu_id 223167 为例,支持多环境)
- 启动 indexer 服务(以本地或 prod 环境为例,.env 中 RUNTIME_ENV=prod, ES_INDEX_NAMESPACE=''):
./scripts/stop.sh
./scripts/start_indexer.sh
- 构建指定 SPU 的 ES doc:
curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \
-H "Content-Type: application/json" \
-d '{"tenant_id": "170", "spu_ids": ["223167"]}'
- 预期返回(节选):
{
"tenant_id": "170",
"docs": [
{
"tenant_id": "170",
"spu_id": "223167",
"title": { "en": "...Floerns Women's Gothic...", "zh": "弗洛恩斯 女士哥特风..." },
"tags": ["Floerns", "Clothing", "Shoes & Jewelry", "..."],
"skus": [
{
"sku_id": "3988393",
"price": 25.99,
"compare_at_price": 25.99,
"stock": 100
}
],
"min_price": 25.99,
"max_price": 25.99,
"compare_at_price": 25.99,
"total_inventory": 100,
"title_embedding": [ /* 1024 维向量 */ ]
}
],
"total": 1,
"success_count": 1,
"failed_count": 0,
"failed": []
}
- 使用
docs/常用查询 - ES.md中的查询,对应验证 ES 索引中的文档(注意索引名前缀):
# prod / 本地环境:ES_INDEX_NAMESPACE 为空,索引名为 search_products_tenant_170
curl -u 'essa:***' \
-X GET 'http://localhost:9200/search_products_tenant_170/_search?pretty' \
-H 'Content-Type: application/json' \
-d '{
"size": 5,
"_source": ["title", "tags"],
"query": {
"bool": {
"filter": [
{ "term": { "spu_id": "223167" } }
]
}
}
}'
# UAT 环境:假设 .env 中配置 ES_INDEX_NAMESPACE=uat_,
# 则索引名为 uat_search_products_tenant_170
curl -u 'essa:***' \
-X GET 'http://localhost:9200/uat_search_products_tenant_170/_search?pretty' \
-H 'Content-Type: application/json' \
-d '{
"size": 5,
"_source": ["title", "tags"],
"query": {
"bool": {
"filter": [
{ "term": { "spu_id": "223167" } }
]
}
}
}'
通过这套流程可以完整验证:MySQL → Python 富化 → ES doc → ES 查询 的全链路行为是否符合预期。*** End Patch"""} ***!