### 一、整体架构与索引设计 #### 1. 索引命名 & 多租户约定 - **每个租户的相关索引:** - **版本化 suggestion 索引 ** - 单个版本索引名: \[ search_suggestions_tenant_{tenant\_id}\_v{yyyyMMddHHmmss} \] 示例:`search_suggestions_tenant_1_v20250318123045` - 对应的读 alias(线上查询默认走 alias): \[ search_suggestions_tenant_{tenant\_id}\_current \] 示例:`search_suggestions_tenant_1_current` - **元信息索引(所有租户共用一个索引,每个租户一条文档)** - 名称: \[ search_suggestions\_meta \] - 用于记录每个租户的元信息(`_id = tenant_id`): 该索引全局创建一次,每新增一个租户,插入一行,每次为一个租户做完全量,如果成功,更新一下对应的信息。 ``` ```json "settings": { "number_of_shards": 1, "number_of_replicas": 0, "refresh_interval": "1s", }, "mappings": { "properties": { "tenant_id": {"type": "keyword"}, "active_alias": {"type": "keyword"}, "active_index": {"type": "keyword"}, "last_full_build_at": {"type": "date"}, "last_incremental_build_at": {"type": "date"}, "last_incremental_watermark": {"type": "date"}, "updated_at": {"type": "date"}, } } ``` - `active_alias`(当前 alias 名) - `active_index`(当前实际索引名) - `last_full_build_at` - `last_incremental_build_at` - `last_incremental_watermark` - `updated_at` #### 2. suggestion 索引 mapping 结构 见 [search_suggestions.json](http://gitlab.essa.top:88/ai-saas/saas-search/blob/master/mappings/search_suggestions.json) - **settings** ```json { "number_of_shards": 1, "number_of_replicas": 0, "refresh_interval": "30s", "analysis": { "analyzer": { "index_ik": { "type": "custom", "tokenizer": "ik_max_word", "filter": ["lowercase", "asciifolding"] }, "query_ik": { "type": "custom", "tokenizer": "ik_smart", "filter": ["lowercase", "asciifolding"] } } } } ``` - **mappings.properties** ```json { "tenant_id": { "type": "keyword" }, "lang": { "type": "keyword" }, "text": { "type": "keyword" }, // 显示给用户的原始文案 "text_norm": { "type": "keyword" }, // 归一化后文本:小写+空白规整 "sources": { "type": "keyword" }, // 来源集合:["title", "qanchor", "tag", "query_log"] "title_doc_count": { "type": "integer" }, "qanchor_doc_count": { "type": "integer" }, "tag_doc_count": { "type": "integer" }, "query_count_7d": { "type": "integer" }, "query_count_30d": { "type": "integer" }, "rank_score": { "type": "float" }, // 排序打分 "lang_confidence": { "type": "float" }, "lang_source": { "type": "keyword" }, // 语言来源:log_field / request_params / detector / fallback / default "lang_conflict": { "type": "boolean" }, // 是否存在多来源语言冲突 "status": { "type": "byte" }, // 1 = 有效 "updated_at": { "type": "date" }, "completion": { // completion suggester 用 "properties": { "": { "type": "completion", "analyzer": "...", // 见下文 "search_analyzer": "..." // zh 使用 query_ik } } }, "sat": { // search_as_you_type 用于 bool_prefix fallback "properties": { "": { "type": "search_as_you_type", "analyzer": "..." } } } } ``` - **语言 → analyzer 映射(ANALYZER_BY_LANG)**(只列关键): - `"zh"`:`index_ik` / 搜索时 `query_ik` - `"en"`:`english` - 其他:按 ES 内建语言 analyzer 映射 > 构建索引时,必须根据每个租户的 `index_languages`(如 `["en", "zh"]`)动态构造 `completion.` 与 `sat.` 两棵子结构。 > index_languages永远包括 en > --- ### 二、全量构建(Full Rebuild) 对应 `SuggestionIndexBuilder.rebuild_tenant_index(...)` 和 `_build_full_candidates(...)`。 #### 1. 输入参数 关键参数: - `tenant_id: str` - `days: int`:查询日志回溯天数 - `recreate: bool`:仅 legacy 索引模式下生效,是否先删除旧索引 - `batch_size: int`:从商品索引扫描商品的分页大小 - `min_query_len: int`:过滤短查询 - `publish_alias: bool`:是否构建完成后切 alias(只在 versioned 模式下起作用) - `keep_versions: int`:保留多少个最新版本索引 #### 3. 构建目标索引 1. 创建索引: - 索引名称:search_suggestions_tenant_{tenant\_id}\_v{yyyyMMddHHmmss} - 结构: search_suggestions.json #### 4. 构建候选词 ##### 4.1 从商品索引收集 title / qanchors / tags(Step 1) - 遍历店铺的所有商品:获取每个商品的 `"spu_id"`, `"title"`, `"qanchors"`, `"tags"`(按 `spu_id`、`id.keyword` 升序,便于 `search_after` 稳定分页) - 对每个商品文档: 1. 确定 `product_id`: - `spu_id` 优先,否则 `id` / `_id` 2. `title` 字段结构:**多语言对象**,如: ```json "title": { "en": "Running Shoes", "zh": "跑步鞋" } ``` 3. 对于每个 `lang in index_languages`: - **title 处理**: - 从 `title[lang]` 取出字符串 - 经过 `_prepare_title_for_suggest` 做长度与截断控制: - 若整体长度 ≤ 120,直接使用 - 若过长:尝试按常见分隔符(中文逗号、英文逗号、分号、竖线、斜杠、括号、方括号等)截取第一段;仍然过长则硬截断到 120 字符,并去掉末尾多余分隔符。 - 将结果记为 `text_raw`。若为空,跳过。 - 归一化得到 `text_norm = _normalize_text(text_raw)`: - `NFKC` 归一化 - `strip()` - `lower()` - 连续空白压缩为单个空格 - 若 `_looks_noise(text_norm)` 为真(长度为 0 或 > 120,或全部为非字母数字符号),跳过。 - 使用键 `(lang, text_norm)` 在候选表中查找 / 新建 `SuggestionCandidate`: - 初始字段: - `text = text_raw` - `text_norm = text_norm` - `lang = lang` - 调用 `add_product("title", spu_id=product_id)`: - 记录来源 `"title"` - 将 `product_id` 加入 `title_spu_ids` - **qanchors 处理**: - `qanchors` 字段同样为多语言对象: ```json "qanchors": { "en": "...", "zh": "..." } ``` - 取 `q_raw = qanchors[lang]` - 通过 `_split_qanchors(q_raw)` 拆分为若干字符串: - 若原值为 list → 去空白后直接返回 - 若为字符串 → 按 `[,;|/\n\t]+` 拆分,去空白,保底返回原字符串 - 对每个 `q_text`: - `text_norm = _normalize_text(q_text)`,再用 `_looks_noise` 过滤 - 同样按 `(lang, text_norm)` 合并为 `SuggestionCandidate`,调用 `add_product("qanchor", spu_id=product_id)`。 4. **tags 处理**(与 `index_languages` 循环并列,每个商品只做一次): - `tags` 可为字符串数组,或逗号等分隔的单个字符串;经 `_iter_product_tags` 展开为若干条。 - 每条 tag **无语言字段**:使用 `query.query_parser.detect_text_language_for_suggestions`(与 `QueryParser` 相同的 `LanguageDetector`)判定语言,并约束在租户的 `index_languages` 内。 - 通过 `_looks_noise` 后按 `(detected_lang, text_norm)` 合并,调用 `add_product("tag", spu_id=product_id)`。 ##### 4.2 从查询日志收集用户 query(Step 2) 对应 `_iter_query_log_rows` 与 `_build_full_candidates` 的后半段。 - 时间窗口: - `now = 当前 UTC` - `since = now - days` - `since_7d = now - 7天` - 数据源:**MySQL 表 `shoplazza_search_log`**: - 字段: - `tenant_id` - `query` - `language` - `request_params` - `create_time` - `deleted`(0 表示有效) - 条件: - `tenant_id = :tenant_id` - `deleted = 0` - `query IS NOT NULL AND query <> ''` - `create_time >= :since AND create_time < :now` - 结果按 `create_time ASC` 排序,`fetch_size` 分页流式遍历。 - 对每一行: 1. `q = row.query.strip()` - 若 `len(q) < min_query_len` 跳过。 2. 语言解析 `_resolve_query_language(...)`: - 输入: - `query=q` - `log_language = row.language` - `request_params = row.request_params`(可能是 JSON 字符串,内部再解析 `language` 字段) - `index_languages` & `primary_language` - 决策顺序: 1. **日志字段 request_params.language 有值** 2. 用检测方法进行检测:`_detect_script_language(query)`: 4. 若都失败 → 回退到 `primary_language`,`lang_confidence=0.3`,`lang_source="default"` - 若日志里的 `language` 与 `request_params` 解析出的语言不一致,则 `lang_conflict=True`。 3. 文本归一化: - `text_norm = _normalize_text(q)` - 若 `_looks_noise(text_norm)` → 跳过。 4. 合并到候选表: - key = `(lang, text_norm)` - 若不存在,则新建 `SuggestionCandidate`: - `text = q` - `lang = lang` - 其他字段默认。 - 更新: - `lang_confidence = max(c.lang_confidence, conf)` - `lang_source`:第一次从 `"default"` 变成具体来源,之后保持已有非 default 值 - `lang_conflict |= conflict` - 根据 `create_time` 是否在 `since_7d` 之后判断 `is_7d` - `add_query_log(is_7d)`: - `query_count_30d += 1` - 最近 7 天则 `query_count_7d += 1` #### 5. rank_score 计算与文档成型 - **打分公式**(`_compute_rank_score` 与 `_compute_rank_score_from_candidate`): \[ rank\_score = 1.8 \cdot \log(1 + query\_count\_{30d}) + 1.2 \cdot \log(1 + query\_count\_{7d}) + 1.0 \cdot \log(1 + qanchor\_doc\_count) + 0.85 \cdot \log(1 + tag\_doc\_count) + 0.6 \cdot \log(1 + title\_doc\_count) \] - 构造最终文档(`_candidate_to_doc`): - `_id = f"{tenant_id}|{lang}|{text_norm}"` - `completion` 字段: ```json "completion": { "": { "input": [text], // 原始展示文案 "weight": int(max(rank_score, 1.0) * 100) } } ``` - `sat` 字段: ```json "sat": { "": text } ``` - 其余字段直接来自 `SuggestionCandidate` 聚合结果(见 mapping 部分)。 - 将所有 doc 批量写入目标索引,然后执行 `refresh`。 #### 6. alias 发布与旧版本清理 - 若 `publish_alias = true` 且 `use_versioned_index = true`: 1. 通过 alias 名(`get_suggestion_alias_name`)查出当前挂载的索引列表 `current_indices` 2. 构造 `update_aliases` 操作: - 为所有旧索引 `remove` alias - 为新索引 `add` alias 3. 调用 `_cleanup_old_versions`: - 通过通配符 `get_suggestion_versioned_index_pattern(tenant_id)` 列出所有版本索引 - 按名称排序,保留最新 `keep_versions` 个 - 删除其余版本索引,但保护当前新索引 4. 更新 meta 索引: - `active_alias` - `active_index` #### 7. meta 索引更新 - 无论是否发布 alias,都会更新 meta: ```json { "tenant_id": "", "last_full_build_at": , "last_incremental_watermark": , "active_index": , // 若使用 versioned + alias "active_alias": "" // ditto } ``` - 更新行为是「读旧值 → merge patch → index 文档,refresh wait_for」。 --- ### 三、增量构建(Incremental Update) 对应 `incremental_update_tenant_index(...)` 与 `_build_incremental_deltas(...)`, `_build_incremental_actions(...)`。 #### 1. 输入参数 - `tenant_id: str` - `min_query_len: int`:过滤短 query - `fallback_days: int`:如果找不到上次水位线,从最近 N 天回补 - `overlap_minutes: int`:窗口向前重叠几分钟,防止日志延迟 - `bootstrap_if_missing: bool`:没有任何 suggestion 索引时是否先全量构建 - `bootstrap_days: int`:bootstrap 全量构建的日志回溯天数 - `batch_size: int`:仅在需要 bootstrap 全量构建时使用 #### 2. 目标索引解析 - `_resolve_incremental_target_index(tenant_id)`: 1. 首选:alias `get_suggestion_alias_name(tenant_id)` 所指向的索引(一般只会有一个) 2. 若 alias 不存在:回退到 legacy 索引名 `get_suggestion_legacy_index_name(tenant_id)`,且索引存在 3. 若都不存在 → 返回 `None` - 若 `target_index is None`: - 若 `bootstrap_if_missing = false` → 抛错,提示先跑全量或打开 bootstrap。 - 否则执行一次全量构建 `rebuild_tenant_index(...)`: - 参数:`days=bootstrap_days`, `batch_size=batch_size`, `min_query_len`, `publish_alias=true`, `use_versioned_index=true` - 返回 `{"mode": "incremental", "bootstrapped": true, "bootstrap_result": ...}` #### 3. 增量时间窗口计算 - 从 meta 索引 `_get_meta(tenant_id)` 中读取: - `last_incremental_watermark` 或 `last_full_build_at` - 若是 ISO 字符串(可能带 `Z` 后缀)→ 解析为 UTC `since`。 - 若解析失败或不存在 → `since = now - fallback_days` - 然后: - `since = since - overlap_minutes` - 但不得早于 `now - fallback_days`(防止 overlap 过大) - 最终时间窗口: ```text [since, now) ``` > 时间计算与 UTC 处理逻辑保持一致(字符串格式可兼容 ISO 8601)。 #### 4. 构建 QueryDelta(按 query + 语言累积增量) - `_build_incremental_deltas(...)` 同样从表 `shoplazza_search_log` 读取,只是时间窗口换成增量窗口 `since` ~ `now`。 - 对每一条日志: 1. `q = row.query.strip()`,若 `len(q) < min_query_len` → 跳过。 2. 仍然通过 `_resolve_query_language` 计算 `(lang, conf, source, conflict)`。 3. `text_norm = _normalize_text(q)`,`_looks_noise` 过滤。 4. 键 `(lang, text_norm)`,合并为 `QueryDelta`: - 初始: - `tenant_id` - `lang` - `text = q` - `text_norm` - `lang_confidence = conf` - `lang_source = source` - `lang_conflict = conflict` - 更新逻辑: - `delta_30d += 1` - 若 `create_time >= now - 7d` → `delta_7d += 1` - 若新的 `conf > lang_confidence` → 更新 `lang_confidence` & `lang_source` - `lang_conflict |= conflict` #### 5. 构建增量更新动作(Bulk Update with Scripted Upsert) - 将每个 `QueryDelta` 转换为 upsert 文档(`_delta_to_upsert_doc`): - 使用与全量同一套打分公式,但只基于增量的 `delta_7d / delta_30d`。 - 设置: - `sources = ["query_log"]` - `title_doc_count = 0` - `qanchor_doc_count = 0` - `tag_doc_count = 0` - `completion..input = [text]` - `completion..weight = int(max(rank_score, 1.0) * 100)` - `sat. = text` - `status = 1` - `updated_at = now_iso`(本次执行时间) - 为每个 delta 构造一个 bulk **update** 动作(`_build_incremental_actions`): - `_op_type = "update"` - `_index = target_index` - `_id = f"{tenant_id}|{lang}|{text_norm}"` - `scripted_upsert = true` - `script.lang = "painless"` - `script.source` 为一段 Painless 脚本(见 `SuggestionIndexBuilder._build_incremental_update_script`),其要点: - 若文档不存在 → 直接 `ctx._source = params.upsert` - 若存在: - 初始化空字段 - `query_count_30d += params.delta_30d` - `query_count_7d += params.delta_7d` - 将 `"query_log"` 加入 `sources` - `lang_conflict` 与 `params.lang_conflict` 取或 - 若 `params.lang_confidence > ctx._source.lang_confidence` 则更新 `lang_confidence` 和 `lang_source` - 基于更新后的 `query_count_7d/30d` + `qanchor_doc_count` + `tag_doc_count` + `title_doc_count` 重新计算 `rank_score` - `status = 1` - `updated_at = params.now_iso` - 同步更新 `text / lang / text_norm` - 更新 `completion[lang]` 的 `input` 与 `weight` - 更新 `sat[lang] = text` - `script.params` 包含: - `delta_30d`, `delta_7d`, `lang_confidence`, `lang_source`, `lang_conflict` - `now_iso`, `lang`, `text`, `text_norm` - `completion_input`, `completion_weight` - `upsert`(完整 upsert 文档) - 整个增量批次调 `bulk` / `bulk_actions`,执行完成后 `refresh(target_index)`。 #### 6. 更新 meta 索引 - 更新字段: ```json { "last_incremental_build_at": now_iso, "last_incremental_watermark": now_iso, "active_index": target_index, "active_alias": "" } ``` > 注意:即便外部不依赖 `active_alias`,最好仍然保持与现有 Python 实现一致,方便混合环境切换。 --- ### 四、语言解析与噪音过滤 #### 1. 文本归一化 `_normalize_text(value: str)` - `unicodedata.normalize("NFKC", value or "")` - `strip()` - 转小写 - 用正则 `\s+` 将多空白压缩成单个空格 #### 2. 噪音检测 `_looks_noise(text_value: str)` - 任何一条为真即视为噪音,候选会被忽略: - 空字符串 - 长度 > 120 - `re.fullmatch(r"[\W_]+", text_value)`:全是符号/下划线,没有文字 #### 3. 语言标准化 `_normalize_lang(lang)` - 输入 lang 为空 → None - 转小写,`-` 替换成 `_` - 若是 `"zh_tw"` / `"pt_br"` → 保留全量 - 其他 → 取 `_` 前缀(例如 `"en_US"` → `"en"`) #### 4. 查询日志 / tag 的语言回退 `_resolve_query_language` 与 `detect_text_language_for_suggestions` - 日志语言优先级不变:`language` 字段 → `request_params.language` → **语言检测**。 - 检测实现为 `query.query_parser.detect_text_language_for_suggestions`:内部使用与 `QueryParser` 相同的 `LanguageDetector`(`query/language_detector.py`),并将结果约束到租户 `index_languages`(含 `zh_tw` 等与检测码的 base 匹配)。 - 在线联想:`SuggestionService` 在合并 completion 与 SAT 结果后,按 `ES_score × (1 / sqrt(词元数))` 排序(词元算法与 `simple_tokenize_query` 一致),再以 `rank_score` 作次要键,减轻长标题/长短语相对短词根的压制不足问题。