PepsiCo Data Engineer

PepsiCo Data Engineer

This guide features 10 challenging Data Engineer interview questions for PepsiCo (4-6+ years experience), covering SQL optimization, Python ETL, PySpark big data processing, data modeling, cloud architecture (Azure/AWS), supply chain analytics, performance tuning, and production-grade data pipeline challenges at PepsiCo’s global scale (60+ PB data, 150K+ daily jobs).

1. Top N Products by Sales Per Region with Tie Handling

Difficulty Level: Medium

Engineering Level: Mid-Senior Data Engineer (4-5 YOE)

Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel - Sept-Oct 2025)

Team: Data Platform Engineering / Business Intelligence

Interview Round: Coding Round 1 (SQL)

Technology Focus: SQL Window Functions, Ranking Logic

Question: “Write a SQL query to find the top 3-5 products by total sales for each region in the last 6 months. Ensure that ties are handled properly—if two products have the same sales, both should appear in the top 3.”


Answer Framework

Why PepsiCo Asks This:
- Operates across 200+ countries with regional sales data for beverages and snacks
- Identifies top-performing SKUs by geography for supply chain optimization and marketing
- Tests understanding of window functions, ranking vs row numbering, and edge case handling

Key Technical Concepts:
- RANK() vs ROW_NUMBER(): RANK() allows ties (essential for this question), ROW_NUMBER() does not
- PARTITION BY region: Creates separate ranking windows per region
- ORDER BY sales DESC: Ranks highest sales first
- Filtering ranks ≤ 3: Returns top 3 including ties

Solution

SQL Approach (Using RANK for Tie Handling):

WITH regional_sales AS (
    SELECT
        r.region_name,
        p.product_id,
        p.product_name,
        SUM(s.sale_amount) as total_sales
    FROM sales s
    JOIN products p ON s.product_id = p.product_id
    JOIN regions r ON s.region_id = r.region_id
    WHERE s.sale_date >= DATE_SUB(CURDATE(), INTERVAL 6 MONTH)
    GROUP BY r.region_name, p.product_id, p.product_name
),
ranked_products AS (
    SELECT
        region_name,
        product_id,
        product_name,
        total_sales,
        RANK() OVER (PARTITION BY region_name ORDER BY total_sales DESC) as sales_rank
    FROM regional_sales
)
SELECT
    region_name,
    product_id,
    product_name,
    total_sales,
    sales_rank
FROM ranked_products
WHERE sales_rank <= 3ORDER BY region_name, sales_rank, product_name;

Explanation:

  1. CTE regional_sales: Aggregates 6-month sales by region and product
  1. CTE ranked_products: Applies RANK() window function partitioned by region
  1. Filter sales_rank <= 3: Returns top 3, including ties (e.g., if 3 products tied at #2, all appear)
  1. ORDER BY: Ensures consistent output ordering

Why RANK() Not ROW_NUMBER():
- ROW_NUMBER() assigns unique ranks (1,2,3,4,5) even for ties → Would arbitrarily pick one tied product
- RANK() allows ties (1,2,2,4,5) → Both products with same sales get same rank

Edge Case Example:

RegionProductSalesROW_NUMBERRANK
APACPepsi500K11
APACLays400K22
APACDoritos400K32 ← Tie!
APACGatorade350K44

With WHERE RANK <= 3: Returns Pepsi, Lays, and Doritos (3 products, correct!)

With WHERE ROW_NUMBER <= 3: Returns only Pepsi, Lays, Doritos (randomly breaks tie, incorrect!)

Performance Optimization for PepsiCo Scale:
- Index on sale_date, region_id, product_id for fast filtering
- Partition sales table by month for faster 6-month scans
- Pre-aggregate daily sales into monthly summaries to reduce row scans

Expected Follow-Up Questions:
- “What if you want DENSE_RANK instead?” (1,2,2,3 vs 1,2,2,4)
- “How would this scale with billions of rows?” (Partitioning, indexing strategy)
- “What if sales data arrives late?” (Handle late-arriving facts in data warehouse design)


2. Identify Orders with Shipping Delays Beyond Monthly Averages

Difficulty Level: Medium-High

Engineering Level: Mid Data Engineer (4-5 YOE)

Source: LinkedIn (Gerryson, Rahul Patel, Vishakha Singhal - Aug-Nov 2025)

Team: Supply Chain Analytics / Enterprise Data Management

Interview Round: Coding Round 1 (SQL)

Technology Focus: Window Functions, Date Arithmetic, Supply Chain Metrics

Question: “Given a sales table with order_date and ship_date columns, find all orders where shipping took longer than the average shipping time for that month. Return order ID, customer ID, shipping days, and monthly average.”


Answer Framework

Why PepsiCo Asks This:
- PepsiCo distributes products across 1.3 billion miles annually globally
- Identifies logistics bottlenecks affecting delivery SLAs
- Critical for warehouse location optimization and carrier performance analysis
- Supply chain efficiency directly impacts customer satisfaction and costs

Key Technical Concepts:
- Window functions with aggregates: AVG() OVER (PARTITION BY month)
- Date arithmetic: DATEDIFF() or date subtraction
- Self-comparison filtering: WHERE shipping_days > monthly_avg within same query
- Month extraction: MONTH(order_date) for partitioning

Solution

SQL Approach (Window Function with Filtering):

WITH shipping_metrics AS (
    SELECT
        order_id,
        customer_id,
        order_date,
        ship_date,
        DATE_PART('month', order_date) as order_month,
        DATE_PART('year', order_date) as order_year,
        (ship_date - order_date) as shipping_days,
        AVG(ship_date - order_date) OVER (
            PARTITION BY DATE_PART('year', order_date),
                         DATE_PART('month', order_date)
        ) as monthly_avg_shipping_days
    FROM orders
    WHERE ship_date IS NOT NULL  -- Exclude unshipped orders)
SELECT
    order_id,
    customer_id,
    order_date,
    ship_date,
    shipping_days,
    ROUND(monthly_avg_shipping_days, 2) as monthly_avg_shipping_days,
    ROUND(shipping_days - monthly_avg_shipping_days, 2) as delay_vs_avg,
    order_month,
    order_year
FROM shipping_metrics
WHERE shipping_days > monthly_avg_shipping_days
ORDER BY delay_vs_avg DESC, order_date;

Explanation:

  1. CTE shipping_metrics: Calculates shipping days and monthly average in single pass
  1. PARTITION BY year + month: Creates separate averages per month (Jan 2024 ≠ Jan 2025)
  1. shipping_days > monthly_avg: Filters only delayed orders
  1. delay_vs_avg: Shows how many days beyond average (useful for prioritization)

Alternative: Using JOIN Instead of Window Function:

WITH monthly_averages AS (
    SELECT
        DATE_PART('year', order_date) as order_year,
        DATE_PART('month', order_date) as order_month,
        AVG(ship_date - order_date) as avg_shipping_days
    FROM orders
    WHERE ship_date IS NOT NULL    GROUP BY DATE_PART('year', order_date), DATE_PART('month', order_date)
)
SELECT
    o.order_id,
    o.customer_id,
    (o.ship_date - o.order_date) as shipping_days,
    ma.avg_shipping_days as monthly_avg,
    (o.ship_date - o.order_date) - ma.avg_shipping_days as delay_vs_avg
FROM orders o
JOIN monthly_averages ma
    ON DATE_PART('year', o.order_date) = ma.order_year
    AND DATE_PART('month', o.order_date) = ma.order_month
WHERE (o.ship_date - o.order_date) > ma.avg_shipping_days
ORDER BY delay_vs_avg DESC;

Window Function vs JOIN Approach:
- Window function: More concise, better for complex partitioning
- JOIN approach: Better for reusing monthly averages in multiple queries

Real-World PepsiCo Context:

Example output showing delayed shipments:

Order IDCustomerShipping DaysMonthly AvgDelay vs AvgMonth
ORD-12345Walmart125.2+6.8Oct 2024
ORD-12389Target95.2+3.8Oct 2024

Insights: Orders with +5 day delays → investigate carrier issues, warehouse staffing, or SKU-specific problems

Performance Optimization:
- Index: (order_date, ship_date) composite index
- Partition: Partition orders table by month for faster scans
- Materialized view: Pre-compute monthly averages if queried frequently

Expected Follow-Up Questions:
- “How would you handle NULL ship_date?” (Unshipped orders → separate analysis or exclude)
- “What if you need regional averages?” (Add PARTITION BY region to window function)
- “How would this work in Spark/PySpark?” (Use window functions: pyspark.sql.window)


3. Recursive CTE for Organizational Hierarchy

Difficulty Level: High

Engineering Level: Mid-Senior Data Engineer (4-5 YOE)

Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel - Sept-Oct 2025)

Team: Enterprise Data Management / Data Governance

Interview Round: Coding Round 1 (SQL)

Technology Focus: Recursive CTEs, Hierarchical Data, Data Governance

Question: “You have an employee table with employee_id and manager_id columns. Write a recursive query to list all subordinates (direct and indirect) for a given manager. Include the hierarchy level and reporting chain.”


Answer Framework

Why PepsiCo Asks This:
- PepsiCo has 200,000+ employees globally across complex org structures
- Required for Databricks Unity Catalog access control (1,500+ active data users across 30+ teams)
- Enables org-wide reporting, compliance tracking, and data governance
- Used in building role-based access control (RBAC) for data assets

Key Technical Concepts:
- Recursive CTEs: WITH RECURSIVE enables hierarchical queries
- Anchor member: Base case (starting manager)
- Recursive member: Joins to find next level of subordinates
- Termination condition: Prevents infinite recursion

Solution

SQL Approach (Recursive CTE with Hierarchy Tracking):

WITH RECURSIVE org_hierarchy AS (
    -- Anchor member: Start with given manager    SELECT
        employee_id,
        employee_name,
        manager_id,
        job_title,
        1 as hierarchy_level,
        CAST(employee_id AS VARCHAR(1000)) as reporting_chain,
        CAST(employee_name AS VARCHAR(1000)) as reporting_chain_names
    FROM employees
    WHERE employee_id = @ManagerID  -- Parameter: starting manager    UNION ALL    -- Recursive member: Find subordinates at each level    SELECT
        e.employee_id,
        e.employee_name,
        e.manager_id,
        e.job_title,
        oh.hierarchy_level + 1,
        oh.reporting_chain || ' -> ' || CAST(e.employee_id AS VARCHAR),
        oh.reporting_chain_names || ' -> ' || e.employee_name
    FROM employees e
    INNER JOIN org_hierarchy oh
        ON e.manager_id = oh.employee_id
    WHERE oh.hierarchy_level < 10  -- Prevent infinite recursion)
SELECT
    employee_id,
    employee_name,
    job_title,
    hierarchy_level,
    reporting_chain_names,
    manager_id
FROM org_hierarchy
ORDER BY hierarchy_level, employee_name;

Explanation:

  1. Anchor: Starts with @ManagerID (e.g., VP of Engineering)
  1. Recursive: Finds direct reports, then their reports, iteratively
  1. hierarchy_level: Tracks depth (1 = VP, 2 = Directors, 3 = Managers, etc.)
  1. reporting_chain: Shows full path (e.g., “VP -> Director -> Manager -> IC”)
  1. Termination: WHERE hierarchy_level < 10 prevents circular references

Example Output:

Given CEO (ID: 1000):

Employee IDNameTitleLevelReporting Chain
1000John DoeCEO1John Doe
2001Jane SmithCTO2John Doe -> Jane Smith
2002Bob LeeCFO2John Doe -> Bob Lee
3001Alice WangVP Eng3John Doe -> Jane Smith -> Alice Wang
4001Chris ParkDir Data4…Jane Smith -> Alice Wang -> Chris Park

Handling Circular References (Edge Case):

Problem: Employee A reports to B, B reports to A → infinite loop

Solution: Add cycle detection:

WITH RECURSIVE org_hierarchy AS (
    SELECT
        employee_id,
        employee_name,
        manager_id,
        1 as hierarchy_level,
        ARRAY[employee_id] as path  -- Track visited employees    FROM employees
    WHERE employee_id = @ManagerID
    UNION ALL    SELECT
        e.employee_id,
        e.employee_name,
        e.manager_id,
        oh.hierarchy_level + 1,
        oh.path || e.employee_id
    FROM employees e
    INNER JOIN org_hierarchy oh ON e.manager_id = oh.employee_id
    WHERE NOT (e.employee_id = ANY(oh.path))  -- Prevent cycles        AND oh.hierarchy_level < 15)
SELECT * FROM org_hierarchy;

Alternative: Finding All Managers Above an Employee (Upward Traversal):

WITH RECURSIVE manager_chain AS (
    -- Start with specific employee    SELECT
        employee_id,
        employee_name,
        manager_id,
        1 as level    FROM employees
    WHERE employee_id = @EmployeeID
    UNION ALL    -- Climb up the hierarchy    SELECT
        e.employee_id,
        e.employee_name,
        e.manager_id,
        mc.level + 1    FROM employees e
    INNER JOIN manager_chain mc ON e.employee_id = mc.manager_id
    WHERE mc.level < 10)
SELECT * FROM manager_chain
ORDER BY level;

Real-World PepsiCo Use Case:

Data Governance Scenario:
- VP of Data needs to see all team members with database access
- Recursive query finds all subordinates (200+ people across 5 levels)
- Unity Catalog uses this for cascading permissions

Performance Considerations:
- Max recursion depth: Typical org is 5-7 levels; set limit to 10-15
- Index: (manager_id) index critical for fast recursive joins
- Materialized path: For frequent queries, denormalize hierarchy into single column

Expected Follow-Up Questions:
- “What if multiple employees report to same manager?” (Already handled by join logic)
- “How would you find total team size for each manager?” (COUNT(*) grouped by starting manager)
- “What if you need sibling relationships?” (Add PARTITION BY manager_id logic)


4. Deduplicate Large Tables While Preserving Latest Record

Difficulty Level: Medium

Engineering Level: Mid Data Engineer (4-5 YOE)

Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel, Ritik Jain - Sept-Oct 2025)

Team: Data Quality / Cloud Data Solutions

Interview Round: Coding Round 1 (SQL)

Technology Focus: Data Quality, Window Functions, ETL Optimization

Question: “You have a huge table with millions of duplicate records. You need to remove duplicates while keeping only the most recent record based on a timestamp column. How would you approach this efficiently?”


Answer Framework

Why PepsiCo Asks This:
- Data sprawl across 60+ PB globally from multiple ERP systems (SAP, Oracle, custom)
- Enterprise data platform consolidates data with inevitable duplicates
- Critical for building unified data layer across 30+ digital products
- Data quality is core pillar of PepsiCo’s modern architecture

Key Technical Concepts:
- ROW_NUMBER() window function: Assigns unique rank to each duplicate group
- PARTITION BY natural key: Groups duplicates together
- ORDER BY timestamp DESC: Ensures latest record gets ROW_NUMBER = 1
- Performance: Must handle billions of rows efficiently

Solution

Approach 1: Using ROW_NUMBER (Most Common & Efficient):

-- Step 1: Identify duplicates and keep latestWITH deduped AS (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY customer_id, product_id  -- Natural key (adjust per table)            ORDER BY updated_timestamp DESC        -- Latest record first        ) as rn
    FROM customer_orders
)
SELECT
    order_id,
    customer_id,
    product_id,
    order_amount,
    updated_timestamp
FROM deduped
WHERE rn = 1;  -- Keep only the latest record per group

Explanation:
- PARTITION BY: Groups rows with same natural key (customer_id + product_id)
- ORDER BY DESC: Latest timestamp gets rn=1, older records get rn=2,3,…
- WHERE rn=1: Filters to keep only latest

Approach 2: DELETE Duplicates In-Place (For Permanent Cleanup):

-- Warning: Test on small dataset first!WITH deduped AS (
    SELECT
        ctid,  -- PostgreSQL row identifier (use ROWID in Oracle)        ROW_NUMBER() OVER (
            PARTITION BY customer_id, product_id
            ORDER BY updated_timestamp DESC        ) as rn
    FROM customer_orders
)
DELETE FROM customer_orders
WHERE ctid IN (
    SELECT ctid FROM deduped WHERE rn > 1);

Approach 3: Using DISTINCT ON (PostgreSQL Specific - Most Efficient):

-- PostgreSQL shorthand for deduplicationSELECT DISTINCT ON (customer_id, product_id)
    order_id,
    customer_id,
    product_id,
    order_amount,
    updated_timestamp
FROM customer_orders
ORDER BY customer_id, product_id, updated_timestamp DESC;

Approach 4: Using NOT IN with Subquery (Less Efficient, Avoid for Large Tables):

-- Not recommended for billions of rows (very slow)SELECT *FROM customer_orders o1
WHERE updated_timestamp = (
    SELECT MAX(updated_timestamp)
    FROM customer_orders o2
    WHERE o1.customer_id = o2.customer_id
        AND o1.product_id = o2.product_id
);

Performance Comparison:

ApproachPerformance (1M rows)Performance (1B rows)Notes
ROW_NUMBER~5 sec~2 minRequires temp space for window function
DISTINCT ON (PostgreSQL)~3 sec~1.5 minFastest, but PostgreSQL only
NOT IN~30 secHoursCartesian product issues, avoid!
DELETE with CTE~8 sec~3 minIn-place modification, can’t rollback easily

Real-World PepsiCo Scenario:

Problem: Customer master data synced from SAP (daily) and Oracle (hourly) creates duplicates:

Customer IDNameEmailSourceUpdated TimestampKeep?
CUST-001Walmartold@email.comSAP2024-12-01 08:00Duplicate
CUST-001Walmart Incnew@email.comOracle2024-12-08 14:00Latest

Deduplication Query:

WITH deduped_customers AS (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY customer_id
            ORDER BY updated_timestamp DESC,
                     CASE WHEN source = 'Oracle' THEN 1 ELSE 2 END  -- Prefer Oracle if timestamps tie        ) as rn
    FROM customer_master
)
SELECT
    customer_id,
    customer_name,
    email,
    source,
    updated_timestamp
FROM deduped_customers
WHERE rn = 1;

Performance Optimization for PepsiCo Scale:

  1. Partitioning: Partition table by date to limit scan scope
    WHERE updated_timestamp >= '2024-01-01'  -- Only dedupe recent data
  1. Indexing: Create index on natural key + timestamp
    CREATE INDEX idx_customer_dedup
    ON customer_master(customer_id, product_id, updated_timestamp DESC);
  1. Incremental Deduplication: Process in batches
    -- Dedupe only today's dataWHERE DATE(updated_timestamp) = CURRENT_DATE
  1. Use Delta Lake (PepsiCo’s Databricks Stack):
    MERGE INTO customer_master_clean target
    USING (
        SELECT * FROM deduped WHERE rn = 1) sourceON target.customer_id = source.customer_id
    WHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT *;

Expected Follow-Up Questions:
- “What if two records have the same timestamp?” (Add secondary sort column)
- “How do you handle this in Spark/PySpark?” (Use window functions: pyspark.sql.window)
- “What if you need to keep historical duplicates?” (SCD Type 2 approach)


5. Efficiently Process Large CSV Files (20GB+) with Python ETL

Difficulty Level: Medium-High

Engineering Level: Mid-Senior Data Engineer (4-5 YOE)

Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel, Vishakha Singhal - Sept-Nov 2025)

Team: Data Platform Engineering / ETL Optimization

Interview Round: Coding Round 2 (Python/Spark)

Technology Focus: Python ETL, Memory Management, Spark, Data Pipeline Optimization

Question: “You receive a large CSV file (20GB+) daily. You need to read, transform, and write it to a database efficiently using Python. How would you approach this without running out of memory? What technologies would you use?”


Answer Framework

Why PepsiCo Asks This:
- Daily batch processing of millions of supply chain transactions
- PepsiCo processes 150,000+ daily jobs in modern data platform
- Must handle real-time inventory, sales, and demand data efficiently
- Optimization directly impacts infrastructure costs and SLAs

Key Decision Framework:
- File size <5GB: Pandas with chunking
- File size 5-20GB: Pandas chunking OR PySpark (depending on infra)
- File size >20GB: PySpark required (distributed processing)

Solution

Approach 1: Pandas with Chunked Reading (5-10GB files):

import pandas as pd
from sqlalchemy import create_engine
import logging
def process_large_csv_pandas(file_path, chunk_size=50000):
    """    Process large CSV in chunks to avoid memory overflow    """    # Database connection    engine = create_engine('postgresql://user:password@localhost:5432/pepsico_db')
    total_rows_processed = 0    try:
        # Read CSV in chunks        for chunk_num, chunk in enumerate(pd.read_csv(file_path, chunksize=chunk_size), start=1):
            # Transform            chunk['processed_date'] = pd.Timestamp.now()
            chunk['sales_amount'] = pd.to_numeric(chunk['sales_amount'], errors='coerce')
            chunk['quantity'] = chunk['quantity'].fillna(0).astype(int)
            # Data quality: Remove invalid rows            chunk = chunk[chunk['sales_amount'] > 0]
            # Load to database (append mode)            chunk.to_sql(
                'sales_transactions',
                con=engine,
                if_exists='append',
                index=False,
                method='multi'  # Faster bulk insert            )
            total_rows_processed += len(chunk)
            logging.info(f"Processed chunk {chunk_num}: {len(chunk)} rows (Total: {total_rows_processed})")
    except Exception as e:
        logging.error(f"Error processing file: {str(e)}")
        raise    finally:
        engine.dispose()
    return total_rows_processed
# Usagerows = process_large_csv_pandas('daily_sales_20GB.csv', chunk_size=100000)
print(f"Total rows processed: {rows}")

Approach 2: PySpark for 20GB+ Files (Recommended for PepsiCo Scale):

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, current_timestamp, when
from pyspark.sql.types import DoubleType, IntegerType
def process_large_csv_spark(file_path, output_jdbc_url):
    """    Process large CSV using PySpark for distributed processing    """    # Initialize Spark session    spark = SparkSession.builder \        .appName("PepsiCo_LargeCSV_ETL") \        .config("spark.executor.memory", "8g") \        .config("spark.driver.memory", "4g") \        .config("spark.sql.shuffle.partitions", "200") \        .getOrCreate()
    try:
        # Read CSV (distributed across Spark cluster)        df = spark.read \            .option("header", "true") \            .option("inferSchema", "false") \  # Faster, define schema manually            .option("mode", "DROPMALFORMED") \  # Skip corrupted rows            .csv(file_path)
        # Transform        df = df.withColumn('processed_date', current_timestamp()) \               .withColumn('sales_amount', col('sales_amount').cast(DoubleType())) \               .withColumn('quantity', col('quantity').cast(IntegerType())) \               .withColumn('quantity', when(col('quantity').isNull(), 0).otherwise(col('quantity')))
        # Data quality: Filter invalid rows        df = df.filter(col('sales_amount') > 0)
        # Write to database (JDBC)        df.write \            .format("jdbc") \            .option("url", output_jdbc_url) \            .option("dbtable", "sales_transactions") \            .option("user", "pepsico_user") \            .option("password", "password") \            .option("batchsize", 10000) \  # Bulk insert size            .mode("append") \            .save()
        row_count = df.count()
        print(f"Successfully processed {row_count} rows")
        return row_count
    finally:
        spark.stop()
# Usageprocess_large_csv_spark(
    file_path='s3://pepsico-data/daily_sales_20GB.csv',
    output_jdbc_url='jdbc:postgresql://prod-db.pepsico.com:5432/sales_db')

Approach 3: PySpark with Schema Definition (Faster):

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType
def process_with_schema(file_path):
    """    Define schema explicitly for faster processing (no inference)    """    spark = SparkSession.builder.appName("PepsiCo_ETL").getOrCreate()
    # Define schema explicitly (avoids inferSchema scan)    schema = StructType([
        StructField("order_id", StringType(), False),
        StructField("customer_id", StringType(), False),
        StructField("product_id", StringType(), False),
        StructField("sales_amount", DoubleType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("order_date", DateType(), True),
        StructField("region", StringType(), True)
    ])
    # Read with predefined schema (much faster)    df = spark.read \        .schema(schema) \        .option("header", "true") \        .csv(file_path)
    # Transform and load...    return df

Real-World PepsiCo Pipeline:

def pepsico_daily_sales_etl(date_str):
    """    Production ETL pipeline for PepsiCo daily sales data    """    spark = SparkSession.builder \        .appName(f"Sales_ETL_{date_str}") \        .config("spark.databricks.delta.optimizeWrite.enabled", "true") \        .getOrCreate()
    # Read from Azure Data Lake Storage (ADLS)    input_path = f"abfss://raw-data@pepsico.dfs.core.windows.net/sales/{date_str}/*.csv"    df = spark.read \        .option("header", "true") \        .csv(input_path)
    # Transform    from pyspark.sql.functions import sum as _sum, avg as _avg
    df_aggregated = df.groupBy("region", "product_id") \        .agg(
            _sum("sales_amount").alias("total_sales"),
            _avg("sales_amount").alias("avg_sales"),
            _sum("quantity").alias("total_quantity")
        )
    # Write to Delta Lake (PepsiCo uses Databricks Delta)    df_aggregated.write \        .format("delta") \        .mode("overwrite") \        .partitionBy("region") \        .save(f"abfss://curated@pepsico.dfs.core.windows.net/sales_daily/{date_str}")
    print(f"ETL completed for {date_str}")
# Schedule with Airflow/Databricks Jobs# pepsico_daily_sales_etl('2024-12-08')

Performance Best Practices:

  1. Partition large files:
    • Instead of 1×20GB file → 20×1GB files (parallel processing)
  1. Use binary formats for storage:
    • CSV → Parquet (10x smaller, 100x faster reads)
  1. Database connection pooling:
    from sqlalchemy import create_engine
    engine = create_engine('postgresql://...', pool_size=10, max_overflow=20)
  1. Monitor memory usage:
    import psutil
    print(f"Memory usage: {psutil.virtual_memory().percent}%")

Expected Follow-Up Questions:
- “What if CSV has inconsistent schemas?” (Schema evolution handling)
- “How would you parallelize Pandas processing?” (Use Dask or multiprocessing)
- “What about corrupted CSV rows?” (Error handling, dead letter queue)


6. Detect Anomalies in Sales Data Using Statistical Methods

Difficulty Level: Medium-High

Engineering Level: Mid-Senior Data Engineer (4-5 YOE)

Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel - Sept-Oct 2025)

Team: Analytics Infrastructure / Real-time Alerts

Interview Round: Coding Round 2 (Python)

Technology Focus: Statistical Analysis, Python Data Science, Anomaly Detection

Question: “Write a Python function to detect anomalies in daily sales data. An anomaly is defined as any value that deviates more than 3 standard deviations (3σ) from the mean. Return flagged records with anomaly scores.”


Answer Framework

Why PepsiCo Asks This:
- Real-time monitoring of 1 billion+ daily consumer interactions globally
- Detects supply chain disruptions (unusual demand spikes/drops)
- Critical for inventory management across 30+ digital products
- Enables proactive alerts for demand forecasting errors and quality issues

Key Technical Concepts:
- Z-score (3-sigma rule): Measures how many standard deviations a value is from mean
- Statistical anomaly detection: Values beyond 3σ are statistically rare (99.7% within 3σ)
- Grouped anomaly detection: Per-product analysis (Pepsi anomalies ≠ Doritos anomalies)
- Time-series consideration: Rolling windows vs static mean/std

Solution

import numpy as np
import pandas as pd
from scipy import stats
def detect_sales_anomalies(sales_df, threshold=3):
    """    Detect anomalies using 3-sigma rule (z-score method)    Args:        sales_df: DataFrame with columns ['date', 'product_id', 'sales_amount']        threshold: Number of standard deviations (default: 3)    Returns:        DataFrame with anomalies flagged    """    sales_df = sales_df.copy()
    # Calculate z-score per product    sales_df['z_score'] = sales_df.groupby('product_id')['sales_amount'].transform(
        lambda x: np.abs(stats.zscore(x, nan_policy='omit'))
    )
   # Flag anomalies    sales_df['is_anomaly'] = sales_df['z_score'] > threshold
    sales_df['anomaly_severity'] = sales_df['z_score']
    # Return only anomalies    anomalies = sales_df[sales_df['is_anomaly']].copy()
    return anomalies[['date', 'product_id', 'sales_amount', 'z_score', 'anomaly_severity']]
# Alternative: Rolling window for time-seriesdef detect_anomalies_rolling(sales_df, window=30, threshold=3):
    """    Detect anomalies using rolling statistics (better for time-series)    """    sales_df = sales_df.sort_values(['product_id', 'date']).copy()
    sales_df['rolling_mean'] = sales_df.groupby('product_id')['sales_amount'].transform(
        lambda x: x.rolling(window=window, min_periods=1).mean()
    )
    sales_df['rolling_std'] = sales_df.groupby('product_id')['sales_amount'].transform(
        lambda x: x.rolling(window=window, min_periods=1).std()
    )
    # Z-score relative to rolling window    sales_df['z_score'] = (sales_df['sales_amount'] - sales_df['rolling_mean']) / sales_df['rolling_std']
    sales_df['is_anomaly'] = np.abs(sales_df['z_score']) > threshold
    return sales_df[sales_df['is_anomaly']]

7. Group Transactions by Customer and Find Top N Without Pandas

Difficulty Level: Medium

Engineering Level: Mid Data Engineer (4-5 YOE)

Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel - Sept-Oct 2025)

Team: Data Platform Engineering

Interview Round: Coding Round 2 (Python Core)

Technology Focus: Python Core, Data Structures, Algorithm Design

Question: “Given a list of transactions with customer_id, transaction_amount, and date, write a Python function to group by customer and return the top N customers by purchase frequency without using Pandas or built-in aggregation libraries. Use only standard Python.”


Answer Framework

Why PepsiCo Asks This:
- Tests fundamental Python programming skills without library dependencies
- Assesses algorithmic thinking and data structure knowledge
- Important for environments where libraries may be restricted
- Evaluates ability to optimize code manually (time/space complexity)

Key Technical Concepts:
- Dictionary aggregation: Efficient O(1) key lookups for grouping
- Sorting with lambda: Custom sort keys (frequency descending, then amount descending)
- List slicing: [:n] for top N selection
- Time complexity: O(n) for aggregation + O(n log n) for sorting

Solution

def find_top_n_customers(transactions, n=10):
    """    Find top N customers by purchase frequency using only Python core    Args:        transactions: List of dicts [{'customer_id': 'C1', 'transaction_amount': 100, 'date': '2024-01-01'}, ...]        n: Number of top customers to return    Returns:        List of dicts with customer stats    """    # Step 1: Group by customer_id    customer_stats = {}
    for trans in transactions:
        customer_id = trans['customer_id']
        amount = trans['transaction_amount']
        if customer_id not in customer_stats:
            customer_stats[customer_id] = {
                'frequency': 0,
                'total_amount': 0,
                'transactions': []
            }
        customer_stats[customer_id]['frequency'] += 1        customer_stats[customer_id]['total_amount'] += amount
        customer_stats[customer_id]['transactions'].append(trans)
    # Step 2: Sort by frequency (descending), then by total amount    sorted_customers = sorted(
        customer_stats.items(),
        key=lambda x: (-x[1]['frequency'], -x[1]['total_amount'])
    )
    # Step 3: Get top N    top_n = sorted_customers[:n]
    # Step 4: Format output    result = []
    for customer_id, stats in top_n:
        result.append({
            'customer_id': customer_id,
            'purchase_frequency': stats['frequency'],
            'total_spent': stats['total_amount'],
            'avg_transaction': round(stats['total_amount'] / stats['frequency'], 2)
        })
    return result
# Example usagetransactions = [
    {'customer_id': 'CUST-001', 'transaction_amount': 150, 'date': '2024-01-01'},
    {'customer_id': 'CUST-002', 'transaction_amount': 200, 'date': '2024-01-02'},
    {'customer_id': 'CUST-001', 'transaction_amount': 120, 'date': '2024-01-03'},
]
top_10 = find_top_n_customers(transactions, n=10)

8. Solve the Small File Problem in ADLS/S3 with PySpark

Difficulty Level: High

Engineering Level: Senior Data Engineer (5+ YOE)

Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel, Ritik Jain - Sept-Oct 2025)

Team: Data Platform Engineering / Cloud Infrastructure

Interview Round: Technical Round 2 (System Design)

Technology Focus: PySpark, Azure Data Lake Storage, Spark Performance Tuning

Question: “You have thousands of small CSV files (5MB-100MB each) scattered across Azure Data Lake Storage (ADLS). When you try to read them into PySpark, performance is terrible. How would you solve the ‘small file problem’ and optimize for parallel processing?”


Answer Framework

Why PepsiCo Asks This:
- PepsiCo stores 60+ PB of data across Azure Data Lake Storage Gen2
- Data ingestion from multiple systems creates thousands of small fragmented files
- Small files cause excessive Spark overhead (metadata operations, task scheduling)
- Central challenge in PepsiCo’s Databricks-powered modern data architecture

Key Technical Concepts:
- Small file problem: Too many small files (< 128MB) cause poor Spark performance
- Coalesce vs repartition: Coalesce reduces partitions without shuffle, repartition allows increase
- Optimal partition size: 128-256MB per partition for HDFS/S3/ADLS
- File format optimization: Parquet/Delta Lake vs CSV for storage efficiency

Solution

from pyspark.sql import SparkSession
def solve_small_file_problem(path_to_files, output_path):
    """    Solve small file problem by coalescing and converting to Parquet    """    spark = SparkSession.builder \        .appName("SmallFileOptimization") \        .config("spark.sql.adaptive.enabled", "true") \        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \        .getOrCreate()
    # Read all small CSV files    df = spark.read \        .option("header", "true") \        .csv(f"{path_to_files}/*.csv")
    # Calculate optimal partitions (1 partition per 128MB data)    total_size_mb = df.rdd.map(lambda x: len(str(x))).sum() / (1024 * 1024)
    optimal_partitions = max(1, int(total_size_mb / 128))
    # Coalesce to reduce partition count    df_optimized = df.coalesce(optimal_partitions)
    # Write as partitioned Parquet (larger, more efficient files)    df_optimized.write \        .mode("overwrite") \        .partitionBy("region", "year", "month") \        .parquet(output_path)
    print(f"Optimized from {df.rdd.getNumPartitions()} to {optimal_partitions} partitions")
# Usagesolve_small_file_problem(
    path_to_files="abfss://raw@pepsico.dfs.core.windows.net/sales_csv",
    output_path="abfss://curated@pepsico.dfs.core.windows.net/sales_parquet")

9. Design Efficient PySpark Join for Billions of Rows

Difficulty Level: Very High

Engineering Level: Senior Data Engineer (5+ YOE)

Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel - Sept-Oct 2025)

Team: Data Platform Engineering / Performance Optimization

Interview Round: Technical Round 2 (System Design)

Technology Focus: PySpark Join Optimization, Spark Performance

Question: “You need to join two very large datasets (billions of rows each) efficiently in PySpark. How would you approach this? What optimizations would you apply?”


Answer Framework

Why PepsiCo Asks This:
- PepsiCo joins massive datasets daily (sales × customer × product × time dimensions)
- Typical scenario: customer transactions (10B rows) × customer master (100M rows)
- Critical for building unified reporting layer across 30+ digital products
- Join optimization directly impacts query time (hours vs minutes) and costs

Key Technical Concepts:
- Broadcast join: Small table (<1GB) broadcasted to all executors, no shuffle
- Sort-merge join: Default for large-large joins, requires shuffle and sort
- Bucketing: Pre-partitions tables by join key, eliminates shuffle for recurring joins
- Join strategies: Broadcast (fastest) > Bucketed > Shuffle hash > Sort-merge

Solution

from pyspark.sql.functions import broadcast
def efficient_large_join(spark, large_df_path, small_df_path):
    """    Efficiently join large datasets using broadcast or bucketing    """    df_large = spark.read.parquet(large_df_path)  # 10B rows    df_small = spark.read.parquet(small_df_path)  # 100M rows    # Approach 1: Broadcast join (if one table < 1GB)    if df_small.rdd.mapPartitions(lambda x: [sum(1 for _ in x)]).sum() < 10_000_000:
        result = df_large.join(broadcast(df_small), "customer_id", "left")
    else:
        # Approach 2: Bucketed join (for recurring joins)        result = df_large.join(df_small, "customer_id", "left")
    return result
# Bucketing for repeated joins (one-time setup)def create_bucketed_tables(df, output_path, num_buckets=256):
    df.write \        .mode("overwrite") \        .bucketBy(num_buckets, "customer_id") \        .sortBy("customer_id") \        .parquet(output_path)

10. Handle Data Skew in Spark Joins

Difficulty Level: Very High

Engineering Level: Senior Data Engineer (5+ YOE)

Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel - Sept-Oct 2025)

Team: Data Platform Engineering / Advanced Spark Tuning

Interview Round: Technical Round 2 (System Design)

Technology Focus: Data Skew, Salting Technique, Spark Optimization

Question: “You’re joining two large Spark DataFrames, but one join key has significantly more data than others (e.g., ‘Other’ category represents 70% of rows). Your Spark job is extremely slow. How would you handle this data skew?”


Answer Framework

Why PepsiCo Asks This:
- Real-world data is inherently skewed (Pepsi brand >> niche brands, major warehouses >> small ones)
- Supply chain data has hot keys (popular products, major distribution centers)
- Skew causes uneven Spark task distribution and OOM (out-of-memory) errors
- Critical for handling PepsiCo’s 30+ digital products with varying data volumes

Key Technical Concepts:
- Data skew: Uneven distribution where one key has disproportionate data (70%+ in one partition)
- Salting technique: Add random suffix to skewed keys to distribute across multiple partitions
- Isolated join: Handle skewed keys separately with broadcast, normal keys with regular join
- Adaptive Query Execution (AQE): Spark 3.0+ automatically detects and handles some skew

Solution

from pyspark.sql.functions import rand, when, col, concat, lit
def handle_data_skew(df1, df2, join_key, skew_threshold=0.7, salt_buckets=10):
    """    Handle data skew using salting technique    """    # Step 1: Identify skewed keys    key_distribution = df1.groupBy(join_key).count() \        .withColumn("percentage", col("count") / df1.count())
    skewed_keys = key_distribution.filter(col("percentage") > skew_threshold) \        .select(join_key).rdd.flatMap(lambda x: x).collect()
    # Step 2: Split into skewed and normal    df1_skewed = df1.filter(col(join_key).isin(skewed_keys))
    df1_normal = df1.filter(~col(join_key).isin(skewed_keys))
    # Step 3: Apply salting to skewed portion    df1_skewed = df1_skewed.withColumn("salt", (rand() * salt_buckets).cast("int")) \        .withColumn("salted_key", concat(col(join_key), lit("_"), col("salt")))
    # Step 4: Replicate df2 for skewed keys    df2_replicated = df2.filter(col(join_key).isin(skewed_keys))
    df2_salted_list = []
    for i in range(salt_buckets):
        df2_salted_list.append(
            df2_replicated.withColumn("salt", lit(i)) \                .withColumn("salted_key", concat(col(join_key), lit("_"), col("salt")))
        )
    from functools import reduce    from pyspark.sql import DataFrame
    df2_salted = reduce(DataFrame.unionAll, df2_salted_list)
    # Step 5: Join skewed and normal separately    result_skewed = df1_skewed.join(df2_salted, "salted_key", "left")
    result_normal = df1_normal.join(df2.filter(~col(join_key).isin(skewed_keys)), join_key, "left")
    # Step 6: Combine results    result = result_skewed.drop("salt", "salted_key") \        .unionByName(result_normal)
    return result