Skip to content

Step 3: RSP Processing

Overview

The RSP (Recommended Selling Price) processing step handles price data, applies market mappings, converts currencies, manages inheritance rules, and expands date ranges for daily price tracking.

Architecture

graph TB
    subgraph "Input"
        A1[SRP Database Excel]
        A2[Market Mapping]
        A3[Inheritance Mapping]
        A4[FX Rates Table]
    end

    subgraph "Processing"
        B1[Data Loader]
        B2[Market Processor]
        B3[FX Converter]
        B4[Inheritance Processor]
        B5[Date Expander]
        B6[Gold Layer Preparer]
    end

    subgraph "Output"
        C1[Bronze RSP]
        C2[Gold RSP Expanded]
    end

    A1 --> B1
    A2 --> B2
    A3 --> B4
    A4 --> B3
    B1 --> B2
    B2 --> B3
    B3 --> B4
    B4 --> B5
    B5 --> B6
    B6 --> C1
    B6 --> C2

Components

1. Data Loader (loader.py)

Purpose: Load and validate RSP Excel files

Required Columns

required_columns = ['EAN', 'Market', 'SRP', 'Valid_From']
optional_columns = ['House', 'Item_Name', 'LP']

Data Cleaning

def load_and_clean_source(self, source_path):
    # Read as binary
    binary_df = spark.read.format("binaryFile").load(source_path)

    # Convert to pandas with EAN as string
    dtype_spec = {'EAN': str, 'Ean': str, 'ean': str}
    df_pd = pd.read_excel(excel_buffer, dtype=dtype_spec)

    # Clean column names
    df_pd = self._clean_column_names(df_pd)

    # Convert to Spark and validate
    return self._convert_to_spark(df_pd)

2. Market Mapping Processor (mappings/market.py)

Purpose: Convert Market to reporting_unit_key

Mapping Structure

Market;reporting_unit_key;reporting_unit_description
France;FR;France
Germany;DE;Germany
Belgium;BE;Belgium
Netherlands;NL;Netherlands

Implementation

class MarketMappingProcessor:
    def apply_mapping(self, df):
        # Join on Market column
        df_joined = df.join(
            F.broadcast(mapping_df),
            on="Market",
            how="left"
        )
        return df_joined

3. FX Rate Converter (enrichment.py)

Purpose: Convert local currency to EUR

FX Conversion Logic

def apply_fx_rate_conversion(self, df):
    # Extract fiscal year
    df = df.withColumn(
        "fiscal_year", 
        F.year("Valid_From").cast(StringType())
    )

    # Query FX rates
    fx_rates = execute_sql_query("""
        SELECT Fiscal_Year, Reporting_Unit_Key, Toeur 
        FROM bronze_sellin_fx_rate
    """)

    # Convert to EUR
    df = df.withColumn(
        "SRP_eur",
        F.col("SRP") / F.col("Toeur")
    )

    # Also convert LP if exists
    if "Lp" in df.columns:
        df = df.withColumn(
            "LP_eur",
            F.col("Lp") / F.col("Toeur")
        )

4. Inheritance Processor (mappings/inheritance.py)

Purpose: Create inherited RSP entries

Inheritance Rules

Reporting_Unit_Key;Inherits_From;reporting_unit_description
EU_EX;DE;Europe Export inherits from Germany
EU_TR;DE;Europe Travel Retail inherits from Germany

Implementation

def apply_inheritance(self, df):
    inherited_dfs = []

    for rule in inheritance_rules:
        inheritor = rule["Reporting_Unit_Key"]
        parent = rule["Inherits_From"]

        # Filter parent rows
        parent_rows = df.filter(
            F.col("Reporting_Unit_Key") == parent
        )

        # Create inherited rows
        inherited = parent_rows.withColumn(
            "Reporting_Unit_Key", F.lit(inheritor)
        ).withColumn(
            "is_inherited", F.lit(True)
        )

        inherited_dfs.append(inherited)

    # Union all DataFrames
    return union_all([df] + inherited_dfs)

5. Date Expander (expander.py)

Purpose: Expand RSP to daily granularity

Expansion Logic

def expand_date_ranges(self, df):
    # Find next Valid_From for each EAN/RU
    df_with_next = df.withColumn(
        "next_valid_from",
        F.lead("Valid_From").over(
            Window.partitionBy("reporting_unit_key", "EAN")
                  .orderBy("Valid_From")
        )
    )

    # Set end_date
    df = df.withColumn(
        "end_date",
        F.coalesce(
            F.date_sub("next_valid_from", 1),
            F.lit(current_date)
        )
    )

    # Create date sequence
    date_df = spark.sql(f"""
        SELECT explode(sequence(
            to_date('{min_date}'),
            to_date('{max_date}'),
            interval 1 day
        )) as Date
    """)

    # Cross join with date range filter
    expanded = df.join(date_df).where(
        (date_df.Date >= df.Valid_From) & 
        (date_df.Date <= df.end_date)
    )

6. Gold Layer Preparer (gold_layer.py)

Purpose: Final transformations for analytics

Operations

  • Round prices to 2 decimals
  • Add Year and Month columns
  • Add metadata columns
  • Reorder columns logically

Input Files

RSP Source File

File: SRP_database.xlsx

Example structure:

EAN          | Market    | SRP    | Valid_From | House      | Item_Name
3614229823653| France    | 89.90  | 2024-01-01 | Hugo Boss  | Boss Bottled EDT
3614229823653| Germany   | 85.00  | 2024-01-01 | Hugo Boss  | Boss Bottled EDT
3614229823653| France    | 94.90  | 2024-07-01 | Hugo Boss  | Boss Bottled EDT

Market Mapping

File: rsp_market_mapping.csv

Market;reporting_unit_key;reporting_unit_description
France;FR;France
Germany;DE;Germany
Belgium;BE;Belgium
Netherlands;NL;Netherlands
Europe Export;EU_EX;Europe Export
Europe Travel;EU_TR;Europe Travel Retail

Inheritance Mapping

File: rsp_inheritance_mapping.csv

Reporting_Unit_Key;Inherits_From;reporting_unit_description
EU_EX;DE;Europe Export
EU_TR;DE;Europe Travel Retail
CH;DE;Switzerland

Output Tables

Bronze Table: bronze_rsp

Point-in-time price records: - Original Valid_From dates - Prices in EUR - Inheritance flags - Processing metadata

Schema:

reporting_unit_key: string
reporting_unit_description: string
EAN: string
Valid_From: date
SRP_eur: double
LP_eur: double
Market: string
House: string
Item_Name: string
is_inherited: boolean
fiscal_year: string
processing_timestamp: timestamp

Gold Table: gold_rsp

Daily expanded price records: - One row per EAN per day - Prices carried forward - Year/Month columns - Business metadata

Schema:

reporting_unit_key: string
reporting_unit_description: string
EAN: string
Date: date
Year: int
Month: int
SRP_eur: double
LP_eur: double
Market: string
House: string
Item_Name: string
is_inherited: boolean
Source: string
gold_processing_timestamp: timestamp

Business Logic

Price Validity Rules

  1. Price Duration: Valid from date until next price or current date
  2. Inheritance: Some markets inherit prices from others
  3. Currency: All prices converted to EUR
  4. Daily Granularity: Expanded for daily reporting

Special Cases

  • Missing FX Rate: Skip EUR conversion, filter out
  • No Next Price: Use current date as end
  • Inherited Prices: Mark with is_inherited flag

Performance Considerations

Optimization Strategies

  1. Binary Reading: Efficient Excel loading
  2. Broadcast Joins: Small mapping tables
  3. Window Functions: Efficient next date calculation
  4. Date Explosion: Optimized sequence generation

Memory Management

# Cache before expensive operations
df = df.cache()

# Unpersist after use
df.unpersist()

Error Handling

Common Issues

  1. Invalid Dates

    df = df.filter(F.col("Valid_From").isNotNull())
    

  2. Missing Markets

    unmatched = df.filter(
        F.col("reporting_unit_key").isNull()
    )
    logger.warning(f"Unmatched markets: {unmatched.count()}")
    

  3. FX Rate Missing ```python df = df.