temp_embed_tenant_image_urls.py
8.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
#!/usr/bin/env python3
"""
临时脚本:从 ES 遍历指定租户的 image_url,批量调用图片 embedding 服务。
5 进程并发,每请求最多 8 条 URL。日志打印到标准输出。
用法:
source activate.sh # 会加载 .env,提供 ES_HOST / ES_USERNAME / ES_PASSWORD
python scripts/temp_embed_tenant_image_urls.py
未 source 时脚本也会尝试加载项目根目录 .env。
"""
from __future__ import annotations
import json
import multiprocessing as mp
import os
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlencode
import requests
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
# 未 source activate.sh 时仍可从项目根 .env 加载(与 ES_HOST / ES_USERNAME / ES_PASSWORD 一致)
try:
from dotenv import load_dotenv
_ROOT = Path(__file__).resolve().parents[1]
load_dotenv(_ROOT / ".env")
except ImportError:
pass
# ---------------------------------------------------------------------------
# 配置(可按需修改;默认与 .env 中 ES_* 一致,见 config/loader.py)
# ---------------------------------------------------------------------------
# Elasticsearch(默认读环境变量:ES_HOST、ES_USERNAME、ES_PASSWORD)
ES_HOST: str = os.getenv("ES_HOST", "http://localhost:9200")
ES_USERNAME: Optional[str] = os.getenv("ES_USERNAME") or None
ES_PASSWORD: Optional[str] = os.getenv("ES_PASSWORD") or None
ES_INDEX: str = "search_products_tenant_163"
# 租户(keyword 字段,字符串)
TENANT_ID: str = "163"
# 图片 embedding 服务(与文档 7.1.2 一致)
EMBED_BASE_URL: str = "http://localhost:6008"
EMBED_PATH: str = "/embed/image"
EMBED_QUERY: Dict[str, Any] = {
"normalize": "true",
"priority": "1", # 与对接文档 curl 一致;批量离线可改为 "0"
}
# 并发与批量
WORKER_PROCESSES: int = 5
URLS_PER_REQUEST: int = 8
# HTTP
REQUEST_TIMEOUT_SEC: float = 120.0
# ES scan(elasticsearch-py 8+/ES 9:`scan(..., query=...)` 会展开为 `client.search(**kwargs)`,
# 必须传与 Search API 一致的参数名,例如顶层 `query` = DSL 的 query 子句,不要用裸 `match_all`。)
SCROLL_CHUNK_SIZE: int = 500
# ---------------------------------------------------------------------------
@dataclass
class BatchResult:
batch_index: int
url_count: int
ok: bool
status_code: Optional[int]
elapsed_sec: float
error: Optional[str] = None
def _build_embed_url() -> str:
q = urlencode(EMBED_QUERY)
return f"{EMBED_BASE_URL.rstrip('/')}{EMBED_PATH}?{q}"
def _process_batch(payload: Tuple[int, List[str]]) -> BatchResult:
batch_index, urls = payload
if not urls:
return BatchResult(batch_index, 0, True, None, 0.0, None)
url = _build_embed_url()
t0 = time.perf_counter()
try:
resp = requests.post(
url,
headers={"Content-Type": "application/json"},
data=json.dumps(urls),
timeout=REQUEST_TIMEOUT_SEC,
)
elapsed = time.perf_counter() - t0
ok = resp.status_code == 200
err: Optional[str] = None
if ok:
try:
body = resp.json()
if not isinstance(body, list) or len(body) != len(urls):
ok = False
err = f"response length mismatch or not list: got {type(body).__name__}"
except Exception as e:
ok = False
err = f"json decode: {e}"
else:
err = resp.text[:500] if resp.text else f"HTTP {resp.status_code}"
worker = mp.current_process().name
status = resp.status_code if resp else None
ms = elapsed * 1000.0
if ok:
print(
f"[embed] worker={worker} batch={batch_index} urls={len(urls)} "
f"http={status} elapsed_ms={ms:.2f} ok",
flush=True,
)
else:
print(
f"[embed] worker={worker} batch={batch_index} urls={len(urls)} "
f"http={status} elapsed_ms={ms:.2f} FAIL err={err}",
flush=True,
)
return BatchResult(batch_index, len(urls), ok, status, elapsed, err)
except Exception as e:
elapsed = time.perf_counter() - t0
worker = mp.current_process().name
print(
f"[embed] worker={worker} batch={batch_index} urls={len(urls)} "
f"http=None elapsed_ms={elapsed * 1000.0:.2f} FAIL err={e}",
flush=True,
)
return BatchResult(batch_index, len(urls), False, None, elapsed, str(e))
def _iter_image_urls(es: Elasticsearch) -> List[str]:
# 对应 search body: { "query": { "term": { "tenant_id": "..." } } }
search_kw: Dict[str, Any] = {
"query": {"term": {"tenant_id": TENANT_ID}},
"source_includes": ["image_url"],
}
urls: List[str] = []
for hit in scan(
es,
query=search_kw,
index=ES_INDEX,
size=SCROLL_CHUNK_SIZE,
):
src = hit.get("_source") or {}
u = src.get("image_url")
if u is None:
continue
s = str(u).strip()
if not s:
continue
urls.append(s)
return urls
def main() -> int:
t_wall0 = time.perf_counter()
auth = None
if ES_USERNAME and ES_PASSWORD:
auth = (ES_USERNAME, ES_PASSWORD)
es = Elasticsearch([ES_HOST], basic_auth=auth)
if not es.ping():
print("ERROR: Elasticsearch ping failed", file=sys.stderr)
return 1
print(
f"[main] ES={ES_HOST} basic_auth={'yes' if auth else 'no'} "
f"index={ES_INDEX} tenant_id={TENANT_ID} "
f"workers={WORKER_PROCESSES} urls_per_req={URLS_PER_REQUEST}",
flush=True,
)
print(f"[main] embed_url={_build_embed_url()}", flush=True)
t_fetch0 = time.perf_counter()
all_urls = _iter_image_urls(es)
fetch_elapsed = time.perf_counter() - t_fetch0
print(
f"[main] collected image_url count={len(all_urls)} es_scan_elapsed_sec={fetch_elapsed:.3f}",
flush=True,
)
batches: List[List[str]] = []
for i in range(0, len(all_urls), URLS_PER_REQUEST):
batches.append(all_urls[i : i + URLS_PER_REQUEST])
if not batches:
print("[main] no URLs to process; done.", flush=True)
return 0
tasks = [(idx, batch) for idx, batch in enumerate(batches)]
print(f"[main] batches={len(tasks)} (parallel processes={WORKER_PROCESSES})", flush=True)
t_run0 = time.perf_counter()
total_urls = 0
success_urls = 0
failed_urls = 0
ok_batches = 0
fail_batches = 0
sum_req_sec = 0.0
with mp.Pool(processes=WORKER_PROCESSES) as pool:
for res in pool.imap_unordered(_process_batch, tasks, chunksize=1):
total_urls += res.url_count
sum_req_sec += res.elapsed_sec
if res.ok:
ok_batches += 1
success_urls += res.url_count
else:
fail_batches += 1
failed_urls += res.url_count
wall_total = time.perf_counter() - t_wall0
run_elapsed = time.perf_counter() - t_run0
print("---------- summary ----------", flush=True)
print(f"tenant_id: {TENANT_ID}", flush=True)
print(f"total documents w/ url: {len(all_urls)}", flush=True)
print(f"total batches: {len(batches)}", flush=True)
print(f"batches succeeded: {ok_batches}", flush=True)
print(f"batches failed: {fail_batches}", flush=True)
print(f"urls (success path): {success_urls}", flush=True)
print(f"urls (failed path): {failed_urls}", flush=True)
print(f"ES scan elapsed (s): {fetch_elapsed:.3f}", flush=True)
print(f"embed phase wall (s): {run_elapsed:.3f}", flush=True)
print(f"sum request time (s): {sum_req_sec:.3f} (sequential sum, for reference)", flush=True)
print(f"total wall time (s): {wall_total:.3f}", flush=True)
print("-----------------------------", flush=True)
return 0 if fail_batches == 0 else 2
if __name__ == "__main__":
raise SystemExit(main())