Skip to content

Step 1: Sell-Out Processing

Overview

The sell-out processing step handles retail sales data from partners, applying mappings and calculating key metrics for business analysis.

Architecture

graph TB
    subgraph "Input"
        A1[Excel Source File]
        A2[Retailer Mapping]
        A3[Company Mapping]
        A4[Product Mapping]
    end

    subgraph "Processing"
        B1[Data Loader]
        B2[Data Enricher]
        B3[Mapping Processors]
        B4[Gold Layer Preparer]
    end

    subgraph "Output"
        C1[Bronze Sellout]
        C2[Gold Sellout]
    end

    A1 --> B1
    A2 --> B3
    A3 --> B3
    A4 --> B3
    B1 --> B2
    B2 --> B3
    B3 --> B4
    B4 --> C1
    B4 --> C2

Components

1. Data Loader (loader.py)

Purpose: Load and clean Excel source files

Key Features

  • Dynamic header detection
  • Column name standardization
  • Data type inference
  • Binary file reading from Azure

Implementation

class SelloutDataLoader:
    def load_and_clean_source(self, source_path: str):
        # Read as binary from Azure
        binary_df = spark.read.format("binaryFile").load(source_path)

        # Detect header row
        header_row = self._detect_header_row_from_dataframe(df_sample)

        # Load with pandas
        df_pd = pd.read_excel(excel_buffer, header=header_row)

        # Clean column names
        df_pd = self._clean_column_names(df_pd)

        # Convert to Spark
        return self._convert_to_spark(df_pd)

2. Data Enricher (enrichment.py)

Purpose: Calculate derived metrics and add business logic

Calculations

  • Discount Amount: Regular_Price - Final_Price_After_Discount
  • Discount Percent: Discount_Amount / Regular_Price
  • Data Validity Flags: Quality indicators
  • Promo Frame: Discount percentage buckets

Promo Frame Bins

promo_frame_bins = [
    (0, 0, '0%'),
    (0.001, 0.10, '0-10%'),
    (0.11, 0.20, '11-20%'),
    (0.21, 0.30, '21-30%'),
    # ... up to 91-100%
]

3. Mapping Processors

Base Mapping Processor (mappings/base.py)

Abstract base class for all mapping processors: - Load mapping from Azure - Save to bronze table - Apply mapping with validation - Check for data inflation

Retailer Mapping (mappings/retailer.py)

Maps retailers to customers and countries:

class RetailerMappingProcessor(MappingProcessor):
    key_column = "Retailer"
    expected_columns = ['Retailer', 'Customer', 'Country_Sk', 'Country_Sell_Out']

Company Mapping (mappings/company.py)

Maps brands to parent companies:

class CompanyMappingProcessor(MappingProcessor):
    key_column = "Brand"
    expected_columns = ['Brand', 'Company']

Product Mapping (mappings/product.py)

Enriches with product attributes via UPC:

class ProductMappingProcessor(MappingProcessor):
    key_column = "Upc"  # Dynamic key detection
    expected_columns = ['Product_Name', 'Upc', 'Type', 'Size']

4. Gold Layer Preparer (gold_layer.py)

Purpose: Final transformations for analytics-ready data

Operations

  • Round numeric columns (4 decimal places)
  • Add fiscal year calculation
  • Add metadata (source file, timestamp)
  • Cache for performance

Fiscal Year Logic

# Coty fiscal year: July-June
fiscal_year = F.when(
    F.month("Date") >= 7,  # July-December
    F.year("Date") + 1     # Next year
).otherwise(
    F.year("Date")         # Same year
)

5. Orchestrator (orchestrator.py)

Purpose: Coordinate all processing steps

Process Flow

  1. Load and clean source data
  2. Enrich with calculations
  3. Apply all mappings
  4. Prepare gold layer
  5. Write to data lake
  6. Cleanup resources

Input Files

Source Data Format

File: Coty_DSA_Price_Promo_Data.xlsx

Expected columns: - Sr_No or ID - Brand - Retailer - Week/Date - Upc - Regular_Price - Final_Price_After_Discount

Mapping Files

Retailer Mapping

File: sellout_retailer_mapping.csv

Retailer;Customer;Country_Sk;Country_Sell_Out
Amazon;Amazon Corp;US;United States
Boots;Boots UK;GB;United Kingdom

Company Mapping

File: sellout_company_mapping.csv

Brand;Company
Hugo Boss;Coty Prestige
Burberry;Coty Luxury

Product Mapping

File: sellout_product_mapping.csv

Product_Name;Upc;Type;Size
Boss Bottled EDT;3614229823653;Fragrance;100ml
Burberry Her EDP;3614229823660;Fragrance;50ml

Output Tables

Bronze Table: bronze_sellout

Raw data with minimal transformations: - Original columns preserved - Processing timestamp added - Data types standardized

Gold Table: gold_sellout

Business-ready dataset: - All mappings applied - Calculated metrics included - Fiscal year added - Rounded numerics - Metadata columns

Data Quality Checks

Validation Rules

  1. Price Validation: Regular_Price > 0
  2. Discount Logic: Discount_Percent between 0 and 1
  3. Date Validation: Valid date ranges
  4. Mapping Coverage: Track unmatched records

Quality Flags

# Data validity flag
Data_Valid = F.when(Regular_Price > 0, "Y").otherwise("N")

# Week flag
Week_Sk = F.when(Data_Valid == "Y", 1).otherwise(0)

Performance Considerations

Optimization Techniques

  • Binary file reading for large Excel files
  • Broadcast joins for mappings
  • DataFrame caching after enrichment
  • Cleanup after processing

Memory Management

def _cleanup_dataframes(self, *dataframes):
    for df in dataframes:
        if df is not None:
            df.unpersist()

Error Handling

Common Issues and Solutions

  1. Missing Headers
  2. Detection patterns for various formats
  3. Fallback to first row

  4. Data Type Issues

  5. Force string conversion for IDs
  6. Handle mixed types gracefully

  7. Mapping Mismatches

  8. Log unmatched values
  9. Continue processing with nulls

  10. File Format Errors

  11. Multiple read attempts
  12. Different encoding options

Configuration

Key configuration in config.py:

# File names
SELLOUT_SOURCE_FILE = "Coty_DSA_Price_Promo_Data.xlsx"

# Mapping files
SELLOUT_RETAILER_MAPPING_FILE = "sellout_retailer_mapping.csv"
SELLOUT_COMPANY_MAPPING_FILE = "sellout_company_mapping.csv"
SELLOUT_PRODUCT_MAPPING_FILE = "sellout_product_mapping.csv"

# Table names
BRONZE_SELLOUT = "bronze_sellout"
GOLD_SELLOUT = "gold_sellout"

Usage Example

from src.pipeline.step_01_sellout_processing import process_pipeline

success = process_pipeline(
    source_path="abfss://domain-finance-ffg@storage.dfs.core.windows.net/sellout_data/incoming/source.xlsx",
    retailer_mapping_path="path/to/retailer_mapping.csv",
    company_mapping_path="path/to/company_mapping.csv",
    product_mapping_path="path/to/product_mapping.csv",
    schema_name="dm_finance_ffg"
)

if success:
    print("Sell-out processing completed successfully")

Monitoring

Key Metrics

  • Row counts at each stage
  • Mapping match rates
  • Processing time
  • Data quality scores

Logging Output

2024-09-16 10:00:00 | INFO | Loading source file from Azure
2024-09-16 10:00:05 | INFO | Detected header row at index 2
2024-09-16 10:00:10 | INFO | Loaded 50000 rows
2024-09-16 10:00:15 | INFO | Applied retailer mapping: 95% match rate
2024-09-16 10:00:20 | INFO | Written to gold_sellout table