incremental_service.py
8.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
"""
增量数据获取服务。
提供单个SPU的数据获取接口,用于增量更新ES索引。
公共数据(分类映射、配置等)在服务启动时预加载,以提高性能。
"""
import pandas as pd
import numpy as np
import logging
from typing import Dict, Any, Optional
from sqlalchemy import text
from config import ConfigLoader
from config.tenant_config_loader import get_tenant_config_loader
from indexer.document_transformer import SPUDocumentTransformer
# Configure logger
logger = logging.getLogger(__name__)
class IncrementalIndexerService:
"""增量索引服务,提供单个SPU数据获取功能。"""
def __init__(self, db_engine: Any):
"""
初始化增量索引服务。
Args:
db_engine: SQLAlchemy database engine
"""
self.db_engine = db_engine
# 预加载分类映射(全局,所有租户共享)
self.category_id_to_name = self._load_category_mapping()
logger.info(f"Preloaded {len(self.category_id_to_name)} category mappings")
# 租户配置加载器(延迟加载,按需获取租户配置)
self.tenant_config_loader = get_tenant_config_loader()
def _load_category_mapping(self) -> Dict[str, str]:
"""
加载分类ID到名称的映射(全局,所有租户共享)。
Returns:
Dictionary mapping category_id to category_name
"""
query = text("""
SELECT DISTINCT
category_id,
category
FROM shoplazza_product_spu
WHERE deleted = 0 AND category_id IS NOT NULL
""")
mapping = {}
try:
with self.db_engine.connect() as conn:
result = conn.execute(query)
for row in result:
category_id = str(int(row.category_id))
category_name = row.category
if not category_name or not category_name.strip():
logger.warning(f"Category ID {category_id} has empty name, skipping")
continue
mapping[category_id] = category_name
except Exception as e:
logger.error(f"Failed to load category mapping: {e}", exc_info=True)
return mapping
def get_spu_document(self, tenant_id: str, spu_id: str) -> Optional[Dict[str, Any]]:
"""
获取单个SPU的ES文档数据。
Args:
tenant_id: 租户ID
spu_id: SPU ID
Returns:
ES文档字典,如果SPU不存在或已删除则返回None
"""
try:
# 加载SPU数据
spu_row = self._load_single_spu(tenant_id, spu_id)
if spu_row is None:
logger.warning(f"SPU {spu_id} not found for tenant_id={tenant_id}")
return None
# 加载SKU数据
skus_df = self._load_skus_for_spu(tenant_id, spu_id)
# 加载Option数据
options_df = self._load_options_for_spu(tenant_id, spu_id)
# 获取租户配置
tenant_config = self.tenant_config_loader.get_tenant_config(tenant_id)
# 加载搜索配置
translator = None
translation_prompts = {}
searchable_option_dimensions = ['option1', 'option2', 'option3']
try:
config_loader = ConfigLoader()
config = config_loader.load_config()
searchable_option_dimensions = config.spu_config.searchable_option_dimensions
# Initialize translator if translation is enabled
if config.query_config.enable_translation:
from query.translator import Translator
translator = Translator(
api_key=config.query_config.translation_api_key,
use_cache=True, # 索引时使用缓存避免重复翻译
glossary_id=config.query_config.translation_glossary_id,
translation_context=config.query_config.translation_context
)
translation_prompts = config.query_config.translation_prompts
except Exception as e:
logger.warning(f"Failed to load config, using default: {e}")
# 创建文档转换器
transformer = SPUDocumentTransformer(
category_id_to_name=self.category_id_to_name,
searchable_option_dimensions=searchable_option_dimensions,
tenant_config=tenant_config,
translator=translator,
translation_prompts=translation_prompts
)
# 转换为ES文档
doc = transformer.transform_spu_to_doc(
tenant_id=tenant_id,
spu_row=spu_row,
skus=skus_df,
options=options_df
)
if doc is None:
logger.warning(f"Failed to transform SPU {spu_id} for tenant_id={tenant_id}")
return None
return doc
except Exception as e:
logger.error(f"Error getting SPU document for tenant_id={tenant_id}, spu_id={spu_id}: {e}", exc_info=True)
raise
def _load_single_spu(self, tenant_id: str, spu_id: str) -> Optional[pd.Series]:
"""
加载单个SPU数据。
Args:
tenant_id: 租户ID
spu_id: SPU ID
Returns:
SPU行数据,如果不存在则返回None
"""
query = text("""
SELECT
id, shop_id, shoplazza_id, title, brief, description,
spu, vendor, vendor_url,
image_src, image_width, image_height, image_path, image_alt,
tags, note, category, category_id, category_google_id,
category_level, category_path,
fake_sales, display_fake_sales,
tenant_id, creator, create_time, updater, update_time, deleted
FROM shoplazza_product_spu
WHERE tenant_id = :tenant_id AND id = :spu_id AND deleted = 0
LIMIT 1
""")
with self.db_engine.connect() as conn:
df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
if df.empty:
return None
return df.iloc[0]
def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
"""
加载指定SPU的所有SKU数据。
Args:
tenant_id: 租户ID
spu_id: SPU ID
Returns:
SKU数据DataFrame
"""
query = text("""
SELECT
id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
shoplazza_image_id, title, sku, barcode, position,
price, compare_at_price, cost_price,
option1, option2, option3,
inventory_quantity, weight, weight_unit, image_src,
wholesale_price, note, extend,
shoplazza_created_at, shoplazza_updated_at, tenant_id,
creator, create_time, updater, update_time, deleted
FROM shoplazza_product_sku
WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
""")
with self.db_engine.connect() as conn:
df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
return df
def _load_options_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
"""
加载指定SPU的所有Option数据。
Args:
tenant_id: 租户ID
spu_id: SPU ID
Returns:
Option数据DataFrame
"""
query = text("""
SELECT
id, spu_id, shop_id, shoplazza_id, shoplazza_product_id,
position, name, `values`, tenant_id,
creator, create_time, updater, update_time, deleted
FROM shoplazza_product_option
WHERE tenant_id = :tenant_id AND spu_id = :spu_id AND deleted = 0
ORDER BY position
""")
with self.db_engine.connect() as conn:
df = pd.read_sql(query, conn, params={"tenant_id": tenant_id, "spu_id": spu_id})
return df