Blame view

scripts/maintenance/embed_tenant_image_urls.py 8.06 KB
8c8b9d84   tangwang   ES 拉取 coarse_rank...
1
2
3
4
5
6
7
  #!/usr/bin/env python3
  """
  临时脚本:从 ES 遍历指定租户的 image_url,批量调用图片 embedding 服务。
  5 进程并发,每请求最多 8  URL。日志打印到标准输出。
  
  用法:
    source activate.sh   # 会加载 .env,提供 ES_HOST / ES_USERNAME / ES_PASSWORD
32e9b30c   tangwang   scripts/ 根目录主要保留启...
8
    python scripts/maintenance/embed_tenant_image_urls.py
8c8b9d84   tangwang   ES 拉取 coarse_rank...
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
  
   source 时脚本也会尝试加载项目根目录 .env
  """
  
  from __future__ import annotations
  
  import json
  import multiprocessing as mp
  import os
  import sys
  import time
  from dataclasses import dataclass
  from pathlib import Path
  from typing import Any, Dict, List, Optional, Tuple
  from urllib.parse import urlencode
  
  import requests
  from elasticsearch import Elasticsearch
  from elasticsearch.helpers import scan
  
  # 未 source activate.sh 时仍可从项目根 .env 加载(与 ES_HOST / ES_USERNAME / ES_PASSWORD 一致)
  try:
      from dotenv import load_dotenv
  
32e9b30c   tangwang   scripts/ 根目录主要保留启...
33
      _ROOT = Path(__file__).resolve().parents[2]
8c8b9d84   tangwang   ES 拉取 coarse_rank...
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
      load_dotenv(_ROOT / ".env")
  except ImportError:
      pass
  
  # ---------------------------------------------------------------------------
  # 配置(可按需修改;默认与 .env 中 ES_* 一致,见 config/loader.py)
  # ---------------------------------------------------------------------------
  
  # Elasticsearch(默认读环境变量:ES_HOST、ES_USERNAME、ES_PASSWORD)
  ES_HOST: str = os.getenv("ES_HOST", "http://localhost:9200")
  ES_USERNAME: Optional[str] = os.getenv("ES_USERNAME") or None
  ES_PASSWORD: Optional[str] = os.getenv("ES_PASSWORD") or None
  ES_INDEX: str = "search_products_tenant_163"
  
  # 租户(keyword 字段,字符串)
  TENANT_ID: str = "163"
  
  # 图片 embedding 服务(与文档 7.1.2 一致)
  EMBED_BASE_URL: str = "http://localhost:6008"
  EMBED_PATH: str = "/embed/image"
  EMBED_QUERY: Dict[str, Any] = {
      "normalize": "true",
      "priority": "1",  # 与对接文档 curl 一致;批量离线可改为 "0"
  }
  
  # 并发与批量
  WORKER_PROCESSES: int = 5
  URLS_PER_REQUEST: int = 8
  
  # HTTP
  REQUEST_TIMEOUT_SEC: float = 120.0
  
  # ES scan(elasticsearch-py 8+/ES 9:`scan(..., query=...)` 会展开为 `client.search(**kwargs)`,
  # 必须传与 Search API 一致的参数名,例如顶层 `query` = DSL 的 query 子句,不要用裸 `match_all`。)
  SCROLL_CHUNK_SIZE: int = 500
  
  # ---------------------------------------------------------------------------
  
  
  @dataclass
  class BatchResult:
      batch_index: int
      url_count: int
      ok: bool
      status_code: Optional[int]
      elapsed_sec: float
      error: Optional[str] = None
  
  
  def _build_embed_url() -> str:
      q = urlencode(EMBED_QUERY)
      return f"{EMBED_BASE_URL.rstrip('/')}{EMBED_PATH}?{q}"
  
  
  def _process_batch(payload: Tuple[int, List[str]]) -> BatchResult:
      batch_index, urls = payload
      if not urls:
          return BatchResult(batch_index, 0, True, None, 0.0, None)
  
      url = _build_embed_url()
      t0 = time.perf_counter()
      try:
          resp = requests.post(
              url,
              headers={"Content-Type": "application/json"},
              data=json.dumps(urls),
              timeout=REQUEST_TIMEOUT_SEC,
          )
          elapsed = time.perf_counter() - t0
          ok = resp.status_code == 200
          err: Optional[str] = None
          if ok:
              try:
                  body = resp.json()
                  if not isinstance(body, list) or len(body) != len(urls):
                      ok = False
                      err = f"response length mismatch or not list: got {type(body).__name__}"
              except Exception as e:
                  ok = False
                  err = f"json decode: {e}"
          else:
              err = resp.text[:500] if resp.text else f"HTTP {resp.status_code}"
  
          worker = mp.current_process().name
          status = resp.status_code if resp else None
          ms = elapsed * 1000.0
          if ok:
              print(
                  f"[embed] worker={worker} batch={batch_index} urls={len(urls)} "
                  f"http={status} elapsed_ms={ms:.2f} ok",
                  flush=True,
              )
          else:
              print(
                  f"[embed] worker={worker} batch={batch_index} urls={len(urls)} "
                  f"http={status} elapsed_ms={ms:.2f} FAIL err={err}",
                  flush=True,
              )
          return BatchResult(batch_index, len(urls), ok, status, elapsed, err)
      except Exception as e:
          elapsed = time.perf_counter() - t0
          worker = mp.current_process().name
          print(
              f"[embed] worker={worker} batch={batch_index} urls={len(urls)} "
              f"http=None elapsed_ms={elapsed * 1000.0:.2f} FAIL err={e}",
              flush=True,
          )
          return BatchResult(batch_index, len(urls), False, None, elapsed, str(e))
  
  
  def _iter_image_urls(es: Elasticsearch) -> List[str]:
      # 对应 search body: { "query": { "term": { "tenant_id": "..." } } }
      search_kw: Dict[str, Any] = {
          "query": {"term": {"tenant_id": TENANT_ID}},
          "source_includes": ["image_url"],
      }
      urls: List[str] = []
      for hit in scan(
          es,
          query=search_kw,
          index=ES_INDEX,
          size=SCROLL_CHUNK_SIZE,
      ):
          src = hit.get("_source") or {}
          u = src.get("image_url")
          if u is None:
              continue
          s = str(u).strip()
          if not s:
              continue
          urls.append(s)
      return urls
  
  
  def main() -> int:
      t_wall0 = time.perf_counter()
  
      auth = None
      if ES_USERNAME and ES_PASSWORD:
          auth = (ES_USERNAME, ES_PASSWORD)
  
      es = Elasticsearch([ES_HOST], basic_auth=auth)
      if not es.ping():
          print("ERROR: Elasticsearch ping failed", file=sys.stderr)
          return 1
  
      print(
          f"[main] ES={ES_HOST} basic_auth={'yes' if auth else 'no'} "
          f"index={ES_INDEX} tenant_id={TENANT_ID} "
          f"workers={WORKER_PROCESSES} urls_per_req={URLS_PER_REQUEST}",
          flush=True,
      )
      print(f"[main] embed_url={_build_embed_url()}", flush=True)
  
      t_fetch0 = time.perf_counter()
      all_urls = _iter_image_urls(es)
      fetch_elapsed = time.perf_counter() - t_fetch0
      print(
          f"[main] collected image_url count={len(all_urls)} es_scan_elapsed_sec={fetch_elapsed:.3f}",
          flush=True,
      )
  
      batches: List[List[str]] = []
      for i in range(0, len(all_urls), URLS_PER_REQUEST):
          batches.append(all_urls[i : i + URLS_PER_REQUEST])
  
      if not batches:
          print("[main] no URLs to process; done.", flush=True)
          return 0
  
      tasks = [(idx, batch) for idx, batch in enumerate(batches)]
      print(f"[main] batches={len(tasks)} (parallel processes={WORKER_PROCESSES})", flush=True)
  
      t_run0 = time.perf_counter()
      total_urls = 0
      success_urls = 0
      failed_urls = 0
      ok_batches = 0
      fail_batches = 0
      sum_req_sec = 0.0
  
      with mp.Pool(processes=WORKER_PROCESSES) as pool:
          for res in pool.imap_unordered(_process_batch, tasks, chunksize=1):
              total_urls += res.url_count
              sum_req_sec += res.elapsed_sec
              if res.ok:
                  ok_batches += 1
                  success_urls += res.url_count
              else:
                  fail_batches += 1
                  failed_urls += res.url_count
  
      wall_total = time.perf_counter() - t_wall0
      run_elapsed = time.perf_counter() - t_run0
  
      print("---------- summary ----------", flush=True)
      print(f"tenant_id:              {TENANT_ID}", flush=True)
      print(f"total documents w/ url: {len(all_urls)}", flush=True)
      print(f"total batches:          {len(batches)}", flush=True)
      print(f"batches succeeded:      {ok_batches}", flush=True)
      print(f"batches failed:         {fail_batches}", flush=True)
      print(f"urls (success path):    {success_urls}", flush=True)
      print(f"urls (failed path):     {failed_urls}", flush=True)
      print(f"ES scan elapsed (s):    {fetch_elapsed:.3f}", flush=True)
      print(f"embed phase wall (s):   {run_elapsed:.3f}", flush=True)
      print(f"sum request time (s):   {sum_req_sec:.3f}  (sequential sum, for reference)", flush=True)
      print(f"total wall time (s):    {wall_total:.3f}", flush=True)
      print("-----------------------------", flush=True)
      return 0 if fail_batches == 0 else 2
  
  
  if __name__ == "__main__":
      raise SystemExit(main())