bulk_indexer.py 8.37 KB
"""
Bulk indexer for Elasticsearch.

Handles batch indexing of documents with progress tracking and error handling.
"""

from typing import List, Dict, Any, Optional
from elasticsearch.helpers import bulk, BulkIndexError
from utils.es_client import ESClient
import time


class BulkIndexer:
    """Bulk indexer for Elasticsearch with batching and error handling."""

    def __init__(
        self,
        es_client: ESClient,
        index_name: str,
        batch_size: int = 500,
        max_retries: int = 3
    ):
        """
        Initialize bulk indexer.

        Args:
            es_client: Elasticsearch client
            index_name: Target index name
            batch_size: Number of documents per batch
            max_retries: Maximum retry attempts for failed batches
        """
        self.es_client = es_client
        self.index_name = index_name
        self.batch_size = batch_size
        self.max_retries = max_retries

    def index_documents(
        self,
        documents: List[Dict[str, Any]],
        id_field: str = "skuId",
        show_progress: bool = True
    ) -> Dict[str, Any]:
        """
        Index documents in bulk.

        Args:
            documents: List of documents to index
            id_field: Field to use as document ID
            show_progress: Whether to print progress

        Returns:
            Dictionary with indexing statistics
        """
        total_docs = len(documents)
        success_count = 0
        failed_count = 0
        errors = []

        print(f"[BulkIndexer] Starting bulk indexing of {total_docs} documents...")
        start_time = time.time()

        # Process in batches
        for i in range(0, total_docs, self.batch_size):
            batch = documents[i:i + self.batch_size]
            batch_num = (i // self.batch_size) + 1
            total_batches = (total_docs + self.batch_size - 1) // self.batch_size

            if show_progress:
                print(f"[BulkIndexer] Processing batch {batch_num}/{total_batches} "
                      f"({len(batch)} documents)...")

            # Prepare actions for bulk API
            actions = []
            for doc in batch:
                action = {
                    '_index': self.index_name,
                    '_source': doc
                }

                # Use specified field as document ID if present
                if id_field and id_field in doc:
                    action['_id'] = doc[id_field]

                actions.append(action)

            # Try to index batch with retries
            batch_success = False
            for attempt in range(self.max_retries):
                try:
                    success, failed = bulk(
                        self.es_client.client,
                        actions,
                        raise_on_error=False,
                        raise_on_exception=False
                    )

                    success_count += success
                    if failed:
                        failed_count += len(failed)
                        errors.extend(failed)

                    batch_success = True
                    break

                except BulkIndexError as e:
                    if attempt < self.max_retries - 1:
                        print(f"[BulkIndexer] Batch {batch_num} failed, retrying... "
                              f"(attempt {attempt + 1}/{self.max_retries})")
                        time.sleep(1)
                    else:
                        print(f"[BulkIndexer] Batch {batch_num} failed after "
                              f"{self.max_retries} attempts")
                        failed_count += len(batch)
                        errors.append({
                            'batch': batch_num,
                            'error': str(e)
                        })

                except Exception as e:
                    print(f"[BulkIndexer] Unexpected error in batch {batch_num}: {e}")
                    failed_count += len(batch)
                    errors.append({
                        'batch': batch_num,
                        'error': str(e)
                    })
                    break

        elapsed_time = time.time() - start_time

        # Refresh index to make documents searchable
        self.es_client.refresh(self.index_name)

        results = {
            'total': total_docs,
            'success': success_count,
            'failed': failed_count,
            'elapsed_time': elapsed_time,
            'docs_per_second': total_docs / elapsed_time if elapsed_time > 0 else 0,
            'errors': errors[:10]  # Keep only first 10 errors
        }

        print(f"[BulkIndexer] Indexing complete!")
        print(f"  - Total: {total_docs}")
        print(f"  - Success: {success_count}")
        print(f"  - Failed: {failed_count}")
        print(f"  - Time: {elapsed_time:.2f}s")
        print(f"  - Speed: {results['docs_per_second']:.2f} docs/s")

        return results

    def delete_by_query(self, query: Dict[str, Any]) -> int:
        """
        Delete documents matching a query.

        Args:
            query: ES query DSL

        Returns:
            Number of documents deleted
        """
        try:
            response = self.es_client.client.delete_by_query(
                index=self.index_name,
                body={"query": query}
            )
            deleted = response.get('deleted', 0)
            print(f"[BulkIndexer] Deleted {deleted} documents")
            return deleted
        except Exception as e:
            print(f"[BulkIndexer] Delete by query failed: {e}")
            return 0

    def update_by_query(self, query: Dict[str, Any], script: Dict[str, Any]) -> int:
        """
        Update documents matching a query.

        Args:
            query: ES query DSL
            script: Update script

        Returns:
            Number of documents updated
        """
        try:
            response = self.es_client.client.update_by_query(
                index=self.index_name,
                body={
                    "query": query,
                    "script": script
                }
            )
            updated = response.get('updated', 0)
            print(f"[BulkIndexer] Updated {updated} documents")
            return updated
        except Exception as e:
            print(f"[BulkIndexer] Update by query failed: {e}")
            return 0


class IndexingPipeline:
    """Complete indexing pipeline from source data to ES."""

    def __init__(
        self,
        config,
        es_client: ESClient,
        data_transformer,
        recreate_index: bool = False
    ):
        """
        Initialize indexing pipeline.

        Args:
            config: Customer configuration
            es_client: Elasticsearch client
            data_transformer: Data transformer instance
            recreate_index: Whether to recreate index if exists
        """
        self.config = config
        self.es_client = es_client
        self.transformer = data_transformer
        self.recreate_index = recreate_index

    def run(self, df, batch_size: int = 100) -> Dict[str, Any]:
        """
        Run complete indexing pipeline.

        Args:
            df: Source dataframe
            batch_size: Batch size for processing

        Returns:
            Indexing statistics
        """
        from indexer.mapping_generator import MappingGenerator

        # Generate and create index
        mapping_gen = MappingGenerator(self.config)
        mapping = mapping_gen.generate_mapping()

        index_name = self.config.es_index_name

        if self.recreate_index:
            if self.es_client.index_exists(index_name):
                print(f"[IndexingPipeline] Deleting existing index: {index_name}")
                self.es_client.delete_index(index_name)

        if not self.es_client.index_exists(index_name):
            print(f"[IndexingPipeline] Creating index: {index_name}")
            self.es_client.create_index(index_name, mapping)
        else:
            print(f"[IndexingPipeline] Using existing index: {index_name}")

        # Transform data
        print(f"[IndexingPipeline] Transforming {len(df)} documents...")
        documents = self.transformer.transform_batch(df, batch_size=batch_size)
        print(f"[IndexingPipeline] Transformed {len(documents)} documents")

        # Bulk index
        indexer = BulkIndexer(self.es_client, index_name, batch_size=500)
        results = indexer.index_documents(documents, id_field="skuId")

        return results