Name Last Update
..
ANCHORS_AND_SEMANTIC_ATTRIBUTES.md Loading commit data...
README.md Loading commit data...
Untitled Loading commit data...
__init__.py Loading commit data...
bulk_indexer.py Loading commit data...
bulk_indexing_service.py Loading commit data...
document_transformer.py Loading commit data...
incremental_service.py Loading commit data...
indexer_logger.py Loading commit data...
indexing_utils.py Loading commit data...
mapping_generator.py Loading commit data...
spu_transformer.py Loading commit data...
taxonomy.md Loading commit data...
test_indexing.py Loading commit data...

README.md

Indexer 模块

负责 MySQL → ES 文档富化(多语言、翻译、向量、规格聚合)。请求示例docs/QUICKSTART.md §3.2。


一、整体架构说明

1.1 系统角色划分

  • Java 索引程序

    • 负责“什么时候、对哪些 SPU 做索引”(调度 & 触发)。
    • 负责商品/店铺/类目等基础数据同步(写 MySQL)。
    • 负责多租户环境下的全量/增量索引调度,但不再关心具体 doc 字段细节。
  • Python 索引富化模块(本项目 saas-search / indexer)

    • 负责“如何把 MySQL 基础数据变成符合 ES mapping 的 doc”,包括:
    • 多语言字段组织;
    • 翻译调用与缓存;
    • 向量生成与(可选)缓存;
    • 规格、SKU 聚合、类目路径解析等。
    • 保留当前“直接写 ES”能力(BulkIndexingService, IncrementalIndexerService)。
    • 新增:提供 HTTP 接口,接收 Java 传入的完整 doc/基础数据,返回或直接写入 ES-ready doc,以支持“Java 只调接口、不关心字段细节”的新架构。

二、Java 索引程序职责(保留 & 对接)

2.1 现有职责(需保留)

  1. 索引触发与调度

    • 全量:
      • ShoplazzaProductIndexFullJobProductIndexServiceImpl.fullIndex(...)
      • 按 tenant 分页拉取 SPU,调用批量索引。
    • 增量:
      • MQ 消费(ShoplazzaProductCreateAndUpdateConsumerService);
      • 手工/API 触发增量索引 → incrementalIndex(tenantId, spuId)
  2. MySQL 基础数据维护

    • 店铺配置表 shoplazza_shop_config
      • 字段:
      • primary_language:店铺主语言;
      • translate_to_en:是否需要翻译成英文;
      • translate_to_zh:是否需要翻译成中文。
      • 逻辑:
      • 每晚商品同步(ShoplazzaProductSyncServiceImpl)时,根据店铺 locale/Shoplazza 配置,写入/更新 primary_language 与翻译开关字段。
    • 类目表 shoplazza_product_category
      • 同步/修正逻辑封装在 ProductCategoryService 中:
      • getProductCategoryByPathIdList(tenantId, categoryIdList)
      • 当 mapping 对不上时触发 syncProductCategoryByApi(shopId) 再重查。
  3. Shopify/Shoplazza 商品同步 & 并发控制

    • MQ 等机制用于削峰,避免店匠批量导入商品时压垮服务:
      • 同步逻辑在 ShoplazzaProductSyncServiceImpl
      • 对接 MQ 消息:商品创建、更新、删除等事件;
      • 对高并发导入,拆分为小批次写入 MySQL + 后续异步索引。
  4. 索引结构调整为 per-tenant

    • 在 Java 中已统一使用:
      • indexName = elasticsearchProperties.buildIndexName(tenantId);
      • 索引命名形如:{namespace}search_products_tenant_{tenant_id},其中 namespace 用于区分 prod/uat/test 等环境。
    • Python 侧通过 indexer/mapping_generator.get_tenant_index_name(tenant_id) 统一生成索引名:
      • 索引命名规则为:{ES_INDEX_NAMESPACE}search_products_tenant_{tenant_id}
      • ES_INDEX_NAMESPACE 来源于 config.env_config,默认 prod 环境为空字符串,其它环境可设置为 uat_test_ 等。

2.2 Java 侧不再深入关心的部分

  • ES 文档结构 ProductIndexDocument 的字段细节(title/brief/description/vendor/category_xxx/tags/specifications/skus/embedding 等)。
  • 翻译、向量等具体算法逻辑。
  • qanchors 等外部内容理解字段的生成。

新职责边界
Java 只负责“选出要索引的 SPU + 从 MySQL 拉取原始数据 + 调用 Python 服务(或交给 Python 做完整索引)”。


三、Python 索引富化模块职责

3.1 职责总览

  • 输入:MySQL 基础数据shoplazza_product_spu/sku/option/category/image 等)。
  • 输出:符合 mappings/search_products.json 的 doc 列表,包括:
    • 多语言文本字段:title.*, brief.*, description.*, vendor.*, category_path.*, category_name_text.*
    • 算法特征:title_embedding, image_embedding
    • 结构化字段:tags, specifications, skus, min_price, max_price, compare_at_price, total_inventory, sales 等。
  • 附加:
    • 翻译调用 & Redis 缓存(继承 Java 的 key 组织和 TTL 策略);
    • 向量生成(文本 & 图片);
    • ES 写入能力(Bulk & Incremental)。

3.2 当前 Python 模块结构(简述)

  • indexer/spu_transformer.py
    • 从 MySQL 读取 SPU/SKU/Option 数据。
  • indexer/document_transformer.py (SPUDocumentTransformer):
    • 把单个 SPU + SKUs + Options 转成 ES 文档(doc)。
  • indexer/bulk_indexing_service.py
    • 全量索引服务,调用 SPUTransformerSPUDocumentTransformerBulkIndexer 写 ES。
  • indexer/incremental_service.py
    • 增量索引服务,按 SPU 列表批量更新/删除 ES 文档。

新设计中,本模块还将新增:

  • HTTP 富化接口(例如 POST /index/enrich_docs / POST /index/enrich_and_index);
  • 翻译客户端 + Redis 缓存,按 Java 规则组织 key;
  • (可选)向量缓存

四、翻译与多语言字段设计(Java → Python 迁移)

4.1 语言决策策略(从 Java 迁移)

在 Java 中,语言决策逻辑在 ProductIndexConvert.convertToIndexDocument(...),现规则:

  1. 基础配置

    • primaryLanguage = shopConfig.primaryLanguage(主语言);
    • translateToEn = shopConfig.translateToEn
    • translateToZh = shopConfig.translateToZh
  2. 检测内容语言

    • 标题:queryTextLang = BaiDuTransApi.queryTextLang(spu.title)
    • 若检测不到,则视为 queryTextLang = primaryLanguage
  3. 确定源语言 defSrcLang 与目标翻译语言 defLang

    • 情况 A:primaryLanguage == queryTextLang(不缺主语言)
      • defSrcLang = primaryLanguage
      • translateToEn && primaryLanguage != "en"defLang = "en"
      • translateToZh && primaryLanguage != "zh"defLang = "zh"
    • 情况 B:primaryLanguage != queryTextLang(认为“缺主语言”)
      • defSrcLang = queryTextLang
      • defIsMissPrimaryLanguage = true
      • translateToEn && queryTextLang != "en"defLang = "en"
      • translateToZh && queryTextLang != "zh"defLang = "zh"
      • 若上述都不满足(没有翻到 en/zh),则回退:
      • defIsMissPrimaryLanguage = false
      • defLang = primaryLanguage(翻译回主语言)。
  • 兜底:若 defLang 仍为空,默认 defLang = "en"
  1. DocumentTranslation 元数据(用于后续检查/补偿)
documentTranslation.setDefSrcLang(defSrcLang);
documentTranslation.setDefLang(defLang);
documentTranslation.setDefQueryTextLang(queryTextLang);
documentTranslation.setDefIsMissPrimaryLanguage(isMissPrimaryLanguage);

类目字段category, category_path)有类似一套独立的决策逻辑,写入 defCategorySrcLang / defCategoryLang / defCategoryQueryTextLang / defCategoryIsMissPrimaryLanguage

Python 需做的:在 SPUDocumentTransformer 内部复刻这套决策逻辑,对 title/brief/description/vendor/keywords & category 字段分别计算源语言 / 目标语言 / 主语言缺失标记,保存在一个等价的结构中(不一定叫 DocumentTranslation,但含义相同)。

4.2 多语言字段填充规则

以标题为例(Java 中的 DocumentTitle):

  • 原始 title:spu.title
  • 多语言写入:
DocumentTitle title = new DocumentTitle();
title.set(defLang,      translationTitle)     // 翻译结果(例如 en 或 zh)
     .set(defSrcLang,  spu.getTitle())       // 原文
     .set(primaryLanguage, primaryTitle);    // 若缺主语言,则从 queryTextLang 翻回主语言
doc.setTitle(title);

同样模式适用于:

  • keywords:从 spu.seoKeywords 翻译生成;
  • brief:从 spu.brief 翻译生成;
  • description:从清理 HTML 后的 spu.description 翻译生成;
  • vendor:从 spu.vendor 翻译生成。

类目字段

  • category_name_text:基于 spu.category
  • category_path:基于类目表 product_category 的 name 列表拼出的路径字符串 allPathName

分别写入:

categoryNameText.set(categoryLang, translationCategory)
                .set(defLang,      spu.getCategory())
                .set(primaryLanguage, primaryCategory);
categoryPath.set(categoryLang, translationCategoryPath)
            .set(defLang,      allPathName)
            .set(primaryLanguage, primaryCategoryPath);

Python 需做的:在构造 doc 时,为各多语言字段生成 dict:

  • 至少包含 {defSrcLang: 原文}
  • 如有翻译,加入 {defLang: 翻译}
  • isMissPrimaryLanguage 为 true,再加入 {primaryLanguage: 回译结果}

五、翻译服务与 Redis 缓存设计(必须继承)

5.1 外部翻译接口

你当前要使用的翻译接口(Python 侧):

curl -X POST http://127.0.0.1:6006/translate \
  -H "Content-Type: application/json" \
  -d '{"text":"儿童小男孩女孩开学 100 天衬衫短袖 搞笑图案字母印花庆祝上衣",
       "target_lang":"en",
       "source_lang":"zh",
       "model":"qwen-mt",
       "scene":"sku_name"}'
  • 请求参数:
    • text:待翻译文本;
    • target_lang:目标语言(如 "en""zh" 等);
    • source_lang:源语言;
    • model:启用的翻译能力名称;
    • scene:翻译场景(如 sku_namegeneral)。
  • 响应(参考 Java TranslationServiceImpl.querySaasTranslate):
    • JSON 里包含 status 字段,如果是 "success",且 translated_text 非空,则返回翻译结果。

5.2 Redis 缓存 key 规则(与 Java 完全对齐)

TranslationServiceImpl 中,缓存 key 定义:

private static final Integer DEFAULT_TTL_DAYS = 30;

private String buildCacheKey(Long tenantId, String sourceText, String targetLang) {
    String hash = DigestUtils.md5Hex(sourceText);
    return String.format("translation:%s:%s:%s",
            tenantId, targetLang.toLowerCase(), hash);
}
  • Key 模式translation:{tenantId}:{targetLangLower}:{md5(sourceText)}
  • ValuetranslatedText(单纯的翻译结果字符串)。
  • TTL:30 天(DEFAULT_TTL_DAYS = 30)。

缓存读写逻辑:

// 读
String cache = queryCacheTranslation(tenantId, text, targetLang);
if (cache != null) {
    // 构造 TranslateDTO 返回
}

// 写
saveCacheTranslation(tenantId, text, targetLang, translatedText);

你在 Python 侧必须继承的:

  • 相同的 key 组织规则;
  • 相同的 TTL;
  • 相同的维度(tenant_id + 目标语言 + 原文 md5)。

这样可以复用以前在 Java 里已经积累的翻译缓存,也保证后续迁移过程中行为一致。


六、向量服务与缓存(扩展设计)

6.1 文本向量(title_embedding)

Java 侧:

List<Float> titleEmbedding = vectorService.generateTextVector(spu.getTitle());
if (StrUtil.isNotBlank(spu.getTitle()) && CollUtil.isNotEmpty(titleEmbedding)) {
    doc.setTitleEmbedding(titleEmbedding);
}

你当前 Python 侧已有:

  • embeddings/text_encoder.py(通过 6005 向量服务调用 Qwen3-Embedding-0.6B);
  • SPUDocumentTransformer._fill_title_embedding 已封装了调用 encoder 的逻辑。

建议缓存策略(可选,但推荐):

  • Key:text_vector:{model_name}:{md5(title)}
  • Value:向量数组(可序列化成 JSON 或 msgpack);
  • TTL:可设为较长时间(例如 30 天或不设置 TTL,由容量控制)。

6.2 图片向量(image_embedding)

Java 侧:

  • ShoplazzaProductImageDO.src 调用 vectorService.generateImageVector(image.getSrc())
  • 写入 image_embedding.vector(1024 维)+ url

Python 侧已有 embeddings/clip_encoder.py 可用 CN-CLIP 模型;缓存策略类似:

  • Key:image_vector:{model_name}:{md5(image_url)}

七、doc 组织逻辑迁移(从 Java 的 ProductIndexConvert 到 Python 的 SPUDocumentTransformer)

7.1 需要完整迁移的要点

7.1.1 基础字段

  • tenant_idspu.tenant_id
  • spu_idspu.id
  • create_time / update_time:格式化为 ISO 字符串(yyyy-MM-dd'T'HH:mm:ss);
  • 主图 image_url
    • image_srchttp 开头 → 直接使用;
    • 否则前缀 //

7.1.2 多语言字段(title/brief/description/vendor/keywords/category_name_text/category_path)

  • 完整复刻前文第 4 节的逻辑:

    • 语言决策;
    • 调翻译接口(带缓存);
    • 构造多语言对象(Python 中为 dict):
    title = {}
    title[src_lang] = spu.title
    if translation_title: title[def_lang] = translation_title
    if is_miss_primary and primary_title: title[primary_lang] = primary_title
    doc["title"] = title
    
  • keywords 来源:spu.seo_keywords

  • 类目路径需要从 product_category 表取 name 列表,按 level 排序后拼成 allPathName = "一级/二级/三级"

7.1.3 tags

  • 同 Java 逻辑:
if spu.tags:
    doc["tags"] = [t.strip() for t in spu.tags.split(",") if t.strip()]

7.1.4 规格、SKU、价格、库存

迁移 Java 的 parseOptions & parseSkus 逻辑:

  • option1_name, option2_name, option3_name

    • position 排序 Option 表;
    • 取前三个,写 name;
    • 每个 Option 的 values 去重后写入 optionX_values
    • 同时构建 valueNameMap[value] = optionName,用于构建 specifications
  • specifications

    • 遍历所有 SKU:
    • option1 非空:构造 1 条 {sku_id, name=valueNameMap[option1], value=option1}
    • 同理 option2option3
  • skus(nested):

    • 每条 SKU 映射为:
    • sku_id, price, compare_at_price, sku_code, stock, weight, weight_unit, option1_value, option2_value, option3_value, image_src
  • 聚合字段:

    • min_price / max_price:全体 SKU price 的最小/最大;
    • compare_at_price:全体 SKU compare_at_price 的最大值(若 SPU 有 compare_at_price 可优先);
    • sku_prices:所有 SKU price 列表;
    • sku_weights:所有 SKU weight(long)列表;
    • sku_weight_units:weight_unit 去重列表;
    • total_inventory:所有 SKU inventory_quantity 总和;
    • sales:虚拟销量 spu.fake_sales

7.2 qanchors / keywords 扩展

该能力已迁移到独立内容理解服务。本仓库仍保留字段模型与消费侧能力,但不再负责在 indexer 内部生成 qanchors / enriched_*


八、接口设计

8.1 保留的能力:直接写 ES(现有)

  • 全量索引

    • CLI:python main.py ingest ...scripts/ingest.sh
    • 入口:BulkIndexingService.bulk_index(tenant_id, recreate_index, batch_size)
    • 生成 tenant index 名;
    • 如需重建则删除再建索引;
    • 从 MySQL 拉数据 → SPUTransformer.transform_batch()BulkIndexer 写 ES。
  • 增量索引

    • IncrementalIndexerService.index_spus_to_es(es_client, tenant_id, spu_ids, index_name, batch_size, delete_spu_ids)
    • 对于 deleted / DB 已无的 SPU,删除 ES 文档;
    • 对仍存在的 SPU,从 MySQL 拉数据 → create_document_transformerSPUDocumentTransformer.transform_spu_to_doc → 批量写入 ES。

8.2 新增接口一:文档富化(不写 ES)

目的:供 Java 索引程序调用,仅获取 ES-ready docs,自行写入 ES,或作为后续多用途数据源。

  • 接口示例POST /index/enrich_docs
  • 入参(伪 JSON):
{
  "tenant_id": "123",
  "shop_config": {
    "primary_language": "en",
    "translate_to_en": true,
    "translate_to_zh": false
  },
  "spus": [
    {
      "spu": { /* 映射 shoplazza_product_spu */ },
      "skus": [ /* shoplazza_product_sku 列表 */ ],
      "options": [ /* shoplazza_product_option 列表 */ ],
      "images": [ /* shoplazza_product_image 列表(可选) */ ]
    },
    ...
  ]
}

可选:也支持只传 tenant_id + spu_ids,由 Python 侧自行查 MySQL(对接现有 SPUTransformer),但从职责划分上,更推荐 Java 查完基础数据再传给 Python

  • 出参
{
  "tenant_id": "123",
  "docs": [
    {
      "spu_id": "1",
      "tenant_id": "123",
      "title": { "en": "...", "zh": "...", ... },
      "brief": { ... },
      "description": { ... },
      "vendor": { ... },
      "tags": ["xxx","yyy"],
      "image_url": "...",
      "title_embedding": [ ... 1024 floats ... ],
      "image_embedding": [ { "url": "...", "vector": [ ... ] } ],
      "category_name_text": { ... },
      "category_path": { ... },
      "category_id": "xxx",
      "category1_name": "xxx",
      "specifications": [ ... ],
      "skus": [ ... ],
      "min_price": ...,
      "max_price": ...,
      "compare_at_price": ...,
      "total_inventory": ...,
      "sales": ...
    },
    ...
  ]
}

8.3 新增接口二:富化 + 写 ES

目的:Java 只负责调度,不关心 ES client 细节。

  • 接口示例POST /index/enrich_and_index
  • 入参:同上(基础数据 / spu_ids + tenant_id)。
  • 内部逻辑

    1. 按 4–7 节的规则构造 docs(含翻译 & 向量 & 缓存);
    2. 使用 BulkIndexer 写入 search_products_tenant_{tenant_id}
    3. 返回统计信息。
  • 出参(例):

{
  "tenant_id": "123",
  "index_name": "search_products_tenant_123",
  "total": 100,
  "indexed": 98,
  "failed": 2,
  "failed_spu_ids": ["456","789"],
  "elapsed_ms": 12345
}

九、小结

这份设计的目标是:

  • 保留现有 Java 调度 & 数据同步能力,不破坏已有全量/增量任务和 MQ 削峰;
  • 把 ES 文档结构、多语言逻辑、翻译与向量等算法能力全部收拢到 Python 索引富化模块,实现“单一 owner”;
  • 完全继承 Java 现有的翻译缓存策略(Redis key & TTL & 维度),保证行为与性能的一致性;
  • 为未来字段扩展(包括外部内容理解字段接入)预留清晰路径:字段模型可继续保留,但生成职责可独立演进。

十、实际 HTTP 接口与测试用例(速查)

10.1 端口与服务

  • ./scripts/start_backend.shmain.py serve → 端口 6002没有 /indexer/* 路由
  • ./scripts/start_indexer.shmain.py serve-indexer → 端口 6004,只暴露 /indexer/*

实际调用索引相关接口时,请始终访问 6004

10.2 关键接口

  • 构建文档(正式使用)POST /indexer/build-docs

    • 入参:tenant_id + items[ { spu, skus, options } ]
    • 输出:docs 数组,每个元素是完整 ES doc,不查库、不写 ES。
    • 注意:当前不再内置生成 qanchors / enriched_*;如需这些字段,请由独立内容理解服务生成后自行合并。
  • 构建文档(测试用,内部查库)POST /indexer/build-docs-from-db

    • 入参:{"tenant_id": "...", "spu_ids": ["..."]}
    • 内部:按 spu_ids 从 MySQL 查出 SPU/SKU/Option,再走与 build-docs 相同的转换逻辑。
  • 全量壳POST /indexer/reindex(查库 + 转 doc + 写 ES,用于自测)

  • 增量壳POST /indexer/index(查库 + 转 doc + 写 ES,用于自测)

  • 单文档查看POST /indexer/documents

  • 健康检查GET /indexer/health

10.3 典型测试流程(以 tenant 170, spu_id 223167 为例,支持多环境)

  1. 启动 indexer 服务(以本地或 prod 环境为例,.env 中 RUNTIME_ENV=prod, ES_INDEX_NAMESPACE=''):
./scripts/stop.sh
./scripts/start_indexer.sh
  1. 构建指定 SPU 的 ES doc:
curl -X POST "http://127.0.0.1:6004/indexer/build-docs-from-db" \
  -H "Content-Type: application/json" \
  -d '{"tenant_id": "170", "spu_ids": ["223167"]}'
  1. 预期返回(节选):
{
  "tenant_id": "170",
  "docs": [
    {
      "tenant_id": "170",
      "spu_id": "223167",
      "title": { "en": "...Floerns Women's Gothic...", "zh": "弗洛恩斯 女士哥特风..." },
      "tags": ["Floerns", "Clothing", "Shoes & Jewelry", "..."],
      "skus": [
        {
          "sku_id": "3988393",
          "price": 25.99,
          "compare_at_price": 25.99,
          "stock": 100
        }
      ],
      "min_price": 25.99,
      "max_price": 25.99,
      "compare_at_price": 25.99,
      "total_inventory": 100,
      "title_embedding": [ /* 1024 维向量 */ ]
    }
  ],
  "total": 1,
  "success_count": 1,
  "failed_count": 0,
  "failed": []
}
  1. 使用 docs/常用查询 - ES.md 中的查询,对应验证 ES 索引中的文档(注意索引名前缀):
# prod / 本地环境:ES_INDEX_NAMESPACE 为空,索引名为 search_products_tenant_170
curl -u 'essa:***' \
  -X GET 'http://localhost:9200/search_products_tenant_170/_search?pretty' \
  -H 'Content-Type: application/json' \
  -d '{
    "size": 5,
    "_source": ["title", "tags"],
    "query": {
      "bool": {
        "filter": [
          { "term": { "spu_id": "223167" } }
        ]
      }
    }
  }'

# UAT 环境:假设 .env 中配置 ES_INDEX_NAMESPACE=uat_,
# 则索引名为 uat_search_products_tenant_170
curl -u 'essa:***' \
  -X GET 'http://localhost:9200/uat_search_products_tenant_170/_search?pretty' \
  -H 'Content-Type: application/json' \
  -d '{
    "size": 5,
    "_source": ["title", "tags"],
    "query": {
      "bool": {
        "filter": [
          { "term": { "spu_id": "223167" } }
        ]
      }
    }
  }'

通过这套流程可以完整验证:MySQL → Python 富化 → ES doc → ES 查询 的全链路行为是否符合预期。*** End Patch"""} ***!