Step 1: Sell-Out Processing¶
Overview¶
The sell-out processing step handles retail sales data from partners, applying mappings and calculating key metrics for business analysis.
Architecture¶
graph TB
subgraph "Input"
A1[Excel Source File]
A2[Retailer Mapping]
A3[Company Mapping]
A4[Product Mapping]
end
subgraph "Processing"
B1[Data Loader]
B2[Data Enricher]
B3[Mapping Processors]
B4[Gold Layer Preparer]
end
subgraph "Output"
C1[Bronze Sellout]
C2[Gold Sellout]
end
A1 --> B1
A2 --> B3
A3 --> B3
A4 --> B3
B1 --> B2
B2 --> B3
B3 --> B4
B4 --> C1
B4 --> C2
Components¶
1. Data Loader (loader.py
)¶
Purpose: Load and clean Excel source files
Key Features¶
- Dynamic header detection
- Column name standardization
- Data type inference
- Binary file reading from Azure
Implementation¶
class SelloutDataLoader:
def load_and_clean_source(self, source_path: str):
# Read as binary from Azure
binary_df = spark.read.format("binaryFile").load(source_path)
# Detect header row
header_row = self._detect_header_row_from_dataframe(df_sample)
# Load with pandas
df_pd = pd.read_excel(excel_buffer, header=header_row)
# Clean column names
df_pd = self._clean_column_names(df_pd)
# Convert to Spark
return self._convert_to_spark(df_pd)
2. Data Enricher (enrichment.py
)¶
Purpose: Calculate derived metrics and add business logic
Calculations¶
- Discount Amount: Regular_Price - Final_Price_After_Discount
- Discount Percent: Discount_Amount / Regular_Price
- Data Validity Flags: Quality indicators
- Promo Frame: Discount percentage buckets
Promo Frame Bins¶
promo_frame_bins = [
(0, 0, '0%'),
(0.001, 0.10, '0-10%'),
(0.11, 0.20, '11-20%'),
(0.21, 0.30, '21-30%'),
# ... up to 91-100%
]
3. Mapping Processors¶
Base Mapping Processor (mappings/base.py
)¶
Abstract base class for all mapping processors: - Load mapping from Azure - Save to bronze table - Apply mapping with validation - Check for data inflation
Retailer Mapping (mappings/retailer.py
)¶
Maps retailers to customers and countries:
class RetailerMappingProcessor(MappingProcessor):
key_column = "Retailer"
expected_columns = ['Retailer', 'Customer', 'Country_Sk', 'Country_Sell_Out']
Company Mapping (mappings/company.py
)¶
Maps brands to parent companies:
class CompanyMappingProcessor(MappingProcessor):
key_column = "Brand"
expected_columns = ['Brand', 'Company']
Product Mapping (mappings/product.py
)¶
Enriches with product attributes via UPC:
class ProductMappingProcessor(MappingProcessor):
key_column = "Upc" # Dynamic key detection
expected_columns = ['Product_Name', 'Upc', 'Type', 'Size']
4. Gold Layer Preparer (gold_layer.py
)¶
Purpose: Final transformations for analytics-ready data
Operations¶
- Round numeric columns (4 decimal places)
- Add fiscal year calculation
- Add metadata (source file, timestamp)
- Cache for performance
Fiscal Year Logic¶
# Coty fiscal year: July-June
fiscal_year = F.when(
F.month("Date") >= 7, # July-December
F.year("Date") + 1 # Next year
).otherwise(
F.year("Date") # Same year
)
5. Orchestrator (orchestrator.py
)¶
Purpose: Coordinate all processing steps
Process Flow¶
- Load and clean source data
- Enrich with calculations
- Apply all mappings
- Prepare gold layer
- Write to data lake
- Cleanup resources
Input Files¶
Source Data Format¶
File: Coty_DSA_Price_Promo_Data.xlsx
Expected columns: - Sr_No or ID - Brand - Retailer - Week/Date - Upc - Regular_Price - Final_Price_After_Discount
Mapping Files¶
Retailer Mapping¶
File: sellout_retailer_mapping.csv
Retailer;Customer;Country_Sk;Country_Sell_Out
Amazon;Amazon Corp;US;United States
Boots;Boots UK;GB;United Kingdom
Company Mapping¶
File: sellout_company_mapping.csv
Product Mapping¶
File: sellout_product_mapping.csv
Product_Name;Upc;Type;Size
Boss Bottled EDT;3614229823653;Fragrance;100ml
Burberry Her EDP;3614229823660;Fragrance;50ml
Output Tables¶
Bronze Table: bronze_sellout
¶
Raw data with minimal transformations: - Original columns preserved - Processing timestamp added - Data types standardized
Gold Table: gold_sellout
¶
Business-ready dataset: - All mappings applied - Calculated metrics included - Fiscal year added - Rounded numerics - Metadata columns
Data Quality Checks¶
Validation Rules¶
- Price Validation: Regular_Price > 0
- Discount Logic: Discount_Percent between 0 and 1
- Date Validation: Valid date ranges
- Mapping Coverage: Track unmatched records
Quality Flags¶
# Data validity flag
Data_Valid = F.when(Regular_Price > 0, "Y").otherwise("N")
# Week flag
Week_Sk = F.when(Data_Valid == "Y", 1).otherwise(0)
Performance Considerations¶
Optimization Techniques¶
- Binary file reading for large Excel files
- Broadcast joins for mappings
- DataFrame caching after enrichment
- Cleanup after processing
Memory Management¶
Error Handling¶
Common Issues and Solutions¶
- Missing Headers
- Detection patterns for various formats
-
Fallback to first row
-
Data Type Issues
- Force string conversion for IDs
-
Handle mixed types gracefully
-
Mapping Mismatches
- Log unmatched values
-
Continue processing with nulls
-
File Format Errors
- Multiple read attempts
- Different encoding options
Configuration¶
Key configuration in config.py
:
# File names
SELLOUT_SOURCE_FILE = "Coty_DSA_Price_Promo_Data.xlsx"
# Mapping files
SELLOUT_RETAILER_MAPPING_FILE = "sellout_retailer_mapping.csv"
SELLOUT_COMPANY_MAPPING_FILE = "sellout_company_mapping.csv"
SELLOUT_PRODUCT_MAPPING_FILE = "sellout_product_mapping.csv"
# Table names
BRONZE_SELLOUT = "bronze_sellout"
GOLD_SELLOUT = "gold_sellout"
Usage Example¶
from src.pipeline.step_01_sellout_processing import process_pipeline
success = process_pipeline(
source_path="abfss://domain-finance-ffg@storage.dfs.core.windows.net/sellout_data/incoming/source.xlsx",
retailer_mapping_path="path/to/retailer_mapping.csv",
company_mapping_path="path/to/company_mapping.csv",
product_mapping_path="path/to/product_mapping.csv",
schema_name="dm_finance_ffg"
)
if success:
print("Sell-out processing completed successfully")
Monitoring¶
Key Metrics¶
- Row counts at each stage
- Mapping match rates
- Processing time
- Data quality scores