diff --git a/scripts/build_suggestions.sh b/scripts/build_suggestions.sh index 0a57ad8..09b4934 100755 --- a/scripts/build_suggestions.sh +++ b/scripts/build_suggestions.sh @@ -10,34 +10,34 @@ # ./scripts/build_suggestions.sh --mode incremental # # # full rebuild + incremental + ES/API smoke checks (same as legacy rebuild_suggestions.sh) -# ./scripts/build_suggestions.sh --rebuild-and-verify +# ./scripts/build_suggestions.sh --rebuild # set -euo pipefail if [ $# -lt 1 ]; then - echo "Usage: $0 [--rebuild-and-verify | extra args for main.py build-suggestions...]" - echo "Example (full): $0 162 --mode full --days 30 --publish-alias" - echo "Example (incremental): $0 162 --mode incremental --overlap-minutes 30" - echo "Example (pipeline + smoke): $0 162 --rebuild-and-verify" + echo "Usage: $0 [--rebuild | extra args for main.py build-suggestions...]" + echo "Example (full): $0 163 --mode full --days 30 --publish-alias" + echo "Example (incremental): $0 163 --mode incremental --overlap-minutes 30" + echo "Example (pipeline + smoke): $0 163 --rebuild" exit 1 fi TENANT_ID="$1" shift || true -REBUILD_VERIFY=false +RUN_REBUILD_PIPELINE=false PASSTHROUGH_ARGS=() for arg in "$@"; do - if [ "$arg" = "--rebuild-and-verify" ]; then - REBUILD_VERIFY=true + if [ "$arg" = "--rebuild" ]; then + RUN_REBUILD_PIPELINE=true else PASSTHROUGH_ARGS+=("$arg") fi done -if [ "$REBUILD_VERIFY" = true ] && [ ${#PASSTHROUGH_ARGS[@]} -gt 0 ]; then - echo "Error: --rebuild-and-verify cannot be combined with other build-suggestions arguments." +if [ "$RUN_REBUILD_PIPELINE" = true ] && [ ${#PASSTHROUGH_ARGS[@]} -gt 0 ]; then + echo "Error: --rebuild cannot be combined with other build-suggestions arguments." exit 1 fi @@ -50,11 +50,12 @@ if [ ! -x "$PY_BIN" ]; then PY_BIN="python3" fi -if [ "$REBUILD_VERIFY" = true ]; then - # Fixed smoke-test queries and languages (no CLI args). +if [ "$RUN_REBUILD_PIPELINE" = true ]; then SAMPLE_QUERIES=(s sh dress tshirt) SAMPLE_LANGS=(en zh) - API_BASE="${API_BASE_URL:-http://localhost:6002}" + # This script validates the locally rebuilt index, so default the smoke target + # to the local backend. A remote/public API base must be opted into explicitly. + API_BASE="${SUGGESTIONS_SMOKE_BASE_URL:-http://localhost:6002}" if [ -z "${ES_HOST:-}" ]; then ES_HOST="$("$PY_BIN" - <<'PY' @@ -111,7 +112,7 @@ PY }' echo - echo "[4/4] API smoke test" + echo "[4/4] API smoke test (base=${API_BASE})" for lang in "${SAMPLE_LANGS[@]}"; do for q in "${SAMPLE_QUERIES[@]}"; do echo "--- GET /search/suggestions?q=${q}&language=${lang} ---" diff --git a/scripts/rebuild_suggestions.sh b/scripts/rebuild_suggestions.sh deleted file mode 100755 index 3be168c..0000000 --- a/scripts/rebuild_suggestions.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env bash -# Backward-compatible entry: full + incremental + ES/API checks. -# See scripts/build_suggestions.sh --rebuild-and-verify. -set -euo pipefail -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -exec "$SCRIPT_DIR/build_suggestions.sh" "${1:?Usage: $0 }" --rebuild-and-verify diff --git a/scripts/service_ctl.sh b/scripts/service_ctl.sh index 4c47425..93a2aa5 100755 --- a/scripts/service_ctl.sh +++ b/scripts/service_ctl.sh @@ -16,9 +16,10 @@ mkdir -p "${LOG_DIR}" source "${PROJECT_ROOT}/scripts/lib/load_env.sh" CORE_SERVICES=("backend" "indexer" "frontend" "eval-web") -OPTIONAL_SERVICES=("tei" "cnclip" "embedding" "embedding-image" "translator" "reranker" "reranker-fine") +# reranker-fine 暂时不用,因此暂时从OPTIONAL_SERVICES中删除 +OPTIONAL_SERVICES=("tei" "cnclip" "embedding" "embedding-image" "translator" "reranker") FULL_SERVICES=("${OPTIONAL_SERVICES[@]}" "${CORE_SERVICES[@]}") -STOP_ORDER_SERVICES=("frontend" "eval-web" "indexer" "backend" "reranker-fine" "reranker" "translator" "embedding-image" "embedding" "cnclip" "tei") +STOP_ORDER_SERVICES=("frontend" "eval-web" "indexer" "backend" "reranker" "translator" "embedding-image" "embedding" "cnclip" "tei") all_services() { echo "${FULL_SERVICES[@]}" diff --git a/suggestion/builder.py b/suggestion/builder.py index fc50a75..5ee53df 100644 --- a/suggestion/builder.py +++ b/suggestion/builder.py @@ -38,7 +38,7 @@ def get_suggestion_alias_name(tenant_id: str) -> str: def get_suggestion_versioned_index_name(tenant_id: str, build_at: Optional[datetime] = None) -> str: """Versioned suggestion index name.""" - ts = (build_at or datetime.now(timezone.utc)).strftime("%Y%m%d%H%M%S") + ts = (build_at or datetime.now(timezone.utc)).strftime("%Y%m%d%H%M%S%f") return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_v{ts}" @@ -101,6 +101,79 @@ class SuggestionIndexBuilder: self.es_client = es_client self.db_engine = db_engine + def _format_allocation_failure(self, index_name: str) -> str: + health = self.es_client.wait_for_index_ready(index_name=index_name, timeout="5s") + explain = self.es_client.get_allocation_explain(index_name=index_name) + + parts = [ + f"Suggestion index '{index_name}' was created but is not allocatable/readable yet", + f"health_status={health.get('status')}", + f"timed_out={health.get('timed_out')}", + ] + if health.get("error"): + parts.append(f"health_error={health['error']}") + + if explain: + unassigned = explain.get("unassigned_info") or {} + if unassigned.get("reason"): + parts.append(f"unassigned_reason={unassigned['reason']}") + if unassigned.get("last_allocation_status"): + parts.append(f"last_allocation_status={unassigned['last_allocation_status']}") + + for node in explain.get("node_allocation_decisions") or []: + node_name = node.get("node_name") or node.get("node_id") or "unknown-node" + for decider in node.get("deciders") or []: + if decider.get("decision") == "NO": + parts.append( + f"{node_name}:{decider.get('decider')}={decider.get('explanation')}" + ) + return "; ".join(parts) + + return "; ".join(parts) + + def _create_fresh_versioned_index( + self, + tenant_id: str, + mapping: Dict[str, Any], + max_attempts: int = 5, + ) -> str: + for attempt in range(1, max_attempts + 1): + index_name = get_suggestion_versioned_index_name(tenant_id) + if self.es_client.index_exists(index_name): + logger.warning( + "Suggestion index name collision before create for tenant=%s index=%s attempt=%s/%s", + tenant_id, + index_name, + attempt, + max_attempts, + ) + continue + + if self.es_client.create_index(index_name, mapping): + return index_name + + if self.es_client.index_exists(index_name): + logger.warning( + "Suggestion index name collision during create for tenant=%s index=%s attempt=%s/%s", + tenant_id, + index_name, + attempt, + max_attempts, + ) + continue + + raise RuntimeError(f"Failed to create suggestion index: {index_name}") + + raise RuntimeError( + f"Failed to allocate a unique suggestion index name for tenant={tenant_id} after {max_attempts} attempts" + ) + + def _ensure_new_index_ready(self, index_name: str) -> None: + health = self.es_client.wait_for_index_ready(index_name=index_name, timeout="5s") + if health.get("ok"): + return + raise RuntimeError(self._format_allocation_failure(index_name)) + @staticmethod def _to_utc(dt: Any) -> Optional[datetime]: if dt is None: @@ -609,62 +682,65 @@ class SuggestionIndexBuilder: index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"] primary_language: str = tenant_cfg.get("primary_language") or "en" - # Always write to a fresh versioned index; legacy concrete index is no longer supported. - index_name = get_suggestion_versioned_index_name(tenant_id) - - if self.es_client.index_exists(index_name): - raise RuntimeError(f"Target suggestion index already exists: {index_name}") - - mapping = build_suggestion_mapping(index_languages=index_languages) - if not self.es_client.create_index(index_name, mapping): - raise RuntimeError(f"Failed to create suggestion index: {index_name}") + alias_publish: Optional[Dict[str, Any]] = None + index_name: Optional[str] = None + try: + mapping = build_suggestion_mapping(index_languages=index_languages) + index_name = self._create_fresh_versioned_index( + tenant_id=tenant_id, + mapping=mapping, + ) + self._ensure_new_index_ready(index_name) - key_to_candidate = self._build_full_candidates( - tenant_id=tenant_id, - index_languages=index_languages, - primary_language=primary_language, - days=days, - batch_size=batch_size, - min_query_len=min_query_len, - ) + key_to_candidate = self._build_full_candidates( + tenant_id=tenant_id, + index_languages=index_languages, + primary_language=primary_language, + days=days, + batch_size=batch_size, + min_query_len=min_query_len, + ) - now_iso = datetime.now(timezone.utc).isoformat() - docs = [self._candidate_to_doc(tenant_id, c, now_iso) for c in key_to_candidate.values()] + now_iso = datetime.now(timezone.utc).isoformat() + docs = [self._candidate_to_doc(tenant_id, c, now_iso) for c in key_to_candidate.values()] - if docs: - bulk_result = self.es_client.bulk_index(index_name=index_name, docs=docs) - self.es_client.refresh(index_name) - else: - bulk_result = {"success": 0, "failed": 0, "errors": []} + if docs: + bulk_result = self.es_client.bulk_index(index_name=index_name, docs=docs) + self.es_client.refresh(index_name) + else: + bulk_result = {"success": 0, "failed": 0, "errors": []} - alias_publish: Optional[Dict[str, Any]] = None - if publish_alias: - alias_publish = self._publish_alias( - tenant_id=tenant_id, - index_name=index_name, - keep_versions=keep_versions, - ) + if publish_alias: + alias_publish = self._publish_alias( + tenant_id=tenant_id, + index_name=index_name, + keep_versions=keep_versions, + ) - now_utc = datetime.now(timezone.utc).isoformat() - meta_patch: Dict[str, Any] = { - "last_full_build_at": now_utc, - "last_incremental_watermark": now_utc, - } - if publish_alias: - meta_patch["active_index"] = index_name - meta_patch["active_alias"] = get_suggestion_alias_name(tenant_id) - self._upsert_meta(tenant_id, meta_patch) + now_utc = datetime.now(timezone.utc).isoformat() + meta_patch: Dict[str, Any] = { + "last_full_build_at": now_utc, + "last_incremental_watermark": now_utc, + } + if publish_alias: + meta_patch["active_index"] = index_name + meta_patch["active_alias"] = get_suggestion_alias_name(tenant_id) + self._upsert_meta(tenant_id, meta_patch) - return { - "mode": "full", - "tenant_id": str(tenant_id), - "index_name": index_name, - "alias_published": bool(alias_publish), - "alias_publish": alias_publish, - "total_candidates": len(key_to_candidate), - "indexed_docs": len(docs), - "bulk_result": bulk_result, - } + return { + "mode": "full", + "tenant_id": str(tenant_id), + "index_name": index_name, + "alias_published": bool(alias_publish), + "alias_publish": alias_publish, + "total_candidates": len(key_to_candidate), + "indexed_docs": len(docs), + "bulk_result": bulk_result, + } + except Exception: + if index_name and not alias_publish: + self.es_client.delete_index(index_name) + raise def _build_incremental_deltas( self, diff --git a/tests/test_suggestions.py b/tests/test_suggestions.py index fb23c4a..4e138a6 100644 --- a/tests/test_suggestions.py +++ b/tests/test_suggestions.py @@ -8,6 +8,7 @@ from suggestion.builder import ( QueryDelta, SuggestionIndexBuilder, get_suggestion_alias_name, + get_suggestion_versioned_index_name, ) from suggestion.service import SuggestionService @@ -121,6 +122,16 @@ class FakeESClient: self.indices.add(index_name) return True + def wait_for_index_ready(self, index_name: str, timeout: str = "10s") -> Dict[str, Any]: + self.calls.append({"op": "wait_for_index_ready", "index": index_name, "timeout": timeout}) + return {"ok": True, "status": "green", "timed_out": False} + + def get_allocation_explain(self, index_name: str, shard: int = 0, primary: bool = True) -> Dict[str, Any] | None: + self.calls.append( + {"op": "get_allocation_explain", "index": index_name, "shard": shard, "primary": primary} + ) + return None + def refresh(self, index_name: str) -> bool: self.calls.append({"op": "refresh", "index": index_name}) return True @@ -150,6 +161,67 @@ class FakeESClient: @pytest.mark.unit +def test_versioned_index_name_uses_microseconds(): + build_at = datetime(2026, 4, 7, 3, 52, 26, 123456, tzinfo=timezone.utc) + assert ( + get_suggestion_versioned_index_name("163", build_at) + == "search_suggestions_tenant_163_v20260407035226123456" + ) + + +@pytest.mark.unit +def test_rebuild_cleans_up_unallocatable_new_index(): + fake_es = FakeESClient() + + def _wait_fail(index_name: str, timeout: str = "10s") -> Dict[str, Any]: + fake_es.calls.append({"op": "wait_for_index_ready", "index": index_name, "timeout": timeout}) + return {"ok": False, "status": "red", "timed_out": True} + + def _allocation_explain(index_name: str, shard: int = 0, primary: bool = True) -> Dict[str, Any]: + fake_es.calls.append( + {"op": "get_allocation_explain", "index": index_name, "shard": shard, "primary": primary} + ) + return { + "unassigned_info": {"reason": "INDEX_CREATED", "last_allocation_status": "no"}, + "node_allocation_decisions": [ + { + "node_name": "node-1", + "deciders": [ + { + "decider": "disk_threshold", + "decision": "NO", + "explanation": "node is above high watermark", + } + ], + } + ], + } + + fake_es.wait_for_index_ready = _wait_fail # type: ignore[method-assign] + fake_es.get_allocation_explain = _allocation_explain # type: ignore[method-assign] + + builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None) + + from config import tenant_config_loader as tcl + + loader = tcl.get_tenant_config_loader() + loader._config = { + "default": {"primary_language": "en", "index_languages": ["en", "zh"]}, + "tenants": { + "163": {"primary_language": "en", "index_languages": ["en", "zh"]}, + }, + } + + with pytest.raises(RuntimeError, match="disk_threshold"): + builder.rebuild_tenant_index(tenant_id="163") + + create_calls = [x for x in fake_es.calls if x.get("op") == "create_index"] + assert len(create_calls) == 1 + created_index = create_calls[0]["index"] + assert created_index not in fake_es.indices + + +@pytest.mark.unit def test_resolve_query_language_prefers_log_field(): fake_es = FakeESClient() builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None) diff --git a/utils/es_client.py b/utils/es_client.py index 8759589..a894dfa 100644 --- a/utils/es_client.py +++ b/utils/es_client.py @@ -76,13 +76,70 @@ class ESClient: True if successful, False otherwise """ try: - self.client.indices.create(index=index_name, body=body) + client = self.client.options(request_timeout=30, max_retries=0) + client.indices.create( + index=index_name, + body=body, + wait_for_active_shards="0", + ) logger.info(f"Index '{index_name}' created successfully") return True except Exception as e: + if self.index_exists(index_name): + logger.warning( + "Create index request for '%s' raised %s, but the index now exists; treating it as created", + index_name, + type(e).__name__, + exc_info=True, + ) + return True logger.error(f"Failed to create index '{index_name}': {e}", exc_info=True) return False + def wait_for_index_ready(self, index_name: str, timeout: str = "10s") -> Dict[str, Any]: + """Wait until an index primary shard is allocated and searchable.""" + try: + resp = self.client.cluster.health( + index=index_name, + wait_for_status="yellow", + timeout=timeout, + level="indices", + ) + index_info = ((resp.get("indices") or {}).get(index_name) or {}) + status = index_info.get("status") or resp.get("status") + timed_out = bool(resp.get("timed_out")) + return { + "ok": (not timed_out) and status in {"yellow", "green"}, + "status": status, + "timed_out": timed_out, + "response": resp, + } + except Exception as e: + logger.error("Failed waiting for index '%s' readiness: %s", index_name, e, exc_info=True) + return { + "ok": False, + "status": "unknown", + "timed_out": False, + "error": str(e), + } + + def get_allocation_explain(self, index_name: str, shard: int = 0, primary: bool = True) -> Optional[Dict[str, Any]]: + """Explain why a shard can or cannot be allocated.""" + try: + return self.client.cluster.allocation_explain( + body={"index": index_name, "shard": shard, "primary": primary} + ) + except Exception as e: + logger.warning( + "Failed to get allocation explain for index '%s' shard=%s primary=%s: %s", + index_name, + shard, + primary, + e, + exc_info=True, + ) + return None + def put_alias(self, index_name: str, alias_name: str) -> bool: """Add alias for an index.""" try: -- libgit2 0.21.2