""" Bulk indexer for Elasticsearch. Handles batch indexing of documents with progress tracking and error handling. """ from typing import List, Dict, Any, Optional from elasticsearch.helpers import bulk, BulkIndexError from utils.es_client import ESClient from indexer.mapping_generator import load_mapping, DEFAULT_INDEX_NAME import time class BulkIndexer: """Bulk indexer for Elasticsearch with batching and error handling.""" def __init__( self, es_client: ESClient, index_name: str, batch_size: int = 500, max_retries: int = 3 ): """ Initialize bulk indexer. Args: es_client: Elasticsearch client index_name: Target index name batch_size: Number of documents per batch max_retries: Maximum retry attempts for failed batches """ self.es_client = es_client self.index_name = index_name self.batch_size = batch_size self.max_retries = max_retries def index_documents( self, documents: List[Dict[str, Any]], id_field: str = "skuId", show_progress: bool = True ) -> Dict[str, Any]: """ Index documents in bulk. Args: documents: List of documents to index id_field: Field to use as document ID show_progress: Whether to print progress Returns: Dictionary with indexing statistics """ total_docs = len(documents) success_count = 0 failed_count = 0 errors = [] print(f"[BulkIndexer] Starting bulk indexing of {total_docs} documents...") start_time = time.time() # Process in batches for i in range(0, total_docs, self.batch_size): batch = documents[i:i + self.batch_size] batch_num = (i // self.batch_size) + 1 total_batches = (total_docs + self.batch_size - 1) // self.batch_size if show_progress: print(f"[BulkIndexer] Processing batch {batch_num}/{total_batches} " f"({len(batch)} documents)...") # Prepare actions for bulk API actions = [] for doc in batch: action = { '_index': self.index_name, '_source': doc } # Use specified field as document ID if present if id_field and id_field in doc: action['_id'] = doc[id_field] actions.append(action) # Try to index batch with retries batch_success = False for attempt in range(self.max_retries): try: success, failed = bulk( self.es_client.client, actions, raise_on_error=False, raise_on_exception=False ) success_count += success if failed: failed_count += len(failed) errors.extend(failed) batch_success = True break except BulkIndexError as e: if attempt < self.max_retries - 1: print(f"[BulkIndexer] Batch {batch_num} failed, retrying... " f"(attempt {attempt + 1}/{self.max_retries})") time.sleep(1) else: print(f"[BulkIndexer] Batch {batch_num} failed after " f"{self.max_retries} attempts") failed_count += len(batch) errors.append({ 'batch': batch_num, 'error': str(e) }) except Exception as e: print(f"[BulkIndexer] Unexpected error in batch {batch_num}: {e}") failed_count += len(batch) errors.append({ 'batch': batch_num, 'error': str(e) }) break elapsed_time = time.time() - start_time # Refresh index to make documents searchable self.es_client.refresh(self.index_name) results = { 'total': total_docs, 'success': success_count, 'failed': failed_count, 'elapsed_time': elapsed_time, 'docs_per_second': total_docs / elapsed_time if elapsed_time > 0 else 0, 'errors': errors[:10] # Keep only first 10 errors } print(f"[BulkIndexer] Indexing complete!") print(f" - Total: {total_docs}") print(f" - Success: {success_count}") print(f" - Failed: {failed_count}") print(f" - Time: {elapsed_time:.2f}s") print(f" - Speed: {results['docs_per_second']:.2f} docs/s") return results def delete_by_query(self, query: Dict[str, Any]) -> int: """ Delete documents matching a query. Args: query: ES query DSL Returns: Number of documents deleted """ try: response = self.es_client.client.delete_by_query( index=self.index_name, body={"query": query} ) deleted = response.get('deleted', 0) print(f"[BulkIndexer] Deleted {deleted} documents") return deleted except Exception as e: print(f"[BulkIndexer] Delete by query failed: {e}") return 0 def update_by_query(self, query: Dict[str, Any], script: Dict[str, Any]) -> int: """ Update documents matching a query. Args: query: ES query DSL script: Update script Returns: Number of documents updated """ try: response = self.es_client.client.update_by_query( index=self.index_name, body={ "query": query, "script": script } ) updated = response.get('updated', 0) print(f"[BulkIndexer] Updated {updated} documents") return updated except Exception as e: print(f"[BulkIndexer] Update by query failed: {e}") return 0 class IndexingPipeline: """Complete indexing pipeline from source data to ES.""" def __init__( self, es_client: ESClient, data_transformer, index_name: str = None, recreate_index: bool = False ): """ Initialize indexing pipeline. Args: es_client: Elasticsearch client data_transformer: Data transformer instance index_name: Index name (defaults to DEFAULT_INDEX_NAME) recreate_index: Whether to recreate index if exists """ self.es_client = es_client self.transformer = data_transformer self.index_name = index_name or DEFAULT_INDEX_NAME self.recreate_index = recreate_index def run(self, df, batch_size: int = 100) -> Dict[str, Any]: """ Run complete indexing pipeline. Args: df: Source dataframe batch_size: Batch size for processing Returns: Indexing statistics """ # Load and create index mapping = load_mapping() if self.recreate_index: if self.es_client.index_exists(self.index_name): print(f"[IndexingPipeline] Deleting existing index: {self.index_name}") self.es_client.delete_index(self.index_name) if not self.es_client.index_exists(self.index_name): print(f"[IndexingPipeline] Creating index: {self.index_name}") self.es_client.create_index(self.index_name, mapping) else: print(f"[IndexingPipeline] Using existing index: {self.index_name}") # Transform data print(f"[IndexingPipeline] Transforming {len(df)} documents...") documents = self.transformer.transform_batch(df, batch_size=batch_size) print(f"[IndexingPipeline] Transformed {len(documents)} documents") # Bulk index indexer = BulkIndexer(self.es_client, self.index_name, batch_size=500) results = indexer.index_documents(documents, id_field="skuId") return results