Commit d6606d7aa4638d454684920b84db9d933951a9cf

Authored by tangwang
1 parent 5ac64fc7

清理旧代码,具体如下:

1. 删除 IndexingPipeline 类
文件:indexer/bulk_indexer.py
删除:IndexingPipeline 类(第201-259行)
删除:不再需要的 load_mapping 导入
2. 删除 main.py 中的旧代码
删除:cmd_ingest() 函数(整个函数)
删除:ingest 子命令定义
删除:main() 中对 ingest 命令的处理
删除:不再需要的 pandas 导入
更新:文档字符串,移除 ingest 命令说明
3. 删除旧的数据导入脚本
删除:data/customer1/ingest_customer1.py(依赖已废弃的 DataTransformer 和 IndexingPipeline)
CLAUDE.md
... ... @@ -132,7 +132,7 @@ python main.py search "query" --tenant-id 1 --size 10
132 132  
133 133 2. **Indexing Layer** (`indexer/`):
134 134 - Reads from MySQL, applies transformations with embeddings
135   - - Uses `DataTransformer` and `IndexingPipeline` for batch processing
  135 + - Uses `SPUTransformer`, `BulkIndexingService`, and `IncrementalIndexerService` for batch processing
136 136 - Supports both full and incremental indexing with embedding caching
137 137  
138 138 3. **Query Processing Layer** (`query/`):
... ...
data/customer1/ingest_customer1.py deleted
... ... @@ -1,198 +0,0 @@
1   -#!/usr/bin/env python3
2   -"""
3   -Customer1 data ingestion script.
4   -
5   -Loads data from CSV and indexes into Elasticsearch with embeddings.
6   -"""
7   -
8   -import sys
9   -import os
10   -import pandas as pd
11   -import argparse
12   -from typing import Optional
13   -
14   -# Add parent directory to path (go up 3 levels: customer1 -> data -> SearchEngine -> root)
15   -project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
16   -sys.path.insert(0, project_root)
17   -
18   -from config import ConfigLoader
19   -from utils import ESClient, get_connection_from_config
20   -from indexer import DataTransformer, IndexingPipeline
21   -from embeddings import BgeEncoder, CLIPImageEncoder
22   -
23   -
24   -def load_csv_data(csv_path: str, limit: Optional[int] = None) -> pd.DataFrame:
25   - """
26   - Load data from CSV file.
27   -
28   - Args:
29   - csv_path: Path to CSV file
30   - limit: Maximum number of rows to load (None for all)
31   -
32   - Returns:
33   - DataFrame with data
34   - """
35   - print(f"[Ingestion] Loading data from: {csv_path}")
36   -
37   - df = pd.read_csv(csv_path)
38   -
39   - if limit:
40   - df = df.head(limit)
41   -
42   - print(f"[Ingestion] Loaded {len(df)} rows")
43   - print(f"[Ingestion] Columns: {df.columns.tolist()}")
44   -
45   - return df
46   -
47   -
48   -def main():
49   - """Main ingestion function."""
50   - parser = argparse.ArgumentParser(description='Ingest customer1 data into Elasticsearch')
51   - parser.add_argument('--config', default='customer1', help='Customer config name')
52   - parser.add_argument('--csv', default='data/customer1/goods_with_pic.5years_congku.csv.shuf.1w',
53   - help='Path to CSV data file')
54   - parser.add_argument('--limit', type=int, help='Limit number of documents to index')
55   - parser.add_argument('--batch-size', type=int, default=100, help='Batch size for processing')
56   - parser.add_argument('--recreate-index', action='store_true', help='Recreate index if exists')
57   - parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
58   - parser.add_argument('--es-username', default=None, help='Elasticsearch username (or set ES_USERNAME env var)')
59   - parser.add_argument('--es-password', default=None, help='Elasticsearch password (or set ES_PASSWORD env var)')
60   - parser.add_argument('--skip-embeddings', action='store_true', help='Skip embedding generation')
61   - args = parser.parse_args()
62   -
63   - print("="*60)
64   - print("Customer1 Data Ingestion")
65   - print("="*60)
66   -
67   - # Load configuration
68   - print(f"\n[1/6] Loading configuration: {args.config}")
69   - config_loader = ConfigLoader("config/schema")
70   - config = config_loader.load_customer_config(args.config)
71   -
72   - # Validate configuration
73   - errors = config_loader.validate_config(config)
74   - if errors:
75   - print(f"Configuration validation failed:")
76   - for error in errors:
77   - print(f" - {error}")
78   - return 1
79   -
80   - print(f"Configuration loaded successfully")
81   - print(f" - Index: {config.es_index_name}")
82   - print(f" - Fields: {len(config.fields)}")
83   - print(f" - Indexes: {len(config.indexes)}")
84   -
85   - # Initialize Elasticsearch client
86   - print(f"\n[2/6] Connecting to Elasticsearch: {args.es_host}")
87   -
88   - # Get credentials: prioritize command-line args, then environment variables, then .env file
89   - es_username = args.es_username
90   - es_password = args.es_password
91   -
92   - # If not provided via args, try to load from .env file via env_config
93   - if not es_username or not es_password:
94   - try:
95   - from config.env_config import get_es_config
96   - es_config = get_es_config()
97   - es_username = es_username or es_config.get('username')
98   - es_password = es_password or es_config.get('password')
99   - except Exception:
100   - # Fallback to environment variables
101   - es_username = es_username or os.getenv('ES_USERNAME')
102   - es_password = es_password or os.getenv('ES_PASSWORD')
103   -
104   - # Create ES client with credentials if available
105   - if es_username and es_password:
106   - print(f" Using authentication: {es_username}")
107   - es_client = ESClient(hosts=[args.es_host], username=es_username, password=es_password)
108   - else:
109   - print(f" Warning: No authentication credentials found")
110   - print(f" Attempting connection without authentication (will fail if ES requires auth)")
111   - es_client = ESClient(hosts=[args.es_host])
112   -
113   - if not es_client.ping():
114   - print("Failed to connect to Elasticsearch")
115   - print("\nTroubleshooting:")
116   - print(" 1. Check if Elasticsearch is running: curl http://localhost:9200")
117   - print(" 2. If ES requires authentication, provide credentials:")
118   - print(" - Use --es-username and --es-password arguments, or")
119   - print(" - Set ES_USERNAME and ES_PASSWORD environment variables")
120   - print(" 3. Verify the host URL is correct: --es-host")
121   - return 1
122   -
123   - print("Connected to Elasticsearch successfully")
124   -
125   - # Load data
126   - print(f"\n[3/6] Loading data from CSV")
127   - df = load_csv_data(args.csv, limit=args.limit)
128   -
129   - # Initialize encoders (if not skipping embeddings)
130   - text_encoder = None
131   - image_encoder = None
132   -
133   - if not args.skip_embeddings:
134   - print(f"\n[4/6] Initializing embedding encoders")
135   - print("This may take a few minutes on first run (downloading models)...")
136   -
137   - try:
138   - text_encoder = BgeEncoder()
139   - print("Text encoder initialized")
140   - except Exception as e:
141   - print(f"Warning: Failed to initialize text encoder: {e}")
142   - print("Continuing without text embeddings...")
143   -
144   - try:
145   - image_encoder = CLIPImageEncoder()
146   - print("Image encoder initialized")
147   - except Exception as e:
148   - print(f"Warning: Failed to initialize image encoder: {e}")
149   - print("Continuing without image embeddings...")
150   - else:
151   - print(f"\n[4/6] Skipping embedding generation (--skip-embeddings)")
152   -
153   - # Initialize data transformer
154   - print(f"\n[5/6] Initializing data transformation pipeline")
155   - transformer = DataTransformer(
156   - config=config,
157   - text_encoder=text_encoder,
158   - image_encoder=image_encoder,
159   - use_cache=True
160   - )
161   -
162   - # Run indexing pipeline
163   - print(f"\n[6/6] Starting indexing pipeline")
164   - pipeline = IndexingPipeline(
165   - config=config,
166   - es_client=es_client,
167   - data_transformer=transformer,
168   - recreate_index=args.recreate_index
169   - )
170   -
171   - results = pipeline.run(df, batch_size=args.batch_size)
172   -
173   - # Print summary
174   - print("\n" + "="*60)
175   - print("Ingestion Complete!")
176   - print("="*60)
177   - print(f"Total documents: {results['total']}")
178   - print(f"Successfully indexed: {results['success']}")
179   - print(f"Failed: {results['failed']}")
180   - print(f"Time elapsed: {results['elapsed_time']:.2f}s")
181   - print(f"Throughput: {results['docs_per_second']:.2f} docs/s")
182   -
183   - if results['errors']:
184   - print(f"\nFirst few errors:")
185   - for error in results['errors'][:5]:
186   - print(f" - {error}")
187   -
188   - # Verify index
189   - print(f"\nVerifying index...")
190   - doc_count = es_client.count(config.es_index_name)
191   - print(f"Documents in index: {doc_count}")
192   -
193   - print("\nIngestion completed successfully!")
194   - return 0
195   -
196   -
197   -if __name__ == "__main__":
198   - sys.exit(main())
docs/常用查询 - ES.md
... ... @@ -233,7 +233,7 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/
233 233 curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_count?pretty' -H 'Content-Type: application/json' -d '{
234 234 "query": {
235 235 "term": {
236   - "tenant_id": "162"
  236 + "tenant_id": "170"
237 237 }
238 238 }
239 239 }'
... ...
indexer/bulk_indexer.py
... ... @@ -7,7 +7,7 @@ Handles batch indexing of documents with progress tracking and error handling.
7 7 from typing import List, Dict, Any, Optional
8 8 from elasticsearch.helpers import bulk, BulkIndexError
9 9 from utils.es_client import ESClient
10   -from indexer.mapping_generator import load_mapping, DEFAULT_INDEX_NAME
  10 +from indexer.mapping_generator import DEFAULT_INDEX_NAME
11 11 import time
12 12  
13 13  
... ... @@ -196,64 +196,3 @@ class BulkIndexer:
196 196 except Exception as e:
197 197 print(f"[BulkIndexer] Update by query failed: {e}")
198 198 return 0
199   -
200   -
201   -class IndexingPipeline:
202   - """Complete indexing pipeline from source data to ES."""
203   -
204   - def __init__(
205   - self,
206   - es_client: ESClient,
207   - data_transformer,
208   - index_name: str = None,
209   - recreate_index: bool = False
210   - ):
211   - """
212   - Initialize indexing pipeline.
213   -
214   - Args:
215   - es_client: Elasticsearch client
216   - data_transformer: Data transformer instance
217   - index_name: Index name (defaults to DEFAULT_INDEX_NAME)
218   - recreate_index: Whether to recreate index if exists
219   - """
220   - self.es_client = es_client
221   - self.transformer = data_transformer
222   - self.index_name = index_name or DEFAULT_INDEX_NAME
223   - self.recreate_index = recreate_index
224   -
225   - def run(self, df, batch_size: int = 100) -> Dict[str, Any]:
226   - """
227   - Run complete indexing pipeline.
228   -
229   - Args:
230   - df: Source dataframe
231   - batch_size: Batch size for processing
232   -
233   - Returns:
234   - Indexing statistics
235   - """
236   - # Load and create index
237   - mapping = load_mapping()
238   -
239   - if self.recreate_index:
240   - if self.es_client.index_exists(self.index_name):
241   - print(f"[IndexingPipeline] Deleting existing index: {self.index_name}")
242   - self.es_client.delete_index(self.index_name)
243   -
244   - if not self.es_client.index_exists(self.index_name):
245   - print(f"[IndexingPipeline] Creating index: {self.index_name}")
246   - self.es_client.create_index(self.index_name, mapping)
247   - else:
248   - print(f"[IndexingPipeline] Using existing index: {self.index_name}")
249   -
250   - # Transform data
251   - print(f"[IndexingPipeline] Transforming {len(df)} documents...")
252   - documents = self.transformer.transform_batch(df, batch_size=batch_size)
253   - print(f"[IndexingPipeline] Transformed {len(documents)} documents")
254   -
255   - # Bulk index
256   - indexer = BulkIndexer(self.es_client, self.index_name, batch_size=500)
257   - results = indexer.index_documents(documents, id_field="skuId")
258   -
259   - return results
... ...
... ... @@ -3,8 +3,8 @@
3 3 Main entry point for SearchEngine operations.
4 4  
5 5 Provides a unified CLI for common operations:
6   -- ingest: Ingest data into Elasticsearch
7   -- serve: Start API service
  6 +- serve: Start API service (search + admin routes)
  7 +- serve-indexer: Start dedicated Indexer API service
8 8 - search: Test search from command line
9 9 """
10 10  
... ... @@ -12,7 +12,6 @@ import sys
12 12 import os
13 13 import argparse
14 14 import json
15   -import pandas as pd
16 15 import uvicorn
17 16  
18 17 # Add parent directory to path
... ... @@ -23,53 +22,6 @@ from utils import ESClient
23 22 from search import Searcher
24 23  
25 24  
26   -def cmd_ingest(args):
27   - """Run data ingestion."""
28   - # Local imports to avoid hard dependency at module import time
29   - import pandas as pd
30   - from embeddings import BgeEncoder, CLIPImageEncoder
31   - from indexer.bulk_indexer import IndexingPipeline
32   - # NOTE: DataTransformer was referenced historically, but the concrete
33   - # implementation is now provided via customer-specific scripts
34   - # (e.g. data/customer1/ingest_customer1.py). If you still need a generic
35   - # ingestion pipeline here, you can wire your own transformer.
36   - from indexer.spu_transformer import SPUTransformer as DataTransformer
37   - print("Starting data ingestion")
38   -
39   - # Load config
40   - config_loader = ConfigLoader("config/config.yaml")
41   - config = config_loader.load_config()
42   -
43   - # Initialize ES
44   - es_client = ESClient(hosts=[args.es_host])
45   - if not es_client.ping():
46   - print(f"ERROR: Cannot connect to Elasticsearch at {args.es_host}")
47   - return 1
48   -
49   - # Load data
50   - df = pd.read_csv(args.csv_file)
51   - if args.limit:
52   - df = df.head(args.limit)
53   - print(f"Loaded {len(df)} documents")
54   -
55   - # Initialize encoders
56   - text_encoder = None if args.skip_embeddings else BgeEncoder()
57   - image_encoder = None if args.skip_embeddings else CLIPImageEncoder()
58   -
59   - # Transform and index
60   - transformer = DataTransformer(config, text_encoder, image_encoder, use_cache=True)
61   - pipeline = IndexingPipeline(config, es_client, transformer, recreate_index=args.recreate)
62   -
63   - results = pipeline.run(df, batch_size=args.batch_size)
64   -
65   - print(f"\nIngestion complete:")
66   - print(f" Success: {results['success']}")
67   - print(f" Failed: {results['failed']}")
68   - print(f" Time: {results['elapsed_time']:.2f}s")
69   -
70   - return 0
71   -
72   -
73 25 def cmd_serve(args):
74 26 """Start API service."""
75 27 os.environ['ES_HOST'] = args.es_host
... ... @@ -154,15 +106,6 @@ def main():
154 106  
155 107 subparsers = parser.add_subparsers(dest='command', help='Command to execute')
156 108  
157   - # Ingest command
158   - ingest_parser = subparsers.add_parser('ingest', help='Ingest data into Elasticsearch')
159   - ingest_parser.add_argument('csv_file', help='Path to CSV data file')
160   - ingest_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host')
161   - ingest_parser.add_argument('--limit', type=int, help='Limit number of documents')
162   - ingest_parser.add_argument('--batch-size', type=int, default=100, help='Batch size')
163   - ingest_parser.add_argument('--recreate', action='store_true', help='Recreate index')
164   - ingest_parser.add_argument('--skip-embeddings', action='store_true', help='Skip embeddings')
165   -
166 109 # Serve command
167 110 serve_parser = subparsers.add_parser('serve', help='Start API service (multi-tenant)')
168 111 serve_parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
... ... @@ -197,9 +140,7 @@ def main():
197 140 return 1
198 141  
199 142 # Execute command
200   - if args.command == 'ingest':
201   - return cmd_ingest(args)
202   - elif args.command == 'serve':
  143 + if args.command == 'serve':
203 144 return cmd_serve(args)
204 145 elif args.command == 'serve-indexer':
205 146 return cmd_serve_indexer(args)
... ...