Skip to content

Pipeline Steps Overview

Processing Pipeline

The FFG pipeline consists of six sequential processing steps, each building upon the previous to create a comprehensive data processing workflow.

Pipeline Execution Flow

flowchart TB
    A[Start Pipeline] --> B[Load Mappings]
    B --> C{Files Available?}
    C -->|Yes| D[Step 1: Sell-Out]
    C -->|No| E[Skip to Sell-In]
    D --> F[Step 2: Sell-In]
    E --> F
    F --> G[Step 3: RSP]
    G --> H[Step 4: Ventilation]
    H --> I[Step 5: Consolidation]
    I --> J[Step 6: Corridor Analysis]
    J --> K[Archive Files]
    K --> L[Send Notifications]
    L --> M[End Pipeline]

Step Descriptions

Step 1: Sell-Out Processing

Purpose: Process retail sales data from partners

Key operations: - Load Excel source files - Apply retailer, company, and product mappings - Calculate discount metrics - Create bronze and gold layers

Step 2: Sell-In Processing

Purpose: Process internal sales data from SAP

Key operations: - Query pl_global table - Apply HFM aggregations - Enrich with MDM attributes - Apply business mappings

Step 3: RSP Processing

Purpose: Process recommended selling prices

Key operations: - Load price database - Apply market mappings - Convert currencies - Expand date ranges

Step 4: Ventilation & Allocation

Purpose: Allocate financial metrics across dimensions

Key operations: - Create house-level aggregations - Calculate product-level allocations - Apply business rules - Handle exceptions

Step 5: Master Consolidation

Purpose: Combine all data sources into master table

Key operations: - Join ventilated sell-in with sell-out - Integrate RSP data - Calculate combined KPIs - Apply VAT adjustments

Step 6: Corridor Analysis

Purpose: Analyze price corridors and opportunities

Key operations: - Calculate price corridors - Identify opportunities - Assess risks - Generate recommendations

Dependencies

Step Dependencies

Each step depends on the successful completion of previous steps:

graph LR
    A[Mappings] --> B[Sell-Out]
    A --> C[Sell-In]
    A --> D[RSP]
    C --> E[Ventilation]
    E --> F[Consolidation]
    B --> F
    D --> F
    F --> G[Corridor Analysis]

Data Dependencies

Step Required Inputs Produces
Sell-Out Excel files, Mappings bronze_sellout, gold_sellout
Sell-In SQL tables, Mappings, MDM bronze_sellin, gold_sellin
RSP Excel files, Mappings bronze_rsp, gold_rsp
Ventilation gold_sellin house_level_ventilated, product_level_ventilated
Consolidation All gold tables master_sellin_consolidated
Corridor master_consolidated gold_corridor_analysis

Execution Modes

Full Pipeline

Execute all steps in sequence:

from main import run_pipeline
success = run_pipeline()

Individual Steps

Execute specific steps:

from src.pipeline import step_01_sellout_processing

success = step_01_sellout_processing.process_pipeline(
    source_path=source_path,
    retailer_mapping_path=retailer_path,
    company_mapping_path=company_path,
    product_mapping_path=product_path,
    schema_name=schema
)

Recovery Mode

Resume from specific step after failure:

# Resume from ventilation after fixing sell-in issues
from src.pipeline import step_04_ventilation_allocation
success = step_04_ventilation_allocation.process_pipeline()

Configuration

Pipeline Configuration

All pipeline parameters are centralized in config.py:

# Schema configuration
SCHEMA_NAME = "dm_finance_ffg"

# Table names
BRONZE_SELLOUT = "bronze_sellout"
GOLD_SELLOUT = "gold_sellout"

# File paths
SELLOUT_SOURCE_FILE = "Coty_DSA_Price_Promo_Data.xlsx"
RSP_SOURCE_FILE = "SRP_database.xlsx"

Business Rules

Configurable thresholds and rules:

# Cleaning thresholds
SELLIN_CLEANING_MIN_THRESHOLD = -10_000_000_000
SELLIN_CLEANING_MAX_THRESHOLD = 0

# Ventilation parameters
VENTILATION_DOUGLAS_THRESHOLD = 300000
VENTILATION_MIN_NP = 1
VENTILATION_MAX_NP = 300

Error Handling

Step-Level Error Handling

Each step implements comprehensive error handling:

  1. Input Validation: Verify required files/tables exist
  2. Processing Errors: Catch and log transformation errors
  3. Output Validation: Verify output meets expectations
  4. Recovery: Attempt recovery or graceful failure

Pipeline-Level Error Handling

The main orchestrator manages overall execution:

try:
    # Process each step
    if not process_step():
        logger.error(f"Step failed: {step_name}")
        # Decide whether to continue or abort
except Exception as e:
    logger.error(f"Pipeline failed: {str(e)}")
    # Cleanup and notification

Monitoring

Logging

Structured logging throughout: - Section markers for tracking - Row counts at each stage - Performance metrics - Error details

Notifications

Power Automate integration: - Success notifications - Failure alerts - Performance warnings - Data quality issues

Performance

Optimization Techniques

  • Caching: Frequently used DataFrames
  • Broadcast Joins: Small lookup tables
  • Composite Keys: Efficient joins
  • Cleanup: Memory management between steps

Resource Management

def cleanup_after_step(spark, logger, step_name):
    spark.catalog.clearCache()
    runtime = spark.sparkContext._jvm.Runtime.getRuntime()
    runtime.gc()
    logger.info(f"Cleaned up after {step_name}")

Best Practices

Development Guidelines

  1. Modular Design: Each step is self-contained
  2. Configuration-Driven: Externalize parameters
  3. Comprehensive Logging: Track all operations
  4. Error Recovery: Implement retry logic
  5. Documentation: Comment complex logic

Testing Strategy

  1. Unit Tests: Individual components
  2. Integration Tests: Step interactions
  3. Data Quality Tests: Validation rules
  4. Performance Tests: Scalability checks
  5. End-to-End Tests: Full pipeline runs

Deployment Process

  1. Development: Feature branches
  2. Testing: Staging environment
  3. Review: Code review and approval
  4. Deployment: Production release
  5. Monitoring: Post-deployment validation