From d6606d7aa4638d454684920b84db9d933951a9cf Mon Sep 17 00:00:00 2001 From: tangwang Date: Fri, 19 Dec 2025 08:57:36 +0800 Subject: [PATCH] 清理旧代码,具体如下: 1. 删除 IndexingPipeline 类 文件:indexer/bulk_indexer.py 删除:IndexingPipeline 类(第201-259行) 删除:不再需要的 load_mapping 导入 2. 删除 main.py 中的旧代码 删除:cmd_ingest() 函数(整个函数) 删除:ingest 子命令定义 删除:main() 中对 ingest 命令的处理 删除:不再需要的 pandas 导入 更新:文档字符串,移除 ingest 命令说明 3. 删除旧的数据导入脚本 删除:data/customer1/ingest_customer1.py(依赖已废弃的 DataTransformer 和 IndexingPipeline) --- CLAUDE.md | 2 +- data/customer1/ingest_customer1.py | 198 ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ docs/常用查询 - ES.md | 2 +- indexer/bulk_indexer.py | 63 +-------------------------------------------------------------- main.py | 65 +++-------------------------------------------------------------- 5 files changed, 6 insertions(+), 324 deletions(-) delete mode 100755 data/customer1/ingest_customer1.py diff --git a/CLAUDE.md b/CLAUDE.md index 02e1276..78a26b6 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -132,7 +132,7 @@ python main.py search "query" --tenant-id 1 --size 10 2. **Indexing Layer** (`indexer/`): - Reads from MySQL, applies transformations with embeddings - - Uses `DataTransformer` and `IndexingPipeline` for batch processing + - Uses `SPUTransformer`, `BulkIndexingService`, and `IncrementalIndexerService` for batch processing - Supports both full and incremental indexing with embedding caching 3. **Query Processing Layer** (`query/`): diff --git a/data/customer1/ingest_customer1.py b/data/customer1/ingest_customer1.py deleted file mode 100755 index 27a48d3..0000000 --- a/data/customer1/ingest_customer1.py +++ /dev/null @@ -1,198 +0,0 @@ -#!/usr/bin/env python3 -""" -Customer1 data ingestion script. - -Loads data from CSV and indexes into Elasticsearch with embeddings. -""" - -import sys -import os -import pandas as pd -import argparse -from typing import Optional - -# Add parent directory to path (go up 3 levels: customer1 -> data -> SearchEngine -> root) -project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -sys.path.insert(0, project_root) - -from config import ConfigLoader -from utils import ESClient, get_connection_from_config -from indexer import DataTransformer, IndexingPipeline -from embeddings import BgeEncoder, CLIPImageEncoder - - -def load_csv_data(csv_path: str, limit: Optional[int] = None) -> pd.DataFrame: - """ - Load data from CSV file. - - Args: - csv_path: Path to CSV file - limit: Maximum number of rows to load (None for all) - - Returns: - DataFrame with data - """ - print(f"[Ingestion] Loading data from: {csv_path}") - - df = pd.read_csv(csv_path) - - if limit: - df = df.head(limit) - - print(f"[Ingestion] Loaded {len(df)} rows") - print(f"[Ingestion] Columns: {df.columns.tolist()}") - - return df - - -def main(): - """Main ingestion function.""" - parser = argparse.ArgumentParser(description='Ingest customer1 data into Elasticsearch') - parser.add_argument('--config', default='customer1', help='Customer config name') - parser.add_argument('--csv', default='data/customer1/goods_with_pic.5years_congku.csv.shuf.1w', - help='Path to CSV data file') - parser.add_argument('--limit', type=int, help='Limit number of documents to index') - parser.add_argument('--batch-size', type=int, default=100, help='Batch size for processing') - parser.add_argument('--recreate-index', action='store_true', help='Recreate index if exists') - parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') - parser.add_argument('--es-username', default=None, help='Elasticsearch username (or set ES_USERNAME env var)') - parser.add_argument('--es-password', default=None, help='Elasticsearch password (or set ES_PASSWORD env var)') - parser.add_argument('--skip-embeddings', action='store_true', help='Skip embedding generation') - args = parser.parse_args() - - print("="*60) - print("Customer1 Data Ingestion") - print("="*60) - - # Load configuration - print(f"\n[1/6] Loading configuration: {args.config}") - config_loader = ConfigLoader("config/schema") - config = config_loader.load_customer_config(args.config) - - # Validate configuration - errors = config_loader.validate_config(config) - if errors: - print(f"Configuration validation failed:") - for error in errors: - print(f" - {error}") - return 1 - - print(f"Configuration loaded successfully") - print(f" - Index: {config.es_index_name}") - print(f" - Fields: {len(config.fields)}") - print(f" - Indexes: {len(config.indexes)}") - - # Initialize Elasticsearch client - print(f"\n[2/6] Connecting to Elasticsearch: {args.es_host}") - - # Get credentials: prioritize command-line args, then environment variables, then .env file - es_username = args.es_username - es_password = args.es_password - - # If not provided via args, try to load from .env file via env_config - if not es_username or not es_password: - try: - from config.env_config import get_es_config - es_config = get_es_config() - es_username = es_username or es_config.get('username') - es_password = es_password or es_config.get('password') - except Exception: - # Fallback to environment variables - es_username = es_username or os.getenv('ES_USERNAME') - es_password = es_password or os.getenv('ES_PASSWORD') - - # Create ES client with credentials if available - if es_username and es_password: - print(f" Using authentication: {es_username}") - es_client = ESClient(hosts=[args.es_host], username=es_username, password=es_password) - else: - print(f" Warning: No authentication credentials found") - print(f" Attempting connection without authentication (will fail if ES requires auth)") - es_client = ESClient(hosts=[args.es_host]) - - if not es_client.ping(): - print("Failed to connect to Elasticsearch") - print("\nTroubleshooting:") - print(" 1. Check if Elasticsearch is running: curl http://localhost:9200") - print(" 2. If ES requires authentication, provide credentials:") - print(" - Use --es-username and --es-password arguments, or") - print(" - Set ES_USERNAME and ES_PASSWORD environment variables") - print(" 3. Verify the host URL is correct: --es-host") - return 1 - - print("Connected to Elasticsearch successfully") - - # Load data - print(f"\n[3/6] Loading data from CSV") - df = load_csv_data(args.csv, limit=args.limit) - - # Initialize encoders (if not skipping embeddings) - text_encoder = None - image_encoder = None - - if not args.skip_embeddings: - print(f"\n[4/6] Initializing embedding encoders") - print("This may take a few minutes on first run (downloading models)...") - - try: - text_encoder = BgeEncoder() - print("Text encoder initialized") - except Exception as e: - print(f"Warning: Failed to initialize text encoder: {e}") - print("Continuing without text embeddings...") - - try: - image_encoder = CLIPImageEncoder() - print("Image encoder initialized") - except Exception as e: - print(f"Warning: Failed to initialize image encoder: {e}") - print("Continuing without image embeddings...") - else: - print(f"\n[4/6] Skipping embedding generation (--skip-embeddings)") - - # Initialize data transformer - print(f"\n[5/6] Initializing data transformation pipeline") - transformer = DataTransformer( - config=config, - text_encoder=text_encoder, - image_encoder=image_encoder, - use_cache=True - ) - - # Run indexing pipeline - print(f"\n[6/6] Starting indexing pipeline") - pipeline = IndexingPipeline( - config=config, - es_client=es_client, - data_transformer=transformer, - recreate_index=args.recreate_index - ) - - results = pipeline.run(df, batch_size=args.batch_size) - - # Print summary - print("\n" + "="*60) - print("Ingestion Complete!") - print("="*60) - print(f"Total documents: {results['total']}") - print(f"Successfully indexed: {results['success']}") - print(f"Failed: {results['failed']}") - print(f"Time elapsed: {results['elapsed_time']:.2f}s") - print(f"Throughput: {results['docs_per_second']:.2f} docs/s") - - if results['errors']: - print(f"\nFirst few errors:") - for error in results['errors'][:5]: - print(f" - {error}") - - # Verify index - print(f"\nVerifying index...") - doc_count = es_client.count(config.es_index_name) - print(f"Documents in index: {doc_count}") - - print("\nIngestion completed successfully!") - return 0 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/docs/常用查询 - ES.md b/docs/常用查询 - ES.md index 3e20d15..f8c7065 100644 --- a/docs/常用查询 - ES.md +++ b/docs/常用查询 - ES.md @@ -233,7 +233,7 @@ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/ curl -u 'essa:4hOaLaf41y2VuI8y' -X GET 'http://localhost:9200/search_products/_count?pretty' -H 'Content-Type: application/json' -d '{ "query": { "term": { - "tenant_id": "162" + "tenant_id": "170" } } }' diff --git a/indexer/bulk_indexer.py b/indexer/bulk_indexer.py index 61321d6..754162a 100644 --- a/indexer/bulk_indexer.py +++ b/indexer/bulk_indexer.py @@ -7,7 +7,7 @@ Handles batch indexing of documents with progress tracking and error handling. from typing import List, Dict, Any, Optional from elasticsearch.helpers import bulk, BulkIndexError from utils.es_client import ESClient -from indexer.mapping_generator import load_mapping, DEFAULT_INDEX_NAME +from indexer.mapping_generator import DEFAULT_INDEX_NAME import time @@ -196,64 +196,3 @@ class BulkIndexer: except Exception as e: print(f"[BulkIndexer] Update by query failed: {e}") return 0 - - -class IndexingPipeline: - """Complete indexing pipeline from source data to ES.""" - - def __init__( - self, - es_client: ESClient, - data_transformer, - index_name: str = None, - recreate_index: bool = False - ): - """ - Initialize indexing pipeline. - - Args: - es_client: Elasticsearch client - data_transformer: Data transformer instance - index_name: Index name (defaults to DEFAULT_INDEX_NAME) - recreate_index: Whether to recreate index if exists - """ - self.es_client = es_client - self.transformer = data_transformer - self.index_name = index_name or DEFAULT_INDEX_NAME - self.recreate_index = recreate_index - - def run(self, df, batch_size: int = 100) -> Dict[str, Any]: - """ - Run complete indexing pipeline. - - Args: - df: Source dataframe - batch_size: Batch size for processing - - Returns: - Indexing statistics - """ - # Load and create index - mapping = load_mapping() - - if self.recreate_index: - if self.es_client.index_exists(self.index_name): - print(f"[IndexingPipeline] Deleting existing index: {self.index_name}") - self.es_client.delete_index(self.index_name) - - if not self.es_client.index_exists(self.index_name): - print(f"[IndexingPipeline] Creating index: {self.index_name}") - self.es_client.create_index(self.index_name, mapping) - else: - print(f"[IndexingPipeline] Using existing index: {self.index_name}") - - # Transform data - print(f"[IndexingPipeline] Transforming {len(df)} documents...") - documents = self.transformer.transform_batch(df, batch_size=batch_size) - print(f"[IndexingPipeline] Transformed {len(documents)} documents") - - # Bulk index - indexer = BulkIndexer(self.es_client, self.index_name, batch_size=500) - results = indexer.index_documents(documents, id_field="skuId") - - return results diff --git a/main.py b/main.py index 90f36eb..f1614f6 100755 --- a/main.py +++ b/main.py @@ -3,8 +3,8 @@ Main entry point for SearchEngine operations. Provides a unified CLI for common operations: -- ingest: Ingest data into Elasticsearch -- serve: Start API service +- serve: Start API service (search + admin routes) +- serve-indexer: Start dedicated Indexer API service - search: Test search from command line """ @@ -12,7 +12,6 @@ import sys import os import argparse import json -import pandas as pd import uvicorn # Add parent directory to path @@ -23,53 +22,6 @@ from utils import ESClient from search import Searcher -def cmd_ingest(args): - """Run data ingestion.""" - # Local imports to avoid hard dependency at module import time - import pandas as pd - from embeddings import BgeEncoder, CLIPImageEncoder - from indexer.bulk_indexer import IndexingPipeline - # NOTE: DataTransformer was referenced historically, but the concrete - # implementation is now provided via customer-specific scripts - # (e.g. data/customer1/ingest_customer1.py). If you still need a generic - # ingestion pipeline here, you can wire your own transformer. - from indexer.spu_transformer import SPUTransformer as DataTransformer - print("Starting data ingestion") - - # Load config - config_loader = ConfigLoader("config/config.yaml") - config = config_loader.load_config() - - # Initialize ES - es_client = ESClient(hosts=[args.es_host]) - if not es_client.ping(): - print(f"ERROR: Cannot connect to Elasticsearch at {args.es_host}") - return 1 - - # Load data - df = pd.read_csv(args.csv_file) - if args.limit: - df = df.head(args.limit) - print(f"Loaded {len(df)} documents") - - # Initialize encoders - text_encoder = None if args.skip_embeddings else BgeEncoder() - image_encoder = None if args.skip_embeddings else CLIPImageEncoder() - - # Transform and index - transformer = DataTransformer(config, text_encoder, image_encoder, use_cache=True) - pipeline = IndexingPipeline(config, es_client, transformer, recreate_index=args.recreate) - - results = pipeline.run(df, batch_size=args.batch_size) - - print(f"\nIngestion complete:") - print(f" Success: {results['success']}") - print(f" Failed: {results['failed']}") - print(f" Time: {results['elapsed_time']:.2f}s") - - return 0 - - def cmd_serve(args): """Start API service.""" os.environ['ES_HOST'] = args.es_host @@ -154,15 +106,6 @@ def main(): subparsers = parser.add_subparsers(dest='command', help='Command to execute') - # Ingest command - ingest_parser = subparsers.add_parser('ingest', help='Ingest data into Elasticsearch') - ingest_parser.add_argument('csv_file', help='Path to CSV data file') - ingest_parser.add_argument('--es-host', default='http://localhost:9200', help='Elasticsearch host') - ingest_parser.add_argument('--limit', type=int, help='Limit number of documents') - ingest_parser.add_argument('--batch-size', type=int, default=100, help='Batch size') - ingest_parser.add_argument('--recreate', action='store_true', help='Recreate index') - ingest_parser.add_argument('--skip-embeddings', action='store_true', help='Skip embeddings') - # Serve command serve_parser = subparsers.add_parser('serve', help='Start API service (multi-tenant)') serve_parser.add_argument('--host', default='0.0.0.0', help='Host to bind to') @@ -197,9 +140,7 @@ def main(): return 1 # Execute command - if args.command == 'ingest': - return cmd_ingest(args) - elif args.command == 'serve': + if args.command == 'serve': return cmd_serve(args) elif args.command == 'serve-indexer': return cmd_serve_indexer(args) -- libgit2 0.21.2