Pipeline Steps Overview¶
Processing Pipeline¶
The FFG pipeline consists of six sequential processing steps, each building upon the previous to create a comprehensive data processing workflow.
Pipeline Execution Flow¶
flowchart TB
A[Start Pipeline] --> B[Load Mappings]
B --> C{Files Available?}
C -->|Yes| D[Step 1: Sell-Out]
C -->|No| E[Skip to Sell-In]
D --> F[Step 2: Sell-In]
E --> F
F --> G[Step 3: RSP]
G --> H[Step 4: Ventilation]
H --> I[Step 5: Consolidation]
I --> J[Step 6: Corridor Analysis]
J --> K[Archive Files]
K --> L[Send Notifications]
L --> M[End Pipeline]
Step Descriptions¶
Step 1: Sell-Out Processing¶
Purpose: Process retail sales data from partners
Key operations: - Load Excel source files - Apply retailer, company, and product mappings - Calculate discount metrics - Create bronze and gold layers
Step 2: Sell-In Processing¶
Purpose: Process internal sales data from SAP
Key operations: - Query pl_global table - Apply HFM aggregations - Enrich with MDM attributes - Apply business mappings
Step 3: RSP Processing¶
Purpose: Process recommended selling prices
Key operations: - Load price database - Apply market mappings - Convert currencies - Expand date ranges
Step 4: Ventilation & Allocation¶
Purpose: Allocate financial metrics across dimensions
Key operations: - Create house-level aggregations - Calculate product-level allocations - Apply business rules - Handle exceptions
Step 5: Master Consolidation¶
Purpose: Combine all data sources into master table
Key operations: - Join ventilated sell-in with sell-out - Integrate RSP data - Calculate combined KPIs - Apply VAT adjustments
Step 6: Corridor Analysis¶
Purpose: Analyze price corridors and opportunities
Key operations: - Calculate price corridors - Identify opportunities - Assess risks - Generate recommendations
Dependencies¶
Step Dependencies¶
Each step depends on the successful completion of previous steps:
graph LR
A[Mappings] --> B[Sell-Out]
A --> C[Sell-In]
A --> D[RSP]
C --> E[Ventilation]
E --> F[Consolidation]
B --> F
D --> F
F --> G[Corridor Analysis]
Data Dependencies¶
Step | Required Inputs | Produces |
---|---|---|
Sell-Out | Excel files, Mappings | bronze_sellout, gold_sellout |
Sell-In | SQL tables, Mappings, MDM | bronze_sellin, gold_sellin |
RSP | Excel files, Mappings | bronze_rsp, gold_rsp |
Ventilation | gold_sellin | house_level_ventilated, product_level_ventilated |
Consolidation | All gold tables | master_sellin_consolidated |
Corridor | master_consolidated | gold_corridor_analysis |
Execution Modes¶
Full Pipeline¶
Execute all steps in sequence:
Individual Steps¶
Execute specific steps:
from src.pipeline import step_01_sellout_processing
success = step_01_sellout_processing.process_pipeline(
source_path=source_path,
retailer_mapping_path=retailer_path,
company_mapping_path=company_path,
product_mapping_path=product_path,
schema_name=schema
)
Recovery Mode¶
Resume from specific step after failure:
# Resume from ventilation after fixing sell-in issues
from src.pipeline import step_04_ventilation_allocation
success = step_04_ventilation_allocation.process_pipeline()
Configuration¶
Pipeline Configuration¶
All pipeline parameters are centralized in config.py
:
# Schema configuration
SCHEMA_NAME = "dm_finance_ffg"
# Table names
BRONZE_SELLOUT = "bronze_sellout"
GOLD_SELLOUT = "gold_sellout"
# File paths
SELLOUT_SOURCE_FILE = "Coty_DSA_Price_Promo_Data.xlsx"
RSP_SOURCE_FILE = "SRP_database.xlsx"
Business Rules¶
Configurable thresholds and rules:
# Cleaning thresholds
SELLIN_CLEANING_MIN_THRESHOLD = -10_000_000_000
SELLIN_CLEANING_MAX_THRESHOLD = 0
# Ventilation parameters
VENTILATION_DOUGLAS_THRESHOLD = 300000
VENTILATION_MIN_NP = 1
VENTILATION_MAX_NP = 300
Error Handling¶
Step-Level Error Handling¶
Each step implements comprehensive error handling:
- Input Validation: Verify required files/tables exist
- Processing Errors: Catch and log transformation errors
- Output Validation: Verify output meets expectations
- Recovery: Attempt recovery or graceful failure
Pipeline-Level Error Handling¶
The main orchestrator manages overall execution:
try:
# Process each step
if not process_step():
logger.error(f"Step failed: {step_name}")
# Decide whether to continue or abort
except Exception as e:
logger.error(f"Pipeline failed: {str(e)}")
# Cleanup and notification
Monitoring¶
Logging¶
Structured logging throughout: - Section markers for tracking - Row counts at each stage - Performance metrics - Error details
Notifications¶
Power Automate integration: - Success notifications - Failure alerts - Performance warnings - Data quality issues
Performance¶
Optimization Techniques¶
- Caching: Frequently used DataFrames
- Broadcast Joins: Small lookup tables
- Composite Keys: Efficient joins
- Cleanup: Memory management between steps
Resource Management¶
def cleanup_after_step(spark, logger, step_name):
spark.catalog.clearCache()
runtime = spark.sparkContext._jvm.Runtime.getRuntime()
runtime.gc()
logger.info(f"Cleaned up after {step_name}")
Best Practices¶
Development Guidelines¶
- Modular Design: Each step is self-contained
- Configuration-Driven: Externalize parameters
- Comprehensive Logging: Track all operations
- Error Recovery: Implement retry logic
- Documentation: Comment complex logic
Testing Strategy¶
- Unit Tests: Individual components
- Integration Tests: Step interactions
- Data Quality Tests: Validation rules
- Performance Tests: Scalability checks
- End-to-End Tests: Full pipeline runs
Deployment Process¶
- Development: Feature branches
- Testing: Staging environment
- Review: Code review and approval
- Deployment: Production release
- Monitoring: Post-deployment validation