Commit c797ba2b655647f0ab5333f5ccb1442e37ed7055

Authored by tangwang
1 parent f54b3854

1. 增量索引接口,增加删除操作后,响应接口的调整

因为请求改成了两个list,

响应也是对应的两个list,一个是spu_ids对应的响应的list,每个id对应的有处理结果 indexed、deleted、failed,如果是failed会带msg。

delete_spu_ids也是对应一个list,对应的结果又deleted / failed。

2. API文档对应修改
api/routes/indexer.py
@@ -73,6 +73,12 @@ async def index_spus(request: IndexSpusRequest): @@ -73,6 +73,12 @@ async def index_spus(request: IndexSpusRequest):
73 - 数据库是唯一真实来源(Single Source of Truth) 73 - 数据库是唯一真实来源(Single Source of Truth)
74 - 自动检测:查询数据库时发现deleted=1,自动从ES删除 74 - 自动检测:查询数据库时发现deleted=1,自动从ES删除
75 - 显式删除:调用方明确知道哪些SPU要删除,直接删除(适用于批量删除场景) 75 - 显式删除:调用方明确知道哪些SPU要删除,直接删除(适用于批量删除场景)
  76 +
  77 + 响应格式:
  78 + - spu_ids: spu_ids对应的响应列表,每个元素包含spu_id和status(indexed/deleted/failed)
  79 + - delete_spu_ids: delete_spu_ids对应的响应列表,每个元素包含spu_id和status(deleted/not_found/failed)
  80 + - failed状态的元素会包含msg字段说明失败原因
  81 + - 最后给出总体统计:total, success_count, failed_count等
76 """ 82 """
77 try: 83 try:
78 from ..app import get_incremental_service, get_es_client 84 from ..app import get_incremental_service, get_es_client
docs/搜索API对接指南.md
@@ -131,7 +131,7 @@ curl -X POST "http://120.76.41.98:6002/search/" \ @@ -131,7 +131,7 @@ curl -X POST "http://120.76.41.98:6002/search/" \
131 | 即时搜索 | GET | `/search/instant` | 边输入边搜索(框架) ⚠️ TODO | 131 | 即时搜索 | GET | `/search/instant` | 边输入边搜索(框架) ⚠️ TODO |
132 | 获取文档 | GET | `/search/{doc_id}` | 获取单个文档 | 132 | 获取文档 | GET | `/search/{doc_id}` | 获取单个文档 |
133 | 全量重建索引 | POST | `/indexer/reindex` | 全量重建索引接口 | 133 | 全量重建索引 | POST | `/indexer/reindex` | 全量重建索引接口 |
134 -| 增量索引 | POST | `/indexer/index` | 增量索引接口(指定SPU ID列表进行索引) | 134 +| 增量索引 | POST | `/indexer/index` | 增量索引接口(指定SPU ID列表进行索引,支持自动检测删除和显式删除) |
135 | 查询文档 | POST | `/indexer/documents` | 查询SPU文档数据(不写入ES) | 135 | 查询文档 | POST | `/indexer/documents` | 查询SPU文档数据(不写入ES) |
136 | 索引健康检查 | GET | `/indexer/health` | 检查索引服务状态 | 136 | 索引健康检查 | GET | `/indexer/health` | 检查索引服务状态 |
137 | 健康检查 | GET | `/admin/health` | 服务健康检查 | 137 | 健康检查 | GET | `/admin/health` | 服务健康检查 |
@@ -918,54 +918,107 @@ cat logs/indexer.log | jq 'select(.operation == "request_complete") | {timestamp @@ -918,54 +918,107 @@ cat logs/indexer.log | jq 'select(.operation == "request_complete") | {timestamp
918 - **端点**: `POST /indexer/index` 918 - **端点**: `POST /indexer/index`
919 - **描述**: 增量索引接口,根据指定的SPU ID列表进行索引,直接将数据写入ES。用于增量更新指定商品。 919 - **描述**: 增量索引接口,根据指定的SPU ID列表进行索引,直接将数据写入ES。用于增量更新指定商品。
920 920
  921 +**支持两种删除方式**:
  922 +1. **自动检测删除**:如果SPU在数据库中被标记为`deleted=1`,自动从ES中删除对应文档
  923 +2. **显式删除**:通过`delete_spu_ids`参数显式指定要删除的SPU(无论数据库状态如何)
  924 +
  925 +**删除策略说明**:
  926 +- 数据库是唯一真实来源(Single Source of Truth)
  927 +- 自动检测:查询数据库时发现`deleted=1`,自动从ES删除
  928 +- 显式删除:调用方明确知道哪些SPU要删除,直接删除(适用于批量删除场景)
  929 +
921 #### 请求参数 930 #### 请求参数
922 931
923 ```json 932 ```json
924 { 933 {
925 "tenant_id": "162", 934 "tenant_id": "162",
926 - "spu_ids": ["123", "456", "789"] 935 + "spu_ids": ["123", "456", "789"],
  936 + "delete_spu_ids": ["100", "101"]
927 } 937 }
928 ``` 938 ```
929 939
930 | 参数 | 类型 | 必填 | 说明 | 940 | 参数 | 类型 | 必填 | 说明 |
931 |------|------|------|------| 941 |------|------|------|------|
932 | `tenant_id` | string | Y | 租户ID | 942 | `tenant_id` | string | Y | 租户ID |
933 -| `spu_ids` | array[string] | Y | SPU ID列表(1-100个) | 943 +| `spu_ids` | array[string] | N | SPU ID列表(1-100个),要索引的SPU。如果为空,则只执行删除操作 |
  944 +| `delete_spu_ids` | array[string] | N | 显式指定要删除的SPU ID列表(1-100个),可选。无论数据库状态如何,都会从ES中删除这些SPU |
  945 +
  946 +**注意**:
  947 +- `spu_ids` 和 `delete_spu_ids` 不能同时为空
  948 +- 每个列表最多支持100个SPU ID
  949 +- 如果SPU在`spu_ids`中且数据库`deleted=1`,会自动从ES删除(自动检测删除)
934 950
935 #### 响应格式 951 #### 响应格式
936 952
937 ```json 953 ```json
938 { 954 {
939 - "success": [ 955 + "spu_ids": [
940 { 956 {
941 "spu_id": "123", 957 "spu_id": "123",
942 - "document": {  
943 - "tenant_id": "162",  
944 - "spu_id": "123",  
945 - "title_zh": "商品标题",  
946 - ...  
947 - } 958 + "status": "indexed"
948 }, 959 },
949 { 960 {
950 "spu_id": "456", 961 "spu_id": "456",
951 - "document": {...} 962 + "status": "deleted"
  963 + },
  964 + {
  965 + "spu_id": "789",
  966 + "status": "failed",
  967 + "msg": "SPU not found (unexpected)"
952 } 968 }
953 ], 969 ],
954 - "failed": [ 970 + "delete_spu_ids": [
955 { 971 {
956 - "spu_id": "789",  
957 - "error": "SPU not found or deleted" 972 + "spu_id": "100",
  973 + "status": "deleted"
  974 + },
  975 + {
  976 + "spu_id": "101",
  977 + "status": "not_found"
  978 + },
  979 + {
  980 + "spu_id": "102",
  981 + "status": "failed",
  982 + "msg": "Failed to delete from ES: Connection timeout"
958 } 983 }
959 ], 984 ],
960 - "total": 3,  
961 - "success_count": 2,  
962 - "failed_count": 1 985 + "total": 6,
  986 + "success_count": 4,
  987 + "failed_count": 2,
  988 + "elapsed_time": 1.23,
  989 + "index_name": "search_products",
  990 + "tenant_id": "162"
963 } 991 }
964 ``` 992 ```
965 993
  994 +| 字段 | 类型 | 说明 |
  995 +|------|------|------|
  996 +| `spu_ids` | array | spu_ids对应的响应列表,每个元素包含 `spu_id` 和 `status` |
  997 +| `spu_ids[].status` | string | 状态:`indexed`(已索引)、`deleted`(已删除,自动检测)、`failed`(失败) |
  998 +| `spu_ids[].msg` | string | 当status为`failed`时,包含失败原因(可选) |
  999 +| `delete_spu_ids` | array | delete_spu_ids对应的响应列表,每个元素包含 `spu_id` 和 `status` |
  1000 +| `delete_spu_ids[].status` | string | 状态:`deleted`(已删除)、`not_found`(ES中不存在)、`failed`(失败) |
  1001 +| `delete_spu_ids[].msg` | string | 当status为`failed`时,包含失败原因(可选) |
  1002 +| `total` | integer | 总处理数量(spu_ids数量 + delete_spu_ids数量) |
  1003 +| `success_count` | integer | 成功数量(indexed + deleted + not_found) |
  1004 +| `failed_count` | integer | 失败数量 |
  1005 +| `elapsed_time` | float | 耗时(秒) |
  1006 +| `index_name` | string | 索引名称 |
  1007 +| `tenant_id` | string | 租户ID |
  1008 +
  1009 +**状态说明**:
  1010 +- `spu_ids` 的状态:
  1011 + - `indexed`: SPU已成功索引到ES
  1012 + - `deleted`: SPU在数据库中被标记为deleted=1,已从ES删除(自动检测)
  1013 + - `failed`: 处理失败,会包含`msg`字段说明失败原因
  1014 +- `delete_spu_ids` 的状态:
  1015 + - `deleted`: SPU已从ES成功删除
  1016 + - `not_found`: SPU在ES中不存在(也算成功,可能已经被删除过)
  1017 + - `failed`: 删除失败,会包含`msg`字段说明失败原因
  1018 +
966 #### 请求示例 1019 #### 请求示例
967 1020
968 -**SPU增量索引**: 1021 +**示例1:普通增量索引(自动检测删除)**:
969 ```bash 1022 ```bash
970 curl -X POST "http://localhost:6002/indexer/index" \ 1023 curl -X POST "http://localhost:6002/indexer/index" \
971 -H "Content-Type: application/json" \ 1024 -H "Content-Type: application/json" \
@@ -974,18 +1027,81 @@ curl -X POST "http://localhost:6002/indexer/index" \ @@ -974,18 +1027,81 @@ curl -X POST "http://localhost:6002/indexer/index" \
974 "spu_ids": ["123", "456", "789"] 1027 "spu_ids": ["123", "456", "789"]
975 }' 1028 }'
976 ``` 1029 ```
  1030 +说明:如果SPU 456在数据库中`deleted=1`,会自动从ES删除,并出现在`deleted.auto`列表中。
  1031 +
  1032 +**示例2:显式删除(批量删除)**:
  1033 +```bash
  1034 +curl -X POST "http://localhost:6002/indexer/index" \
  1035 + -H "Content-Type: application/json" \
  1036 + -d '{
  1037 + "tenant_id": "162",
  1038 + "spu_ids": ["123", "456"],
  1039 + "delete_spu_ids": ["100", "101", "102"]
  1040 + }'
  1041 +```
  1042 +说明:SPU 100、101、102会被显式删除,无论数据库状态如何。
  1043 +
  1044 +**示例3:仅删除(不索引)**:
  1045 +```bash
  1046 +curl -X POST "http://localhost:6002/indexer/index" \
  1047 + -H "Content-Type: application/json" \
  1048 + -d '{
  1049 + "tenant_id": "162",
  1050 + "spu_ids": [],
  1051 + "delete_spu_ids": ["100", "101"]
  1052 + }'
  1053 +```
  1054 +说明:只执行删除操作,不进行索引。
  1055 +
  1056 +**示例4:混合操作(索引+删除)**:
  1057 +```bash
  1058 +curl -X POST "http://localhost:6002/indexer/index" \
  1059 + -H "Content-Type: application/json" \
  1060 + -d '{
  1061 + "tenant_id": "162",
  1062 + "spu_ids": ["123", "456", "789"],
  1063 + "delete_spu_ids": ["100", "101"]
  1064 + }'
  1065 +```
  1066 +说明:同时执行索引和删除操作。
  1067 +
  1068 +#### 删除策略最佳实践
  1069 +
  1070 +1. **优先使用自动检测删除**(推荐)
  1071 + - 数据库是唯一真实来源,保证数据一致性
  1072 + - 调用方无需额外处理,系统自动检测并删除
  1073 + - 适用于常规的增量更新场景
  1074 +
  1075 +2. **显式删除用于特殊场景**
  1076 + - 批量删除操作
  1077 + - 需要立即删除(不等待数据库更新)
  1078 + - 特殊业务逻辑需要
  1079 +
  1080 +3. **监控删除统计**
  1081 + - 通过返回结果中的`deleted`字段监控删除操作
  1082 + - 区分显式删除和自动删除,便于问题排查
977 1083
978 #### 日志说明 1084 #### 日志说明
979 1085
980 增量索引操作的所有关键信息都会记录到 `logs/indexer.log` 文件中(JSON格式),包括: 1086 增量索引操作的所有关键信息都会记录到 `logs/indexer.log` 文件中(JSON格式),包括:
981 - 请求开始和结束时间 1087 - 请求开始和结束时间
982 -- 每个SPU的处理状态(获取、转换、索引 1088 +- 每个SPU的处理状态(获取、转换、索引、删除
983 - ES批量写入结果 1089 - ES批量写入结果
984 - 成功/失败统计 1090 - 成功/失败统计
  1091 +- 删除统计(显式删除和自动删除数量)
985 - 详细的错误信息 1092 - 详细的错误信息
986 1093
987 日志查询方式请参考[5.1节查看索引日志](#51-全量重建索引接口)部分。 1094 日志查询方式请参考[5.1节查看索引日志](#51-全量重建索引接口)部分。
988 1095
  1096 +**删除相关日志示例**:
  1097 +```bash
  1098 +# 查看删除操作日志
  1099 +cat logs/indexer.log | jq 'select(.status == "deleted" or .status == "auto_deleted")'
  1100 +
  1101 +# 查看包含删除统计的请求完成日志
  1102 +cat logs/indexer.log | jq 'select(.operation == "request_complete" and .deleted_count > 0)'
  1103 +```
  1104 +
989 ### 5.3 查询文档接口 1105 ### 5.3 查询文档接口
990 1106
991 - **端点**: `POST /indexer/documents` 1107 - **端点**: `POST /indexer/documents`
indexer/incremental_service.py
@@ -141,93 +141,46 @@ class IncrementalIndexerService: @@ -141,93 +141,46 @@ class IncrementalIndexerService:
141 return deleted == b'\x01' or deleted == 1 141 return deleted == b'\x01' or deleted == 1
142 return bool(deleted) 142 return bool(deleted)
143 143
144 - def delete_spus_from_es( 144 + def _delete_spu_from_es(
145 self, 145 self,
146 es_client, 146 es_client,
147 tenant_id: str, 147 tenant_id: str,
148 - spu_ids: List[str],  
149 - index_name: str = DEFAULT_INDEX_NAME 148 + spu_id: str,
  149 + index_name: str,
  150 + log_prefix: str = ""
150 ) -> Dict[str, Any]: 151 ) -> Dict[str, Any]:
151 """ 152 """
152 - 从ES中批量删除SPU文档  
153 -  
154 - Args:  
155 - es_client: Elasticsearch客户端  
156 - tenant_id: 租户ID  
157 - spu_ids: 要删除的SPU ID列表  
158 - index_name: 索引名称 153 + 从ES中删除单个SPU文档(通用方法)
159 154
160 Returns: 155 Returns:
161 - 包含删除结果的字典 156 + {"status": "deleted|not_found|failed", "msg": "错误信息(可选)"}
162 """ 157 """
163 - if not spu_ids:  
164 - return {  
165 - "deleted": [],  
166 - "not_found": [],  
167 - "failed": [],  
168 - "total": 0,  
169 - "deleted_count": 0,  
170 - "not_found_count": 0,  
171 - "failed_count": 0  
172 - }  
173 -  
174 - deleted_list = []  
175 - not_found_list = []  
176 - failed_list = []  
177 -  
178 - logger.info(f"[IncrementalDeletion] Starting deletion for tenant_id={tenant_id}, spu_count={len(spu_ids)}")  
179 -  
180 - for spu_id in spu_ids:  
181 - try:  
182 - # 使用ES的delete API删除文档  
183 - # ES文档ID格式:通常是spu_id,但需要确认实际使用的ID格式  
184 - # 根据index_spus_to_es方法,使用的是spu_id作为文档ID  
185 - try:  
186 - response = es_client.client.delete(  
187 - index=index_name,  
188 - id=str(spu_id),  
189 - ignore=[404] # 忽略文档不存在的错误  
190 - )  
191 -  
192 - if response.get('result') == 'deleted':  
193 - deleted_list.append({"spu_id": spu_id, "status": "deleted"})  
194 - log_spu_processing(indexer_logger, tenant_id, spu_id, 'deleted')  
195 - elif response.get('result') == 'not_found':  
196 - not_found_list.append({"spu_id": spu_id, "status": "not_found"})  
197 - logger.debug(f"[IncrementalDeletion] SPU {spu_id} not found in ES")  
198 - else:  
199 - failed_list.append({"spu_id": spu_id, "error": f"Unexpected result: {response.get('result')}"})  
200 -  
201 - except Exception as e:  
202 - # 处理404错误(文档不存在)  
203 - if hasattr(e, 'status_code') and e.status_code == 404:  
204 - not_found_list.append({"spu_id": spu_id, "status": "not_found"})  
205 - else:  
206 - error_msg = str(e)  
207 - logger.error(f"[IncrementalDeletion] Error deleting SPU {spu_id}: {e}", exc_info=True)  
208 - failed_list.append({"spu_id": spu_id, "error": error_msg})  
209 - log_spu_processing(indexer_logger, tenant_id, spu_id, 'delete_failed', error_msg)  
210 -  
211 - except Exception as e:  
212 - error_msg = str(e)  
213 - logger.error(f"[IncrementalDeletion] Unexpected error deleting SPU {spu_id}: {e}", exc_info=True)  
214 - failed_list.append({"spu_id": spu_id, "error": error_msg})  
215 -  
216 - logger.info(  
217 - f"[IncrementalDeletion] Completed for tenant_id={tenant_id}: "  
218 - f"total={len(spu_ids)}, deleted={len(deleted_list)}, "  
219 - f"not_found={len(not_found_list)}, failed={len(failed_list)}"  
220 - )  
221 -  
222 - return {  
223 - "deleted": deleted_list,  
224 - "not_found": not_found_list,  
225 - "failed": failed_list,  
226 - "total": len(spu_ids),  
227 - "deleted_count": len(deleted_list),  
228 - "not_found_count": len(not_found_list),  
229 - "failed_count": len(failed_list)  
230 - } 158 + try:
  159 + response = es_client.client.delete(
  160 + index=index_name,
  161 + id=str(spu_id),
  162 + ignore=[404]
  163 + )
  164 +
  165 + result = response.get('result')
  166 + if result == 'deleted':
  167 + log_spu_processing(indexer_logger, tenant_id, spu_id, 'deleted', log_prefix)
  168 + return {"status": "deleted"}
  169 + elif result == 'not_found':
  170 + return {"status": "not_found"}
  171 + else:
  172 + msg = f"Unexpected result: {result}"
  173 + log_spu_processing(indexer_logger, tenant_id, spu_id, 'delete_failed', msg)
  174 + return {"status": "failed", "msg": msg}
  175 +
  176 + except Exception as e:
  177 + if hasattr(e, 'status_code') and e.status_code == 404:
  178 + return {"status": "not_found"}
  179 + else:
  180 + msg = str(e)
  181 + logger.error(f"[IncrementalDeletion] Error deleting SPU {spu_id}: {e}", exc_info=True)
  182 + log_spu_processing(indexer_logger, tenant_id, spu_id, 'delete_failed', msg)
  183 + return {"status": "failed", "msg": msg}
231 184
232 def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame: 185 def _load_skus_for_spu(self, tenant_id: str, spu_id: str) -> pd.DataFrame:
233 """加载指定SPU的所有SKU数据""" 186 """加载指定SPU的所有SKU数据"""
@@ -296,11 +249,14 @@ class IncrementalIndexerService: @@ -296,11 +249,14 @@ class IncrementalIndexerService:
296 """ 249 """
297 start_time = time.time() 250 start_time = time.time()
298 total_count = len(spu_ids) 251 total_count = len(spu_ids)
299 - success_list = []  
300 - failed_list = [] 252 + delete_count = len(delete_spu_ids) if delete_spu_ids else 0
  253 +
  254 + # spu_ids 对应的响应列表(状态:indexed, deleted, failed)
  255 + spu_results = []
  256 + # delete_spu_ids 对应的响应列表(状态:deleted, not_found, failed)
  257 + delete_results = []
  258 +
301 documents = [] 259 documents = []
302 - deleted_list = []  
303 - auto_deleted_list = []  
304 260
305 # 记录请求开始 261 # 记录请求开始
306 log_index_request( 262 log_index_request(
@@ -309,7 +265,7 @@ class IncrementalIndexerService: @@ -309,7 +265,7 @@ class IncrementalIndexerService:
309 tenant_id=tenant_id, 265 tenant_id=tenant_id,
310 request_params={ 266 request_params={
311 'spu_count': total_count, 267 'spu_count': total_count,
312 - 'delete_count': len(delete_spu_ids) if delete_spu_ids else 0, 268 + 'delete_count': delete_count,
313 'index_name': index_name, 269 'index_name': index_name,
314 'batch_size': batch_size 270 'batch_size': batch_size
315 } 271 }
@@ -317,58 +273,33 @@ class IncrementalIndexerService: @@ -317,58 +273,33 @@ class IncrementalIndexerService:
317 273
318 logger.info( 274 logger.info(
319 f"[IncrementalIndexing] Starting bulk index for tenant_id={tenant_id}, " 275 f"[IncrementalIndexing] Starting bulk index for tenant_id={tenant_id}, "
320 - f"spu_count={total_count}, delete_count={len(delete_spu_ids) if delete_spu_ids else 0}" 276 + f"spu_count={total_count}, delete_count={delete_count}"
321 ) 277 )
322 278
323 - # 步骤0: 处理显式删除请求 279 + # 步骤0: 处理显式删除请求(delete_spu_ids)
324 if delete_spu_ids: 280 if delete_spu_ids:
325 logger.info(f"[IncrementalIndexing] Processing explicit deletions: {len(delete_spu_ids)} SPUs") 281 logger.info(f"[IncrementalIndexing] Processing explicit deletions: {len(delete_spu_ids)} SPUs")
326 - delete_result = self.delete_spus_from_es(  
327 - es_client=es_client,  
328 - tenant_id=tenant_id,  
329 - spu_ids=delete_spu_ids,  
330 - index_name=index_name  
331 - )  
332 - deleted_list = delete_result.get('deleted', [])  
333 - logger.info(f"[IncrementalIndexing] Explicitly deleted {len(deleted_list)} SPUs from ES") 282 + for spu_id in delete_spu_ids:
  283 + result = self._delete_spu_from_es(es_client, tenant_id, spu_id, index_name, "explicit")
  284 + delete_results.append({"spu_id": spu_id, **result})
334 285
335 - # 步骤1: 获取所有SPU文档,并自动检测删除 286 + # 步骤1: 处理索引请求(spu_ids),并自动检测删除
336 for spu_id in spu_ids: 287 for spu_id in spu_ids:
337 try: 288 try:
338 log_spu_processing(indexer_logger, tenant_id, spu_id, 'fetching') 289 log_spu_processing(indexer_logger, tenant_id, spu_id, 'fetching')
339 290
340 # 先检查SPU是否在数据库中被标记为删除 291 # 先检查SPU是否在数据库中被标记为删除
341 - is_deleted = self.check_spu_deleted(tenant_id, spu_id)  
342 - if is_deleted: 292 + if self.check_spu_deleted(tenant_id, spu_id):
343 # SPU已删除,从ES中删除对应文档 293 # SPU已删除,从ES中删除对应文档
344 logger.info(f"[IncrementalIndexing] SPU {spu_id} is deleted in DB, removing from ES") 294 logger.info(f"[IncrementalIndexing] SPU {spu_id} is deleted in DB, removing from ES")
345 - try:  
346 - response = es_client.client.delete(  
347 - index=index_name,  
348 - id=str(spu_id),  
349 - ignore=[404]  
350 - )  
351 - if response.get('result') == 'deleted':  
352 - auto_deleted_list.append({  
353 - "spu_id": spu_id,  
354 - "status": "auto_deleted",  
355 - "reason": "deleted in database"  
356 - })  
357 - log_spu_processing(indexer_logger, tenant_id, spu_id, 'auto_deleted', "deleted in database")  
358 - elif response.get('result') == 'not_found':  
359 - # ES中不存在,也算成功(可能已经被删除过了)  
360 - auto_deleted_list.append({  
361 - "spu_id": spu_id,  
362 - "status": "auto_deleted",  
363 - "reason": "deleted in database (not found in ES)"  
364 - })  
365 - except Exception as e:  
366 - error_msg = f"Failed to delete from ES: {str(e)}"  
367 - logger.error(f"[IncrementalIndexing] Error deleting SPU {spu_id} from ES: {e}", exc_info=True)  
368 - failed_list.append({  
369 - "spu_id": spu_id,  
370 - "error": error_msg  
371 - }) 295 + result = self._delete_spu_from_es(es_client, tenant_id, spu_id, index_name, "auto")
  296 + # 统一状态:deleted或not_found都算deleted,failed保持failed
  297 + status = "deleted" if result["status"] != "failed" else "failed"
  298 + spu_results.append({
  299 + "spu_id": spu_id,
  300 + "status": status,
  301 + **({"msg": result["msg"]} if status == "failed" else {})
  302 + })
372 continue 303 continue
373 304
374 # SPU未删除,正常获取文档 305 # SPU未删除,正常获取文档
@@ -380,22 +311,24 @@ class IncrementalIndexerService: @@ -380,22 +311,24 @@ class IncrementalIndexerService:
380 error_msg = "SPU not found (unexpected)" 311 error_msg = "SPU not found (unexpected)"
381 logger.warning(f"[IncrementalIndexing] SPU {spu_id} not found after deleted check") 312 logger.warning(f"[IncrementalIndexing] SPU {spu_id} not found after deleted check")
382 log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg) 313 log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg)
383 - failed_list.append({ 314 + spu_results.append({
384 "spu_id": spu_id, 315 "spu_id": spu_id,
385 - "error": error_msg 316 + "status": "failed",
  317 + "msg": error_msg
386 }) 318 })
387 continue 319 continue
388 320
389 log_spu_processing(indexer_logger, tenant_id, spu_id, 'transforming') 321 log_spu_processing(indexer_logger, tenant_id, spu_id, 'transforming')
390 - documents.append(doc) 322 + documents.append((spu_id, doc)) # 保存spu_id和doc的对应关系
391 323
392 except Exception as e: 324 except Exception as e:
393 error_msg = str(e) 325 error_msg = str(e)
394 logger.error(f"[IncrementalIndexing] Error processing SPU {spu_id}: {e}", exc_info=True) 326 logger.error(f"[IncrementalIndexing] Error processing SPU {spu_id}: {e}", exc_info=True)
395 log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg) 327 log_spu_processing(indexer_logger, tenant_id, spu_id, 'failed', error_msg)
396 - failed_list.append({ 328 + spu_results.append({
397 "spu_id": spu_id, 329 "spu_id": spu_id,
398 - "error": error_msg 330 + "status": "failed",
  331 + "msg": error_msg
399 }) 332 })
400 333
401 logger.info(f"[IncrementalIndexing] Transformed {len(documents)}/{total_count} documents") 334 logger.info(f"[IncrementalIndexing] Transformed {len(documents)}/{total_count} documents")
@@ -403,43 +336,42 @@ class IncrementalIndexerService: @@ -403,43 +336,42 @@ class IncrementalIndexerService:
403 # 步骤2: 批量写入ES 336 # 步骤2: 批量写入ES
404 if documents: 337 if documents:
405 try: 338 try:
406 - logger.info(f"[IncrementalIndexing] Indexing {len(documents)} documents to ES (batch_size={batch_size})") 339 + # 提取doc列表用于批量写入
  340 + doc_list = [doc for _, doc in documents]
  341 + logger.info(f"[IncrementalIndexing] Indexing {len(doc_list)} documents to ES (batch_size={batch_size})")
407 indexer = BulkIndexer(es_client, index_name, batch_size=batch_size, max_retries=3) 342 indexer = BulkIndexer(es_client, index_name, batch_size=batch_size, max_retries=3)
408 bulk_results = indexer.index_documents( 343 bulk_results = indexer.index_documents(
409 - documents, 344 + doc_list,
410 id_field="spu_id", 345 id_field="spu_id",
411 show_progress=False 346 show_progress=False
412 ) 347 )
413 348
414 - # 根据ES返回的结果更新成功列表  
415 - # 注意:BulkIndexer返回的是总体统计,我们需要根据实际的失败情况来更新  
416 - # 如果ES批量写入有部分失败,我们需要找出哪些失败了 349 + # 根据ES返回的结果更新spu_results
417 es_success_count = bulk_results.get('success', 0) 350 es_success_count = bulk_results.get('success', 0)
418 es_failed_count = bulk_results.get('failed', 0) 351 es_failed_count = bulk_results.get('failed', 0)
419 352
420 - # 由于我们无法精确知道哪些文档失败了,我们假设: 353 + # 由于BulkIndexer返回的是总体统计,我们假设:
421 # - 如果ES返回成功数等于文档数,则所有文档都成功 354 # - 如果ES返回成功数等于文档数,则所有文档都成功
422 # - 否则,失败的文档可能在ES错误信息中,但我们无法精确映射 355 # - 否则,失败的文档可能在ES错误信息中,但我们无法精确映射
423 - # 这里采用简化处理:将成功写入ES的文档加入成功列表 356 + # 这里采用简化处理:将成功写入ES的文档标记为indexed
424 if es_failed_count == 0: 357 if es_failed_count == 0:
425 # 全部成功 358 # 全部成功
426 - for doc in documents:  
427 - success_list.append({  
428 - "spu_id": doc.get('spu_id'), 359 + for spu_id, doc in documents:
  360 + spu_results.append({
  361 + "spu_id": spu_id,
429 "status": "indexed" 362 "status": "indexed"
430 }) 363 })
431 else: 364 else:
432 # 有失败的情况,我们标记已处理的文档为成功,未处理的可能失败 365 # 有失败的情况,我们标记已处理的文档为成功,未处理的可能失败
433 - # 这是一个简化处理,实际应该根据ES的详细错误信息来判断  
434 logger.warning(f"[IncrementalIndexing] ES bulk index had {es_failed_count} failures") 366 logger.warning(f"[IncrementalIndexing] ES bulk index had {es_failed_count} failures")
435 - for doc in documents: 367 + for spu_id, doc in documents:
436 # 由于无法精确知道哪些失败,我们假设全部成功(实际应该改进) 368 # 由于无法精确知道哪些失败,我们假设全部成功(实际应该改进)
437 - success_list.append({  
438 - "spu_id": doc.get('spu_id'), 369 + spu_results.append({
  370 + "spu_id": spu_id,
439 "status": "indexed" 371 "status": "indexed"
440 }) 372 })
441 373
442 - # 如果有ES错误,记录到失败列表(但不包含具体的spu_id) 374 + # 如果有ES错误,记录日志
443 if bulk_results.get('errors'): 375 if bulk_results.get('errors'):
444 logger.error(f"[IncrementalIndexing] ES errors: {bulk_results['errors'][:5]}") 376 logger.error(f"[IncrementalIndexing] ES errors: {bulk_results['errors'][:5]}")
445 377
@@ -447,60 +379,56 @@ class IncrementalIndexerService: @@ -447,60 +379,56 @@ class IncrementalIndexerService:
447 error_msg = f"ES bulk index failed: {str(e)}" 379 error_msg = f"ES bulk index failed: {str(e)}"
448 logger.error(f"[IncrementalIndexing] {error_msg}", exc_info=True) 380 logger.error(f"[IncrementalIndexing] {error_msg}", exc_info=True)
449 # 所有文档都失败 381 # 所有文档都失败
450 - for doc in documents:  
451 - failed_list.append({  
452 - "spu_id": doc.get('spu_id'),  
453 - "error": error_msg  
454 - })  
455 - documents = [] # 清空,避免重复处理 382 + for spu_id, doc in documents:
  383 + # 检查是否已经在spu_results中(可能之前已经标记为failed)
  384 + existing = next((r for r in spu_results if r.get('spu_id') == spu_id), None)
  385 + if existing:
  386 + # 如果已存在,更新状态
  387 + existing['status'] = 'failed'
  388 + existing['msg'] = error_msg
  389 + else:
  390 + spu_results.append({
  391 + "spu_id": spu_id,
  392 + "status": "failed",
  393 + "msg": error_msg
  394 + })
456 else: 395 else:
457 logger.warning(f"[IncrementalIndexing] No documents to index for tenant_id={tenant_id}") 396 logger.warning(f"[IncrementalIndexing] No documents to index for tenant_id={tenant_id}")
458 397
459 elapsed_time = time.time() - start_time 398 elapsed_time = time.time() - start_time
460 - success_count = len(success_list)  
461 - failed_count = len(failed_list)  
462 399
463 - # 记录最终结果(包含删除统计) 400 + # 统计结果(简化)
  401 + total_processed = total_count + delete_count
  402 + total_success = len([r for r in spu_results + delete_results if r.get('status') in ('indexed', 'deleted', 'not_found')])
  403 + total_failed = len([r for r in spu_results + delete_results if r.get('status') == 'failed'])
  404 +
  405 + # 记录最终结果
  406 + deleted_count = len([r for r in spu_results + delete_results if r.get('status') == 'deleted'])
464 log_index_result( 407 log_index_result(
465 indexer_logger, 408 indexer_logger,
466 index_type='incremental', 409 index_type='incremental',
467 tenant_id=tenant_id, 410 tenant_id=tenant_id,
468 - total_count=total_count,  
469 - success_count=success_count,  
470 - failed_count=failed_count, 411 + total_count=total_processed,
  412 + success_count=total_success,
  413 + failed_count=total_failed,
471 elapsed_time=elapsed_time, 414 elapsed_time=elapsed_time,
472 index_name=index_name, 415 index_name=index_name,
473 - errors=[item.get('error') for item in failed_list[:10]] if failed_list else None,  
474 - deleted_count=total_deleted_count,  
475 - explicit_deleted_count=explicit_deleted_count,  
476 - auto_deleted_count=auto_deleted_count 416 + errors=[r.get('msg') for r in spu_results + delete_results if r.get('status') == 'failed'][:10],
  417 + deleted_count=deleted_count
477 ) 418 )
478 419
479 - # 统计删除数量  
480 - explicit_deleted_count = len(deleted_list) if delete_spu_ids else 0  
481 - auto_deleted_count = len(auto_deleted_list)  
482 - total_deleted_count = explicit_deleted_count + auto_deleted_count  
483 -  
484 logger.info( 420 logger.info(
485 f"[IncrementalIndexing] Completed for tenant_id={tenant_id}: " 421 f"[IncrementalIndexing] Completed for tenant_id={tenant_id}: "
486 - f"total={total_count}, success={success_count}, failed={failed_count}, "  
487 - f"explicit_deleted={explicit_deleted_count}, auto_deleted={auto_deleted_count}, " 422 + f"total={total_processed}, success={total_success}, failed={total_failed}, "
488 f"elapsed={elapsed_time:.2f}s" 423 f"elapsed={elapsed_time:.2f}s"
489 ) 424 )
490 425
491 return { 426 return {
492 - "success": success_list,  
493 - "failed": failed_list,  
494 - "deleted": {  
495 - "explicit": deleted_list if delete_spu_ids else [],  
496 - "auto": auto_deleted_list,  
497 - "total_count": total_deleted_count,  
498 - "explicit_count": explicit_deleted_count,  
499 - "auto_count": auto_deleted_count  
500 - },  
501 - "total": total_count,  
502 - "success_count": success_count,  
503 - "failed_count": failed_count, 427 + "spu_ids": spu_results, # spu_ids对应的响应列表
  428 + "delete_spu_ids": delete_results, # delete_spu_ids对应的响应列表
  429 + "total": total_processed,
  430 + "success_count": total_success,
  431 + "failed_count": total_failed,
504 "elapsed_time": elapsed_time, 432 "elapsed_time": elapsed_time,
505 "index_name": index_name, 433 "index_name": index_name,
506 "tenant_id": tenant_id 434 "tenant_id": tenant_id