suggestion索引构建.md 18.6 KB

一、整体架构与索引设计

1. 索引命名 & 多租户约定

  • 每个租户的相关索引:

  • *版本化 suggestion 索引 *

    • 单个版本索引名:

    [ search_suggestions_tenant_{tenant_id}_v{yyyyMMddHHmmss} ]

    示例:search_suggestions_tenant_1_v20250318123045

    • 对应的读 alias(线上查询默认走 alias):

    [ search_suggestions_tenant_{tenant_id}_current ]

    示例:search_suggestions_tenant_1_current

  • 元信息索引(所有租户共用一个索引,每个租户一条文档)

    • 名称:

    [ search_suggestions_meta ]

    • 用于记录每个租户的元信息(_id = tenant_id): 该索引全局创建一次,每新增一个租户,插入一行,每次为一个租户做完全量,如果成功,更新一下对应的信息。 json "settings": { "number_of_shards": 1, "number_of_replicas": 0, "refresh_interval": "1s", }, "mappings": { "properties": { "tenant_id": {"type": "keyword"}, "active_alias": {"type": "keyword"}, "active_index": {"type": "keyword"}, "last_full_build_at": {"type": "date"}, "last_incremental_build_at": {"type": "date"}, "last_incremental_watermark": {"type": "date"}, "updated_at": {"type": "date"}, } } `` -active_alias(当前 alias 名) -active_index(当前实际索引名) -last_full_build_at -last_incremental_build_at -last_incremental_watermark -updated_at`

2. suggestion 索引 mapping 结构

search_suggestions.json

  • settings
  {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "refresh_interval": "30s",
    "analysis": {
      "analyzer": {
        "index_ik": {
          "type": "custom",
          "tokenizer": "ik_max_word",
          "filter": ["lowercase", "asciifolding"]
        },
        "query_ik": {
          "type": "custom",
          "tokenizer": "ik_smart",
          "filter": ["lowercase", "asciifolding"]
        }
      }
    }
  }
  • mappings.properties
  {
    "tenant_id":         { "type": "keyword" },
    "lang":              { "type": "keyword" },
    "text":              { "type": "keyword" },       // 显示给用户的原始文案
    "text_norm":         { "type": "keyword" },       // 归一化后文本:小写+空白规整
    "sources":           { "type": "keyword" },       // 来源集合:["title", "qanchor", "tag", "query_log"]
    "title_doc_count":   { "type": "integer" },
    "qanchor_doc_count": { "type": "integer" },
    "tag_doc_count":     { "type": "integer" },
    "query_count_7d":    { "type": "integer" },
    "query_count_30d":   { "type": "integer" },
    "rank_score":        { "type": "float" },         // 排序打分
    "lang_confidence":   { "type": "float" },
    "lang_source":       { "type": "keyword" },       // 语言来源:log_field / request_params / detector / fallback / default
    "lang_conflict":     { "type": "boolean" },       // 是否存在多来源语言冲突
    "status":            { "type": "byte" },          // 1 = 有效
    "updated_at":        { "type": "date" },

    "completion": {       // completion suggester 
      "properties": {
        "<lang>": {
          "type": "completion",
          "analyzer": "...",        // 见下文
          "search_analyzer": "..."  // zh 使用 query_ik
        }
      }
    },

    "sat": {              // search_as_you_type 用于 bool_prefix fallback
      "properties": {
        "<lang>": {
          "type": "search_as_you_type",
          "analyzer": "..."
        }
      }
    }
  }
  • 语言 → analyzer 映射(ANALYZER_BY_LANG)(只列关键):

    • "zh"index_ik / 搜索时 query_ik
    • "en"english
    • 其他:按 ES 内建语言 analyzer 映射

构建索引时,必须根据每个租户的 index_languages(如 ["en", "zh"])动态构造 completion.<lang>sat.<lang> 两棵子结构。 index_languages永远包括 en


二、全量构建(Full Rebuild)

对应 SuggestionIndexBuilder.rebuild_tenant_index(...)_build_full_candidates(...)

1. 输入参数

关键参数:

  • tenant_id: str
  • days: int:查询日志回溯天数
  • recreate: bool:仅 legacy 索引模式下生效,是否先删除旧索引
  • batch_size: int:从商品索引扫描商品的分页大小
  • min_query_len: int:过滤短查询
  • publish_alias: bool:是否构建完成后切 alias(只在 versioned 模式下起作用)
  • keep_versions: int:保留多少个最新版本索引

3. 构建目标索引

  1. 创建索引:
    • 索引名称:search_suggestions_tenant_{tenant_id}_v{yyyyMMddHHmmss}
    • 结构: search_suggestions.json

4. 构建候选词

4.1 从商品索引收集 title / qanchors / tags(Step 1)
  • 遍历店铺的所有商品:获取每个商品的 "spu_id", "title", "qanchors", "enriched_tags"(按 spu_idid.keyword 升序,便于 search_after 稳定分页)

    • 对每个商品文档:
  1. 确定 product_id
    • spu_id 优先,否则 id / _id
  2. title 字段结构:多语言对象,如:

     "title": { "en": "Running Shoes", "zh": "跑步鞋" }
    
  3. 对于每个 lang in index_languages

 - **title 处理**:
   - 从 `title[lang]` 取出字符串
   - 经过 `_prepare_title_for_suggest` 做长度与截断控制:
     - 若整体长度 ≤ 120,直接使用
     - 若过长:尝试按常见分隔符(中文逗号、英文逗号、分号、竖线、斜杠、括号、方括号等)截取第一段;仍然过长则硬截断到 120 字符,并去掉末尾多余分隔符。
   - 将结果记为 `text_raw`。若为空,跳过。
   - 归一化得到 `text_norm = _normalize_text(text_raw)`:
     - `NFKC` 归一化
     - `strip()`
     - `lower()`
     - 连续空白压缩为单个空格
   - 若 `_looks_noise(text_norm)` 为真(长度为 0 或 > 120,或全部为非字母数字符号),跳过。
   - 使用键 `(lang, text_norm)` 在候选表中查找 / 新建 `SuggestionCandidate`:
     - 初始字段:
       - `text = text_raw`
       - `text_norm = text_norm`
       - `lang = lang`
     - 调用 `add_product("title", spu_id=product_id)`:
       - 记录来源 `"title"`
       - 将 `product_id` 加入 `title_spu_ids`

 - **qanchors 处理**:
   - `qanchors` 字段同样为多语言对象:
     ```json
     "qanchors": { "en": ["slim fit", "sporty casual"], "zh": ["修身", "显瘦"] }
     ```
   - 取 `q_raw = qanchors[lang]`
   - 通过 `_split_qanchors(q_raw)` 拆分为若干字符串:
     - 若原值为 list → 去空白后直接返回
     - 若为字符串 → 按 `[,;|/\n\t]+` 拆分,去空白,保底返回原字符串
   - 对每个 `q_text`:
     - `text_norm = _normalize_text(q_text)`,再用 `_looks_noise` 过滤
     - 同样按 `(lang, text_norm)` 合并为 `SuggestionCandidate`,调用 `add_product("qanchor", spu_id=product_id)`。
  1. enriched_tags 处理(与 index_languages 循环并列):
    • enriched_tags 现为多语言对象,例如: json "enriched_tags": { "en": ["Classic", "ribbed neckline"], "zh": ["辣妹风"] }
    • 优先读取 enriched_tags[lang],每个值可为字符串数组,或逗号等分隔的单个字符串;经 _iter_product_tags 展开为若干条。
    • 对历史旧数据,若 enriched_tags 仍是单层字符串 / 数组,则继续走语言检测兜底,并约束在租户的 index_languages 内。
    • 通过 _looks_noise 后按 (lang, text_norm) 合并,调用 add_product("tag", spu_id=product_id)
4.2 从查询日志收集用户 query(Step 2)

对应 _iter_query_log_rows_build_full_candidates 的后半段。

  • 时间窗口:
    • now = 当前 UTC
    • since = now - days
    • since_7d = now - 7天
  • 数据源:MySQL 表 shoplazza_search_log

    • 字段:
    • tenant_id
    • query
    • language
    • request_params
    • create_time
    • deleted(0 表示有效)
    • 条件:
    • tenant_id = :tenant_id
    • deleted = 0
    • query IS NOT NULL AND query <> ''
    • create_time >= :since AND create_time < :now
    • 结果按 create_time ASC 排序,fetch_size 分页流式遍历。
  • 对每一行:

  1. q = row.query.strip()
    • len(q) < min_query_len 跳过。
  2. 语言解析 _resolve_query_language(...)

    • 输入:
      • query=q
      • log_language = row.language
      • request_params = row.request_params(可能是 JSON 字符串,内部再解析 language 字段)
      • index_languages & primary_language
    • 决策顺序:
      1. 日志字段 request_params.language 有值
      2. 用检测方法进行检测:_detect_script_language(query)
      3. 若都失败 → 回退到 primary_languagelang_confidence=0.3lang_source="default"
    • 若日志里的 languagerequest_params 解析出的语言不一致,则 lang_conflict=True
  3. 文本归一化:

    • text_norm = _normalize_text(q)
    • _looks_noise(text_norm) → 跳过。
  4. 合并到候选表:

 - key = `(lang, text_norm)`
 - 若不存在,则新建 `SuggestionCandidate`:
   - `text = q`
   - `lang = lang`
   - 其他字段默认。
 - 更新:
   - `lang_confidence = max(c.lang_confidence, conf)`
   - `lang_source`:第一次从 `"default"` 变成具体来源,之后保持已有非 default 值
   - `lang_conflict |= conflict`
   - 根据 `create_time` 是否在 `since_7d` 之后判断 `is_7d`
   - `add_query_log(is_7d)`:
     - `query_count_30d += 1`
     - 最近 7 天则 `query_count_7d += 1`

5. rank_score 计算与文档成型

  • 打分公式_compute_rank_score_compute_rank_score_from_candidate):

[ rank_score = 1.8 \cdot \log(1 + query_count_{30d}) + 1.2 \cdot \log(1 + query_count_{7d}) + 1.0 \cdot \log(1 + qanchor_doc_count) + 0.85 \cdot \log(1 + tag_doc_count) + 0.6 \cdot \log(1 + title_doc_count) ]

  • 构造最终文档(_candidate_to_doc):

    • _id = f"{tenant_id}|{lang}|{text_norm}"
    • completion 字段:
    "completion": {
      "<lang>": {
        "input": [text],                // 原始展示文案
        "weight": int(max(rank_score, 1.0) * 100)
      }
    }
    
    • sat 字段:
    "sat": {
      "<lang>": text
    }
    
    • 其余字段直接来自 SuggestionCandidate 聚合结果(见 mapping 部分)。
  • 将所有 doc 批量写入目标索引,然后执行 refresh

6. alias 发布与旧版本清理

  • publish_alias = trueuse_versioned_index = true
  1. 通过 alias 名(get_suggestion_alias_name)查出当前挂载的索引列表 current_indices
  2. 构造 update_aliases 操作:
    • 为所有旧索引 remove alias
    • 为新索引 add alias
  3. 调用 _cleanup_old_versions
    • 通过通配符 get_suggestion_versioned_index_pattern(tenant_id) 列出所有版本索引
    • 按名称排序,保留最新 keep_versions
    • 删除其余版本索引,但保护当前新索引
  4. 更新 meta 索引:
    • active_alias
    • active_index

7. meta 索引更新

  • 无论是否发布 alias,都会更新 meta:
  {
    "tenant_id": "<tenant_id>",
    "last_full_build_at": <now_utc_iso>,
    "last_incremental_watermark": <now_utc_iso>,
    "active_index": <index_name>,              // 若使用 versioned + alias
    "active_alias": "<alias_name>"             // ditto
  }
  • 更新行为是「读旧值 → merge patch → index 文档,refresh wait_for」。

三、增量构建(Incremental Update)

对应 incremental_update_tenant_index(...)_build_incremental_deltas(...), _build_incremental_actions(...)

1. 输入参数

  • tenant_id: str
  • min_query_len: int:过滤短 query
  • fallback_days: int:如果找不到上次水位线,从最近 N 天回补
  • overlap_minutes: int:窗口向前重叠几分钟,防止日志延迟
  • bootstrap_if_missing: bool:没有任何 suggestion 索引时是否先全量构建
  • bootstrap_days: int:bootstrap 全量构建的日志回溯天数
  • batch_size: int:仅在需要 bootstrap 全量构建时使用

2. 目标索引解析

  • _resolve_incremental_target_index(tenant_id)
  1. 首选:alias get_suggestion_alias_name(tenant_id) 所指向的索引(一般只会有一个)
  2. 若 alias 不存在:回退到 legacy 索引名 get_suggestion_legacy_index_name(tenant_id),且索引存在
  3. 若都不存在 → 返回 None
  • target_index is None

    • bootstrap_if_missing = false → 抛错,提示先跑全量或打开 bootstrap。
    • 否则执行一次全量构建 rebuild_tenant_index(...)
    • 参数:days=bootstrap_days, batch_size=batch_size, min_query_len, publish_alias=true, use_versioned_index=true
    • 返回 {"mode": "incremental", "bootstrapped": true, "bootstrap_result": ...}

3. 增量时间窗口计算

  • 从 meta 索引 _get_meta(tenant_id) 中读取:
    • last_incremental_watermarklast_full_build_at
  • 若是 ISO 字符串(可能带 Z 后缀)→ 解析为 UTC since
  • 若解析失败或不存在 → since = now - fallback_days
  • 然后:
    • since = since - overlap_minutes
    • 但不得早于 now - fallback_days(防止 overlap 过大)
  • 最终时间窗口:
  [since, now)

时间计算与 UTC 处理逻辑保持一致(字符串格式可兼容 ISO 8601)。

4. 构建 QueryDelta(按 query + 语言累积增量)

  • _build_incremental_deltas(...) 同样从表 shoplazza_search_log 读取,只是时间窗口换成增量窗口 sincenow
  • 对每一条日志:
    1. q = row.query.strip(),若 len(q) < min_query_len → 跳过。
    2. 仍然通过 _resolve_query_language 计算 (lang, conf, source, conflict)
    3. text_norm = _normalize_text(q)_looks_noise 过滤。
    4. (lang, text_norm),合并为 QueryDelta
      • 初始:
      • tenant_id
      • lang
      • text = q
      • text_norm
      • lang_confidence = conf
      • lang_source = source
      • lang_conflict = conflict
      • 更新逻辑:
      • delta_30d += 1
      • create_time >= now - 7ddelta_7d += 1
      • 若新的 conf > lang_confidence → 更新 lang_confidence & lang_source
      • lang_conflict |= conflict

5. 构建增量更新动作(Bulk Update with Scripted Upsert)

  • 将每个 QueryDelta 转换为 upsert 文档(_delta_to_upsert_doc):

    • 使用与全量同一套打分公式,但只基于增量的 delta_7d / delta_30d
    • 设置:
    • sources = ["query_log"]
    • title_doc_count = 0
    • qanchor_doc_count = 0
    • tag_doc_count = 0
    • completion.<lang>.input = [text]
    • completion.<lang>.weight = int(max(rank_score, 1.0) * 100)
    • sat.<lang> = text
    • status = 1
    • updated_at = now_iso(本次执行时间)
  • 为每个 delta 构造一个 bulk update 动作(_build_incremental_actions):

    • _op_type = "update"
    • _index = target_index
    • _id = f"{tenant_id}|{lang}|{text_norm}"
    • scripted_upsert = true
    • script.lang = "painless"
    • script.source 为一段 Painless 脚本(见 SuggestionIndexBuilder._build_incremental_update_script),其要点:
    • 若文档不存在 → 直接 ctx._source = params.upsert
    • 若存在:
      • 初始化空字段
      • query_count_30d += params.delta_30d
      • query_count_7d += params.delta_7d
      • "query_log" 加入 sources
      • lang_conflictparams.lang_conflict 取或
      • params.lang_confidence > ctx._source.lang_confidence 则更新 lang_confidencelang_source
      • 基于更新后的 query_count_7d/30d + qanchor_doc_count + tag_doc_count + title_doc_count 重新计算 rank_score
      • status = 1
      • updated_at = params.now_iso
      • 同步更新 text / lang / text_norm
      • 更新 completion[lang]inputweight
      • 更新 sat[lang] = text
    • script.params 包含:
    • delta_30d, delta_7d, lang_confidence, lang_source, lang_conflict
    • now_iso, lang, text, text_norm
    • completion_input, completion_weight
    • upsert(完整 upsert 文档)
  • 整个增量批次调 bulk / bulk_actions,执行完成后 refresh(target_index)

6. 更新 meta 索引

  • 更新字段:
  {
    "last_incremental_build_at": now_iso,
    "last_incremental_watermark": now_iso,
    "active_index": target_index,
    "active_alias": "<alias_name>"
  }

注意:即便外部不依赖 active_alias,最好仍然保持与现有 Python 实现一致,方便混合环境切换。


四、语言解析与噪音过滤

1. 文本归一化 _normalize_text(value: str)

  • unicodedata.normalize("NFKC", value or "")
  • strip()
  • 转小写
  • 用正则 \s+ 将多空白压缩成单个空格

2. 噪音检测 _looks_noise(text_value: str)

  • 任何一条为真即视为噪音,候选会被忽略:

    • 空字符串
    • 长度 > 120
    • re.fullmatch(r"[\W_]+", text_value):全是符号/下划线,没有文字

3. 语言标准化 _normalize_lang(lang)

  • 输入 lang 为空 → None
  • 转小写,- 替换成 _
  • 若是 "zh_tw" / "pt_br" → 保留全量
  • 其他 → 取 _ 前缀(例如 "en_US""en"

4. 查询日志 / tag 的语言回退 _resolve_query_languagedetect_text_language_for_suggestions

  • 日志语言优先级不变:language 字段 → request_params.language语言检测
  • 检测实现为 query.query_parser.detect_text_language_for_suggestions:内部使用与 QueryParser 相同的 LanguageDetectorquery/language_detector.py),并将结果约束到租户 index_languages(含 zh_tw 等与检测码的 base 匹配)。
  • 在线联想:SuggestionService 在合并 completion 与 SAT 结果后,按 ES_score × (1 / sqrt(词元数)) 排序(词元算法与 simple_tokenize_query 一致),再以 rank_score 作次要键,减轻长标题/长短语相对短词根的压制不足问题。