Skip to content

Step 2: Sell-In Processing

Overview

The sell-in processing step extracts internal sales data from SAP systems, applies complex financial transformations, enriches with master data, and creates business-ready datasets.

Architecture

graph TB
    subgraph "Data Sources"
        A1[PL Global Table]
        A2[MDM Tables]
        A3[Mapping Files]
    end

    subgraph "Processing Components"
        B1[SQL Loader]
        B2[HFM Processor]
        B3[MDM Enricher]
        B4[Mapping Processors]
        B5[Aggregator]
        B6[Standardizer]
    end

    subgraph "Output"
        C1[Bronze Sellin]
        C2[Gold Sellin]
    end

    A1 --> B1
    A2 --> B3
    A3 --> B4
    B1 --> B2
    B2 --> B3
    B3 --> B4
    B4 --> B5
    B5 --> B6
    B6 --> C1
    B6 --> C2

Components

1. SQL Loader (loader.py)

Purpose: Extract and aggregate pl_global data

Dynamic Date Range

def get_dynamic_date_range():
    today = datetime.now()
    # Previous year start to today
    previous_year = today.year - 1
    start_date = datetime(previous_year, 1, 1)
    return start_date.strftime('%Y%m%d'), today.strftime('%Y%m%d')

SQL Query Construction

-- Query 1: Valid material_key with Luxury division
SELECT dimensions, metrics
FROM pl_global pl
JOIN material m ON pl.material_key = m.material_key
WHERE m.division = 'Luxury'
  AND pl.posting_date BETWEEN start AND end
  AND pl.reporting_unit_key IN (scope)

UNION ALL

-- Query 2: Null/empty material_key
SELECT dimensions, metrics
FROM pl_global pl
WHERE pl.material_key IS NULL OR pl.material_key = ''
  AND pl.posting_date BETWEEN start AND end

2. HFM Processor (hfm_processor.py)

Purpose: Create financial account aggregations

HFM Account Types

  1. Direct Accounts
  2. 1010: Gross Invoiced Sales
  3. 1110: Returns Actual
  4. 1120: Returns Accrual
  5. 1240: Markdowns Actual
  6. 1241: Markdowns Accrual

  7. PNL Lines

  8. 1810: Returns & Markdowns
  9. 1820: Discretionary
  10. 1830: Trade Terms
  11. 9195C: Net Revenues
  12. 9230: COGS (with subtraction)

  13. Trade Terms Groups

    HFM_TRADE_TERMS_GROUPS = {
        "Activation_and_Data": ["1212", "1262", "1330", ...],
        "COOP": ["1312", "1311", "1310", ...],
        "Growth": ["1271", "1272", "1331"],
        "Promotions": ["1213", "1250", "1251", ...],
        "Efficiency": ["1110", "1120", "1130", ...],
        "Base_Discount": ["1210", "1220", "1289", ...]
    }
    

3. MDM Enricher (mdm_enrichment.py)

Purpose: Add master data attributes

Enrichment Sources

MDM_TABLES = {
    "reporting_unit": "gold_mdm.reporting_unit",
    "material": "gold_mdm.material",
    "customer": "gold_mdm.customer",
    "brand": "gold_mdm.brand",
    "house": "gold_mdm.house",
    "core_category": "gold_mdm.core_category",
    "sub_category": "gold_mdm.sub_category",
    "sub_division": "gold_mdm.sub_division",
    "product_line": "gold_mdm.product_line"
}

4. Mapping Processors

Region Mapping

Maps reporting units to regions:

class RegionMappingProcessor:
    key_column = "reporting_unit_key"
    # Adds: Region

Customer Mapping

Adds customer classifications:

class CustomerMappingProcessor:
    key_column = "customer_hier_l3"
    # Adds: Code_L3, Retailer_Classi_Sk, Channels_Sk, Buying_Group_Sk

Freight Mapping

Applies freight percentages:

class FreightMappingProcessor:
    key_columns = ["fiscal_year", "reporting_unit_key"]
    # Adds: Freight (percentage)

Royalty Mapping

Applies royalty percentages by house:

class RoyaltyMappingProcessor:
    key_columns = ["fiscal_year", "house_transac"]
    # Adds: Royalty (percentage)

FX Rate Mapping

Converts local currency to EUR:

class FXRateMappingProcessor:
    def apply_mapping(self, df):
        # Convert all amount_lc columns
        for col in amount_columns:
            eur_col = f"{col}_eur"
            df[eur_col] = df[col] / df["Toeur"]

Additional Mappings

  • Premiumness: Luxury tier classification
  • ABC: Product importance levels
  • Multiline: Brand multiline flags

5. Aggregator (aggregator.py)

Purpose: Final aggregation with special rules

EU_VD Special Handling

def final_aggregation(self, df):
    # Only EU_VD keeps L4 hierarchy
    df = df.withColumn(
        "customer_hier_l4_clean",
        F.when(
            F.col("reporting_unit_key") == "EU_VD",
            F.col("customer_hier_l4")
        ).otherwise(F.lit(""))
    )

6. Standardizer (standardizer.py)

Purpose: Clean and standardize data

Operations

  1. Replace nulls with "Not Assigned"
  2. Add cleaning flags
  3. Filter critical dimensions
  4. Remove Premiumness = "OUT"
  5. Reorder columns logically

Cleaning Flags

# Gross sales threshold
cleaning_gross_sales = F.when(
    (F.col("Gross_Sales") < -10_000_000_000) | 
    (F.col("Gross_Sales") > 0),
    0
).otherwise(1)

# Combined flag
cleaning_total = cleaning_gross_sales * cleaning_quantities * cleaning_nis

Input Data

Reporting Unit Scope

File: sellin_reporting_unit_scope.csv

Reporting_Unit_Key;Description
FR;France
DE;Germany
IT;Italy

HFM Groups Mapping

File: sellin_hfm_groups.csv

Hfm_Acct_No;Hfm_Acct_Desc;Pnl_Line_Hfm_Acct_No;Pnl_Line_Desc
1010;Gross Invoiced Sales;1800;Total GTN
1110;Returns Actual;1810;Returns & Markdowns

Output Tables

Bronze Table: bronze_sellin

Raw aggregated data from SQL: - All dimensions preserved - HFM accounts separate - Local currency amounts

Gold Table: gold_sellin

Business-ready dataset with: - HFM aggregations by type - EUR converted amounts - MDM enrichments - All mappings applied - Cleaning flags

Business Logic

Customer Hierarchy Reclassification

CUSTOMER_HIER_RECLASSIFICATION = {
    "customer_hier_l4": {
        "10514837": {
            "column": "Retailer_Classi_Sk",
            "value": "LA Makeup"
        }
    }
}

Data Quality Thresholds

SELLIN_CLEANING_MIN_THRESHOLD = -10_000_000_000  # -10 billion
SELLIN_CLEANING_MAX_THRESHOLD = 0

Performance Optimization

Aggregation Strategy

  1. Initial SQL aggregation
  2. HFM groupBy once
  3. Cache after major joins
  4. Final aggregation to reduce rows

Memory Management

# Cache dataframe if large enough
if df.count() > 1000:
    df = df.cache()
    logger.info("Cached dataframe for performance")

Error Handling

Common Issues

  1. Missing MDM Data
  2. Use "Not Assigned" defaults
  3. Log missing enrichments
  4. Continue processing

  5. FX Rate Missing

  6. Skip EUR conversion
  7. Log affected records
  8. Flag for review

  9. HFM Mapping Issues

  10. Create column with zeros
  11. Log unmapped accounts

Configuration

Key settings in config.py:

# Direct HFM accounts
SELLIN_DIRECT_HFM_ACCOUNTS = ["1010", "1110", "1120", "1240", "1241"]

# PNL lines
SELLIN_PNL_LINES = ["1810", "1820", "1830", "1800", "9195C", "S510", "9230", "9240C"]

# COGS accounts (for subtraction)
HFM_COGS_ACCOUNTS = ["2042", "2043", "2044", "2047", "2214"]

Usage Example

from src.pipeline.step_02_sellin_processing import process_pipeline

success = process_pipeline(
    schema_name="dm_finance_ffg",
    region_mapping_path="path/to/region_mapping.csv",
    customer_mapping_path="path/to/customer_mapping.csv",
    freight_mapping_path="path/to/freight_mapping.csv",
    royalties_mapping_path="path/to/royalties_mapping.csv",
    fx_rate_mapping_path="path/to/fx_rate_mapping.csv",
    hfm_mapping_path="path/to/hfm_mapping.csv",
    reporting_unit_scope_path="path/to/scope.csv",
    premiumness_mapping_path="path/to/premiumness.csv",
    abc_mapping_path="path/to/abc.csv",
    multiline_mapping_path="path/to/multiline.csv"
)

Monitoring

Key Metrics

  • SQL query execution time
  • Row counts by stage
  • Mapping match rates
  • Aggregation reduction ratios

Sample Log Output

2024-09-16 10:00:00 | INFO | Loading reporting unit scope: 15 units
2024-09-16 10:00:05 | INFO | Executing SQL query for date range: 20230101-20240916
2024-09-16 10:01:00 | INFO | Retrieved 5,000,000 rows from pl_global
2024-09-16 10:02:00 | INFO | HFM aggregation complete: 42 columns created
2024-09-16 10:03:00 | INFO | MDM enrichment: 95% material keys matched
2024-09-16 10:04:00 | INFO | Final aggregation: 5M → 500K rows
2024-09-16 10:05:00 | INFO | Written to gold_sellin table