e7a2c0b7
tangwang
img encode
|
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
return {
"tenant_id": "162",
"items": [
{
"spu": {
"id": 10001,
"tenant_id": "162",
"title": "测试T恤 纯棉短袖",
"brief": "舒适纯棉,多色可选",
"description": "这是一款适合日常穿着的纯棉T恤,透气吸汗。",
"vendor": "测试品牌",
"category": "服装/上衣/T恤",
"category_id": 100,
"category_level": 2,
"category_path": "服装/上衣/T恤",
"fake_sales": 1280,
|
e7a2c0b7
tangwang
img encode
|
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
"tags": "T恤,纯棉,短袖,夏季",
"create_time": now,
"update_time": now,
},
"skus": [
{
"id": 20001,
"spu_id": 10001,
"price": 99.0,
"compare_at_price": 129.0,
"sku": "SKU-TSHIRT-001",
"inventory_quantity": 50,
"option1": "黑色",
"option2": "M",
"option3": None,
},
{
"id": 20002,
"spu_id": 10001,
"price": 99.0,
"compare_at_price": 129.0,
"sku": "SKU-TSHIRT-002",
"inventory_quantity": 30,
"option1": "白色",
"option2": "L",
"option3": None,
},
],
"options": [
{"id": 1, "position": 1, "name": "颜色"},
{"id": 2, "position": 2, "name": "尺码"},
],
}
],
}
def call_via_http(base_url: str, body: dict):
"""通过 HTTP 调用 build-docs。"""
url = f"{base_url.rstrip('/')}/indexer/build-docs"
r = requests.post(url, json=body, timeout=30)
return r.status_code, r.text, r.json() if r.headers.get("content-type", "").startswith("application/json") else None
def call_via_test_client(body: dict):
"""通过 FastAPI TestClient 调用(不依赖已启动服务,但需 DB/ES 已配置)。"""
from fastapi.testclient import TestClient
import api.indexer_app as indexer_app
with TestClient(indexer_app.app) as client:
r = client.post("/indexer/build-docs", json=body)
return r.status_code, r.text, r.json() if r.headers.get("content-type", "").startswith("application/json") else None
def main():
body = build_sample_request()
print("=" * 60)
print("【请求】POST /indexer/build-docs")
print("=" * 60)
print(json.dumps(body, ensure_ascii=False, indent=2))
base_url = os.getenv("INDEXER_URL", "http://localhost:6004")
use_http = HAS_REQUESTS and (os.getenv("USE_TEST_CLIENT", "").lower() not in ("1", "true", "yes"))
if use_http:
try:
status, raw, data = call_via_http(base_url, body)
except requests.RequestException as e:
print("\n[错误] 无法连接 Indexer 服务:", e)
print("请先启动: ./scripts/start_indexer.sh 或 uvicorn api.indexer_app:app --port 6004")
if HAS_REQUESTS:
|
e7a2c0b7
tangwang
img encode
|
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
|
sys.exit(1)
else:
if not use_http and not HAS_REQUESTS:
print("\n[提示] 未安装 requests,使用 TestClient 调用(需配置 DB/ES)。")
else:
print("\n[提示] 使用 TestClient 调用(USE_TEST_CLIENT=1)。")
try:
status, raw, data = call_via_test_client(body)
except Exception as e:
print("\n[错误] TestClient 调用失败:", e)
print("请确保已 source activate.sh 且 DB/ES 环境变量正确,或先启动 Indexer 再用 HTTP 调用。")
sys.exit(1)
print("\n" + "=" * 60)
print("【响应】HTTP status =", status)
print("=" * 60)
if data is not None:
print(json.dumps(data, ensure_ascii=False, indent=2, default=str))
if data.get("docs"):
doc = data["docs"][0]
print("\n" + "=" * 60)
print("【返回 doc 顶层字段】共 {} 个".format(len(doc)))
print("=" * 60)
for k in sorted(doc.keys()):
print(" ", k)
else:
print(raw)
if status != 200:
sys.exit(1)
if __name__ == "__main__":
main()
|