Step 3: RSP Processing¶
Overview¶
The RSP (Recommended Selling Price) processing step handles price data, applies market mappings, converts currencies, manages inheritance rules, and expands date ranges for daily price tracking.
Architecture¶
graph TB
subgraph "Input"
A1[SRP Database Excel]
A2[Market Mapping]
A3[Inheritance Mapping]
A4[FX Rates Table]
end
subgraph "Processing"
B1[Data Loader]
B2[Market Processor]
B3[FX Converter]
B4[Inheritance Processor]
B5[Date Expander]
B6[Gold Layer Preparer]
end
subgraph "Output"
C1[Bronze RSP]
C2[Gold RSP Expanded]
end
A1 --> B1
A2 --> B2
A3 --> B4
A4 --> B3
B1 --> B2
B2 --> B3
B3 --> B4
B4 --> B5
B5 --> B6
B6 --> C1
B6 --> C2
Components¶
1. Data Loader (loader.py
)¶
Purpose: Load and validate RSP Excel files
Required Columns¶
required_columns = ['EAN', 'Market', 'SRP', 'Valid_From']
optional_columns = ['House', 'Item_Name', 'LP']
Data Cleaning¶
def load_and_clean_source(self, source_path):
# Read as binary
binary_df = spark.read.format("binaryFile").load(source_path)
# Convert to pandas with EAN as string
dtype_spec = {'EAN': str, 'Ean': str, 'ean': str}
df_pd = pd.read_excel(excel_buffer, dtype=dtype_spec)
# Clean column names
df_pd = self._clean_column_names(df_pd)
# Convert to Spark and validate
return self._convert_to_spark(df_pd)
2. Market Mapping Processor (mappings/market.py
)¶
Purpose: Convert Market to reporting_unit_key
Mapping Structure¶
Market;reporting_unit_key;reporting_unit_description
France;FR;France
Germany;DE;Germany
Belgium;BE;Belgium
Netherlands;NL;Netherlands
Implementation¶
class MarketMappingProcessor:
def apply_mapping(self, df):
# Join on Market column
df_joined = df.join(
F.broadcast(mapping_df),
on="Market",
how="left"
)
return df_joined
3. FX Rate Converter (enrichment.py
)¶
Purpose: Convert local currency to EUR
FX Conversion Logic¶
def apply_fx_rate_conversion(self, df):
# Extract fiscal year
df = df.withColumn(
"fiscal_year",
F.year("Valid_From").cast(StringType())
)
# Query FX rates
fx_rates = execute_sql_query("""
SELECT Fiscal_Year, Reporting_Unit_Key, Toeur
FROM bronze_sellin_fx_rate
""")
# Convert to EUR
df = df.withColumn(
"SRP_eur",
F.col("SRP") / F.col("Toeur")
)
# Also convert LP if exists
if "Lp" in df.columns:
df = df.withColumn(
"LP_eur",
F.col("Lp") / F.col("Toeur")
)
4. Inheritance Processor (mappings/inheritance.py
)¶
Purpose: Create inherited RSP entries
Inheritance Rules¶
Reporting_Unit_Key;Inherits_From;reporting_unit_description
EU_EX;DE;Europe Export inherits from Germany
EU_TR;DE;Europe Travel Retail inherits from Germany
Implementation¶
def apply_inheritance(self, df):
inherited_dfs = []
for rule in inheritance_rules:
inheritor = rule["Reporting_Unit_Key"]
parent = rule["Inherits_From"]
# Filter parent rows
parent_rows = df.filter(
F.col("Reporting_Unit_Key") == parent
)
# Create inherited rows
inherited = parent_rows.withColumn(
"Reporting_Unit_Key", F.lit(inheritor)
).withColumn(
"is_inherited", F.lit(True)
)
inherited_dfs.append(inherited)
# Union all DataFrames
return union_all([df] + inherited_dfs)
5. Date Expander (expander.py
)¶
Purpose: Expand RSP to daily granularity
Expansion Logic¶
def expand_date_ranges(self, df):
# Find next Valid_From for each EAN/RU
df_with_next = df.withColumn(
"next_valid_from",
F.lead("Valid_From").over(
Window.partitionBy("reporting_unit_key", "EAN")
.orderBy("Valid_From")
)
)
# Set end_date
df = df.withColumn(
"end_date",
F.coalesce(
F.date_sub("next_valid_from", 1),
F.lit(current_date)
)
)
# Create date sequence
date_df = spark.sql(f"""
SELECT explode(sequence(
to_date('{min_date}'),
to_date('{max_date}'),
interval 1 day
)) as Date
""")
# Cross join with date range filter
expanded = df.join(date_df).where(
(date_df.Date >= df.Valid_From) &
(date_df.Date <= df.end_date)
)
6. Gold Layer Preparer (gold_layer.py
)¶
Purpose: Final transformations for analytics
Operations¶
- Round prices to 2 decimals
- Add Year and Month columns
- Add metadata columns
- Reorder columns logically
Input Files¶
RSP Source File¶
File: SRP_database.xlsx
Example structure:
EAN | Market | SRP | Valid_From | House | Item_Name
3614229823653| France | 89.90 | 2024-01-01 | Hugo Boss | Boss Bottled EDT
3614229823653| Germany | 85.00 | 2024-01-01 | Hugo Boss | Boss Bottled EDT
3614229823653| France | 94.90 | 2024-07-01 | Hugo Boss | Boss Bottled EDT
Market Mapping¶
File: rsp_market_mapping.csv
Market;reporting_unit_key;reporting_unit_description
France;FR;France
Germany;DE;Germany
Belgium;BE;Belgium
Netherlands;NL;Netherlands
Europe Export;EU_EX;Europe Export
Europe Travel;EU_TR;Europe Travel Retail
Inheritance Mapping¶
File: rsp_inheritance_mapping.csv
Reporting_Unit_Key;Inherits_From;reporting_unit_description
EU_EX;DE;Europe Export
EU_TR;DE;Europe Travel Retail
CH;DE;Switzerland
Output Tables¶
Bronze Table: bronze_rsp
¶
Point-in-time price records: - Original Valid_From dates - Prices in EUR - Inheritance flags - Processing metadata
Schema:
reporting_unit_key: string
reporting_unit_description: string
EAN: string
Valid_From: date
SRP_eur: double
LP_eur: double
Market: string
House: string
Item_Name: string
is_inherited: boolean
fiscal_year: string
processing_timestamp: timestamp
Gold Table: gold_rsp
¶
Daily expanded price records: - One row per EAN per day - Prices carried forward - Year/Month columns - Business metadata
Schema:
reporting_unit_key: string
reporting_unit_description: string
EAN: string
Date: date
Year: int
Month: int
SRP_eur: double
LP_eur: double
Market: string
House: string
Item_Name: string
is_inherited: boolean
Source: string
gold_processing_timestamp: timestamp
Business Logic¶
Price Validity Rules¶
- Price Duration: Valid from date until next price or current date
- Inheritance: Some markets inherit prices from others
- Currency: All prices converted to EUR
- Daily Granularity: Expanded for daily reporting
Special Cases¶
- Missing FX Rate: Skip EUR conversion, filter out
- No Next Price: Use current date as end
- Inherited Prices: Mark with is_inherited flag
Performance Considerations¶
Optimization Strategies¶
- Binary Reading: Efficient Excel loading
- Broadcast Joins: Small mapping tables
- Window Functions: Efficient next date calculation
- Date Explosion: Optimized sequence generation
Memory Management¶
Error Handling¶
Common Issues¶
-
Invalid Dates
-
Missing Markets
-
FX Rate Missing ```python df = df.