be52af70
tangwang
first commit
|
1
2
3
4
5
|
"""
Elasticsearch client wrapper.
"""
from elasticsearch import Elasticsearch
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
6
|
from elasticsearch.helpers import bulk
|
be52af70
tangwang
first commit
|
7
8
|
from typing import Dict, Any, List, Optional
import os
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
9
10
11
12
13
14
15
16
17
|
import logging
# Try to import ES_CONFIG, but allow import to fail
try:
from config.env_config import ES_CONFIG
except ImportError:
ES_CONFIG = None
logger = logging.getLogger(__name__)
|
be52af70
tangwang
first commit
|
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
class ESClient:
"""Wrapper for Elasticsearch client with common operations."""
def __init__(
self,
hosts: List[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
**kwargs
):
"""
Initialize Elasticsearch client.
Args:
hosts: List of ES host URLs (default: ['http://localhost:9200'])
username: ES username (optional)
password: ES password (optional)
**kwargs: Additional ES client parameters
"""
if hosts is None:
hosts = [os.getenv('ES_HOST', 'http://localhost:9200')]
# Build client config
client_config = {
'hosts': hosts,
'timeout': 30,
'max_retries': 3,
'retry_on_timeout': True,
}
# Add authentication if provided
if username and password:
client_config['http_auth'] = (username, password)
# Merge additional kwargs
client_config.update(kwargs)
self.client = Elasticsearch(**client_config)
def ping(self) -> bool:
"""
Test connection to Elasticsearch.
Returns:
True if connected, False otherwise
"""
try:
return self.client.ping()
except Exception as e:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
69
|
logger.error(f"Failed to ping Elasticsearch: {e}", exc_info=True)
|
be52af70
tangwang
first commit
|
70
71
72
73
74
75
76
77
78
79
80
|
return False
def create_index(self, index_name: str, body: Dict[str, Any]) -> bool:
"""
Create an index.
Args:
index_name: Name of the index
body: Index configuration (settings + mappings)
Returns:
|
41e1f8df
tangwang
店匠体系数据的搜索:mock da...
|
81
|
True if successful, False otherwise
|
be52af70
tangwang
first commit
|
82
83
84
|
"""
try:
self.client.indices.create(index=index_name, body=body)
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
85
|
logger.info(f"Index '{index_name}' created successfully")
|
be52af70
tangwang
first commit
|
86
87
|
return True
except Exception as e:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
88
|
logger.error(f"Failed to create index '{index_name}': {e}", exc_info=True)
|
be52af70
tangwang
first commit
|
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
return False
def delete_index(self, index_name: str) -> bool:
"""
Delete an index.
Args:
index_name: Name of the index
Returns:
True if successful
"""
try:
if self.client.indices.exists(index=index_name):
self.client.indices.delete(index=index_name)
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
104
|
logger.info(f"Index '{index_name}' deleted successfully")
|
be52af70
tangwang
first commit
|
105
106
|
return True
else:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
107
|
logger.warning(f"Index '{index_name}' does not exist")
|
be52af70
tangwang
first commit
|
108
109
|
return False
except Exception as e:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
110
|
logger.error(f"Failed to delete index '{index_name}': {e}", exc_info=True)
|
be52af70
tangwang
first commit
|
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
return False
def index_exists(self, index_name: str) -> bool:
"""Check if index exists."""
return self.client.indices.exists(index=index_name)
def bulk_index(self, index_name: str, docs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Bulk index documents.
Args:
index_name: Name of the index
docs: List of documents to index
Returns:
Dictionary with results
"""
|
be52af70
tangwang
first commit
|
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
actions = []
for doc in docs:
action = {
'_index': index_name,
'_source': doc
}
# If document has _id field, use it
if '_id' in doc:
action['_id'] = doc['_id']
del doc['_id']
actions.append(action)
try:
success, failed = bulk(self.client, actions, raise_on_error=False)
return {
'success': success,
'failed': len(failed),
'errors': failed
}
except Exception as e:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
149
|
logger.error(f"Bulk indexing failed: {e}", exc_info=True)
|
be52af70
tangwang
first commit
|
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
return {
'success': 0,
'failed': len(docs),
'errors': [str(e)]
}
def search(
self,
index_name: str,
body: Dict[str, Any],
size: int = 10,
from_: int = 0
) -> Dict[str, Any]:
"""
Execute search query.
Args:
index_name: Name of the index
body: Search query body
size: Number of results to return
from_: Offset for pagination
Returns:
Search results
"""
|
bf89b597
tangwang
feat(search): ada...
|
175
176
177
178
179
180
181
182
183
184
185
186
|
# Safety guard: collapse is no longer needed (index is already SPU-level).
# If any caller accidentally adds a collapse clause (e.g. on product_id),
# strip it here to avoid 400 errors like:
# "no mapping found for `product_id` in order to collapse on"
if isinstance(body, dict) and "collapse" in body:
logger.warning(
"Removing unsupported 'collapse' clause from ES query body: %s",
body.get("collapse")
)
body = dict(body) # shallow copy to avoid mutating caller
body.pop("collapse", None)
|
be52af70
tangwang
first commit
|
187
188
189
190
191
192
193
194
|
try:
return self.client.search(
index=index_name,
body=body,
size=size,
from_=from_
)
except Exception as e:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
195
|
logger.error(f"Search failed: {e}", exc_info=True)
|
be52af70
tangwang
first commit
|
196
197
198
199
200
201
202
203
204
205
206
207
208
|
return {
'hits': {
'total': {'value': 0},
'hits': []
},
'error': str(e)
}
def get_mapping(self, index_name: str) -> Dict[str, Any]:
"""Get index mapping."""
try:
return self.client.indices.get_mapping(index=index_name)
except Exception as e:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
209
|
logger.error(f"Failed to get mapping for '{index_name}': {e}", exc_info=True)
|
be52af70
tangwang
first commit
|
210
211
212
213
214
215
216
217
|
return {}
def refresh(self, index_name: str) -> bool:
"""Refresh index to make documents searchable."""
try:
self.client.indices.refresh(index=index_name)
return True
except Exception as e:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
218
|
logger.error(f"Failed to refresh index '{index_name}': {e}", exc_info=True)
|
be52af70
tangwang
first commit
|
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
|
return False
def count(self, index_name: str, body: Optional[Dict[str, Any]] = None) -> int:
"""
Count documents in index.
Args:
index_name: Name of the index
body: Optional query body
Returns:
Document count
"""
try:
result = self.client.count(index=index_name, body=body)
return result['count']
except Exception as e:
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
236
|
logger.error(f"Count failed: {e}", exc_info=True)
|
be52af70
tangwang
first commit
|
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
|
return 0
def get_es_client_from_env() -> ESClient:
"""
Create ES client from environment variables.
Environment variables:
ES_HOST: Elasticsearch host URL (default: http://localhost:9200)
ES_USERNAME: Username (optional)
ES_PASSWORD: Password (optional)
Returns:
ESClient instance
"""
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
252
|
if ES_CONFIG:
|
d79810d5
tangwang
first commit
|
253
|
return ESClient(
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
254
255
256
|
hosts=[ES_CONFIG['host']],
username=ES_CONFIG.get('username'),
password=ES_CONFIG.get('password')
|
d79810d5
tangwang
first commit
|
257
|
)
|
325eec03
tangwang
1. 日志、配置基础设施,使用优化
|
258
|
else:
|
d79810d5
tangwang
first commit
|
259
260
261
262
263
264
|
# Fallback to env variables
return ESClient(
hosts=[os.getenv('ES_HOST', 'http://localhost:9200')],
username=os.getenv('ES_USERNAME'),
password=os.getenv('ES_PASSWORD')
)
|