3c1f8031
tangwang
api/routes/indexe...
|
1
|
"""增量数据获取服务"""
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
2
3
|
import pandas as pd
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
4
5
6
|
import logging
from typing import Dict, Any, Optional
from sqlalchemy import text
|
3c1f8031
tangwang
api/routes/indexe...
|
7
|
from indexer.indexing_utils import load_category_mapping, create_document_transformer
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
8
9
10
11
12
13
|
# Configure logger
logger = logging.getLogger(__name__)
class IncrementalIndexerService:
|
3c1f8031
tangwang
api/routes/indexe...
|
14
|
"""增量索引服务,提供SPU数据获取功能。"""
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
15
16
|
def __init__(self, db_engine: Any):
|
3c1f8031
tangwang
api/routes/indexe...
|
17
|
"""初始化增量索引服务"""
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
18
19
20
|
self.db_engine = db_engine
# 预加载分类映射(全局,所有租户共享)
|
3c1f8031
tangwang
api/routes/indexe...
|
21
|
self.category_id_to_name = load_category_mapping(db_engine)
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
22
|
logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings")
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
23
24
|
def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]:
|
3c1f8031
tangwang
api/routes/indexe...
|
25
|
"""获取SPU的ES文档数据"""
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
26
27
28
29
30
31
32
33
34
35
36
37
38
|
try:
# 加载SPU数据
spu_row = self._load_single_spu(tenant_id, spu_id)
if spu_row is None:
logger.warning(f"SPU {spu_id} not found for tenant_id={tenant_id}")
return None
# 加载SKU数据
skus_df = self._load_skus_for_spu(tenant_id, spu_id)
# 加载Option数据
options_df = self._load_options_for_spu(tenant_id, spu_id)
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
39
|
# 创建文档转换器
|
3c1f8031
tangwang
api/routes/indexe...
|
40
|
transformer = create_document_transformer(
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
41
|
category_id_to_name=self.category_id_to_name,
|
3c1f8031
tangwang
api/routes/indexe...
|
42
|
tenant_id=tenant_id
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
)
# 转换为ES文档
doc = transformer.transform_spu_to_doc(
tenant_id=tenant_id,
spu_row=spu_row,
skus=skus_df,
options=options_df
)
if doc is None:
logger.warning(f"Failed to transform SPU {spu_id} for tenant_id={tenant_id}")
return None
return doc
except Exception as e:
logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True)
raise
def _load_single_spu(self, tenant_id: str, spu_id: str) -> Optional[pd.Series]:
|
3c1f8031
tangwang
api/routes/indexe...
|
64
|
"""加载单个SPU数据"""
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
query = text("""
SELECT
id, shop_id, shoplazza_id, title, brief, description,
spu, vendor, vendor_url,
image_src, image_width, image_height, image_path, image_alt,
tags, note, category, category_id, category_google_id,
category_level, category_path,
fake_sales, display_fake_sales,
tenant_id, creator, create_time, updater, update_time, deleted
FROM shoplazza_product_spu
WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0
LIMIT 1
""")
with self.db_engine.connect() as conn:
df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
if df.empty:
return None
return df.iloc[0]
def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
|
3c1f8031
tangwang
api/routes/indexe...
|
88
|
"""加载指定SPU的所有SKU数据"""
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
query = text("""
SELECT
id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
shoplazza_image_id, title, sku, barcode, position,
price, compare_at_price, cost_price,
option1, option2, option3,
inventory_quantity, weight, weight_unit, image_src,
wholesale_price, note, extend,
shoplazza_created_at, shoplazza_updated_at, tenant_id,
creator, create_time, updater, update_time, deleted
FROM shoplazza_product_sku
WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
""")
with self.db_engine.connect() as conn:
df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
return df
def _load_options_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
|
3c1f8031
tangwang
api/routes/indexe...
|
109
|
"""加载指定SPU的所有Option数据"""
|
0064e946
tangwang
feat: 增量索引服务、租户配置...
|
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
query = text("""
SELECT
id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
position, name, `values`, tenant_id,
creator, create_time, updater, update_time, deleted
FROM shoplazza_product_option
WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
ORDER BY position
""")
with self.db_engine.connect() as conn:
df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
return df
|