一、整体架构与索引设计
1. 索引命名 & 多租户约定
每个租户的相关索引:
*版本化 suggestion 索引 *
- 单个版本索引名:
[ search_suggestions_tenant_{tenant_id}_v{yyyyMMddHHmmss} ]
示例:
search_suggestions_tenant_1_v20250318123045- 对应的读 alias(线上查询默认走 alias):
[ search_suggestions_tenant_{tenant_id}_current ]
示例:
search_suggestions_tenant_1_current元信息索引(所有租户共用一个索引,每个租户一条文档)
- 名称:
[ search_suggestions_meta ]
- 用于记录每个租户的元信息(
_id = tenant_id): 该索引全局创建一次,每新增一个租户,插入一行,每次为一个租户做完全量,如果成功,更新一下对应的信息。json "settings": { "number_of_shards": 1, "number_of_replicas": 0, "refresh_interval": "1s", }, "mappings": { "properties": { "tenant_id": {"type": "keyword"}, "active_alias": {"type": "keyword"}, "active_index": {"type": "keyword"}, "last_full_build_at": {"type": "date"}, "last_incremental_build_at": {"type": "date"}, "last_incremental_watermark": {"type": "date"}, "updated_at": {"type": "date"}, } } ``-active_alias(当前 alias 名) -active_index(当前实际索引名) -last_full_build_at-last_incremental_build_at-last_incremental_watermark-updated_at`
2. suggestion 索引 mapping 结构
- settings
{
"number_of_shards": 1,
"number_of_replicas": 0,
"refresh_interval": "30s",
"analysis": {
"analyzer": {
"index_ik": {
"type": "custom",
"tokenizer": "ik_max_word",
"filter": ["lowercase", "asciifolding"]
},
"query_ik": {
"type": "custom",
"tokenizer": "ik_smart",
"filter": ["lowercase", "asciifolding"]
}
}
}
}
- mappings.properties
{
"tenant_id": { "type": "keyword" },
"lang": { "type": "keyword" },
"text": { "type": "keyword" }, // 显示给用户的原始文案
"text_norm": { "type": "keyword" }, // 归一化后文本:小写+空白规整
"sources": { "type": "keyword" }, // 来源集合:["title", "qanchor", "tag", "query_log"]
"title_doc_count": { "type": "integer" },
"qanchor_doc_count": { "type": "integer" },
"tag_doc_count": { "type": "integer" },
"query_count_7d": { "type": "integer" },
"query_count_30d": { "type": "integer" },
"rank_score": { "type": "float" }, // 排序打分
"lang_confidence": { "type": "float" },
"lang_source": { "type": "keyword" }, // 语言来源:log_field / request_params / detector / fallback / default
"lang_conflict": { "type": "boolean" }, // 是否存在多来源语言冲突
"status": { "type": "byte" }, // 1 = 有效
"updated_at": { "type": "date" },
"completion": { // completion suggester 用
"properties": {
"<lang>": {
"type": "completion",
"analyzer": "...", // 见下文
"search_analyzer": "..." // zh 使用 query_ik
}
}
},
"sat": { // search_as_you_type 用于 bool_prefix fallback
"properties": {
"<lang>": {
"type": "search_as_you_type",
"analyzer": "..."
}
}
}
}
语言 → analyzer 映射(ANALYZER_BY_LANG)(只列关键):
"zh":index_ik/ 搜索时query_ik"en":english- 其他:按 ES 内建语言 analyzer 映射
构建索引时,必须根据每个租户的
index_languages(如["en", "zh"])动态构造completion.<lang>与sat.<lang>两棵子结构。 index_languages永远包括 en
二、全量构建(Full Rebuild)
对应 SuggestionIndexBuilder.rebuild_tenant_index(...) 和 _build_full_candidates(...)。
1. 输入参数
关键参数:
tenant_id: strdays: int:查询日志回溯天数recreate: bool:仅 legacy 索引模式下生效,是否先删除旧索引batch_size: int:从商品索引扫描商品的分页大小min_query_len: int:过滤短查询publish_alias: bool:是否构建完成后切 alias(只在 versioned 模式下起作用)keep_versions: int:保留多少个最新版本索引
3. 构建目标索引
- 创建索引:
- 索引名称:search_suggestions_tenant_{tenant_id}_v{yyyyMMddHHmmss}
- 结构: search_suggestions.json
4. 构建候选词
4.1 从商品索引收集 title / qanchors / tags(Step 1)
遍历店铺的所有商品:获取每个商品的
"spu_id","title","qanchors","enriched_tags"(按spu_id、id.keyword升序,便于search_after稳定分页)- 对每个商品文档:
- 确定
product_id:spu_id优先,否则id/_id
title字段结构:多语言对象,如:"title": { "en": "Running Shoes", "zh": "跑步鞋" }对于每个
lang in index_languages:
- **title 处理**:
- 从 `title[lang]` 取出字符串
- 经过 `_prepare_title_for_suggest` 做长度与截断控制:
- 若整体长度 ≤ 120,直接使用
- 若过长:尝试按常见分隔符(中文逗号、英文逗号、分号、竖线、斜杠、括号、方括号等)截取第一段;仍然过长则硬截断到 120 字符,并去掉末尾多余分隔符。
- 将结果记为 `text_raw`。若为空,跳过。
- 归一化得到 `text_norm = _normalize_text(text_raw)`:
- `NFKC` 归一化
- `strip()`
- `lower()`
- 连续空白压缩为单个空格
- 若 `_looks_noise(text_norm)` 为真(长度为 0 或 > 120,或全部为非字母数字符号),跳过。
- 使用键 `(lang, text_norm)` 在候选表中查找 / 新建 `SuggestionCandidate`:
- 初始字段:
- `text = text_raw`
- `text_norm = text_norm`
- `lang = lang`
- 调用 `add_product("title", spu_id=product_id)`:
- 记录来源 `"title"`
- 将 `product_id` 加入 `title_spu_ids`
- **qanchors 处理**:
- `qanchors` 字段同样为多语言对象:
```json
"qanchors": { "en": ["slim fit", "sporty casual"], "zh": ["修身", "显瘦"] }
```
- 取 `q_raw = qanchors[lang]`
- 通过 `_split_qanchors(q_raw)` 拆分为若干字符串:
- 若原值为 list → 去空白后直接返回
- 若为字符串 → 按 `[,;|/\n\t]+` 拆分,去空白,保底返回原字符串
- 对每个 `q_text`:
- `text_norm = _normalize_text(q_text)`,再用 `_looks_noise` 过滤
- 同样按 `(lang, text_norm)` 合并为 `SuggestionCandidate`,调用 `add_product("qanchor", spu_id=product_id)`。
- enriched_tags 处理(与
index_languages循环并列):enriched_tags现为多语言对象,例如:json "enriched_tags": { "en": ["Classic", "ribbed neckline"], "zh": ["辣妹风"] }- 优先读取
enriched_tags[lang],每个值可为字符串数组,或逗号等分隔的单个字符串;经_iter_product_tags展开为若干条。 - 对历史旧数据,若
enriched_tags仍是单层字符串 / 数组,则继续走语言检测兜底,并约束在租户的index_languages内。 - 通过
_looks_noise后按(lang, text_norm)合并,调用add_product("tag", spu_id=product_id)。
4.2 从查询日志收集用户 query(Step 2)
对应 _iter_query_log_rows 与 _build_full_candidates 的后半段。
- 时间窗口:
now = 当前 UTCsince = now - dayssince_7d = now - 7天
数据源:MySQL 表
shoplazza_search_log:- 字段:
tenant_idquerylanguagerequest_paramscreate_timedeleted(0 表示有效)- 条件:
tenant_id = :tenant_iddeleted = 0query IS NOT NULL AND query <> ''create_time >= :since AND create_time < :now- 结果按
create_time ASC排序,fetch_size分页流式遍历。
对每一行:
q = row.query.strip()- 若
len(q) < min_query_len跳过。
- 若
语言解析
_resolve_query_language(...):- 输入:
query=qlog_language = row.languagerequest_params = row.request_params(可能是 JSON 字符串,内部再解析language字段)index_languages&primary_language
- 决策顺序:
- 日志字段 request_params.language 有值
- 用检测方法进行检测:
_detect_script_language(query): - 若都失败 → 回退到
primary_language,lang_confidence=0.3,lang_source="default"
- 若日志里的
language与request_params解析出的语言不一致,则lang_conflict=True。
- 输入:
文本归一化:
text_norm = _normalize_text(q)- 若
_looks_noise(text_norm)→ 跳过。
合并到候选表:
- key = `(lang, text_norm)`
- 若不存在,则新建 `SuggestionCandidate`:
- `text = q`
- `lang = lang`
- 其他字段默认。
- 更新:
- `lang_confidence = max(c.lang_confidence, conf)`
- `lang_source`:第一次从 `"default"` 变成具体来源,之后保持已有非 default 值
- `lang_conflict |= conflict`
- 根据 `create_time` 是否在 `since_7d` 之后判断 `is_7d`
- `add_query_log(is_7d)`:
- `query_count_30d += 1`
- 最近 7 天则 `query_count_7d += 1`
5. rank_score 计算与文档成型
- 打分公式(
_compute_rank_score与_compute_rank_score_from_candidate):
[ rank_score = 1.8 \cdot \log(1 + query_count_{30d}) + 1.2 \cdot \log(1 + query_count_{7d}) + 1.0 \cdot \log(1 + qanchor_doc_count) + 0.85 \cdot \log(1 + tag_doc_count) + 0.6 \cdot \log(1 + title_doc_count) ]
构造最终文档(
_candidate_to_doc):_id = f"{tenant_id}|{lang}|{text_norm}"completion字段:
"completion": { "<lang>": { "input": [text], // 原始展示文案 "weight": int(max(rank_score, 1.0) * 100) } }sat字段:
"sat": { "<lang>": text }- 其余字段直接来自
SuggestionCandidate聚合结果(见 mapping 部分)。
将所有 doc 批量写入目标索引,然后执行
refresh。
6. alias 发布与旧版本清理
- 若
publish_alias = true且use_versioned_index = true:
- 通过 alias 名(
get_suggestion_alias_name)查出当前挂载的索引列表current_indices - 构造
update_aliases操作:- 为所有旧索引
removealias - 为新索引
addalias
- 为所有旧索引
- 调用
_cleanup_old_versions:- 通过通配符
get_suggestion_versioned_index_pattern(tenant_id)列出所有版本索引 - 按名称排序,保留最新
keep_versions个 - 删除其余版本索引,但保护当前新索引
- 通过通配符
- 更新 meta 索引:
active_aliasactive_index
7. meta 索引更新
- 无论是否发布 alias,都会更新 meta:
{
"tenant_id": "<tenant_id>",
"last_full_build_at": <now_utc_iso>,
"last_incremental_watermark": <now_utc_iso>,
"active_index": <index_name>, // 若使用 versioned + alias
"active_alias": "<alias_name>" // ditto
}
- 更新行为是「读旧值 → merge patch → index 文档,refresh wait_for」。
三、增量构建(Incremental Update)
对应 incremental_update_tenant_index(...) 与 _build_incremental_deltas(...), _build_incremental_actions(...)。
1. 输入参数
tenant_id: strmin_query_len: int:过滤短 queryfallback_days: int:如果找不到上次水位线,从最近 N 天回补overlap_minutes: int:窗口向前重叠几分钟,防止日志延迟bootstrap_if_missing: bool:没有任何 suggestion 索引时是否先全量构建bootstrap_days: int:bootstrap 全量构建的日志回溯天数batch_size: int:仅在需要 bootstrap 全量构建时使用
2. 目标索引解析
_resolve_incremental_target_index(tenant_id):
- 首选:alias
get_suggestion_alias_name(tenant_id)所指向的索引(一般只会有一个) - 若 alias 不存在:回退到 legacy 索引名
get_suggestion_legacy_index_name(tenant_id),且索引存在 - 若都不存在 → 返回
None
若
target_index is None:- 若
bootstrap_if_missing = false→ 抛错,提示先跑全量或打开 bootstrap。 - 否则执行一次全量构建
rebuild_tenant_index(...): - 参数:
days=bootstrap_days,batch_size=batch_size,min_query_len,publish_alias=true,use_versioned_index=true - 返回
{"mode": "incremental", "bootstrapped": true, "bootstrap_result": ...}
- 若
3. 增量时间窗口计算
- 从 meta 索引
_get_meta(tenant_id)中读取:last_incremental_watermark或last_full_build_at
- 若是 ISO 字符串(可能带
Z后缀)→ 解析为 UTCsince。 - 若解析失败或不存在 →
since = now - fallback_days - 然后:
since = since - overlap_minutes- 但不得早于
now - fallback_days(防止 overlap 过大)
- 最终时间窗口:
[since, now)
时间计算与 UTC 处理逻辑保持一致(字符串格式可兼容 ISO 8601)。
4. 构建 QueryDelta(按 query + 语言累积增量)
_build_incremental_deltas(...)同样从表shoplazza_search_log读取,只是时间窗口换成增量窗口since~now。- 对每一条日志:
q = row.query.strip(),若len(q) < min_query_len→ 跳过。- 仍然通过
_resolve_query_language计算(lang, conf, source, conflict)。 text_norm = _normalize_text(q),_looks_noise过滤。- 键
(lang, text_norm),合并为QueryDelta:- 初始:
tenant_idlangtext = qtext_normlang_confidence = conflang_source = sourcelang_conflict = conflict- 更新逻辑:
delta_30d += 1- 若
create_time >= now - 7d→delta_7d += 1 - 若新的
conf > lang_confidence→ 更新lang_confidence&lang_source lang_conflict |= conflict
5. 构建增量更新动作(Bulk Update with Scripted Upsert)
将每个
QueryDelta转换为 upsert 文档(_delta_to_upsert_doc):- 使用与全量同一套打分公式,但只基于增量的
delta_7d / delta_30d。 - 设置:
sources = ["query_log"]title_doc_count = 0qanchor_doc_count = 0tag_doc_count = 0completion.<lang>.input = [text]completion.<lang>.weight = int(max(rank_score, 1.0) * 100)sat.<lang> = textstatus = 1updated_at = now_iso(本次执行时间)
- 使用与全量同一套打分公式,但只基于增量的
为每个 delta 构造一个 bulk update 动作(
_build_incremental_actions):_op_type = "update"_index = target_index_id = f"{tenant_id}|{lang}|{text_norm}"scripted_upsert = truescript.lang = "painless"script.source为一段 Painless 脚本(见SuggestionIndexBuilder._build_incremental_update_script),其要点:- 若文档不存在 → 直接
ctx._source = params.upsert - 若存在:
- 初始化空字段
query_count_30d += params.delta_30dquery_count_7d += params.delta_7d- 将
"query_log"加入sources lang_conflict与params.lang_conflict取或- 若
params.lang_confidence > ctx._source.lang_confidence则更新lang_confidence和lang_source - 基于更新后的
query_count_7d/30d+qanchor_doc_count+tag_doc_count+title_doc_count重新计算rank_score status = 1updated_at = params.now_iso- 同步更新
text / lang / text_norm - 更新
completion[lang]的input与weight - 更新
sat[lang] = text
script.params包含:delta_30d,delta_7d,lang_confidence,lang_source,lang_conflictnow_iso,lang,text,text_normcompletion_input,completion_weightupsert(完整 upsert 文档)
整个增量批次调
bulk/bulk_actions,执行完成后refresh(target_index)。
6. 更新 meta 索引
- 更新字段:
{
"last_incremental_build_at": now_iso,
"last_incremental_watermark": now_iso,
"active_index": target_index,
"active_alias": "<alias_name>"
}
注意:即便外部不依赖
active_alias,最好仍然保持与现有 Python 实现一致,方便混合环境切换。
四、语言解析与噪音过滤
1. 文本归一化 _normalize_text(value: str)
unicodedata.normalize("NFKC", value or "")strip()- 转小写
- 用正则
\s+将多空白压缩成单个空格
2. 噪音检测 _looks_noise(text_value: str)
任何一条为真即视为噪音,候选会被忽略:
- 空字符串
- 长度 > 120
re.fullmatch(r"[\W_]+", text_value):全是符号/下划线,没有文字
3. 语言标准化 _normalize_lang(lang)
- 输入 lang 为空 → None
- 转小写,
-替换成_ - 若是
"zh_tw"/"pt_br"→ 保留全量 - 其他 → 取
_前缀(例如"en_US"→"en")
4. 查询日志 / tag 的语言回退 _resolve_query_language 与 detect_text_language_for_suggestions
- 日志语言优先级不变:
language字段 →request_params.language→ 语言检测。 - 检测实现为
query.query_parser.detect_text_language_for_suggestions:内部使用与QueryParser相同的LanguageDetector(query/language_detector.py),并将结果约束到租户index_languages(含zh_tw等与检测码的 base 匹配)。 - 在线联想:
SuggestionService在合并 completion 与 SAT 结果后,按ES_score × (1 / sqrt(词元数))排序(词元算法与simple_tokenize_query一致),再以rank_score作次要键,减轻长标题/长短语相对短词根的压制不足问题。