Commit 9f33fe3c6c81ec9250df2aaaa8428a6a8c3a9619

Authored by tangwang
1 parent e50924ed

fix suggestion rebuild flow and es index creation

- consolidate suggestion rebuild flow into build_suggestions.sh via --rebuild and remove the redundant rebuild_suggestions.sh wrapper
- make suggestion versioned index names use microseconds and handle index-create retries/timeouts without false already_exists failures
- treat create requests as successful when the index was created server-side, then explicitly wait for shard readiness and surface allocation diagnostics
- clean up freshly created suggestion indices on rebuild failure to avoid leaving red orphan indices behind
- make rebuild smoke tests target the local backend by default, with SUGGESTIONS_SMOKE_BASE_URL as the explicit override
- add unit coverage for microsecond versioned index names and cleanup on unallocatable index failures
scripts/build_suggestions.sh
... ... @@ -10,34 +10,34 @@
10 10 # ./scripts/build_suggestions.sh <tenant_id> --mode incremental
11 11 #
12 12 # # full rebuild + incremental + ES/API smoke checks (same as legacy rebuild_suggestions.sh)
13   -# ./scripts/build_suggestions.sh <tenant_id> --rebuild-and-verify
  13 +# ./scripts/build_suggestions.sh <tenant_id> --rebuild
14 14 #
15 15  
16 16 set -euo pipefail
17 17  
18 18 if [ $# -lt 1 ]; then
19   - echo "Usage: $0 <tenant_id> [--rebuild-and-verify | extra args for main.py build-suggestions...]"
20   - echo "Example (full): $0 162 --mode full --days 30 --publish-alias"
21   - echo "Example (incremental): $0 162 --mode incremental --overlap-minutes 30"
22   - echo "Example (pipeline + smoke): $0 162 --rebuild-and-verify"
  19 + echo "Usage: $0 <tenant_id> [--rebuild | extra args for main.py build-suggestions...]"
  20 + echo "Example (full): $0 163 --mode full --days 30 --publish-alias"
  21 + echo "Example (incremental): $0 163 --mode incremental --overlap-minutes 30"
  22 + echo "Example (pipeline + smoke): $0 163 --rebuild"
23 23 exit 1
24 24 fi
25 25  
26 26 TENANT_ID="$1"
27 27 shift || true
28 28  
29   -REBUILD_VERIFY=false
  29 +RUN_REBUILD_PIPELINE=false
30 30 PASSTHROUGH_ARGS=()
31 31 for arg in "$@"; do
32   - if [ "$arg" = "--rebuild-and-verify" ]; then
33   - REBUILD_VERIFY=true
  32 + if [ "$arg" = "--rebuild" ]; then
  33 + RUN_REBUILD_PIPELINE=true
34 34 else
35 35 PASSTHROUGH_ARGS+=("$arg")
36 36 fi
37 37 done
38 38  
39   -if [ "$REBUILD_VERIFY" = true ] && [ ${#PASSTHROUGH_ARGS[@]} -gt 0 ]; then
40   - echo "Error: --rebuild-and-verify cannot be combined with other build-suggestions arguments."
  39 +if [ "$RUN_REBUILD_PIPELINE" = true ] && [ ${#PASSTHROUGH_ARGS[@]} -gt 0 ]; then
  40 + echo "Error: --rebuild cannot be combined with other build-suggestions arguments."
41 41 exit 1
42 42 fi
43 43  
... ... @@ -50,11 +50,12 @@ if [ ! -x &quot;$PY_BIN&quot; ]; then
50 50 PY_BIN="python3"
51 51 fi
52 52  
53   -if [ "$REBUILD_VERIFY" = true ]; then
54   - # Fixed smoke-test queries and languages (no CLI args).
  53 +if [ "$RUN_REBUILD_PIPELINE" = true ]; then
55 54 SAMPLE_QUERIES=(s sh dress tshirt)
56 55 SAMPLE_LANGS=(en zh)
57   - API_BASE="${API_BASE_URL:-http://localhost:6002}"
  56 + # This script validates the locally rebuilt index, so default the smoke target
  57 + # to the local backend. A remote/public API base must be opted into explicitly.
  58 + API_BASE="${SUGGESTIONS_SMOKE_BASE_URL:-http://localhost:6002}"
58 59  
59 60 if [ -z "${ES_HOST:-}" ]; then
60 61 ES_HOST="$("$PY_BIN" - <<'PY'
... ... @@ -111,7 +112,7 @@ PY
111 112 }'
112 113 echo
113 114  
114   - echo "[4/4] API smoke test"
  115 + echo "[4/4] API smoke test (base=${API_BASE})"
115 116 for lang in "${SAMPLE_LANGS[@]}"; do
116 117 for q in "${SAMPLE_QUERIES[@]}"; do
117 118 echo "--- GET /search/suggestions?q=${q}&language=${lang} ---"
... ...
scripts/rebuild_suggestions.sh deleted
... ... @@ -1,6 +0,0 @@
1   -#!/usr/bin/env bash
2   -# Backward-compatible entry: full + incremental + ES/API checks.
3   -# See scripts/build_suggestions.sh --rebuild-and-verify.
4   -set -euo pipefail
5   -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
6   -exec "$SCRIPT_DIR/build_suggestions.sh" "${1:?Usage: $0 <tenant_id>}" --rebuild-and-verify
scripts/service_ctl.sh
... ... @@ -16,9 +16,10 @@ mkdir -p &quot;${LOG_DIR}&quot;
16 16 source "${PROJECT_ROOT}/scripts/lib/load_env.sh"
17 17  
18 18 CORE_SERVICES=("backend" "indexer" "frontend" "eval-web")
19   -OPTIONAL_SERVICES=("tei" "cnclip" "embedding" "embedding-image" "translator" "reranker" "reranker-fine")
  19 +# reranker-fine ๆš‚ๆ—ถไธ็”จ๏ผŒๅ› ๆญคๆš‚ๆ—ถไปŽOPTIONAL_SERVICESไธญๅˆ ้™ค
  20 +OPTIONAL_SERVICES=("tei" "cnclip" "embedding" "embedding-image" "translator" "reranker")
20 21 FULL_SERVICES=("${OPTIONAL_SERVICES[@]}" "${CORE_SERVICES[@]}")
21   -STOP_ORDER_SERVICES=("frontend" "eval-web" "indexer" "backend" "reranker-fine" "reranker" "translator" "embedding-image" "embedding" "cnclip" "tei")
  22 +STOP_ORDER_SERVICES=("frontend" "eval-web" "indexer" "backend" "reranker" "translator" "embedding-image" "embedding" "cnclip" "tei")
22 23  
23 24 all_services() {
24 25 echo "${FULL_SERVICES[@]}"
... ...
suggestion/builder.py
... ... @@ -38,7 +38,7 @@ def get_suggestion_alias_name(tenant_id: str) -&gt; str:
38 38  
39 39 def get_suggestion_versioned_index_name(tenant_id: str, build_at: Optional[datetime] = None) -> str:
40 40 """Versioned suggestion index name."""
41   - ts = (build_at or datetime.now(timezone.utc)).strftime("%Y%m%d%H%M%S")
  41 + ts = (build_at or datetime.now(timezone.utc)).strftime("%Y%m%d%H%M%S%f")
42 42 return f"{_index_prefix()}search_suggestions_tenant_{tenant_id}_v{ts}"
43 43  
44 44  
... ... @@ -101,6 +101,79 @@ class SuggestionIndexBuilder:
101 101 self.es_client = es_client
102 102 self.db_engine = db_engine
103 103  
  104 + def _format_allocation_failure(self, index_name: str) -> str:
  105 + health = self.es_client.wait_for_index_ready(index_name=index_name, timeout="5s")
  106 + explain = self.es_client.get_allocation_explain(index_name=index_name)
  107 +
  108 + parts = [
  109 + f"Suggestion index '{index_name}' was created but is not allocatable/readable yet",
  110 + f"health_status={health.get('status')}",
  111 + f"timed_out={health.get('timed_out')}",
  112 + ]
  113 + if health.get("error"):
  114 + parts.append(f"health_error={health['error']}")
  115 +
  116 + if explain:
  117 + unassigned = explain.get("unassigned_info") or {}
  118 + if unassigned.get("reason"):
  119 + parts.append(f"unassigned_reason={unassigned['reason']}")
  120 + if unassigned.get("last_allocation_status"):
  121 + parts.append(f"last_allocation_status={unassigned['last_allocation_status']}")
  122 +
  123 + for node in explain.get("node_allocation_decisions") or []:
  124 + node_name = node.get("node_name") or node.get("node_id") or "unknown-node"
  125 + for decider in node.get("deciders") or []:
  126 + if decider.get("decision") == "NO":
  127 + parts.append(
  128 + f"{node_name}:{decider.get('decider')}={decider.get('explanation')}"
  129 + )
  130 + return "; ".join(parts)
  131 +
  132 + return "; ".join(parts)
  133 +
  134 + def _create_fresh_versioned_index(
  135 + self,
  136 + tenant_id: str,
  137 + mapping: Dict[str, Any],
  138 + max_attempts: int = 5,
  139 + ) -> str:
  140 + for attempt in range(1, max_attempts + 1):
  141 + index_name = get_suggestion_versioned_index_name(tenant_id)
  142 + if self.es_client.index_exists(index_name):
  143 + logger.warning(
  144 + "Suggestion index name collision before create for tenant=%s index=%s attempt=%s/%s",
  145 + tenant_id,
  146 + index_name,
  147 + attempt,
  148 + max_attempts,
  149 + )
  150 + continue
  151 +
  152 + if self.es_client.create_index(index_name, mapping):
  153 + return index_name
  154 +
  155 + if self.es_client.index_exists(index_name):
  156 + logger.warning(
  157 + "Suggestion index name collision during create for tenant=%s index=%s attempt=%s/%s",
  158 + tenant_id,
  159 + index_name,
  160 + attempt,
  161 + max_attempts,
  162 + )
  163 + continue
  164 +
  165 + raise RuntimeError(f"Failed to create suggestion index: {index_name}")
  166 +
  167 + raise RuntimeError(
  168 + f"Failed to allocate a unique suggestion index name for tenant={tenant_id} after {max_attempts} attempts"
  169 + )
  170 +
  171 + def _ensure_new_index_ready(self, index_name: str) -> None:
  172 + health = self.es_client.wait_for_index_ready(index_name=index_name, timeout="5s")
  173 + if health.get("ok"):
  174 + return
  175 + raise RuntimeError(self._format_allocation_failure(index_name))
  176 +
104 177 @staticmethod
105 178 def _to_utc(dt: Any) -> Optional[datetime]:
106 179 if dt is None:
... ... @@ -609,62 +682,65 @@ class SuggestionIndexBuilder:
609 682 index_languages: List[str] = tenant_cfg.get("index_languages") or ["en", "zh"]
610 683 primary_language: str = tenant_cfg.get("primary_language") or "en"
611 684  
612   - # Always write to a fresh versioned index; legacy concrete index is no longer supported.
613   - index_name = get_suggestion_versioned_index_name(tenant_id)
614   -
615   - if self.es_client.index_exists(index_name):
616   - raise RuntimeError(f"Target suggestion index already exists: {index_name}")
617   -
618   - mapping = build_suggestion_mapping(index_languages=index_languages)
619   - if not self.es_client.create_index(index_name, mapping):
620   - raise RuntimeError(f"Failed to create suggestion index: {index_name}")
  685 + alias_publish: Optional[Dict[str, Any]] = None
  686 + index_name: Optional[str] = None
  687 + try:
  688 + mapping = build_suggestion_mapping(index_languages=index_languages)
  689 + index_name = self._create_fresh_versioned_index(
  690 + tenant_id=tenant_id,
  691 + mapping=mapping,
  692 + )
  693 + self._ensure_new_index_ready(index_name)
621 694  
622   - key_to_candidate = self._build_full_candidates(
623   - tenant_id=tenant_id,
624   - index_languages=index_languages,
625   - primary_language=primary_language,
626   - days=days,
627   - batch_size=batch_size,
628   - min_query_len=min_query_len,
629   - )
  695 + key_to_candidate = self._build_full_candidates(
  696 + tenant_id=tenant_id,
  697 + index_languages=index_languages,
  698 + primary_language=primary_language,
  699 + days=days,
  700 + batch_size=batch_size,
  701 + min_query_len=min_query_len,
  702 + )
630 703  
631   - now_iso = datetime.now(timezone.utc).isoformat()
632   - docs = [self._candidate_to_doc(tenant_id, c, now_iso) for c in key_to_candidate.values()]
  704 + now_iso = datetime.now(timezone.utc).isoformat()
  705 + docs = [self._candidate_to_doc(tenant_id, c, now_iso) for c in key_to_candidate.values()]
633 706  
634   - if docs:
635   - bulk_result = self.es_client.bulk_index(index_name=index_name, docs=docs)
636   - self.es_client.refresh(index_name)
637   - else:
638   - bulk_result = {"success": 0, "failed": 0, "errors": []}
  707 + if docs:
  708 + bulk_result = self.es_client.bulk_index(index_name=index_name, docs=docs)
  709 + self.es_client.refresh(index_name)
  710 + else:
  711 + bulk_result = {"success": 0, "failed": 0, "errors": []}
639 712  
640   - alias_publish: Optional[Dict[str, Any]] = None
641   - if publish_alias:
642   - alias_publish = self._publish_alias(
643   - tenant_id=tenant_id,
644   - index_name=index_name,
645   - keep_versions=keep_versions,
646   - )
  713 + if publish_alias:
  714 + alias_publish = self._publish_alias(
  715 + tenant_id=tenant_id,
  716 + index_name=index_name,
  717 + keep_versions=keep_versions,
  718 + )
647 719  
648   - now_utc = datetime.now(timezone.utc).isoformat()
649   - meta_patch: Dict[str, Any] = {
650   - "last_full_build_at": now_utc,
651   - "last_incremental_watermark": now_utc,
652   - }
653   - if publish_alias:
654   - meta_patch["active_index"] = index_name
655   - meta_patch["active_alias"] = get_suggestion_alias_name(tenant_id)
656   - self._upsert_meta(tenant_id, meta_patch)
  720 + now_utc = datetime.now(timezone.utc).isoformat()
  721 + meta_patch: Dict[str, Any] = {
  722 + "last_full_build_at": now_utc,
  723 + "last_incremental_watermark": now_utc,
  724 + }
  725 + if publish_alias:
  726 + meta_patch["active_index"] = index_name
  727 + meta_patch["active_alias"] = get_suggestion_alias_name(tenant_id)
  728 + self._upsert_meta(tenant_id, meta_patch)
657 729  
658   - return {
659   - "mode": "full",
660   - "tenant_id": str(tenant_id),
661   - "index_name": index_name,
662   - "alias_published": bool(alias_publish),
663   - "alias_publish": alias_publish,
664   - "total_candidates": len(key_to_candidate),
665   - "indexed_docs": len(docs),
666   - "bulk_result": bulk_result,
667   - }
  730 + return {
  731 + "mode": "full",
  732 + "tenant_id": str(tenant_id),
  733 + "index_name": index_name,
  734 + "alias_published": bool(alias_publish),
  735 + "alias_publish": alias_publish,
  736 + "total_candidates": len(key_to_candidate),
  737 + "indexed_docs": len(docs),
  738 + "bulk_result": bulk_result,
  739 + }
  740 + except Exception:
  741 + if index_name and not alias_publish:
  742 + self.es_client.delete_index(index_name)
  743 + raise
668 744  
669 745 def _build_incremental_deltas(
670 746 self,
... ...
tests/test_suggestions.py
... ... @@ -8,6 +8,7 @@ from suggestion.builder import (
8 8 QueryDelta,
9 9 SuggestionIndexBuilder,
10 10 get_suggestion_alias_name,
  11 + get_suggestion_versioned_index_name,
11 12 )
12 13 from suggestion.service import SuggestionService
13 14  
... ... @@ -121,6 +122,16 @@ class FakeESClient:
121 122 self.indices.add(index_name)
122 123 return True
123 124  
  125 + def wait_for_index_ready(self, index_name: str, timeout: str = "10s") -> Dict[str, Any]:
  126 + self.calls.append({"op": "wait_for_index_ready", "index": index_name, "timeout": timeout})
  127 + return {"ok": True, "status": "green", "timed_out": False}
  128 +
  129 + def get_allocation_explain(self, index_name: str, shard: int = 0, primary: bool = True) -> Dict[str, Any] | None:
  130 + self.calls.append(
  131 + {"op": "get_allocation_explain", "index": index_name, "shard": shard, "primary": primary}
  132 + )
  133 + return None
  134 +
124 135 def refresh(self, index_name: str) -> bool:
125 136 self.calls.append({"op": "refresh", "index": index_name})
126 137 return True
... ... @@ -150,6 +161,67 @@ class FakeESClient:
150 161  
151 162  
152 163 @pytest.mark.unit
  164 +def test_versioned_index_name_uses_microseconds():
  165 + build_at = datetime(2026, 4, 7, 3, 52, 26, 123456, tzinfo=timezone.utc)
  166 + assert (
  167 + get_suggestion_versioned_index_name("163", build_at)
  168 + == "search_suggestions_tenant_163_v20260407035226123456"
  169 + )
  170 +
  171 +
  172 +@pytest.mark.unit
  173 +def test_rebuild_cleans_up_unallocatable_new_index():
  174 + fake_es = FakeESClient()
  175 +
  176 + def _wait_fail(index_name: str, timeout: str = "10s") -> Dict[str, Any]:
  177 + fake_es.calls.append({"op": "wait_for_index_ready", "index": index_name, "timeout": timeout})
  178 + return {"ok": False, "status": "red", "timed_out": True}
  179 +
  180 + def _allocation_explain(index_name: str, shard: int = 0, primary: bool = True) -> Dict[str, Any]:
  181 + fake_es.calls.append(
  182 + {"op": "get_allocation_explain", "index": index_name, "shard": shard, "primary": primary}
  183 + )
  184 + return {
  185 + "unassigned_info": {"reason": "INDEX_CREATED", "last_allocation_status": "no"},
  186 + "node_allocation_decisions": [
  187 + {
  188 + "node_name": "node-1",
  189 + "deciders": [
  190 + {
  191 + "decider": "disk_threshold",
  192 + "decision": "NO",
  193 + "explanation": "node is above high watermark",
  194 + }
  195 + ],
  196 + }
  197 + ],
  198 + }
  199 +
  200 + fake_es.wait_for_index_ready = _wait_fail # type: ignore[method-assign]
  201 + fake_es.get_allocation_explain = _allocation_explain # type: ignore[method-assign]
  202 +
  203 + builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None)
  204 +
  205 + from config import tenant_config_loader as tcl
  206 +
  207 + loader = tcl.get_tenant_config_loader()
  208 + loader._config = {
  209 + "default": {"primary_language": "en", "index_languages": ["en", "zh"]},
  210 + "tenants": {
  211 + "163": {"primary_language": "en", "index_languages": ["en", "zh"]},
  212 + },
  213 + }
  214 +
  215 + with pytest.raises(RuntimeError, match="disk_threshold"):
  216 + builder.rebuild_tenant_index(tenant_id="163")
  217 +
  218 + create_calls = [x for x in fake_es.calls if x.get("op") == "create_index"]
  219 + assert len(create_calls) == 1
  220 + created_index = create_calls[0]["index"]
  221 + assert created_index not in fake_es.indices
  222 +
  223 +
  224 +@pytest.mark.unit
153 225 def test_resolve_query_language_prefers_log_field():
154 226 fake_es = FakeESClient()
155 227 builder = SuggestionIndexBuilder(es_client=fake_es, db_engine=None)
... ...
utils/es_client.py
... ... @@ -76,13 +76,70 @@ class ESClient:
76 76 True if successful, False otherwise
77 77 """
78 78 try:
79   - self.client.indices.create(index=index_name, body=body)
  79 + client = self.client.options(request_timeout=30, max_retries=0)
  80 + client.indices.create(
  81 + index=index_name,
  82 + body=body,
  83 + wait_for_active_shards="0",
  84 + )
80 85 logger.info(f"Index '{index_name}' created successfully")
81 86 return True
82 87 except Exception as e:
  88 + if self.index_exists(index_name):
  89 + logger.warning(
  90 + "Create index request for '%s' raised %s, but the index now exists; treating it as created",
  91 + index_name,
  92 + type(e).__name__,
  93 + exc_info=True,
  94 + )
  95 + return True
83 96 logger.error(f"Failed to create index '{index_name}': {e}", exc_info=True)
84 97 return False
85 98  
  99 + def wait_for_index_ready(self, index_name: str, timeout: str = "10s") -> Dict[str, Any]:
  100 + """Wait until an index primary shard is allocated and searchable."""
  101 + try:
  102 + resp = self.client.cluster.health(
  103 + index=index_name,
  104 + wait_for_status="yellow",
  105 + timeout=timeout,
  106 + level="indices",
  107 + )
  108 + index_info = ((resp.get("indices") or {}).get(index_name) or {})
  109 + status = index_info.get("status") or resp.get("status")
  110 + timed_out = bool(resp.get("timed_out"))
  111 + return {
  112 + "ok": (not timed_out) and status in {"yellow", "green"},
  113 + "status": status,
  114 + "timed_out": timed_out,
  115 + "response": resp,
  116 + }
  117 + except Exception as e:
  118 + logger.error("Failed waiting for index '%s' readiness: %s", index_name, e, exc_info=True)
  119 + return {
  120 + "ok": False,
  121 + "status": "unknown",
  122 + "timed_out": False,
  123 + "error": str(e),
  124 + }
  125 +
  126 + def get_allocation_explain(self, index_name: str, shard: int = 0, primary: bool = True) -> Optional[Dict[str, Any]]:
  127 + """Explain why a shard can or cannot be allocated."""
  128 + try:
  129 + return self.client.cluster.allocation_explain(
  130 + body={"index": index_name, "shard": shard, "primary": primary}
  131 + )
  132 + except Exception as e:
  133 + logger.warning(
  134 + "Failed to get allocation explain for index '%s' shard=%s primary=%s: %s",
  135 + index_name,
  136 + shard,
  137 + primary,
  138 + e,
  139 + exc_info=True,
  140 + )
  141 + return None
  142 +
86 143 def put_alias(self, index_name: str, alias_name: str) -> bool:
87 144 """Add alias for an index."""
88 145 try:
... ...