a10a89a3
tangwang
构造测试数据用于测试分类 和 三种...
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
|
#!/usr/bin/env python3
"""
诊断脚本:检查MySQL数据源中分类和规格信息是否正确
检查:
1. category_path 字段是否有值
2. category_path 格式是否正确(应该能被解析为 category1_name)
3. shoplazza_product_option 表的 name 字段是否有值(应该是 "color", "size", "material")
4. shoplazza_product_sku 表的 option1/2/3 字段是否有值
"""
import sys
import argparse
from pathlib import Path
from sqlalchemy import create_engine, text
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from utils.db_connector import create_db_connection
def check_category_path(db_engine, tenant_id: str):
"""检查 category_path 和 category 字段"""
print("\n" + "="*60)
print("1. 检查 category_path 和 category 字段")
print("="*60)
query = text("""
SELECT
COUNT(*) as total,
COUNT(category_path) as has_category_path,
COUNT(*) - COUNT(category_path) as null_category_path,
COUNT(category) as has_category,
COUNT(*) - COUNT(category) as null_category
FROM shoplazza_product_spu
WHERE tenant_id = :tenant_id AND deleted = 0
""")
with db_engine.connect() as conn:
result = conn.execute(query, {"tenant_id": tenant_id}).fetchone()
total = result[0]
has_category_path = result[1]
null_category_path = result[2]
has_category = result[3]
null_category = result[4]
print(f"总SPU数: {total}")
print(f"有 category_path 的SPU: {has_category_path}")
print(f"category_path 为空的SPU: {null_category_path}")
print(f"有 category 的SPU: {has_category}")
print(f"category 为空的SPU: {null_category}")
# 查看category字段的示例
if has_category > 0:
sample_query = text("""
SELECT id, title, category_path, category, category_id, category_level
FROM shoplazza_product_spu
WHERE tenant_id = :tenant_id
AND deleted = 0
AND category IS NOT NULL
LIMIT 5
""")
samples = conn.execute(sample_query, {"tenant_id": tenant_id}).fetchall()
print(f"\n示例数据(前5条有 category 的记录):")
for row in samples:
print(f" SPU ID: {row[0]}, Title: {row[1][:50] if row[1] else ''}")
print(f" category_path: {row[2]}")
print(f" category: '{row[3]}'")
print(f" category_id: {row[4]}, category_level: {row[5]}")
# 解析 category 字段(用于生成 category1_name)
if row[3]:
category = str(row[3])
if '/' in category:
path_parts = category.split('/')
print(f" 解析后(按'/'分割): {path_parts}")
if len(path_parts) > 0:
print(f" → category1_name: '{path_parts[0].strip()}'")
else:
print(f" → category1_name: '{category.strip()}'(直接作为category1_name)")
else:
print("\n⚠️ 警告: 没有SPU有 category 值!")
# 查看category_path的示例(如果有)
if has_category_path > 0:
sample_query = text("""
SELECT id, title, category_path, category
FROM shoplazza_product_spu
WHERE tenant_id = :tenant_id
AND deleted = 0
AND category_path IS NOT NULL
LIMIT 3
""")
samples = conn.execute(sample_query, {"tenant_id": tenant_id}).fetchall()
print(f"\n示例数据(有 category_path 的记录):")
for row in samples:
print(f" SPU ID: {row[0]}, Title: {row[1][:50] if row[1] else ''}")
print(f" category_path: '{row[2]}'")
print(f" category: '{row[3]}'")
# 检查是否是ID列表格式
if row[2] and ',' in str(row[2]) and not '/' in str(row[2]):
print(f" ⚠️ 注意: category_path是ID列表格式(逗号分隔),不是路径格式")
def check_options(db_engine, tenant_id: str):
"""检查 option 表的 name 字段"""
print("\n" + "="*60)
print("2. 检查 shoplazza_product_option 表的 name 字段")
print("="*60)
query = text("""
SELECT
COUNT(*) as total_options,
COUNT(DISTINCT name) as distinct_names,
COUNT(DISTINCT spu_id) as spus_with_options
FROM shoplazza_product_option
WHERE tenant_id = :tenant_id AND deleted = 0
""")
with db_engine.connect() as conn:
result = conn.execute(query, {"tenant_id": tenant_id}).fetchone()
total_options = result[0]
distinct_names = result[1]
spus_with_options = result[2]
print(f"总 option 记录数: {total_options}")
print(f"不同的 name 数量: {distinct_names}")
print(f"有 option 定义的 SPU 数量: {spus_with_options}")
if total_options > 0:
# 查看不同的 name 值
name_query = text("""
SELECT DISTINCT name, position, COUNT(*) as count
FROM shoplazza_product_option
WHERE tenant_id = :tenant_id AND deleted = 0
GROUP BY name, position
ORDER BY position, name
""")
names = conn.execute(name_query, {"tenant_id": tenant_id}).fetchall()
print(f"\n不同的 name 值:")
for row in names:
print(f" position={row[1]}, name='{row[0]}', count={row[2]}")
# 查看一些示例
sample_query = text("""
SELECT spu_id, position, name, `values`
FROM shoplazza_product_option
WHERE tenant_id = :tenant_id AND deleted = 0
ORDER BY spu_id, position
LIMIT 10
""")
samples = conn.execute(sample_query, {"tenant_id": tenant_id}).fetchall()
print(f"\n示例数据(前10条 option 记录):")
for row in samples:
print(f" SPU ID: {row[0]}, position: {row[1]}, name: '{row[2]}', values: {row[3]}")
else:
print("\n⚠️ 警告: 没有 option 记录!")
def check_sku_options(db_engine, tenant_id: str):
"""检查 SKU 表的 option1/2/3 字段"""
print("\n" + "="*60)
print("3. 检查 shoplazza_product_sku 表的 option1/2/3 字段")
print("="*60)
query = text("""
SELECT
COUNT(*) as total_skus,
COUNT(option1) as has_option1,
COUNT(option2) as has_option2,
COUNT(option3) as has_option3,
COUNT(DISTINCT spu_id) as distinct_spus
FROM shoplazza_product_sku
WHERE tenant_id = :tenant_id AND deleted = 0
""")
with db_engine.connect() as conn:
result = conn.execute(query, {"tenant_id": tenant_id}).fetchone()
total_skus = result[0]
has_option1 = result[1]
has_option2 = result[2]
has_option3 = result[3]
distinct_spus = result[4]
print(f"总 SKU 数: {total_skus}")
print(f"有 option1 的 SKU: {has_option1}")
print(f"有 option2 的 SKU: {has_option2}")
print(f"有 option3 的 SKU: {has_option3}")
print(f"不同的 SPU 数量: {distinct_spus}")
if total_skus > 0:
# 查看一些示例
sample_query = text("""
SELECT spu_id, id, option1, option2, option3
FROM shoplazza_product_sku
WHERE tenant_id = :tenant_id AND deleted = 0
ORDER BY spu_id, id
LIMIT 10
""")
samples = conn.execute(sample_query, {"tenant_id": tenant_id}).fetchall()
print(f"\n示例数据(前10条 SKU 记录):")
for row in samples:
print(f" SPU ID: {row[0]}, SKU ID: {row[1]}")
print(f" option1: '{row[2]}', option2: '{row[3]}', option3: '{row[4]}'")
else:
print("\n⚠️ 警告: 没有 SKU 记录!")
def check_spu_summary(db_engine, tenant_id: str):
"""检查 SPU 汇总信息"""
print("\n" + "="*60)
print("4. SPU 汇总信息")
print("="*60)
query = text("""
SELECT
COUNT(DISTINCT spu.id) as total_spus,
COUNT(DISTINCT sku.id) as total_skus,
COUNT(DISTINCT opt.id) as total_options,
COUNT(DISTINCT CASE WHEN spu.category_path IS NOT NULL THEN spu.id END) as spus_with_category_path,
COUNT(DISTINCT opt.spu_id) as spus_with_options
FROM shoplazza_product_spu spu
LEFT JOIN shoplazza_product_sku sku ON spu.id = sku.spu_id AND sku.tenant_id = :tenant_id AND sku.deleted = 0
LEFT JOIN shoplazza_product_option opt ON spu.id = opt.spu_id AND opt.tenant_id = :tenant_id AND opt.deleted = 0
WHERE spu.tenant_id = :tenant_id AND spu.deleted = 0
""")
with db_engine.connect() as conn:
result = conn.execute(query, {"tenant_id": tenant_id}).fetchone()
total_spus = result[0]
total_skus = result[1]
total_options = result[2]
spus_with_category_path = result[3]
spus_with_options = result[4]
print(f"总 SPU 数: {total_spus}")
print(f"总 SKU 数: {total_skus}")
print(f"总 option 记录数: {total_options}")
print(f"有 category_path 的 SPU: {spus_with_category_path}")
print(f"有 option 定义的 SPU: {spus_with_options}")
def main():
parser = argparse.ArgumentParser(description='检查MySQL数据源中的分类和规格信息')
parser.add_argument('--tenant-id', required=True, help='Tenant ID')
parser.add_argument('--db-host', help='MySQL host (或使用环境变量 DB_HOST)')
parser.add_argument('--db-port', type=int, help='MySQL port (或使用环境变量 DB_PORT, 默认: 3306)')
parser.add_argument('--db-database', help='MySQL database (或使用环境变量 DB_DATABASE)')
parser.add_argument('--db-username', help='MySQL username (或使用环境变量 DB_USERNAME)')
parser.add_argument('--db-password', help='MySQL password (或使用环境变量 DB_PASSWORD)')
args = parser.parse_args()
# 连接数据库
import os
db_host = args.db_host or os.environ.get('DB_HOST')
db_port = args.db_port or int(os.environ.get('DB_PORT', 3306))
db_database = args.db_database or os.environ.get('DB_DATABASE')
db_username = args.db_username or os.environ.get('DB_USERNAME')
db_password = args.db_password or os.environ.get('DB_PASSWORD')
if not all([db_host, db_database, db_username, db_password]):
print("错误: MySQL连接参数不完整")
print("请提供 --db-host, --db-database, --db-username, --db-password")
print("或设置环境变量: DB_HOST, DB_DATABASE, DB_USERNAME, DB_PASSWORD")
return 1
print(f"连接MySQL: {db_host}:{db_port}/{db_database}")
print(f"Tenant ID: {args.tenant_id}")
try:
db_engine = create_db_connection(
host=db_host,
port=db_port,
database=db_database,
username=db_username,
password=db_password
)
print("✓ MySQL连接成功\n")
except Exception as e:
print(f"✗ 连接MySQL失败: {e}")
return 1
# 执行检查
check_spu_summary(db_engine, args.tenant_id)
check_category_path(db_engine, args.tenant_id)
check_options(db_engine, args.tenant_id)
check_sku_options(db_engine, args.tenant_id)
print("\n" + "="*60)
print("检查完成")
print("="*60)
return 0
if __name__ == '__main__':
sys.exit(main())
|