PepsiCo Data Engineer
This guide features 10 challenging Data Engineer interview questions for PepsiCo (4-6+ years experience), covering SQL optimization, Python ETL, PySpark big data processing, data modeling, cloud architecture (Azure/AWS), supply chain analytics, performance tuning, and production-grade data pipeline challenges at PepsiCo’s global scale (60+ PB data, 150K+ daily jobs).
1. Top N Products by Sales Per Region with Tie Handling
Difficulty Level: Medium
Engineering Level: Mid-Senior Data Engineer (4-5 YOE)
Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel - Sept-Oct 2025)
Team: Data Platform Engineering / Business Intelligence
Interview Round: Coding Round 1 (SQL)
Technology Focus: SQL Window Functions, Ranking Logic
Question: “Write a SQL query to find the top 3-5 products by total sales for each region in the last 6 months. Ensure that ties are handled properly—if two products have the same sales, both should appear in the top 3.”
Answer Framework
Why PepsiCo Asks This:
- Operates across 200+ countries with regional sales data for beverages and snacks
- Identifies top-performing SKUs by geography for supply chain optimization and marketing
- Tests understanding of window functions, ranking vs row numbering, and edge case handling
Key Technical Concepts:
- RANK() vs ROW_NUMBER(): RANK() allows ties (essential for this question), ROW_NUMBER() does not
- PARTITION BY region: Creates separate ranking windows per region
- ORDER BY sales DESC: Ranks highest sales first
- Filtering ranks ≤ 3: Returns top 3 including ties
Solution
SQL Approach (Using RANK for Tie Handling):
WITH regional_sales AS (
SELECT
r.region_name,
p.product_id,
p.product_name,
SUM(s.sale_amount) as total_sales
FROM sales s
JOIN products p ON s.product_id = p.product_id
JOIN regions r ON s.region_id = r.region_id
WHERE s.sale_date >= DATE_SUB(CURDATE(), INTERVAL 6 MONTH)
GROUP BY r.region_name, p.product_id, p.product_name
),
ranked_products AS (
SELECT
region_name,
product_id,
product_name,
total_sales,
RANK() OVER (PARTITION BY region_name ORDER BY total_sales DESC) as sales_rank
FROM regional_sales
)
SELECT
region_name,
product_id,
product_name,
total_sales,
sales_rank
FROM ranked_products
WHERE sales_rank <= 3ORDER BY region_name, sales_rank, product_name;Explanation:
- CTE
regional_sales: Aggregates 6-month sales by region and product
- CTE
ranked_products: Applies RANK() window function partitioned by region
- Filter
sales_rank <= 3: Returns top 3, including ties (e.g., if 3 products tied at #2, all appear)
- ORDER BY: Ensures consistent output ordering
Why RANK() Not ROW_NUMBER():
- ROW_NUMBER() assigns unique ranks (1,2,3,4,5) even for ties → Would arbitrarily pick one tied product
- RANK() allows ties (1,2,2,4,5) → Both products with same sales get same rank
Edge Case Example:
| Region | Product | Sales | ROW_NUMBER | RANK |
|---|---|---|---|---|
| APAC | Pepsi | 500K | 1 | 1 |
| APAC | Lays | 400K | 2 | 2 |
| APAC | Doritos | 400K | 3 | 2 ← Tie! |
| APAC | Gatorade | 350K | 4 | 4 |
With WHERE RANK <= 3: Returns Pepsi, Lays, and Doritos (3 products, correct!)
With WHERE ROW_NUMBER <= 3: Returns only Pepsi, Lays, Doritos (randomly breaks tie, incorrect!)
Performance Optimization for PepsiCo Scale:
- Index on sale_date, region_id, product_id for fast filtering
- Partition sales table by month for faster 6-month scans
- Pre-aggregate daily sales into monthly summaries to reduce row scans
Expected Follow-Up Questions:
- “What if you want DENSE_RANK instead?” (1,2,2,3 vs 1,2,2,4)
- “How would this scale with billions of rows?” (Partitioning, indexing strategy)
- “What if sales data arrives late?” (Handle late-arriving facts in data warehouse design)
2. Identify Orders with Shipping Delays Beyond Monthly Averages
Difficulty Level: Medium-High
Engineering Level: Mid Data Engineer (4-5 YOE)
Source: LinkedIn (Gerryson, Rahul Patel, Vishakha Singhal - Aug-Nov 2025)
Team: Supply Chain Analytics / Enterprise Data Management
Interview Round: Coding Round 1 (SQL)
Technology Focus: Window Functions, Date Arithmetic, Supply Chain Metrics
Question: “Given a sales table with order_date and ship_date columns, find all orders where shipping took longer than the average shipping time for that month. Return order ID, customer ID, shipping days, and monthly average.”
Answer Framework
Why PepsiCo Asks This:
- PepsiCo distributes products across 1.3 billion miles annually globally
- Identifies logistics bottlenecks affecting delivery SLAs
- Critical for warehouse location optimization and carrier performance analysis
- Supply chain efficiency directly impacts customer satisfaction and costs
Key Technical Concepts:
- Window functions with aggregates: AVG() OVER (PARTITION BY month)
- Date arithmetic: DATEDIFF() or date subtraction
- Self-comparison filtering: WHERE shipping_days > monthly_avg within same query
- Month extraction: MONTH(order_date) for partitioning
Solution
SQL Approach (Window Function with Filtering):
WITH shipping_metrics AS (
SELECT
order_id,
customer_id,
order_date,
ship_date,
DATE_PART('month', order_date) as order_month,
DATE_PART('year', order_date) as order_year,
(ship_date - order_date) as shipping_days,
AVG(ship_date - order_date) OVER (
PARTITION BY DATE_PART('year', order_date),
DATE_PART('month', order_date)
) as monthly_avg_shipping_days
FROM orders
WHERE ship_date IS NOT NULL -- Exclude unshipped orders)
SELECT
order_id,
customer_id,
order_date,
ship_date,
shipping_days,
ROUND(monthly_avg_shipping_days, 2) as monthly_avg_shipping_days,
ROUND(shipping_days - monthly_avg_shipping_days, 2) as delay_vs_avg,
order_month,
order_year
FROM shipping_metrics
WHERE shipping_days > monthly_avg_shipping_days
ORDER BY delay_vs_avg DESC, order_date;Explanation:
- CTE
shipping_metrics: Calculates shipping days and monthly average in single pass
- PARTITION BY year + month: Creates separate averages per month (Jan 2024 ≠ Jan 2025)
shipping_days > monthly_avg: Filters only delayed orders
delay_vs_avg: Shows how many days beyond average (useful for prioritization)
Alternative: Using JOIN Instead of Window Function:
WITH monthly_averages AS (
SELECT
DATE_PART('year', order_date) as order_year,
DATE_PART('month', order_date) as order_month,
AVG(ship_date - order_date) as avg_shipping_days
FROM orders
WHERE ship_date IS NOT NULL GROUP BY DATE_PART('year', order_date), DATE_PART('month', order_date)
)
SELECT
o.order_id,
o.customer_id,
(o.ship_date - o.order_date) as shipping_days,
ma.avg_shipping_days as monthly_avg,
(o.ship_date - o.order_date) - ma.avg_shipping_days as delay_vs_avg
FROM orders o
JOIN monthly_averages ma
ON DATE_PART('year', o.order_date) = ma.order_year
AND DATE_PART('month', o.order_date) = ma.order_month
WHERE (o.ship_date - o.order_date) > ma.avg_shipping_days
ORDER BY delay_vs_avg DESC;Window Function vs JOIN Approach:
- Window function: More concise, better for complex partitioning
- JOIN approach: Better for reusing monthly averages in multiple queries
Real-World PepsiCo Context:
Example output showing delayed shipments:
| Order ID | Customer | Shipping Days | Monthly Avg | Delay vs Avg | Month |
|---|---|---|---|---|---|
| ORD-12345 | Walmart | 12 | 5.2 | +6.8 | Oct 2024 |
| ORD-12389 | Target | 9 | 5.2 | +3.8 | Oct 2024 |
Insights: Orders with +5 day delays → investigate carrier issues, warehouse staffing, or SKU-specific problems
Performance Optimization:
- Index: (order_date, ship_date) composite index
- Partition: Partition orders table by month for faster scans
- Materialized view: Pre-compute monthly averages if queried frequently
Expected Follow-Up Questions:
- “How would you handle NULL ship_date?” (Unshipped orders → separate analysis or exclude)
- “What if you need regional averages?” (Add PARTITION BY region to window function)
- “How would this work in Spark/PySpark?” (Use window functions: pyspark.sql.window)
3. Recursive CTE for Organizational Hierarchy
Difficulty Level: High
Engineering Level: Mid-Senior Data Engineer (4-5 YOE)
Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel - Sept-Oct 2025)
Team: Enterprise Data Management / Data Governance
Interview Round: Coding Round 1 (SQL)
Technology Focus: Recursive CTEs, Hierarchical Data, Data Governance
Question: “You have an employee table with employee_id and manager_id columns. Write a recursive query to list all subordinates (direct and indirect) for a given manager. Include the hierarchy level and reporting chain.”
Answer Framework
Why PepsiCo Asks This:
- PepsiCo has 200,000+ employees globally across complex org structures
- Required for Databricks Unity Catalog access control (1,500+ active data users across 30+ teams)
- Enables org-wide reporting, compliance tracking, and data governance
- Used in building role-based access control (RBAC) for data assets
Key Technical Concepts:
- Recursive CTEs: WITH RECURSIVE enables hierarchical queries
- Anchor member: Base case (starting manager)
- Recursive member: Joins to find next level of subordinates
- Termination condition: Prevents infinite recursion
Solution
SQL Approach (Recursive CTE with Hierarchy Tracking):
WITH RECURSIVE org_hierarchy AS (
-- Anchor member: Start with given manager SELECT
employee_id,
employee_name,
manager_id,
job_title,
1 as hierarchy_level,
CAST(employee_id AS VARCHAR(1000)) as reporting_chain,
CAST(employee_name AS VARCHAR(1000)) as reporting_chain_names
FROM employees
WHERE employee_id = @ManagerID -- Parameter: starting manager UNION ALL -- Recursive member: Find subordinates at each level SELECT
e.employee_id,
e.employee_name,
e.manager_id,
e.job_title,
oh.hierarchy_level + 1,
oh.reporting_chain || ' -> ' || CAST(e.employee_id AS VARCHAR),
oh.reporting_chain_names || ' -> ' || e.employee_name
FROM employees e
INNER JOIN org_hierarchy oh
ON e.manager_id = oh.employee_id
WHERE oh.hierarchy_level < 10 -- Prevent infinite recursion)
SELECT
employee_id,
employee_name,
job_title,
hierarchy_level,
reporting_chain_names,
manager_id
FROM org_hierarchy
ORDER BY hierarchy_level, employee_name;Explanation:
- Anchor: Starts with @ManagerID (e.g., VP of Engineering)
- Recursive: Finds direct reports, then their reports, iteratively
- hierarchy_level: Tracks depth (1 = VP, 2 = Directors, 3 = Managers, etc.)
- reporting_chain: Shows full path (e.g., “VP -> Director -> Manager -> IC”)
- Termination:
WHERE hierarchy_level < 10prevents circular references
Example Output:
Given CEO (ID: 1000):
| Employee ID | Name | Title | Level | Reporting Chain |
|---|---|---|---|---|
| 1000 | John Doe | CEO | 1 | John Doe |
| 2001 | Jane Smith | CTO | 2 | John Doe -> Jane Smith |
| 2002 | Bob Lee | CFO | 2 | John Doe -> Bob Lee |
| 3001 | Alice Wang | VP Eng | 3 | John Doe -> Jane Smith -> Alice Wang |
| 4001 | Chris Park | Dir Data | 4 | …Jane Smith -> Alice Wang -> Chris Park |
Handling Circular References (Edge Case):
Problem: Employee A reports to B, B reports to A → infinite loop
Solution: Add cycle detection:
WITH RECURSIVE org_hierarchy AS (
SELECT
employee_id,
employee_name,
manager_id,
1 as hierarchy_level,
ARRAY[employee_id] as path -- Track visited employees FROM employees
WHERE employee_id = @ManagerID
UNION ALL SELECT
e.employee_id,
e.employee_name,
e.manager_id,
oh.hierarchy_level + 1,
oh.path || e.employee_id
FROM employees e
INNER JOIN org_hierarchy oh ON e.manager_id = oh.employee_id
WHERE NOT (e.employee_id = ANY(oh.path)) -- Prevent cycles AND oh.hierarchy_level < 15)
SELECT * FROM org_hierarchy;Alternative: Finding All Managers Above an Employee (Upward Traversal):
WITH RECURSIVE manager_chain AS (
-- Start with specific employee SELECT
employee_id,
employee_name,
manager_id,
1 as level FROM employees
WHERE employee_id = @EmployeeID
UNION ALL -- Climb up the hierarchy SELECT
e.employee_id,
e.employee_name,
e.manager_id,
mc.level + 1 FROM employees e
INNER JOIN manager_chain mc ON e.employee_id = mc.manager_id
WHERE mc.level < 10)
SELECT * FROM manager_chain
ORDER BY level;Real-World PepsiCo Use Case:
Data Governance Scenario:
- VP of Data needs to see all team members with database access
- Recursive query finds all subordinates (200+ people across 5 levels)
- Unity Catalog uses this for cascading permissions
Performance Considerations:
- Max recursion depth: Typical org is 5-7 levels; set limit to 10-15
- Index: (manager_id) index critical for fast recursive joins
- Materialized path: For frequent queries, denormalize hierarchy into single column
Expected Follow-Up Questions:
- “What if multiple employees report to same manager?” (Already handled by join logic)
- “How would you find total team size for each manager?” (COUNT(*) grouped by starting manager)
- “What if you need sibling relationships?” (Add PARTITION BY manager_id logic)
4. Deduplicate Large Tables While Preserving Latest Record
Difficulty Level: Medium
Engineering Level: Mid Data Engineer (4-5 YOE)
Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel, Ritik Jain - Sept-Oct 2025)
Team: Data Quality / Cloud Data Solutions
Interview Round: Coding Round 1 (SQL)
Technology Focus: Data Quality, Window Functions, ETL Optimization
Question: “You have a huge table with millions of duplicate records. You need to remove duplicates while keeping only the most recent record based on a timestamp column. How would you approach this efficiently?”
Answer Framework
Why PepsiCo Asks This:
- Data sprawl across 60+ PB globally from multiple ERP systems (SAP, Oracle, custom)
- Enterprise data platform consolidates data with inevitable duplicates
- Critical for building unified data layer across 30+ digital products
- Data quality is core pillar of PepsiCo’s modern architecture
Key Technical Concepts:
- ROW_NUMBER() window function: Assigns unique rank to each duplicate group
- PARTITION BY natural key: Groups duplicates together
- ORDER BY timestamp DESC: Ensures latest record gets ROW_NUMBER = 1
- Performance: Must handle billions of rows efficiently
Solution
Approach 1: Using ROW_NUMBER (Most Common & Efficient):
-- Step 1: Identify duplicates and keep latestWITH deduped AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY customer_id, product_id -- Natural key (adjust per table) ORDER BY updated_timestamp DESC -- Latest record first ) as rn
FROM customer_orders
)
SELECT
order_id,
customer_id,
product_id,
order_amount,
updated_timestamp
FROM deduped
WHERE rn = 1; -- Keep only the latest record per groupExplanation:
- PARTITION BY: Groups rows with same natural key (customer_id + product_id)
- ORDER BY DESC: Latest timestamp gets rn=1, older records get rn=2,3,…
- WHERE rn=1: Filters to keep only latest
Approach 2: DELETE Duplicates In-Place (For Permanent Cleanup):
-- Warning: Test on small dataset first!WITH deduped AS (
SELECT
ctid, -- PostgreSQL row identifier (use ROWID in Oracle) ROW_NUMBER() OVER (
PARTITION BY customer_id, product_id
ORDER BY updated_timestamp DESC ) as rn
FROM customer_orders
)
DELETE FROM customer_orders
WHERE ctid IN (
SELECT ctid FROM deduped WHERE rn > 1);Approach 3: Using DISTINCT ON (PostgreSQL Specific - Most Efficient):
-- PostgreSQL shorthand for deduplicationSELECT DISTINCT ON (customer_id, product_id)
order_id,
customer_id,
product_id,
order_amount,
updated_timestamp
FROM customer_orders
ORDER BY customer_id, product_id, updated_timestamp DESC;Approach 4: Using NOT IN with Subquery (Less Efficient, Avoid for Large Tables):
-- Not recommended for billions of rows (very slow)SELECT *FROM customer_orders o1
WHERE updated_timestamp = (
SELECT MAX(updated_timestamp)
FROM customer_orders o2
WHERE o1.customer_id = o2.customer_id
AND o1.product_id = o2.product_id
);Performance Comparison:
| Approach | Performance (1M rows) | Performance (1B rows) | Notes |
|---|---|---|---|
| ROW_NUMBER | ~5 sec | ~2 min | Requires temp space for window function |
| DISTINCT ON (PostgreSQL) | ~3 sec | ~1.5 min | Fastest, but PostgreSQL only |
| NOT IN | ~30 sec | Hours | Cartesian product issues, avoid! |
| DELETE with CTE | ~8 sec | ~3 min | In-place modification, can’t rollback easily |
Real-World PepsiCo Scenario:
Problem: Customer master data synced from SAP (daily) and Oracle (hourly) creates duplicates:
| Customer ID | Name | Source | Updated Timestamp | Keep? | |
|---|---|---|---|---|---|
| CUST-001 | Walmart | old@email.com | SAP | 2024-12-01 08:00 | Duplicate |
| CUST-001 | Walmart Inc | new@email.com | Oracle | 2024-12-08 14:00 | Latest |
Deduplication Query:
WITH deduped_customers AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY updated_timestamp DESC,
CASE WHEN source = 'Oracle' THEN 1 ELSE 2 END -- Prefer Oracle if timestamps tie ) as rn
FROM customer_master
)
SELECT
customer_id,
customer_name,
email,
source,
updated_timestamp
FROM deduped_customers
WHERE rn = 1;Performance Optimization for PepsiCo Scale:
- Partitioning: Partition table by date to limit scan scope
WHERE updated_timestamp >= '2024-01-01' -- Only dedupe recent data
- Indexing: Create index on natural key + timestamp
CREATE INDEX idx_customer_dedup ON customer_master(customer_id, product_id, updated_timestamp DESC);
- Incremental Deduplication: Process in batches
-- Dedupe only today's dataWHERE DATE(updated_timestamp) = CURRENT_DATE
- Use Delta Lake (PepsiCo’s Databricks Stack):
MERGE INTO customer_master_clean target USING ( SELECT * FROM deduped WHERE rn = 1) sourceON target.customer_id = source.customer_id WHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT *;
Expected Follow-Up Questions:
- “What if two records have the same timestamp?” (Add secondary sort column)
- “How do you handle this in Spark/PySpark?” (Use window functions: pyspark.sql.window)
- “What if you need to keep historical duplicates?” (SCD Type 2 approach)
5. Efficiently Process Large CSV Files (20GB+) with Python ETL
Difficulty Level: Medium-High
Engineering Level: Mid-Senior Data Engineer (4-5 YOE)
Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel, Vishakha Singhal - Sept-Nov 2025)
Team: Data Platform Engineering / ETL Optimization
Interview Round: Coding Round 2 (Python/Spark)
Technology Focus: Python ETL, Memory Management, Spark, Data Pipeline Optimization
Question: “You receive a large CSV file (20GB+) daily. You need to read, transform, and write it to a database efficiently using Python. How would you approach this without running out of memory? What technologies would you use?”
Answer Framework
Why PepsiCo Asks This:
- Daily batch processing of millions of supply chain transactions
- PepsiCo processes 150,000+ daily jobs in modern data platform
- Must handle real-time inventory, sales, and demand data efficiently
- Optimization directly impacts infrastructure costs and SLAs
Key Decision Framework:
- File size <5GB: Pandas with chunking
- File size 5-20GB: Pandas chunking OR PySpark (depending on infra)
- File size >20GB: PySpark required (distributed processing)
Solution
Approach 1: Pandas with Chunked Reading (5-10GB files):
import pandas as pd
from sqlalchemy import create_engine
import logging
def process_large_csv_pandas(file_path, chunk_size=50000):
""" Process large CSV in chunks to avoid memory overflow """ # Database connection engine = create_engine('postgresql://user:password@localhost:5432/pepsico_db')
total_rows_processed = 0 try:
# Read CSV in chunks for chunk_num, chunk in enumerate(pd.read_csv(file_path, chunksize=chunk_size), start=1):
# Transform chunk['processed_date'] = pd.Timestamp.now()
chunk['sales_amount'] = pd.to_numeric(chunk['sales_amount'], errors='coerce')
chunk['quantity'] = chunk['quantity'].fillna(0).astype(int)
# Data quality: Remove invalid rows chunk = chunk[chunk['sales_amount'] > 0]
# Load to database (append mode) chunk.to_sql(
'sales_transactions',
con=engine,
if_exists='append',
index=False,
method='multi' # Faster bulk insert )
total_rows_processed += len(chunk)
logging.info(f"Processed chunk {chunk_num}: {len(chunk)} rows (Total: {total_rows_processed})")
except Exception as e:
logging.error(f"Error processing file: {str(e)}")
raise finally:
engine.dispose()
return total_rows_processed
# Usagerows = process_large_csv_pandas('daily_sales_20GB.csv', chunk_size=100000)
print(f"Total rows processed: {rows}")Approach 2: PySpark for 20GB+ Files (Recommended for PepsiCo Scale):
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, current_timestamp, when
from pyspark.sql.types import DoubleType, IntegerType
def process_large_csv_spark(file_path, output_jdbc_url):
""" Process large CSV using PySpark for distributed processing """ # Initialize Spark session spark = SparkSession.builder \ .appName("PepsiCo_LargeCSV_ETL") \ .config("spark.executor.memory", "8g") \ .config("spark.driver.memory", "4g") \ .config("spark.sql.shuffle.partitions", "200") \ .getOrCreate()
try:
# Read CSV (distributed across Spark cluster) df = spark.read \ .option("header", "true") \ .option("inferSchema", "false") \ # Faster, define schema manually .option("mode", "DROPMALFORMED") \ # Skip corrupted rows .csv(file_path)
# Transform df = df.withColumn('processed_date', current_timestamp()) \ .withColumn('sales_amount', col('sales_amount').cast(DoubleType())) \ .withColumn('quantity', col('quantity').cast(IntegerType())) \ .withColumn('quantity', when(col('quantity').isNull(), 0).otherwise(col('quantity')))
# Data quality: Filter invalid rows df = df.filter(col('sales_amount') > 0)
# Write to database (JDBC) df.write \ .format("jdbc") \ .option("url", output_jdbc_url) \ .option("dbtable", "sales_transactions") \ .option("user", "pepsico_user") \ .option("password", "password") \ .option("batchsize", 10000) \ # Bulk insert size .mode("append") \ .save()
row_count = df.count()
print(f"Successfully processed {row_count} rows")
return row_count
finally:
spark.stop()
# Usageprocess_large_csv_spark(
file_path='s3://pepsico-data/daily_sales_20GB.csv',
output_jdbc_url='jdbc:postgresql://prod-db.pepsico.com:5432/sales_db')Approach 3: PySpark with Schema Definition (Faster):
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType
def process_with_schema(file_path):
""" Define schema explicitly for faster processing (no inference) """ spark = SparkSession.builder.appName("PepsiCo_ETL").getOrCreate()
# Define schema explicitly (avoids inferSchema scan) schema = StructType([
StructField("order_id", StringType(), False),
StructField("customer_id", StringType(), False),
StructField("product_id", StringType(), False),
StructField("sales_amount", DoubleType(), True),
StructField("quantity", IntegerType(), True),
StructField("order_date", DateType(), True),
StructField("region", StringType(), True)
])
# Read with predefined schema (much faster) df = spark.read \ .schema(schema) \ .option("header", "true") \ .csv(file_path)
# Transform and load... return dfReal-World PepsiCo Pipeline:
def pepsico_daily_sales_etl(date_str):
""" Production ETL pipeline for PepsiCo daily sales data """ spark = SparkSession.builder \ .appName(f"Sales_ETL_{date_str}") \ .config("spark.databricks.delta.optimizeWrite.enabled", "true") \ .getOrCreate()
# Read from Azure Data Lake Storage (ADLS) input_path = f"abfss://raw-data@pepsico.dfs.core.windows.net/sales/{date_str}/*.csv" df = spark.read \ .option("header", "true") \ .csv(input_path)
# Transform from pyspark.sql.functions import sum as _sum, avg as _avg
df_aggregated = df.groupBy("region", "product_id") \ .agg(
_sum("sales_amount").alias("total_sales"),
_avg("sales_amount").alias("avg_sales"),
_sum("quantity").alias("total_quantity")
)
# Write to Delta Lake (PepsiCo uses Databricks Delta) df_aggregated.write \ .format("delta") \ .mode("overwrite") \ .partitionBy("region") \ .save(f"abfss://curated@pepsico.dfs.core.windows.net/sales_daily/{date_str}")
print(f"ETL completed for {date_str}")
# Schedule with Airflow/Databricks Jobs# pepsico_daily_sales_etl('2024-12-08')Performance Best Practices:
- Partition large files:
- Instead of 1×20GB file → 20×1GB files (parallel processing)
- Use binary formats for storage:
- CSV → Parquet (10x smaller, 100x faster reads)
- Database connection pooling:
from sqlalchemy import create_engine engine = create_engine('postgresql://...', pool_size=10, max_overflow=20)
- Monitor memory usage:
import psutil print(f"Memory usage: {psutil.virtual_memory().percent}%")
Expected Follow-Up Questions:
- “What if CSV has inconsistent schemas?” (Schema evolution handling)
- “How would you parallelize Pandas processing?” (Use Dask or multiprocessing)
- “What about corrupted CSV rows?” (Error handling, dead letter queue)
6. Detect Anomalies in Sales Data Using Statistical Methods
Difficulty Level: Medium-High
Engineering Level: Mid-Senior Data Engineer (4-5 YOE)
Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel - Sept-Oct 2025)
Team: Analytics Infrastructure / Real-time Alerts
Interview Round: Coding Round 2 (Python)
Technology Focus: Statistical Analysis, Python Data Science, Anomaly Detection
Question: “Write a Python function to detect anomalies in daily sales data. An anomaly is defined as any value that deviates more than 3 standard deviations (3σ) from the mean. Return flagged records with anomaly scores.”
Answer Framework
Why PepsiCo Asks This:
- Real-time monitoring of 1 billion+ daily consumer interactions globally
- Detects supply chain disruptions (unusual demand spikes/drops)
- Critical for inventory management across 30+ digital products
- Enables proactive alerts for demand forecasting errors and quality issues
Key Technical Concepts:
- Z-score (3-sigma rule): Measures how many standard deviations a value is from mean
- Statistical anomaly detection: Values beyond 3σ are statistically rare (99.7% within 3σ)
- Grouped anomaly detection: Per-product analysis (Pepsi anomalies ≠ Doritos anomalies)
- Time-series consideration: Rolling windows vs static mean/std
Solution
import numpy as np
import pandas as pd
from scipy import stats
def detect_sales_anomalies(sales_df, threshold=3):
""" Detect anomalies using 3-sigma rule (z-score method) Args: sales_df: DataFrame with columns ['date', 'product_id', 'sales_amount'] threshold: Number of standard deviations (default: 3) Returns: DataFrame with anomalies flagged """ sales_df = sales_df.copy()
# Calculate z-score per product sales_df['z_score'] = sales_df.groupby('product_id')['sales_amount'].transform(
lambda x: np.abs(stats.zscore(x, nan_policy='omit'))
)
# Flag anomalies sales_df['is_anomaly'] = sales_df['z_score'] > threshold
sales_df['anomaly_severity'] = sales_df['z_score']
# Return only anomalies anomalies = sales_df[sales_df['is_anomaly']].copy()
return anomalies[['date', 'product_id', 'sales_amount', 'z_score', 'anomaly_severity']]
# Alternative: Rolling window for time-seriesdef detect_anomalies_rolling(sales_df, window=30, threshold=3):
""" Detect anomalies using rolling statistics (better for time-series) """ sales_df = sales_df.sort_values(['product_id', 'date']).copy()
sales_df['rolling_mean'] = sales_df.groupby('product_id')['sales_amount'].transform(
lambda x: x.rolling(window=window, min_periods=1).mean()
)
sales_df['rolling_std'] = sales_df.groupby('product_id')['sales_amount'].transform(
lambda x: x.rolling(window=window, min_periods=1).std()
)
# Z-score relative to rolling window sales_df['z_score'] = (sales_df['sales_amount'] - sales_df['rolling_mean']) / sales_df['rolling_std']
sales_df['is_anomaly'] = np.abs(sales_df['z_score']) > threshold
return sales_df[sales_df['is_anomaly']]7. Group Transactions by Customer and Find Top N Without Pandas
Difficulty Level: Medium
Engineering Level: Mid Data Engineer (4-5 YOE)
Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel - Sept-Oct 2025)
Team: Data Platform Engineering
Interview Round: Coding Round 2 (Python Core)
Technology Focus: Python Core, Data Structures, Algorithm Design
Question: “Given a list of transactions with customer_id, transaction_amount, and date, write a Python function to group by customer and return the top N customers by purchase frequency without using Pandas or built-in aggregation libraries. Use only standard Python.”
Answer Framework
Why PepsiCo Asks This:
- Tests fundamental Python programming skills without library dependencies
- Assesses algorithmic thinking and data structure knowledge
- Important for environments where libraries may be restricted
- Evaluates ability to optimize code manually (time/space complexity)
Key Technical Concepts:
- Dictionary aggregation: Efficient O(1) key lookups for grouping
- Sorting with lambda: Custom sort keys (frequency descending, then amount descending)
- List slicing: [:n] for top N selection
- Time complexity: O(n) for aggregation + O(n log n) for sorting
Solution
def find_top_n_customers(transactions, n=10):
""" Find top N customers by purchase frequency using only Python core Args: transactions: List of dicts [{'customer_id': 'C1', 'transaction_amount': 100, 'date': '2024-01-01'}, ...] n: Number of top customers to return Returns: List of dicts with customer stats """ # Step 1: Group by customer_id customer_stats = {}
for trans in transactions:
customer_id = trans['customer_id']
amount = trans['transaction_amount']
if customer_id not in customer_stats:
customer_stats[customer_id] = {
'frequency': 0,
'total_amount': 0,
'transactions': []
}
customer_stats[customer_id]['frequency'] += 1 customer_stats[customer_id]['total_amount'] += amount
customer_stats[customer_id]['transactions'].append(trans)
# Step 2: Sort by frequency (descending), then by total amount sorted_customers = sorted(
customer_stats.items(),
key=lambda x: (-x[1]['frequency'], -x[1]['total_amount'])
)
# Step 3: Get top N top_n = sorted_customers[:n]
# Step 4: Format output result = []
for customer_id, stats in top_n:
result.append({
'customer_id': customer_id,
'purchase_frequency': stats['frequency'],
'total_spent': stats['total_amount'],
'avg_transaction': round(stats['total_amount'] / stats['frequency'], 2)
})
return result
# Example usagetransactions = [
{'customer_id': 'CUST-001', 'transaction_amount': 150, 'date': '2024-01-01'},
{'customer_id': 'CUST-002', 'transaction_amount': 200, 'date': '2024-01-02'},
{'customer_id': 'CUST-001', 'transaction_amount': 120, 'date': '2024-01-03'},
]
top_10 = find_top_n_customers(transactions, n=10)8. Solve the Small File Problem in ADLS/S3 with PySpark
Difficulty Level: High
Engineering Level: Senior Data Engineer (5+ YOE)
Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel, Ritik Jain - Sept-Oct 2025)
Team: Data Platform Engineering / Cloud Infrastructure
Interview Round: Technical Round 2 (System Design)
Technology Focus: PySpark, Azure Data Lake Storage, Spark Performance Tuning
Question: “You have thousands of small CSV files (5MB-100MB each) scattered across Azure Data Lake Storage (ADLS). When you try to read them into PySpark, performance is terrible. How would you solve the ‘small file problem’ and optimize for parallel processing?”
Answer Framework
Why PepsiCo Asks This:
- PepsiCo stores 60+ PB of data across Azure Data Lake Storage Gen2
- Data ingestion from multiple systems creates thousands of small fragmented files
- Small files cause excessive Spark overhead (metadata operations, task scheduling)
- Central challenge in PepsiCo’s Databricks-powered modern data architecture
Key Technical Concepts:
- Small file problem: Too many small files (< 128MB) cause poor Spark performance
- Coalesce vs repartition: Coalesce reduces partitions without shuffle, repartition allows increase
- Optimal partition size: 128-256MB per partition for HDFS/S3/ADLS
- File format optimization: Parquet/Delta Lake vs CSV for storage efficiency
Solution
from pyspark.sql import SparkSession
def solve_small_file_problem(path_to_files, output_path):
""" Solve small file problem by coalescing and converting to Parquet """ spark = SparkSession.builder \ .appName("SmallFileOptimization") \ .config("spark.sql.adaptive.enabled", "true") \ .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ .getOrCreate()
# Read all small CSV files df = spark.read \ .option("header", "true") \ .csv(f"{path_to_files}/*.csv")
# Calculate optimal partitions (1 partition per 128MB data) total_size_mb = df.rdd.map(lambda x: len(str(x))).sum() / (1024 * 1024)
optimal_partitions = max(1, int(total_size_mb / 128))
# Coalesce to reduce partition count df_optimized = df.coalesce(optimal_partitions)
# Write as partitioned Parquet (larger, more efficient files) df_optimized.write \ .mode("overwrite") \ .partitionBy("region", "year", "month") \ .parquet(output_path)
print(f"Optimized from {df.rdd.getNumPartitions()} to {optimal_partitions} partitions")
# Usagesolve_small_file_problem(
path_to_files="abfss://raw@pepsico.dfs.core.windows.net/sales_csv",
output_path="abfss://curated@pepsico.dfs.core.windows.net/sales_parquet")9. Design Efficient PySpark Join for Billions of Rows
Difficulty Level: Very High
Engineering Level: Senior Data Engineer (5+ YOE)
Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel - Sept-Oct 2025)
Team: Data Platform Engineering / Performance Optimization
Interview Round: Technical Round 2 (System Design)
Technology Focus: PySpark Join Optimization, Spark Performance
Question: “You need to join two very large datasets (billions of rows each) efficiently in PySpark. How would you approach this? What optimizations would you apply?”
Answer Framework
Why PepsiCo Asks This:
- PepsiCo joins massive datasets daily (sales × customer × product × time dimensions)
- Typical scenario: customer transactions (10B rows) × customer master (100M rows)
- Critical for building unified reporting layer across 30+ digital products
- Join optimization directly impacts query time (hours vs minutes) and costs
Key Technical Concepts:
- Broadcast join: Small table (<1GB) broadcasted to all executors, no shuffle
- Sort-merge join: Default for large-large joins, requires shuffle and sort
- Bucketing: Pre-partitions tables by join key, eliminates shuffle for recurring joins
- Join strategies: Broadcast (fastest) > Bucketed > Shuffle hash > Sort-merge
Solution
from pyspark.sql.functions import broadcast
def efficient_large_join(spark, large_df_path, small_df_path):
""" Efficiently join large datasets using broadcast or bucketing """ df_large = spark.read.parquet(large_df_path) # 10B rows df_small = spark.read.parquet(small_df_path) # 100M rows # Approach 1: Broadcast join (if one table < 1GB) if df_small.rdd.mapPartitions(lambda x: [sum(1 for _ in x)]).sum() < 10_000_000:
result = df_large.join(broadcast(df_small), "customer_id", "left")
else:
# Approach 2: Bucketed join (for recurring joins) result = df_large.join(df_small, "customer_id", "left")
return result
# Bucketing for repeated joins (one-time setup)def create_bucketed_tables(df, output_path, num_buckets=256):
df.write \ .mode("overwrite") \ .bucketBy(num_buckets, "customer_id") \ .sortBy("customer_id") \ .parquet(output_path)10. Handle Data Skew in Spark Joins
Difficulty Level: Very High
Engineering Level: Senior Data Engineer (5+ YOE)
Source: LinkedIn (Kishore Mohammed, Gerryson, Rahul Patel - Sept-Oct 2025)
Team: Data Platform Engineering / Advanced Spark Tuning
Interview Round: Technical Round 2 (System Design)
Technology Focus: Data Skew, Salting Technique, Spark Optimization
Question: “You’re joining two large Spark DataFrames, but one join key has significantly more data than others (e.g., ‘Other’ category represents 70% of rows). Your Spark job is extremely slow. How would you handle this data skew?”
Answer Framework
Why PepsiCo Asks This:
- Real-world data is inherently skewed (Pepsi brand >> niche brands, major warehouses >> small ones)
- Supply chain data has hot keys (popular products, major distribution centers)
- Skew causes uneven Spark task distribution and OOM (out-of-memory) errors
- Critical for handling PepsiCo’s 30+ digital products with varying data volumes
Key Technical Concepts:
- Data skew: Uneven distribution where one key has disproportionate data (70%+ in one partition)
- Salting technique: Add random suffix to skewed keys to distribute across multiple partitions
- Isolated join: Handle skewed keys separately with broadcast, normal keys with regular join
- Adaptive Query Execution (AQE): Spark 3.0+ automatically detects and handles some skew
Solution
from pyspark.sql.functions import rand, when, col, concat, lit
def handle_data_skew(df1, df2, join_key, skew_threshold=0.7, salt_buckets=10):
""" Handle data skew using salting technique """ # Step 1: Identify skewed keys key_distribution = df1.groupBy(join_key).count() \ .withColumn("percentage", col("count") / df1.count())
skewed_keys = key_distribution.filter(col("percentage") > skew_threshold) \ .select(join_key).rdd.flatMap(lambda x: x).collect()
# Step 2: Split into skewed and normal df1_skewed = df1.filter(col(join_key).isin(skewed_keys))
df1_normal = df1.filter(~col(join_key).isin(skewed_keys))
# Step 3: Apply salting to skewed portion df1_skewed = df1_skewed.withColumn("salt", (rand() * salt_buckets).cast("int")) \ .withColumn("salted_key", concat(col(join_key), lit("_"), col("salt")))
# Step 4: Replicate df2 for skewed keys df2_replicated = df2.filter(col(join_key).isin(skewed_keys))
df2_salted_list = []
for i in range(salt_buckets):
df2_salted_list.append(
df2_replicated.withColumn("salt", lit(i)) \ .withColumn("salted_key", concat(col(join_key), lit("_"), col("salt")))
)
from functools import reduce from pyspark.sql import DataFrame
df2_salted = reduce(DataFrame.unionAll, df2_salted_list)
# Step 5: Join skewed and normal separately result_skewed = df1_skewed.join(df2_salted, "salted_key", "left")
result_normal = df1_normal.join(df2.filter(~col(join_key).isin(skewed_keys)), join_key, "left")
# Step 6: Combine results result = result_skewed.drop("salt", "salted_key") \ .unionByName(result_normal)
return result