Step 2: Sell-In Processing¶
Overview¶
The sell-in processing step extracts internal sales data from SAP systems, applies complex financial transformations, enriches with master data, and creates business-ready datasets.
Architecture¶
graph TB
subgraph "Data Sources"
A1[PL Global Table]
A2[MDM Tables]
A3[Mapping Files]
end
subgraph "Processing Components"
B1[SQL Loader]
B2[HFM Processor]
B3[MDM Enricher]
B4[Mapping Processors]
B5[Aggregator]
B6[Standardizer]
end
subgraph "Output"
C1[Bronze Sellin]
C2[Gold Sellin]
end
A1 --> B1
A2 --> B3
A3 --> B4
B1 --> B2
B2 --> B3
B3 --> B4
B4 --> B5
B5 --> B6
B6 --> C1
B6 --> C2
Components¶
1. SQL Loader (loader.py
)¶
Purpose: Extract and aggregate pl_global data
Dynamic Date Range¶
def get_dynamic_date_range():
today = datetime.now()
# Previous year start to today
previous_year = today.year - 1
start_date = datetime(previous_year, 1, 1)
return start_date.strftime('%Y%m%d'), today.strftime('%Y%m%d')
SQL Query Construction¶
-- Query 1: Valid material_key with Luxury division
SELECT dimensions, metrics
FROM pl_global pl
JOIN material m ON pl.material_key = m.material_key
WHERE m.division = 'Luxury'
AND pl.posting_date BETWEEN start AND end
AND pl.reporting_unit_key IN (scope)
UNION ALL
-- Query 2: Null/empty material_key
SELECT dimensions, metrics
FROM pl_global pl
WHERE pl.material_key IS NULL OR pl.material_key = ''
AND pl.posting_date BETWEEN start AND end
2. HFM Processor (hfm_processor.py
)¶
Purpose: Create financial account aggregations
HFM Account Types¶
- Direct Accounts
- 1010: Gross Invoiced Sales
- 1110: Returns Actual
- 1120: Returns Accrual
- 1240: Markdowns Actual
-
1241: Markdowns Accrual
-
PNL Lines
- 1810: Returns & Markdowns
- 1820: Discretionary
- 1830: Trade Terms
- 9195C: Net Revenues
-
9230: COGS (with subtraction)
-
Trade Terms Groups
3. MDM Enricher (mdm_enrichment.py
)¶
Purpose: Add master data attributes
Enrichment Sources¶
MDM_TABLES = {
"reporting_unit": "gold_mdm.reporting_unit",
"material": "gold_mdm.material",
"customer": "gold_mdm.customer",
"brand": "gold_mdm.brand",
"house": "gold_mdm.house",
"core_category": "gold_mdm.core_category",
"sub_category": "gold_mdm.sub_category",
"sub_division": "gold_mdm.sub_division",
"product_line": "gold_mdm.product_line"
}
4. Mapping Processors¶
Region Mapping¶
Maps reporting units to regions:
Customer Mapping¶
Adds customer classifications:
class CustomerMappingProcessor:
key_column = "customer_hier_l3"
# Adds: Code_L3, Retailer_Classi_Sk, Channels_Sk, Buying_Group_Sk
Freight Mapping¶
Applies freight percentages:
class FreightMappingProcessor:
key_columns = ["fiscal_year", "reporting_unit_key"]
# Adds: Freight (percentage)
Royalty Mapping¶
Applies royalty percentages by house:
class RoyaltyMappingProcessor:
key_columns = ["fiscal_year", "house_transac"]
# Adds: Royalty (percentage)
FX Rate Mapping¶
Converts local currency to EUR:
class FXRateMappingProcessor:
def apply_mapping(self, df):
# Convert all amount_lc columns
for col in amount_columns:
eur_col = f"{col}_eur"
df[eur_col] = df[col] / df["Toeur"]
Additional Mappings¶
- Premiumness: Luxury tier classification
- ABC: Product importance levels
- Multiline: Brand multiline flags
5. Aggregator (aggregator.py
)¶
Purpose: Final aggregation with special rules
EU_VD Special Handling¶
def final_aggregation(self, df):
# Only EU_VD keeps L4 hierarchy
df = df.withColumn(
"customer_hier_l4_clean",
F.when(
F.col("reporting_unit_key") == "EU_VD",
F.col("customer_hier_l4")
).otherwise(F.lit(""))
)
6. Standardizer (standardizer.py
)¶
Purpose: Clean and standardize data
Operations¶
- Replace nulls with "Not Assigned"
- Add cleaning flags
- Filter critical dimensions
- Remove Premiumness = "OUT"
- Reorder columns logically
Cleaning Flags¶
# Gross sales threshold
cleaning_gross_sales = F.when(
(F.col("Gross_Sales") < -10_000_000_000) |
(F.col("Gross_Sales") > 0),
0
).otherwise(1)
# Combined flag
cleaning_total = cleaning_gross_sales * cleaning_quantities * cleaning_nis
Input Data¶
Reporting Unit Scope¶
File: sellin_reporting_unit_scope.csv
HFM Groups Mapping¶
File: sellin_hfm_groups.csv
Hfm_Acct_No;Hfm_Acct_Desc;Pnl_Line_Hfm_Acct_No;Pnl_Line_Desc
1010;Gross Invoiced Sales;1800;Total GTN
1110;Returns Actual;1810;Returns & Markdowns
Output Tables¶
Bronze Table: bronze_sellin
¶
Raw aggregated data from SQL: - All dimensions preserved - HFM accounts separate - Local currency amounts
Gold Table: gold_sellin
¶
Business-ready dataset with: - HFM aggregations by type - EUR converted amounts - MDM enrichments - All mappings applied - Cleaning flags
Business Logic¶
Customer Hierarchy Reclassification¶
CUSTOMER_HIER_RECLASSIFICATION = {
"customer_hier_l4": {
"10514837": {
"column": "Retailer_Classi_Sk",
"value": "LA Makeup"
}
}
}
Data Quality Thresholds¶
Performance Optimization¶
Aggregation Strategy¶
- Initial SQL aggregation
- HFM groupBy once
- Cache after major joins
- Final aggregation to reduce rows
Memory Management¶
# Cache dataframe if large enough
if df.count() > 1000:
df = df.cache()
logger.info("Cached dataframe for performance")
Error Handling¶
Common Issues¶
- Missing MDM Data
- Use "Not Assigned" defaults
- Log missing enrichments
-
Continue processing
-
FX Rate Missing
- Skip EUR conversion
- Log affected records
-
Flag for review
-
HFM Mapping Issues
- Create column with zeros
- Log unmapped accounts
Configuration¶
Key settings in config.py
:
# Direct HFM accounts
SELLIN_DIRECT_HFM_ACCOUNTS = ["1010", "1110", "1120", "1240", "1241"]
# PNL lines
SELLIN_PNL_LINES = ["1810", "1820", "1830", "1800", "9195C", "S510", "9230", "9240C"]
# COGS accounts (for subtraction)
HFM_COGS_ACCOUNTS = ["2042", "2043", "2044", "2047", "2214"]
Usage Example¶
from src.pipeline.step_02_sellin_processing import process_pipeline
success = process_pipeline(
schema_name="dm_finance_ffg",
region_mapping_path="path/to/region_mapping.csv",
customer_mapping_path="path/to/customer_mapping.csv",
freight_mapping_path="path/to/freight_mapping.csv",
royalties_mapping_path="path/to/royalties_mapping.csv",
fx_rate_mapping_path="path/to/fx_rate_mapping.csv",
hfm_mapping_path="path/to/hfm_mapping.csv",
reporting_unit_scope_path="path/to/scope.csv",
premiumness_mapping_path="path/to/premiumness.csv",
abc_mapping_path="path/to/abc.csv",
multiline_mapping_path="path/to/multiline.csv"
)
Monitoring¶
Key Metrics¶
- SQL query execution time
- Row counts by stage
- Mapping match rates
- Aggregation reduction ratios
Sample Log Output¶
2024-09-16 10:00:00 | INFO | Loading reporting unit scope: 15 units
2024-09-16 10:00:05 | INFO | Executing SQL query for date range: 20230101-20240916
2024-09-16 10:01:00 | INFO | Retrieved 5,000,000 rows from pl_global
2024-09-16 10:02:00 | INFO | HFM aggregation complete: 42 columns created
2024-09-16 10:03:00 | INFO | MDM enrichment: 95% material keys matched
2024-09-16 10:04:00 | INFO | Final aggregation: 5M → 500K rows
2024-09-16 10:05:00 | INFO | Written to gold_sellin table