diff --git a/api/indexer_app.py b/api/indexer_app.py index d5544eb..41ad323 100644 --- a/api/indexer_app.py +++ b/api/indexer_app.py @@ -44,14 +44,21 @@ from utils import ESClient # noqa: E402 from utils.db_connector import create_db_connection # noqa: E402 from indexer.incremental_service import IncrementalIndexerService # noqa: E402 from indexer.bulk_indexing_service import BulkIndexingService # noqa: E402 +from suggestion import SuggestionIndexBuilder # noqa: E402 from .routes import indexer as indexer_routes # noqa: E402 -from .service_registry import set_es_client, set_indexer_services # noqa: E402 +from .routes import suggestion_indexer as suggestion_indexer_routes # noqa: E402 +from .service_registry import ( + set_es_client, + set_indexer_services, + set_suggestion_builder, +) # noqa: E402 _es_client: Optional[ESClient] = None _config = None _incremental_service: Optional[IncrementalIndexerService] = None _bulk_indexing_service: Optional[BulkIndexingService] = None +_suggestion_builder: Optional[SuggestionIndexBuilder] = None def init_indexer_service(es_host: str = "http://localhost:9200"): @@ -61,7 +68,7 @@ def init_indexer_service(es_host: str = "http://localhost:9200"): This mirrors the indexing-related initialization logic in api.app.init_service but without search-related components. """ - global _es_client, _config, _incremental_service, _bulk_indexing_service + global _es_client, _config, _incremental_service, _bulk_indexing_service, _suggestion_builder start_time = time.time() logger.info("Initializing Indexer service") @@ -108,10 +115,12 @@ def init_indexer_service(es_host: str = "http://localhost:9200"): _incremental_service = IncrementalIndexerService(db_engine) _bulk_indexing_service = BulkIndexingService(db_engine, _es_client) + _suggestion_builder = SuggestionIndexBuilder(es_client=_es_client, db_engine=db_engine) set_indexer_services( incremental_service=_incremental_service, bulk_indexing_service=_bulk_indexing_service, ) + set_suggestion_builder(_suggestion_builder) logger.info("Indexer services initialized (incremental + bulk)") else: missing = [ @@ -242,6 +251,8 @@ async def health_check(): # Mount the single source of truth indexer routes app.include_router(indexer_routes.router) +# Mount suggestion indexing routes (full + incremental) +app.include_router(suggestion_indexer_routes.router) if __name__ == "__main__": diff --git a/api/routes/suggestion_indexer.py b/api/routes/suggestion_indexer.py new file mode 100644 index 0000000..447ffac --- /dev/null +++ b/api/routes/suggestion_indexer.py @@ -0,0 +1,134 @@ +from typing import Any, Dict, Optional + +import asyncio +import logging + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel, Field + +from ..service_registry import get_suggestion_builder + + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/suggestions", tags=["suggestions"]) + + +class FullBuildRequest(BaseModel): + """全量构建 suggestion 索引""" + + tenant_id: str = Field(..., description="租户 ID") + days: int = Field(360, description="查询日志回溯天数") + batch_size: int = Field(500, description="商品扫描 batch 大小") + min_query_len: int = Field(1, description="最小查询长度过滤") + publish_alias: bool = Field( + True, + description="是否在构建完成后发布 alias 到新版本索引", + ) + keep_versions: int = Field( + 2, + description="保留的最新版本索引数量", + ) + + +class IncrementalBuildRequest(BaseModel): + """增量更新 suggestion 索引""" + + tenant_id: str = Field(..., description="租户 ID") + min_query_len: int = Field(1, description="最小查询长度过滤") + fallback_days: int = Field( + 7, + description="当没有增量水位线时,默认从最近多少天的查询日志开始补", + ) + overlap_minutes: int = Field( + 30, + description="增量窗口向前重叠的分钟数,用于防止日志延迟写入导致的遗漏", + ) + bootstrap_if_missing: bool = Field( + True, + description="当当前没有可用 suggestion 索引时,是否先做一次带 bootstrap_days 的全量构建", + ) + bootstrap_days: int = Field( + 30, + description="bootstrap 全量构建时的查询日志回溯天数", + ) + batch_size: int = Field( + 500, + description="当需要 bootstrap 全量构建时使用的商品扫描 batch 大小", + ) + + +async def _run_in_executor(func, *args, **kwargs) -> Dict[str, Any]: + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) + + +@router.post("/full") +async def build_full_suggestions(request: FullBuildRequest) -> Dict[str, Any]: + """ + 全量构建/重建指定租户的 suggestion 索引。 + + 该接口会: + - 从商品索引 + 查询日志中构建候选词 + - 写入新的 suggestion 索引(使用 versioned 命名) + - 根据配置决定是否切换 alias 并清理旧版本 + """ + builder = get_suggestion_builder() + if builder is None: + raise HTTPException(status_code=503, detail="Suggestion builder is not initialized") + + try: + result = await _run_in_executor( + builder.rebuild_tenant_index, + tenant_id=request.tenant_id, + days=request.days, + batch_size=request.batch_size, + min_query_len=request.min_query_len, + publish_alias=request.publish_alias, + keep_versions=request.keep_versions, + ) + return result + except Exception as e: + logger.error( + "Error in full suggestion rebuild for tenant_id=%s: %s", + request.tenant_id, + e, + exc_info=True, + ) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + + +@router.post("/incremental") +async def build_incremental_suggestions(request: IncrementalBuildRequest) -> Dict[str, Any]: + """ + 增量更新指定租户的 suggestion 索引。 + + - 从上次增量水位线(或全量构建时间)开始,到当前时间为止扫描查询日志 + - 根据查询频次对 suggestion 索引做增量 upsert + - 如当前不存在任何 suggestion 索引且 bootstrap_if_missing=true,则会先做一次 bootstrap 全量构建 + """ + builder = get_suggestion_builder() + if builder is None: + raise HTTPException(status_code=503, detail="Suggestion builder is not initialized") + + try: + result = await _run_in_executor( + builder.incremental_update_tenant_index, + tenant_id=request.tenant_id, + min_query_len=request.min_query_len, + fallback_days=request.fallback_days, + overlap_minutes=request.overlap_minutes, + bootstrap_if_missing=request.bootstrap_if_missing, + bootstrap_days=request.bootstrap_days, + batch_size=request.batch_size, + ) + return result + except Exception as e: + logger.error( + "Error in incremental suggestion update for tenant_id=%s: %s", + request.tenant_id, + e, + exc_info=True, + ) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + diff --git a/api/service_registry.py b/api/service_registry.py index 4f79b1e..c05f1fe 100644 --- a/api/service_registry.py +++ b/api/service_registry.py @@ -15,6 +15,7 @@ from typing import Any, Optional _es_client: Optional[Any] = None _incremental_service: Optional[Any] = None _bulk_indexing_service: Optional[Any] = None +_suggestion_builder: Optional[Any] = None def set_es_client(es_client: Any) -> None: @@ -41,3 +42,12 @@ def get_incremental_service() -> Optional[Any]: def get_bulk_indexing_service() -> Optional[Any]: return _bulk_indexing_service + +def set_suggestion_builder(builder: Any) -> None: + global _suggestion_builder + _suggestion_builder = builder + + +def get_suggestion_builder() -> Optional[Any]: + return _suggestion_builder + diff --git a/docs/TODO.txt b/docs/TODO.txt index eb9ef40..dcbd517 100644 --- a/docs/TODO.txt +++ b/docs/TODO.txt @@ -63,7 +63,7 @@ When sorting on a field, scores are not computed. By setting track_scores to tru - +provider backend 两者的关系,如何配合。 translator的设计 : QueryParser 里面 并不是调用的6006,目前是把6006做了一个provider,然后translate的总体配置又有6006的baseurl,很混乱!!! @@ -119,6 +119,9 @@ translation: +suggest 索引,现在是全量脚本,要交给金伟 + + 翻译,增加facebook/nllb-200-distilled-600M https://blog.csdn.net/qq_42746084/article/details/154947534 @@ -153,6 +156,16 @@ https://modelscope.cn/models/dengcao/Qwen3-Reranker-4B-GGUF/summary +reranker 补充:nvidia/llama-nemotron-rerank-1b-v2 +encoder架构。 +比较新。 +性能更好。 +亚马逊 电商搜索数据集比qwen-reranker-4b更好。 +支持vLLM。 + + + + 查看翻译的缓存情况 diff --git a/docs/suggestion索引构建.md b/docs/suggestion索引构建.md new file mode 100644 index 0000000..19ba9b7 --- /dev/null +++ b/docs/suggestion索引构建.md @@ -0,0 +1,503 @@ + +### 一、整体架构与索引设计 + +#### 1. 索引命名 & 多租户约定 + +- **每个租户的相关索引:** + +- **版本化 suggestion 索引 ** + + - 单个版本索引名: + + \[ + search_suggestions_tenant_{tenant\_id}\_v{yyyyMMddHHmmss} + \] + + 示例:`search_suggestions_tenant_1_v20250318123045` + + - 对应的读 alias(线上查询默认走 alias): + + \[ + search_suggestions_tenant_{tenant\_id}\_current + \] + + 示例:`search_suggestions_tenant_1_current` + +- **元信息索引(所有租户共用)** + + - 名称: + + \[ + search_suggestions\_meta + \] + + - 用于记录每个租户的: + - `active_alias`(当前 alias 名) + - `active_index`(当前实际索引名) + - `last_full_build_at` + - `last_incremental_build_at` + - `last_incremental_watermark` + - `updated_at` + + +#### 2. suggestion 索引 mapping 结构 + +出自 `suggestion/mapping.py` → `build_suggestion_mapping(index_languages)`: + +- **settings** + + ```json + { + "number_of_shards": 1, + "number_of_replicas": 0, + "refresh_interval": "30s", + "analysis": { + "analyzer": { + "index_ik": { + "type": "custom", + "tokenizer": "ik_max_word", + "filter": ["lowercase", "asciifolding"] + }, + "query_ik": { + "type": "custom", + "tokenizer": "ik_smart", + "filter": ["lowercase", "asciifolding"] + } + } + } + } + ``` + +- **mappings.properties** + + ```json + { + "tenant_id": { "type": "keyword" }, + "lang": { "type": "keyword" }, + "text": { "type": "keyword" }, // 显示给用户的原始文案 + "text_norm": { "type": "keyword" }, // 归一化后文本:小写+空白规整 + "sources": { "type": "keyword" }, // 来源集合:["title", "qanchor", "query_log"] + "title_doc_count": { "type": "integer" }, + "qanchor_doc_count": { "type": "integer" }, + "query_count_7d": { "type": "integer" }, + "query_count_30d": { "type": "integer" }, + "rank_score": { "type": "float" }, // 排序打分 + "lang_confidence": { "type": "float" }, + "lang_source": { "type": "keyword" }, // 语言来源:log_field / request_params / script / default + "lang_conflict": { "type": "boolean" }, // 是否存在多来源语言冲突 + "status": { "type": "byte" }, // 1 = 有效 + "updated_at": { "type": "date" }, + + "completion": { // completion suggester 用 + "properties": { + "": { + "type": "completion", + "analyzer": "...", // 见下文 + "search_analyzer": "..." // zh 使用 query_ik + } + } + }, + + "sat": { // search_as_you_type 用于 bool_prefix fallback + "properties": { + "": { + "type": "search_as_you_type", + "analyzer": "..." + } + } + } + } + ``` + + +- **语言 → analyzer 映射(ANALYZER_BY_LANG)**(只列关键): + + - `"zh"`:`index_ik` / 搜索时 `query_ik` + - `"en"`:`english` + - 其他:按 ES 内建语言 analyzer 映射 + +> 构建索引时,必须根据每个租户的 `index_languages`(如 `["en", "zh"]`)动态构造 `completion.` 与 `sat.` 两棵子结构。 +> index_languages永远包括 en +> + +--- + +### 二、全量构建(Full Rebuild) + +对应 `SuggestionIndexBuilder.rebuild_tenant_index(...)` 和 `_build_full_candidates(...)`。 + +#### 1. 输入参数 + +关键参数: + +- `tenant_id: str` +- `days: int`:查询日志回溯天数 +- `recreate: bool`:仅 legacy 索引模式下生效,是否先删除旧索引 +- `batch_size: int`:从商品索引扫描商品的分页大小 +- `min_query_len: int`:过滤短查询 +- `publish_alias: bool`:是否构建完成后切 alias(只在 versioned 模式下起作用) +- `keep_versions: int`:保留多少个最新版本索引 +- `use_versioned_index: bool`:true 使用 `*_v{timestamp}` 版本索引;false 使用 legacy 索引 + +#### 2. 每租户语言配置 + +- 通过 `tenant_config`(`get_tenant_config_loader().get_tenant_config(tenant_id)`)拿到: + - `index_languages: List[str]`,默认 `["en", "zh"]` + - `primary_language: str`,默认 `"en"` + +Java 端需要自己的 tenant 配置源,但**要保持同样字段含义与默认值**。 + +#### 3. 构建目标索引 + +1. 如果 `use_versioned_index = true`: + - 新索引名:`get_suggestion_versioned_index_name(tenant_id)`,即带时间戳后缀 +2. 否则(legacy 模式): + - 索引名:`get_suggestion_legacy_index_name(tenant_id)` + - 若 `recreate = true` 且索引已存在,先删除再重建 +3. 若目标索引已经存在 → 抛错(防止误覆盖) + +4. 按上文 mapping 创建索引。 + +#### 4. 构建候选词 + +##### 4.1 从商品索引收集 title / qanchors(Step 1) + +对应 `_iter_products` 与 `_build_full_candidates` 的前半段。 + +- 数据源:**商品 ES 索引**(SPU 索引) + - 获取当前租户商品索引名称:通过 `indexer.mapping_generator.get_tenant_index_name(tenant_id)` 获取 + - 遍历店铺的所有商品:获取每个商品的 `"spu_id"`, `"title"`, `"qanchors"` 3个字段(按`spu_id`升序) + + +- 对每个商品文档: + + 1. 确定 `product_id`: + - `spu_id` 优先,否则 `id` / `_id` + 2. `title` 字段结构:**多语言对象**,如: + + ```json + "title": { "en": "Running Shoes", "zh": "跑步鞋" } + ``` + + 3. 对于每个 `lang in index_languages`: + + - **title 处理**: + - 从 `title[lang]` 取出字符串 + - 经过 `_prepare_title_for_suggest` 做长度与截断控制: + - 若整体长度 ≤ 120,直接使用 + - 若过长:尝试按常见分隔符(中文逗号、英文逗号、分号、竖线、斜杠、括号、方括号等)截取第一段;仍然过长则硬截断到 120 字符,并去掉末尾多余分隔符。 + - 将结果记为 `text_raw`。若为空,跳过。 + - 归一化得到 `text_norm = _normalize_text(text_raw)`: + - `NFKC` 归一化 + - `strip()` + - `lower()` + - 连续空白压缩为单个空格 + - 若 `_looks_noise(text_norm)` 为真(长度为 0 或 > 120,或全部为非字母数字符号),跳过。 + - 使用键 `(lang, text_norm)` 在候选表中查找 / 新建 `SuggestionCandidate`: + - 初始字段: + - `text = text_raw` + - `text_norm = text_norm` + - `lang = lang` + - 调用 `add_product("title", spu_id=product_id)`: + - 记录来源 `"title"` + - 将 `product_id` 加入 `title_spu_ids` + + - **qanchors 处理**: + - `qanchors` 字段同样为多语言对象: + ```json + "qanchors": { "en": "...", "zh": "..." } + ``` + - 取 `q_raw = qanchors[lang]` + - 通过 `_split_qanchors(q_raw)` 拆分为若干字符串: + - 若原值为 list → 去空白后直接返回 + - 若为字符串 → 按 `[,;|/\n\t]+` 拆分,去空白,保底返回原字符串 + - 对每个 `q_text`: + - `text_norm = _normalize_text(q_text)`,再用 `_looks_noise` 过滤 + - 同样按 `(lang, text_norm)` 合并为 `SuggestionCandidate`,调用 `add_product("qanchor", spu_id=product_id)`。 + +##### 4.2 从查询日志收集用户 query(Step 2) + +对应 `_iter_query_log_rows` 与 `_build_full_candidates` 的后半段。 + +- 时间窗口: + - `now = 当前 UTC` + - `since = now - days` + - `since_7d = now - 7天` +- 数据源:**MySQL 表 `shoplazza_search_log`**: + - 字段: + - `tenant_id` + - `query` + - `language` + - `request_params` + - `create_time` + - `deleted`(0 表示有效) + - 条件: + - `tenant_id = :tenant_id` + - `deleted = 0` + - `query IS NOT NULL AND query <> ''` + - `create_time >= :since AND create_time < :now` + - 结果按 `create_time ASC` 排序,`fetch_size` 分页流式遍历。 + +- 对每一行: + + 1. `q = row.query.strip()` + - 若 `len(q) < min_query_len` 跳过。 + 2. 语言解析 `_resolve_query_language(...)`: + - 输入: + - `query=q` + - `log_language = row.language` + - `request_params = row.request_params`(可能是 JSON 字符串,内部再解析 `language` 字段) + - `index_languages` & `primary_language` + - 决策顺序: + 1. **日志字段 request_params.language 有值** + 2. 用检测方法进行检测:`_detect_script_language(query)`: + 4. 若都失败 → 回退到 `primary_language`,`lang_confidence=0.3`,`lang_source="default"` + - 若日志里的 `language` 与 `request_params` 解析出的语言不一致,则 `lang_conflict=True`。 + + 3. 文本归一化: + - `text_norm = _normalize_text(q)` + - 若 `_looks_noise(text_norm)` → 跳过。 + + 4. 合并到候选表: + + - key = `(lang, text_norm)` + - 若不存在,则新建 `SuggestionCandidate`: + - `text = q` + - `lang = lang` + - 其他字段默认。 + - 更新: + - `lang_confidence = max(c.lang_confidence, conf)` + - `lang_source`:第一次从 `"default"` 变成具体来源,之后保持已有非 default 值 + - `lang_conflict |= conflict` + - 根据 `create_time` 是否在 `since_7d` 之后判断 `is_7d` + - `add_query_log(is_7d)`: + - `query_count_30d += 1` + - 最近 7 天则 `query_count_7d += 1` + +#### 5. rank_score 计算与文档成型 + +- **打分公式**(`_compute_rank_score` 与 `_compute_rank_score_from_candidate`): + + \[ + rank\_score = + 1.8 \cdot \log(1 + query\_count\_{30d}) + + 1.2 \cdot \log(1 + query\_count\_{7d}) + + 1.0 \cdot \log(1 + qanchor\_doc\_count) + + 0.6 \cdot \log(1 + title\_doc\_count) + \] + +- 构造最终文档(`_candidate_to_doc`): + + - `_id = f"{tenant_id}|{lang}|{text_norm}"` + - `completion` 字段: + + ```json + "completion": { + "": { + "input": [text], // 原始展示文案 + "weight": int(max(rank_score, 1.0) * 100) + } + } + ``` + + - `sat` 字段: + + ```json + "sat": { + "": text + } + ``` + + - 其余字段直接来自 `SuggestionCandidate` 聚合结果(见 mapping 部分)。 + +- 将所有 doc 批量写入目标索引,然后执行 `refresh`。 + +#### 6. alias 发布与旧版本清理 + +- 若 `publish_alias = true` 且 `use_versioned_index = true`: + + 1. 通过 alias 名(`get_suggestion_alias_name`)查出当前挂载的索引列表 `current_indices` + 2. 构造 `update_aliases` 操作: + - 为所有旧索引 `remove` alias + - 为新索引 `add` alias + 3. 调用 `_cleanup_old_versions`: + - 通过通配符 `get_suggestion_versioned_index_pattern(tenant_id)` 列出所有版本索引 + - 按名称排序,保留最新 `keep_versions` 个 + - 删除其余版本索引,但保护当前新索引 + 4. 更新 meta 索引: + - `active_alias` + - `active_index` + +#### 7. meta 索引更新 + +- 无论是否发布 alias,都会更新 meta: + + ```json + { + "tenant_id": "", + "last_full_build_at": , + "last_incremental_watermark": , + "active_index": , // 若使用 versioned + alias + "active_alias": "" // ditto + } + ``` + +- 更新行为是「读旧值 → merge patch → index 文档,refresh wait_for」。 + +--- + +### 三、增量构建(Incremental Update) + +对应 `incremental_update_tenant_index(...)` 与 `_build_incremental_deltas(...)`, `_build_incremental_actions(...)`。 + +#### 1. 输入参数 + +- `tenant_id: str` +- `min_query_len: int`:过滤短 query +- `fallback_days: int`:如果找不到上次水位线,从最近 N 天回补 +- `overlap_minutes: int`:窗口向前重叠几分钟,防止日志延迟 +- `bootstrap_if_missing: bool`:没有任何 suggestion 索引时是否先全量构建 +- `bootstrap_days: int`:bootstrap 全量构建的日志回溯天数 +- `batch_size: int`:仅在需要 bootstrap 全量构建时使用 + +#### 2. 目标索引解析 + +- `_resolve_incremental_target_index(tenant_id)`: + + 1. 首选:alias `get_suggestion_alias_name(tenant_id)` 所指向的索引(一般只会有一个) + 2. 若 alias 不存在:回退到 legacy 索引名 `get_suggestion_legacy_index_name(tenant_id)`,且索引存在 + 3. 若都不存在 → 返回 `None` + +- 若 `target_index is None`: + + - 若 `bootstrap_if_missing = false` → 抛错,提示先跑全量或打开 bootstrap。 + - 否则执行一次全量构建 `rebuild_tenant_index(...)`: + - 参数:`days=bootstrap_days`, `batch_size=batch_size`, `min_query_len`, `publish_alias=true`, `use_versioned_index=true` + - 返回 `{"mode": "incremental", "bootstrapped": true, "bootstrap_result": ...}` + +#### 3. 增量时间窗口计算 + +- 从 meta 索引 `_get_meta(tenant_id)` 中读取: + - `last_incremental_watermark` 或 `last_full_build_at` +- 若是 ISO 字符串(可能带 `Z` 后缀)→ 解析为 UTC `since`。 +- 若解析失败或不存在 → `since = now - fallback_days` +- 然后: + - `since = since - overlap_minutes` + - 但不得早于 `now - fallback_days`(防止 overlap 过大) +- 最终时间窗口: + + ```text + [since, now) + ``` + +> 时间计算与 UTC 处理逻辑保持一致(字符串格式可兼容 ISO 8601)。 + +#### 4. 构建 QueryDelta(按 query + 语言累积增量) + +- `_build_incremental_deltas(...)` 同样从表 `shoplazza_search_log` 读取,只是时间窗口换成增量窗口 `since` ~ `now`。 +- 对每一条日志: + 1. `q = row.query.strip()`,若 `len(q) < min_query_len` → 跳过。 + 2. 仍然通过 `_resolve_query_language` 计算 `(lang, conf, source, conflict)`。 + 3. `text_norm = _normalize_text(q)`,`_looks_noise` 过滤。 + 4. 键 `(lang, text_norm)`,合并为 `QueryDelta`: + - 初始: + - `tenant_id` + - `lang` + - `text = q` + - `text_norm` + - `lang_confidence = conf` + - `lang_source = source` + - `lang_conflict = conflict` + - 更新逻辑: + - `delta_30d += 1` + - 若 `create_time >= now - 7d` → `delta_7d += 1` + - 若新的 `conf > lang_confidence` → 更新 `lang_confidence` & `lang_source` + - `lang_conflict |= conflict` + +#### 5. 构建增量更新动作(Bulk Update with Scripted Upsert) + +- 将每个 `QueryDelta` 转换为 upsert 文档(`_delta_to_upsert_doc`): + - 使用与全量同一套打分公式,但只基于增量的 `delta_7d / delta_30d`。 + - 设置: + - `sources = ["query_log"]` + - `title_doc_count = 0` + - `qanchor_doc_count = 0` + - `completion..input = [text]` + - `completion..weight = int(max(rank_score, 1.0) * 100)` + - `sat. = text` + - `status = 1` + - `updated_at = now_iso`(本次执行时间) + +- 为每个 delta 构造一个 bulk **update** 动作(`_build_incremental_actions`): + + - `_op_type = "update"` + - `_index = target_index` + - `_id = f"{tenant_id}|{lang}|{text_norm}"` + - `scripted_upsert = true` + - `script.lang = "painless"` + - `script.source` 为一段 Painless 脚本(见 `SuggestionIndexBuilder._build_incremental_update_script`),其要点: + + - 若文档不存在 → 直接 `ctx._source = params.upsert` + - 若存在: + - 初始化空字段 + - `query_count_30d += params.delta_30d` + - `query_count_7d += params.delta_7d` + - 将 `"query_log"` 加入 `sources` + - `lang_conflict` 与 `params.lang_conflict` 取或 + - 若 `params.lang_confidence > ctx._source.lang_confidence` 则更新 `lang_confidence` 和 `lang_source` + - 基于更新后的 `query_count_7d/30d` + `qanchor_doc_count` + `title_doc_count` 重新计算 `rank_score` + - `status = 1` + - `updated_at = params.now_iso` + - 同步更新 `text / lang / text_norm` + - 更新 `completion[lang]` 的 `input` 与 `weight` + - 更新 `sat[lang] = text` + + - `script.params` 包含: + - `delta_30d`, `delta_7d`, `lang_confidence`, `lang_source`, `lang_conflict` + - `now_iso`, `lang`, `text`, `text_norm` + - `completion_input`, `completion_weight` + - `upsert`(完整 upsert 文档) + +- 整个增量批次调 `bulk` / `bulk_actions`,执行完成后 `refresh(target_index)`。 + +#### 6. 更新 meta 索引 + +- 更新字段: + + ```json + { + "last_incremental_build_at": now_iso, + "last_incremental_watermark": now_iso, + "active_index": target_index, + "active_alias": "" + } + ``` + +> 注意:即便外部不依赖 `active_alias`,最好仍然保持与现有 Python 实现一致,方便混合环境切换。 + +--- + +### 四、语言解析与噪音过滤 + +#### 1. 文本归一化 `_normalize_text(value: str)` + +- `unicodedata.normalize("NFKC", value or "")` +- `strip()` +- 转小写 +- 用正则 `\s+` 将多空白压缩成单个空格 + +#### 2. 噪音检测 `_looks_noise(text_value: str)` + +- 任何一条为真即视为噪音,候选会被忽略: + + - 空字符串 + - 长度 > 120 + - `re.fullmatch(r"[\W_]+", text_value)`:全是符号/下划线,没有文字 + +#### 3. 语言标准化 `_normalize_lang(lang)` + +- 输入 lang 为空 → None +- 转小写,`-` 替换成 `_` +- 若是 `"zh_tw"` / `"pt_br"` → 保留全量 +- 其他 → 取 `_` 前缀(例如 `"en_US"` → `"en"`) + diff --git a/frontend/index.html b/frontend/index.html index 20eceee..ede46f0 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -281,7 +281,10 @@ return; } - const url = new URL(SUGGEST_API); + // SUGGEST_API 可能是相对路径(例如 "/search/suggestions"), + // 直接 new URL(SUGGEST_API) 在浏览器环境下会因缺少 base URL 报错。 + // 显式指定 window.location.origin 作为 base,保证同源调用正常工作。 + const url = new URL(SUGGEST_API, window.location.origin); url.searchParams.set('q', query); url.searchParams.set('size', '40'); url.searchParams.set('language', getSelectedLang()); diff --git a/frontend/static/js/app.js b/frontend/static/js/app.js index 4f45b22..7875a39 100644 --- a/frontend/static/js/app.js +++ b/frontend/static/js/app.js @@ -30,7 +30,7 @@ function getTenantId() { if (tenantSelect) { return tenantSelect.value.trim(); } - return '170'; // Default fallback + return ''; } // Get sku_filter_dimension (as list) from input diff --git a/main.py b/main.py index 4bb7e9c..36aabe0 100755 --- a/main.py +++ b/main.py @@ -138,12 +138,10 @@ def cmd_build_suggestions(args): result = builder.rebuild_tenant_index( tenant_id=args.tenant_id, days=args.days, - recreate=args.recreate, batch_size=args.batch_size, min_query_len=args.min_query_len, publish_alias=args.publish_alias, keep_versions=args.keep_versions, - use_versioned_index=not args.no_versioned_index, ) else: result = builder.incremental_update_tenant_index( @@ -224,16 +222,6 @@ def main(): help='For full mode: keep latest N versioned indices', ) suggest_build_parser.add_argument( - '--no-versioned-index', - action='store_true', - help='For full mode: write to legacy concrete index (not recommended)', - ) - suggest_build_parser.add_argument( - '--recreate', - action='store_true', - help='For legacy concrete index mode: delete and recreate target index before build', - ) - suggest_build_parser.add_argument( '--incremental-fallback-days', type=int, default=7, diff --git a/suggestion/builder.py b/suggestion/builder.py index 90731c0..96f27c0 100644 --- a/suggestion/builder.py +++ b/suggestion/builder.py @@ -30,13 +30,8 @@ def _index_prefix() -> str: return ES_INDEX_NAMESPACE or "" -def get_suggestion_legacy_index_name(tenant_id: str) -> str: - """Legacy concrete index name (Phase1 compatibility).""" - return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}" - - def get_suggestion_alias_name(tenant_id: str) -> str: - """Read alias for suggestion index (Phase2 default search target).""" + """Read alias for suggestion index (single source of truth).""" return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_current" @@ -54,15 +49,6 @@ def get_suggestion_meta_index_name() -> str: return f"{_index_prefix()}search_suggestions_meta" -def get_suggestion_index_name(tenant_id: str) -> str: - """ - Search target for suggestion query. - - Phase2 uses alias by default. - """ - return get_suggestion_alias_name(tenant_id) - - @dataclass class SuggestionCandidate: text: str @@ -431,15 +417,12 @@ class SuggestionIndexBuilder: } def _resolve_incremental_target_index(self, tenant_id: str) -> Optional[str]: + """Resolve active suggestion index for incremental updates (alias only).""" alias_name = get_suggestion_alias_name(tenant_id) aliased = self.es_client.get_alias_indices(alias_name) if aliased: # alias should map to one index in this design return sorted(aliased)[-1] - - legacy = get_suggestion_legacy_index_name(tenant_id) - if self.es_client.index_exists(legacy): - return legacy return None def _build_full_candidates( @@ -556,12 +539,10 @@ class SuggestionIndexBuilder: self, tenant_id: str, days: int = 365, - recreate: bool = False, batch_size: int = 500, min_query_len: int = 1, publish_alias: bool = True, keep_versions: int = 2, - use_versioned_index: bool = True, ) -> Dict[str, Any]: """ Full rebuild. @@ -575,13 +556,8 @@ class SuggestionIndexBuilder: index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"] primary_language: str = tenant_cfg.get("primary_language") or "en" - if use_versioned_index: - index_name = get_suggestion_versioned_index_name(tenant_id) - else: - index_name = get_suggestion_legacy_index_name(tenant_id) - if recreate and self.es_client.index_exists(index_name): - logger.info("Deleting existing suggestion index: %s", index_name) - self.es_client.delete_index(index_name) + # Always write to a fresh versioned index; legacy concrete index is no longer supported. + index_name = get_suggestion_versioned_index_name(tenant_id) if self.es_client.index_exists(index_name): raise RuntimeError(f"Target suggestion index already exists: {index_name}") @@ -609,7 +585,7 @@ class SuggestionIndexBuilder: bulk_result = {"success": 0, "failed": 0, "errors": []} alias_publish: Optional[Dict[str, Any]] = None - if publish_alias and use_versioned_index: + if publish_alias: alias_publish = self._publish_alias( tenant_id=tenant_id, index_name=index_name, @@ -621,7 +597,7 @@ class SuggestionIndexBuilder: "last_full_build_at": now_utc, "last_incremental_watermark": now_utc, } - if publish_alias and use_versioned_index: + if publish_alias: meta_patch["active_index"] = index_name meta_patch["active_alias"] = get_suggestion_alias_name(tenant_id) self._upsert_meta(tenant_id, meta_patch) diff --git a/suggestion/service.py b/suggestion/service.py index 5e0900a..8211426 100644 --- a/suggestion/service.py +++ b/suggestion/service.py @@ -7,11 +7,7 @@ import time from typing import Any, Dict, List, Optional from config.tenant_config_loader import get_tenant_config_loader -from suggestion.builder import ( - get_suggestion_alias_name, - get_suggestion_index_name, - get_suggestion_legacy_index_name, -) +from suggestion.builder import get_suggestion_alias_name from utils.es_client import ESClient logger = logging.getLogger(__name__) @@ -40,16 +36,6 @@ class SuggestionService: alias_name = get_suggestion_alias_name(tenant_id) if self.es_client.alias_exists(alias_name): return alias_name - - # Fallback for pre-Phase2 deployments - legacy = get_suggestion_legacy_index_name(tenant_id) - if self.es_client.index_exists(legacy): - return legacy - - # Last fallback: current naming helper - candidate = get_suggestion_index_name(tenant_id) - if self.es_client.index_exists(candidate): - return candidate return None def _completion_suggest( -- libgit2 0.21.2