""" 索引API路由。 提供全量和增量索引接口,供外部Java程序调用。 """ import asyncio import re from fastapi import APIRouter, HTTPException from typing import Any, Dict, List from pydantic import BaseModel, Field import logging from sqlalchemy import text # Indexer routes depend on services provided by api/indexer_app.py via this registry. from ..service_registry import get_incremental_service, get_bulk_indexing_service, get_es_client logger = logging.getLogger(__name__) router = APIRouter(prefix="/indexer", tags=["indexer"]) class ReindexRequest(BaseModel): """全量重建索引请求""" tenant_id: str batch_size: int = 500 class IndexSpusRequest(BaseModel): """增量索引请求(按SPU列表索引)""" tenant_id: str spu_ids: List[str] delete_spu_ids: List[str] = Field(default_factory=list) # 显式指定要删除的SPU ID列表(可选) class GetDocumentsRequest(BaseModel): """查询文档请求(不写入ES)""" tenant_id: str spu_ids: List[str] class BuildDocItem(BaseModel): """ 单个 SPU 的原始数据包(由上游从 MySQL 查询得到)。 - spu: 一行 SPU 记录,对应 shoplazza_product_spu 表 - skus: 该 SPU 下的所有 SKU 记录,对应 shoplazza_product_sku 表 - options: 该 SPU 的所有 Option 记录,对应 shoplazza_product_option 表 """ spu: Dict[str, Any] = Field(..., description="单个 SPU 的原始字段(MySQL 行数据)") skus: List[Dict[str, Any]] = Field(default_factory=list, description="该 SPU 关联的 SKU 列表") options: List[Dict[str, Any]] = Field(default_factory=list, description="该 SPU 关联的 Option 列表") class BuildDocsRequest(BaseModel): """ 基于上游已查询出的 MySQL 原始数据,构建 ES 索引文档(不访问数据库、不写入 ES)。 该接口是 Java 等外部索引程序正式使用的“doc 生成接口”: - 上游负责:全量 / 增量调度 + 从 MySQL 查询出各表数据 - 本模块负责:根据配置和算法,将原始行数据转换为与 mappings/search_products.json 一致的 ES 文档 """ tenant_id: str = Field(..., description="租户 ID,用于加载租户配置、语言策略等") items: List[BuildDocItem] = Field(..., description="需要构建 doc 的 SPU 列表(含其 SKUs 和 Options)") class BuildDocsFromDbRequest(BaseModel): """ 便捷测试请求:只提供 tenant_id 和 spu_ids,由本服务从 MySQL 查询原始数据, 然后内部调用 /indexer/build-docs 的同一套逻辑构建 ES doc。 用途: - 本地/联调时快速验证 doc 结构,无需手工构造庞大的 BuildDocsRequest JSON - 生产正式使用建议直接走 BuildDocsRequest,由外层(Java)控制 MySQL 查询 """ tenant_id: str = Field(..., description="租户 ID") spu_ids: List[str] = Field(..., description="需要构建 doc 的 SPU ID 列表") class EnrichContentItem(BaseModel): """单条待生成内容理解字段的商品(仅需 spu_id + 标题)。""" spu_id: str = Field(..., description="SPU ID") title: str = Field(..., description="商品标题,用于 LLM 分析生成 qanchors / tags 等") class EnrichContentRequest(BaseModel): """ 内容理解字段生成请求:根据商品标题批量生成 qanchors、semantic_attributes、tags。 供外部 indexer 在自行组织 doc 时调用,与翻译、向量化等微服务并列。 """ tenant_id: str = Field(..., description="租户 ID,用于缓存隔离") items: List[EnrichContentItem] = Field(..., description="待分析的 SPU 列表(spu_id + title)") languages: List[str] = Field( default_factory=lambda: ["zh", "en"], description="目标语言列表,需在支持范围内(zh/en/de/ru/fr),默认 zh, en", ) @router.post("/reindex") async def reindex_all(request: ReindexRequest): """ 全量重建索引接口 将指定租户的所有SPU数据重新索引到ES。 注意:此接口不会删除旧索引,只会更新或创建索引。如需重建索引结构(删除后重建),请使用 `scripts/create_tenant_index.sh` 脚本。 注意:全量索引是长时间运行的操作,会在线程池中执行,不会阻塞其他请求。 全量索引和增量索引可以并行执行。 """ try: service = get_bulk_indexing_service() if service is None: raise HTTPException(status_code=503, detail="Bulk indexing service is not initialized") # 显式将同步阻塞操作放到线程池执行,确保不阻塞事件循环 # 这样全量索引和增量索引可以并行执行 loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, # 使用默认线程池 lambda: service.bulk_index( tenant_id=request.tenant_id, recreate_index=False, batch_size=request.batch_size ) ) return result except HTTPException: raise except Exception as e: logger.error(f"Error in reindex for tenant_id={request.tenant_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/index") async def index_spus(request: IndexSpusRequest): """ 增量索引接口 根据指定的SPU ID列表,将数据索引到ES。用于增量更新指定商品。 支持两种删除方式: 1. **自动检测删除**:如果SPU在数据库中被标记为deleted=1,自动从ES中删除对应文档 2. **显式删除**:通过delete_spu_ids参数显式指定要删除的SPU(无论数据库状态如何) 删除策略说明: - 数据库是唯一真实来源(Single Source of Truth) - 自动检测:查询数据库时发现deleted=1,自动从ES删除 - 显式删除:调用方明确知道哪些SPU要删除,直接删除(适用于批量删除场景) 响应格式: - spu_ids: spu_ids对应的响应列表,每个元素包含spu_id和status(indexed/deleted/failed) - delete_spu_ids: delete_spu_ids对应的响应列表,每个元素包含spu_id和status(deleted/not_found/failed) - failed状态的元素会包含msg字段说明失败原因 - 最后给出总体统计:total, success_count, failed_count等 注意:增量索引在线程池中执行,可以与全量索引并行执行。 """ try: # 验证请求参数 if not request.spu_ids and not request.delete_spu_ids: raise HTTPException(status_code=400, detail="spu_ids and delete_spu_ids cannot both be empty") if request.spu_ids and len(request.spu_ids) > 100: raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for indexing") if request.delete_spu_ids and len(request.delete_spu_ids) > 100: raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request for deletion") service = get_incremental_service() if service is None: raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized") es_client = get_es_client() if es_client is None: raise HTTPException(status_code=503, detail="Elasticsearch client is not initialized") # 显式将同步阻塞操作放到线程池执行,确保不阻塞事件循环 # 这样全量索引和增量索引可以并行执行 loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, # 使用默认线程池 lambda: service.index_spus_to_es( es_client=es_client, tenant_id=request.tenant_id, spu_ids=request.spu_ids if request.spu_ids else [], delete_spu_ids=request.delete_spu_ids if request.delete_spu_ids else None ) ) return result except HTTPException: raise except Exception as e: logger.error(f"Error indexing SPUs for tenant_id={request.tenant_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/build-docs") async def build_docs(request: BuildDocsRequest): """ 构建 ES 文档(不访问数据库、不写入 ES)。 使用场景: - 上游(例如 Java 索引程序)已经从 MySQL 查询出了 SPU / SKU / Option 等原始行数据 - 希望复用本项目的全部“索引富化”能力(多语言、翻译、向量、规格聚合等) - 只需要拿到与 `mappings/search_products.json` 一致的 doc 列表,由上游自行写入 ES """ try: if not request.items: raise HTTPException(status_code=400, detail="items cannot be empty") if len(request.items) > 200: raise HTTPException(status_code=400, detail="Maximum 200 items allowed per request") incremental_service = get_incremental_service() if incremental_service is None: raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized") # 复用增量索引服务中的 transformer 缓存与配置 / 语言 / embedding 初始化逻辑 transformer, encoder, enable_embedding = incremental_service._get_transformer_bundle( tenant_id=request.tenant_id ) import pandas as pd docs: List[Dict[str, Any]] = [] doc_spu_rows: List[pd.Series] = [] failed: List[Dict[str, Any]] = [] for item in request.items: try: # 将上游传入的 MySQL 行数据转换为 Pandas 结构,复用 SPUDocumentTransformer spu_df = pd.DataFrame([item.spu]) spu_row = spu_df.iloc[0] skus_df = pd.DataFrame(item.skus) if item.skus else pd.DataFrame() options_df = pd.DataFrame(item.options) if item.options else pd.DataFrame() doc = transformer.transform_spu_to_doc( tenant_id=request.tenant_id, spu_row=spu_row, skus=skus_df, options=options_df, fill_llm_attributes=False, ) if doc is None: failed.append( { "spu_id": str(item.spu.get("id")), "error": "transform_spu_to_doc returned None", } ) continue # 在“构建 doc”接口中,是否补齐 embedding 由内部配置决定(与增量索引一致) # 此处不强制生成 / 不强制关闭,只复用 transformer_bundle 的 encoder / enable_embedding 设置。 if enable_embedding and encoder: title_obj = doc.get("title") or {} title_text = None if isinstance(title_obj, dict): title_text = title_obj.get("en") or title_obj.get("zh") if not title_text: for v in title_obj.values(): if v and str(v).strip(): title_text = str(v) break if title_text and str(title_text).strip(): import numpy as np embeddings = encoder.encode(title_text) if embeddings is None or len(embeddings) == 0: raise RuntimeError( f"title_embedding empty for spu_id={doc.get('spu_id')}" ) emb0 = np.asarray(embeddings[0], dtype=np.float32) if emb0.ndim != 1 or emb0.size == 0 or not np.isfinite(emb0).all(): raise RuntimeError( f"title_embedding invalid for spu_id={doc.get('spu_id')}" ) doc["title_embedding"] = emb0.tolist() docs.append(doc) doc_spu_rows.append(spu_row) except Exception as e: failed.append( { "spu_id": str(item.spu.get("id")), "error": str(e), } ) # 批量填充 LLM 字段(尽量攒批,每次最多 20 条;失败仅 warning,不影响 build-docs 主功能) try: if docs and doc_spu_rows: transformer.fill_llm_attributes_batch(docs, doc_spu_rows) except Exception as e: logger.warning("Batch LLM fill failed in build-docs (tenant_id=%s): %s", request.tenant_id, e) return { "tenant_id": request.tenant_id, "docs": docs, "total": len(request.items), "success_count": len(docs), "failed_count": len(failed), "failed": failed, } except HTTPException: raise except Exception as e: logger.error(f"Error building docs for tenant_id={request.tenant_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/build-docs-from-db") async def build_docs_from_db(request: BuildDocsFromDbRequest): """ 基于数据库数据构建 ES 文档(测试 / 调试用)。 - 入参:tenant_id + spu_ids - 步骤: 1. 使用增量索引服务的查询能力,从 MySQL 批量加载 SPU / SKU / Option 2. 组装为 BuildDocsRequest 的 items 3. 内部调用与 /indexer/build-docs 相同的构建逻辑,返回 ES-ready docs 注意: - 该接口主要用于本项目自测和调试;正式生产建议由上游(Java)自行查库后调用 /indexer/build-docs """ try: if not request.spu_ids: raise HTTPException(status_code=400, detail="spu_ids cannot be empty") if len(request.spu_ids) > 200: raise HTTPException(status_code=400, detail="Maximum 200 SPU IDs allowed per request") incremental_service = get_incremental_service() if incremental_service is None: raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized") # 直接复用增量服务里的批量查询方法,从 MySQL 拉取原始行数据 # 只加载未删除的记录(include_deleted=False) spu_df = incremental_service._load_spus_for_spu_ids( tenant_id=request.tenant_id, spu_ids=request.spu_ids, include_deleted=False ) if spu_df.empty: return { "tenant_id": request.tenant_id, "docs": [], "total": 0, "success_count": 0, "failed_count": len(request.spu_ids), "failed": [ {"spu_id": spu_id, "error": "SPU not found or deleted"} for spu_id in request.spu_ids ], } # 仅对存在的 spu_id 构建 item,避免无效 ID # _load_skus_for_spu_ids / _load_options_for_spu_ids 会自动过滤不存在的 spu_id existing_ids = [str(int(i)) for i in spu_df["id"].tolist()] skus_df = incremental_service._load_skus_for_spu_ids( tenant_id=request.tenant_id, spu_ids=existing_ids ) options_df = incremental_service._load_options_for_spu_ids( tenant_id=request.tenant_id, spu_ids=existing_ids ) import pandas as pd # group by spu_id 方便取子集 sku_groups = skus_df.groupby("spu_id") if not skus_df.empty else None option_groups = options_df.groupby("spu_id") if not options_df.empty else None items: List[BuildDocItem] = [] failed: List[Dict[str, Any]] = [] for _, spu_row in spu_df.iterrows(): spu_id = int(spu_row["id"]) try: spu_dict = spu_row.to_dict() skus = ( sku_groups.get_group(spu_id).to_dict("records") if sku_groups is not None and spu_id in sku_groups.groups else [] ) options = ( option_groups.get_group(spu_id).to_dict("records") if option_groups is not None and spu_id in option_groups.groups else [] ) items.append( BuildDocItem( spu=spu_dict, skus=skus, options=options, ) ) except Exception as e: failed.append( { "spu_id": str(spu_id), "error": str(e), } ) if not items: return { "tenant_id": request.tenant_id, "docs": [], "total": 0, "success_count": 0, "failed_count": len(request.spu_ids), "failed": failed or [ {"spu_id": spu_id, "error": "SPU not found or data load failed"} for spu_id in request.spu_ids ], } # 调用与 /indexer/build-docs 相同的构建逻辑 build_request = BuildDocsRequest(tenant_id=request.tenant_id, items=items) result = await build_docs(build_request) # 合并两层 failed 信息 merged_failed = list(result.get("failed", [])) if isinstance(result, dict) else [] merged_failed.extend(failed) if isinstance(result, dict): result["failed"] = merged_failed # 更新 failed_count result["failed_count"] = len(merged_failed) return result return result except HTTPException: raise except Exception as e: logger.error(f"Error building docs from DB for tenant_id={request.tenant_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") def _run_enrich_content(tenant_id: str, items: List[Dict[str, str]], languages: List[str]) -> List[Dict[str, Any]]: """ 同步执行内容理解:调用 process_products.analyze_products,按语言批量跑 LLM, 再聚合成每 SPU 的 qanchors、semantic_attributes、tags。供 run_in_executor 调用。 """ from indexer.process_products import analyze_products, SUPPORTED_LANGS llm_langs = [lang for lang in languages if lang in SUPPORTED_LANGS] if not llm_langs: return [ { "spu_id": it["spu_id"], "qanchors": {}, "semantic_attributes": [], "tags": [], "error": "no supported languages (supported: %s)" % sorted(SUPPORTED_LANGS), } for it in items ] products = [{"id": it["spu_id"], "title": (it.get("title") or "").strip()} for it in items] dim_keys = [ "tags", "target_audience", "usage_scene", "season", "key_attributes", "material", "features", ] # 按 spu_id 聚合:qanchors[lang], semantic_attributes[], tags[] by_spu: Dict[str, Dict[str, Any]] = {} for it in items: sid = str(it["spu_id"]) by_spu[sid] = {"qanchors": {}, "semantic_attributes": [], "tags": []} for lang in llm_langs: try: rows = analyze_products( products=products, target_lang=lang, batch_size=20, tenant_id=tenant_id, ) except Exception as e: logger.warning("enrich-content analyze_products failed for lang=%s: %s", lang, e) for it in items: sid = str(it["spu_id"]) if "error" not in by_spu[sid]: by_spu[sid]["error"] = str(e) continue for row in rows: spu_id = str(row.get("id") or "") if spu_id not in by_spu: continue rec = by_spu[spu_id] if row.get("error"): rec["error"] = row["error"] continue anchor_text = str(row.get("anchor_text") or "").strip() if anchor_text: rec["qanchors"][lang] = anchor_text for name in dim_keys: raw = row.get(name) if not raw: continue for part in re.split(r"[,;|/\n\t]+", str(raw)): value = part.strip() if not value: continue rec["semantic_attributes"].append({"lang": lang, "name": name, "value": value}) if name == "tags": rec["tags"].append(value) # 去重 tags(保持顺序) out = [] for it in items: sid = str(it["spu_id"]) rec = by_spu[sid] tags = list(dict.fromkeys(rec["tags"])) out.append({ "spu_id": sid, "qanchors": rec["qanchors"], "semantic_attributes": rec["semantic_attributes"], "tags": tags, **({"error": rec["error"]} if rec.get("error") else {}), }) return out @router.post("/enrich-content") async def enrich_content(request: EnrichContentRequest): """ 内容理解字段生成接口:根据商品标题批量生成 qanchors、semantic_attributes、tags。 使用场景: - 外部 indexer 采用「微服务组合」方式自己组织 doc 时,可调用本接口获取 LLM 生成的 锚文本与语义属性,再与翻译、向量化结果合并写入 ES。 - 与 /indexer/build-docs 解耦,避免 build-docs 因 LLM 耗时过长而阻塞;调用方可 先拿不含 qanchors/tags 的 doc,再异步或离线补齐本接口结果后更新 ES。 实现逻辑与 indexer.process_products.analyze_products 一致,支持多语言与 Redis 缓存。 """ try: if not request.items: raise HTTPException(status_code=400, detail="items cannot be empty") if len(request.items) > 50: raise HTTPException( status_code=400, detail="Maximum 50 items per request for enrich-content (LLM batch limit)", ) items_payload = [ {"spu_id": it.spu_id, "title": it.title or ""} for it in request.items ] loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, lambda: _run_enrich_content( tenant_id=request.tenant_id, items=items_payload, languages=request.languages or ["zh", "en"], ), ) return { "tenant_id": request.tenant_id, "results": result, "total": len(result), } except HTTPException: raise except RuntimeError as e: if "DASHSCOPE_API_KEY" in str(e) or "cannot call LLM" in str(e).lower(): raise HTTPException( status_code=503, detail="Content understanding service unavailable: DASHSCOPE_API_KEY not set", ) raise HTTPException(status_code=500, detail=str(e)) except Exception as e: logger.error(f"Error in enrich-content for tenant_id={request.tenant_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/documents") async def get_documents(request: GetDocumentsRequest): """ 查询文档接口 根据SPU ID列表获取ES文档数据(不写入ES)。用于查看、调试或验证SPU数据。 """ try: if not request.spu_ids: raise HTTPException(status_code=400, detail="spu_ids cannot be empty") if len(request.spu_ids) > 100: raise HTTPException(status_code=400, detail="Maximum 100 SPU IDs allowed per request") service = get_incremental_service() if service is None: raise HTTPException(status_code=503, detail="Incremental indexer service is not initialized") success_list, failed_list = [], [] for spu_id in request.spu_ids: try: doc = service.get_spu_document(tenant_id=request.tenant_id, spu_id=spu_id) (success_list if doc else failed_list).append({ "spu_id": spu_id, "document": doc } if doc else { "spu_id": spu_id, "error": "SPU not found or deleted" }) except Exception as e: failed_list.append({"spu_id": spu_id, "error": str(e)}) return { "success": success_list, "failed": failed_list, "total": len(request.spu_ids), "success_count": len(success_list), "failed_count": len(failed_list) } except HTTPException: raise except Exception as e: logger.error(f"Error getting documents for tenant_id={request.tenant_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get("/health") async def indexer_health_check(): """检查索引服务健康状态""" try: service = get_incremental_service() if service is None: return {"status": "unavailable", "database": "unknown", "preloaded_data": {"category_mappings": 0}} try: with service.db_engine.connect() as conn: conn.execute(text("SELECT 1")) db_status = "connected" except Exception as e: db_status = f"disconnected: {str(e)}" return { "status": "available", "database": db_status, "preloaded_data": {"category_mappings": len(service.category_id_to_name)} } except Exception as e: logger.error(f"Error checking indexer health: {e}", exc_info=True) return {"status": "error", "message": str(e)}