将数据pipeline相关配置从索引配置中剥离.md 12.4 KB

<!-- b5a93a00-49d7-4266-8dbf-3d3f708334ed c9ba91cf-2b58-440d-86d1-35b805e5d3cf -->

Configuration and Pipeline Separation Refactoring

Overview

Implement clean separation between Search Configuration (customer-facing, ES/search focused) and Data Pipeline (internal ETL, script-controlled). Configuration files will only contain search engine settings, while data source and transformation logic will be controlled entirely by script parameters.

Phase 1: Configuration File Cleanup

1.1 Clean BASE Configuration

File: <code>config/schema/base/config.yaml</code>

Remove (data pipeline concerns):

  • mysql_config section
  • main_table field
  • sku_table field
  • extension_table field
  • source_table in field definitions
  • source_column in field definitions

Keep (search configuration):

  • customer_name
  • es_index_name
  • es_settings
  • fields (simplified, no source mapping)
  • indexes (search domains)
  • query_config
  • function_score
  • rerank
  • spu_config
  • tenant_config (as template)
  • default_facets

Simplify field definitions:

fields:
  - name: "title"
    type: "TEXT"
    analyzer: "chinese_ecommerce"
    boost: 3.0
    index: true
    store: true
    # NO source_table, NO source_column

1.2 Update Legacy Configuration

File: <code>config/schema/customer1_legacy/config.yaml</code>

Apply same cleanup as BASE config, marking it as legacy in comments.

Phase 2: Transformer Architecture Refactoring

2.1 Create Base Transformer Class

File: <code>indexer/base_transformer.py</code> (NEW)

Create abstract base class with shared logic:

  • __init__ with config, encoders, cache
  • _convert_value() - type conversion (shared)
  • _generate_text_embeddings() - text embedding (shared)
  • _generate_image_embeddings() - image embedding (shared)
  • _inject_tenant_id() - tenant_id injection (shared)
  • @abstractmethod transform() - to be implemented by subclasses

2.2 Refactor DataTransformer

File: <code>indexer/data_transformer.py</code>

Changes:

  • Inherit from BaseDataTransformer
  • Remove dependency on source_table, source_column from config
  • Accept field mapping as parameter (from script)
  • Implement transform(df, field_mapping) method

2.3 Refactor SPUDataTransformer

File: <code>indexer/spu_data_transformer.py</code>

Changes:

  • Inherit from BaseDataTransformer
  • Remove dependency on config's table names
  • Accept field mapping as parameter
  • Implement transform(spu_df, sku_df, spu_field_mapping, sku_field_mapping) method

2.4 Create Transformer Factory

File: <code>indexer/transformer_factory.py</code> (NEW)

Factory to create appropriate transformer based on parameters:

class TransformerFactory:
    @staticmethod
    def create(
        transformer_type: str,  # 'sku' or 'spu'
        config: CustomerConfig,
        text_encoder=None,
        image_encoder=None
    ) -> BaseDataTransformer:
        if transformer_type == 'spu':
            return SPUDataTransformer(config, text_encoder, image_encoder)
        elif transformer_type == 'sku':
            return DataTransformer(config, text_encoder, image_encoder)
        else:
            raise ValueError(f"Unknown transformer type: {transformer_type}")

2.5 Update Package Exports

File: <code>indexer/__init__.py</code>

Export new structure:

from .base_transformer import BaseDataTransformer
from .data_transformer import DataTransformer
from .spu_data_transformer import SPUDataTransformer
from .transformer_factory import TransformerFactory

__all__ = [
    'BaseDataTransformer',
    'DataTransformer',
    'SPUDataTransformer',
    'TransformerFactory',  # Recommended for new code
    'BulkIndexer',
    'IndexingPipeline',
]

Phase 3: Script Refactoring

3.1 Create Unified Ingestion Script

File: <code>scripts/ingest_universal.py</code> (NEW)

Universal ingestion script with full parameter control:

Parameters:

# Search configuration (pure)
--config base                      # Which search config to use

# Runtime parameters
--tenant-id shop_12345            # REQUIRED tenant identifier
--es-host http://localhost:9200
--es-username elastic
--es-password xxx

# Data source parameters (pipeline concern)
--data-source mysql               # mysql, csv, api, etc.
--mysql-host 120.79.247.228
--mysql-port 3316
--mysql-database saas
--mysql-username saas
--mysql-password xxx

# Transformer parameters (pipeline concern)
--transformer spu                 # spu or sku
--spu-table shoplazza_product_spu
--sku-table shoplazza_product_sku
--shop-id 1                       # Filter by shop_id

# Field mapping (optional, uses defaults if not provided)
--field-mapping mapping.json

# Processing parameters
--batch-size 100
--limit 1000
--skip-embeddings
--recreate-index

Logic:

  1. Load search config (clean, no data source info)
  2. Set tenant_id from parameter
  3. Connect to data source based on --data-source parameter
  4. Load data from tables specified by parameters
  5. Create transformer based on --transformer parameter
  6. Apply field mapping (default or custom)
  7. Transform and index

3.2 Update BASE Ingestion Script

File: <code>scripts/ingest_base.py</code>

Update to use script parameters instead of config values:

  • Remove dependency on config.mysql_config
  • Remove dependency on config.main_table, config.sku_table
  • Get all data source info from command-line arguments
  • Use TransformerFactory

3.3 Create Field Mapping Helper

File: <code>scripts/field_mapping_generator.py</code> (NEW)

Helper script to generate default field mappings:

# Generate default mapping for Shoplazza SPU schema
python scripts/field_mapping_generator.py \
  --source shoplazza \
  --level spu \
  --output mappings/shoplazza_spu.json

Output example:

{
  "spu_fields": {
    "id": "id",
    "title": "title",
    "description": "description",
    ...
  },
  "sku_fields": {
    "id": "id",
    "price": "price",
    "sku": "sku",
    ...
  }
}

Phase 4: Configuration Loader Updates

4.1 Simplify ConfigLoader

File: <code>config/config_loader.py</code>

Changes:

  • Remove parsing of mysql_config
  • Remove parsing of main_table, sku_table, extension_table
  • Remove validation of source_table/source_column in fields
  • Simplify field parsing (no source mapping)
  • Keep validation of ES/search related config

4.2 Update CustomerConfig Model

File: <code>config/__init__.py</code> or wherever CustomerConfig is defined

Remove attributes:

  • mysql_config
  • main_table
  • sku_table
  • extension_table

Add attributes:

  • tenant_id (runtime, default None)

Simplify FieldConfig:

  • Remove source_table
  • Remove source_column

Phase 5: Documentation Updates

5.1 Create Pipeline Guide

File: <code>docs/DATA_PIPELINE_GUIDE.md</code> (NEW)

Document:

  • Separation of concerns (config vs pipeline)
  • How to use ingest_universal.py
  • Default field mappings for common sources
  • Custom field mapping examples
  • Transformer selection guide

5.2 Update BASE Config Guide

File: <code>docs/BASE_CONFIG_GUIDE.md</code>

Update to reflect:

  • Config only contains search settings
  • No data source configuration
  • How tenant_id is injected at runtime
  • Examples of using same config with different data sources

5.3 Update API Documentation

File: <code>API_DOCUMENTATION.md</code>

No changes needed (API layer doesn't know about data pipeline).

5.4 Update Design Documentation

File: <code>设计文档.md</code>

Add section on configuration architecture:

  • Clear separation between search config and pipeline
  • Benefits of this approach
  • How to extend for new data sources

Phase 6: Create Default Field Mappings

6.1 Shoplazza SPU Mapping

File: <code>mappings/shoplazza_spu.json</code> (NEW)

Default field mapping for Shoplazza SPU/SKU tables to BASE config fields.

6.2 Shoplazza SKU Mapping (Legacy)

File: <code>mappings/shoplazza_sku_legacy.json</code> (NEW)

Default field mapping for legacy SKU-level indexing.

6.3 CSV Template Mapping

File: <code>mappings/csv_template.json</code> (NEW)

Example mapping for CSV data sources.

Phase 7: Testing & Validation

7.1 Test Script with Different Sources

Test ingest_universal.py with:

  1. MySQL Shoplazza tables (SPU level)
  2. MySQL Shoplazza tables (SKU level, legacy)
  3. CSV files (if time permits)

7.2 Verify Configuration Portability

Test same BASE config with:

  • Different data sources
  • Different field mappings
  • Different transformers

7.3 Update Test Scripts

File: <code>scripts/test_base.sh</code>

Update to use new script parameters.

Phase 8: Migration & Cleanup

8.1 Create Migration Guide

File: <code>docs/CONFIG_MIGRATION_GUIDE.md</code> (NEW)

Guide for migrating from old config format to new:

  • What changed
  • How to update existing configs
  • How to update ingestion scripts
  • Breaking changes

8.2 Update Example Configs

Update all example configurations to new format.

8.3 Mark Old Scripts as Deprecated

Add deprecation warnings to scripts that still use old config format.

Key Design Principles

1. Separation of Concerns

Search Configuration (customer-facing):

  • What fields exist in ES
  • How fields are analyzed/indexed
  • Search strategies and ranking
  • Facets and aggregations
  • Query processing rules

Data Pipeline (internal):

  • Where data comes from
  • How to connect to data sources
  • Which tables/files to read
  • How to transform data
  • Field mapping logic

2. Configuration Portability

Same search config can be used with:

  • Different data sources (MySQL, CSV, API)
  • Different schemas (with appropriate mapping)
  • Different transformation strategies

3. Flexibility

Pipeline decisions (transformer, data source, field mapping) made at runtime, not in config.

Migration Path

For Existing Users

  1. Update config files (remove data source settings)
  2. Update ingestion commands (add new parameters)
  3. Optionally create field mapping files for convenience

For New Users

  1. Copy BASE config (already clean)
  2. Run ingest_universal.py with appropriate parameters
  3. Provide custom field mapping if needed

Success Criteria

  • [ ] BASE config contains ZERO data source information
  • [ ] Same config works with MySQL and CSV sources
  • [ ] Pipeline fully controlled by script parameters
  • [ ] Transformers work with external field mapping
  • [ ] Documentation clearly separates concerns
  • [ ] Tests validate portability
  • [ ] Migration guide provided

Estimated Effort

  • Configuration cleanup: 2 hours
  • Transformer refactoring: 4-5 hours
  • Script refactoring: 3-4 hours
  • Config loader updates: 2 hours
  • Documentation: 2-3 hours
  • Testing & validation: 2-3 hours
  • Total: 15-19 hours

Benefits

Clean separation of concerns

Configuration reusability across data sources

Customer doesn't need to understand ETL

Easier to add new data sources

More flexible pipeline control

Reduced configuration complexity

To-dos

  • [ ] Clean BASE and legacy configs: remove mysql_config, table names, source_table/source_column from fields
  • [ ] Create BaseDataTransformer abstract class with shared logic (type conversion, embeddings, tenant_id)
  • [ ] Refactor DataTransformer and SPUDataTransformer to inherit from base, accept field mapping as parameter
  • [ ] Create TransformerFactory for creating transformers based on type parameter
  • [ ] Create ingest_universal.py with full parameter control for data source, transformer, field mapping
  • [ ] Update scripts/ingest_base.py to use parameters instead of config for data source
  • [ ] Create field_mapping_generator.py and default mapping files (shoplazza_spu.json, etc.)
  • [ ] Simplify ConfigLoader to only parse search config, remove data source parsing
  • [ ] Create DATA_PIPELINE_GUIDE.md documenting pipeline approach and config separation
  • [ ] Update BASE_CONFIG_GUIDE.md to reflect config-only-search-settings approach
  • [ ] Create CONFIG_MIGRATION_GUIDE.md for migrating from old to new config format
  • [ ] Test same config with different data sources and validate portability