apache-spark-data-processing

Complete guide for Apache Spark data processing including RDDs, DataFrames, Spark SQL, streaming, MLlib, and production deployment

About apache-spark-data-processing

apache-spark-data-processing is a Claude AI skill developed by manutej. Complete guide for Apache Spark data processing including RDDs, DataFrames, Spark SQL, streaming, MLlib, and production deployment This powerful Claude Code plugin helps developers automate workflows and enhance productivity with intelligent AI assistance.

0Stars
0Forks
2025-10-19

Why use apache-spark-data-processing? With 0 stars on GitHub, this skill has been trusted by developers worldwide. Install this Claude skill instantly to enhance your development workflow with AI-powered automation.

nameapache-spark-data-processing
descriptionComplete guide for Apache Spark data processing including RDDs, DataFrames, Spark SQL, streaming, MLlib, and production deployment
tags["spark","big-data","distributed-computing","dataframes","streaming","machine-learning"]
tiertier-1

Apache Spark Data Processing

A comprehensive skill for mastering Apache Spark data processing, from basic RDD operations to advanced streaming, SQL, and machine learning workflows. Learn to build scalable, distributed data pipelines and analytics systems.

When to Use This Skill

Use Apache Spark when you need to:

  • Process Large-Scale Data: Handle datasets too large for single-machine processing (TB to PB scale)
  • Perform Distributed Computing: Execute parallel computations across cluster nodes
  • Real-Time Stream Processing: Process continuous data streams with low latency
  • Complex Data Analytics: Run sophisticated analytics, aggregations, and transformations
  • Machine Learning at Scale: Train ML models on massive datasets
  • ETL/ELT Pipelines: Build robust data transformation and loading workflows
  • Interactive Data Analysis: Perform exploratory analysis on large datasets
  • Unified Data Processing: Combine batch and streaming workloads in one framework

Not Ideal For:

  • Small datasets (<100 GB) that fit in memory on a single machine
  • Simple CRUD operations (use traditional databases)
  • Ultra-low latency requirements (<10ms) where specialized stream processors excel
  • Workflows requiring strong ACID transactions across distributed data

Core Concepts

Resilient Distributed Datasets (RDDs)

RDDs are Spark's fundamental data abstraction - immutable, distributed collections of objects that can be processed in parallel.

Key Characteristics:

  • Resilient: Fault-tolerant through lineage tracking
  • Distributed: Partitioned across cluster nodes
  • Immutable: Transformations create new RDDs, not modify existing ones
  • Lazy Evaluation: Transformations build computation graph; actions trigger execution
  • In-Memory Computing: Cache intermediate results for iterative algorithms

RDD Operations:

  • Transformations: Lazy operations that return new RDDs (map, filter, flatMap, reduceByKey)
  • Actions: Operations that trigger computation and return values (collect, count, reduce, saveAsTextFile)

When to Use RDDs:

  • Low-level control over data distribution and partitioning
  • Custom partitioning schemes required
  • Working with unstructured data (text files, binary data)
  • Migrating legacy code from early Spark versions

Prefer DataFrames/Datasets when possible - they provide automatic optimization via Catalyst optimizer.

DataFrames and Datasets

DataFrames are distributed collections of data organized into named columns - similar to a database table or pandas DataFrame, but with powerful optimizations.

DataFrames:

  • Structured data with schema
  • Automatic query optimization (Catalyst)
  • Cross-language support (Python, Scala, Java, R)
  • Rich API for SQL-like operations

Datasets (Scala/Java only):

  • Typed DataFrames with compile-time type safety
  • Best performance in Scala due to JVM optimization
  • Combine RDD type safety with DataFrame optimizations

Key Advantages Over RDDs:

  • Query Optimization: Catalyst optimizer rewrites queries for efficiency
  • Tungsten Execution: Optimized CPU and memory usage
  • Columnar Storage: Efficient data representation
  • Code Generation: Compile-time bytecode generation for faster execution

Lazy Evaluation

Spark uses lazy evaluation to optimize execution:

  1. Transformations build a Directed Acyclic Graph (DAG) of operations
  2. Actions trigger execution of the DAG
  3. Spark's optimizer analyzes the entire DAG and creates an optimized execution plan
  4. Work is distributed across cluster nodes

Benefits:

  • Minimize data movement across network
  • Combine multiple operations into single stage
  • Eliminate unnecessary computations
  • Optimize memory usage

Partitioning

Data is divided into partitions for parallel processing:

  • Default Partitioning: Typically based on HDFS block size or input source
  • Hash Partitioning: Distribute data by key hash (used by groupByKey, reduceByKey)
  • Range Partitioning: Distribute data by key ranges (useful for sorted data)
  • Custom Partitioning: Define your own partitioning logic

Partition Count Considerations:

  • Too few partitions: Underutilized cluster, large task execution time
  • Too many partitions: Scheduling overhead, small task execution time
  • General rule: 2-4 partitions per CPU core in cluster
  • Use repartition() or coalesce() to adjust partition count

Caching and Persistence

Cache frequently accessed data in memory for performance:

# Cache DataFrame in memory df.cache() # Shorthand for persist(StorageLevel.MEMORY_AND_DISK) # Different storage levels df.persist(StorageLevel.MEMORY_ONLY) # Fast but may lose data if evicted df.persist(StorageLevel.MEMORY_AND_DISK) # Spill to disk if memory full df.persist(StorageLevel.DISK_ONLY) # Store only on disk df.persist(StorageLevel.MEMORY_ONLY_SER) # Serialized in memory (more compact) # Unpersist when done df.unpersist()

When to Cache:

  • Data used multiple times in workflow
  • Iterative algorithms (ML training)
  • Interactive analysis sessions
  • Expensive transformations reused downstream

When Not to Cache:

  • Data used only once
  • Very large datasets that exceed cluster memory
  • Streaming applications with continuous new data

Spark SQL

Spark SQL allows you to query structured data using SQL or DataFrame API:

  • Execute SQL queries on DataFrames and tables
  • Register DataFrames as temporary views
  • Join structured and semi-structured data
  • Connect to Hive metastore for table metadata
  • Support for various data sources (Parquet, ORC, JSON, CSV, JDBC)

Performance Features:

  • Catalyst Optimizer: Rule-based and cost-based query optimization
  • Tungsten Execution Engine: Whole-stage code generation, vectorized processing
  • Adaptive Query Execution (AQE): Runtime optimization based on statistics
  • Dynamic Partition Pruning: Skip irrelevant partitions during execution

Broadcast Variables and Accumulators

Shared variables for efficient distributed computing:

Broadcast Variables:

  • Read-only variables cached on each node
  • Efficient for sharing large read-only data (lookup tables, ML models)
  • Avoid sending large data with every task
# Broadcast a lookup table lookup_table = {"key1": "value1", "key2": "value2"} broadcast_lookup = sc.broadcast(lookup_table) # Use in transformations rdd.map(lambda x: broadcast_lookup.value.get(x, "default"))

Accumulators:

  • Write-only variables for aggregating values across tasks
  • Used for counters and sums in distributed operations
  • Only driver can read final accumulated value
# Create accumulator error_count = sc.accumulator(0) # Increment in tasks rdd.foreach(lambda x: error_count.add(1) if is_error(x) else None) # Read final value in driver print(f"Total errors: {error_count.value}")

Spark SQL Deep Dive

DataFrame Creation

Create DataFrames from various sources:

from pyspark.sql import SparkSession spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate() # From structured data data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)] columns = ["name", "id"] df = spark.createDataFrame(data, columns) # From files df_json = spark.read.json("path/to/file.json") df_parquet = spark.read.parquet("path/to/file.parquet") df_csv = spark.read.option("header", "true").csv("path/to/file.csv") # From JDBC sources df_jdbc = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql://host:port/database") \ .option("dbtable", "table_name") \ .option("user", "username") \ .option("password", "password") \ .load()

DataFrame Operations

Common DataFrame transformations:

# Select columns df.select("name", "age").show() # Filter rows df.filter(df.age > 21).show() df.where(df["age"] > 21).show() # Alternative syntax # Add/modify columns from pyspark.sql.functions import col, lit df.withColumn("age_plus_10", col("age") + 10).show() df.withColumn("country", lit("USA")).show() # Aggregations df.groupBy("department").count().show() df.groupBy("department").agg({"salary": "avg", "age": "max"}).show() # Sorting df.orderBy("age").show() df.orderBy(col("age").desc()).show() # Joins df1.join(df2, df1.id == df2.user_id, "inner").show() df1.join(df2, "id", "left_outer").show() # Unions df1.union(df2).show()

SQL Queries

Execute SQL on DataFrames:

# Register DataFrame as temporary view df.createOrReplaceTempView("people") # Run SQL queries sql_result = spark.sql("SELECT name FROM people WHERE age > 21") sql_result.show() # Complex queries result = spark.sql(""" SELECT department, COUNT(*) as employee_count, AVG(salary) as avg_salary, MAX(age) as max_age FROM people WHERE age > 25 GROUP BY department HAVING COUNT(*) > 5 ORDER BY avg_salary DESC """) result.show()

Data Sources

Spark SQL supports multiple data formats:

Parquet (Recommended for Analytics):

  • Columnar storage format
  • Excellent compression and query performance
  • Schema embedded in file
  • Supports predicate pushdown and column pruning
# Write df.write.parquet("output/path", mode="overwrite", compression="snappy") # Read with partition pruning df = spark.read.parquet("output/path").filter(col("date") == "2025-01-01")

ORC (Optimized Row Columnar):

  • Similar to Parquet with slightly better compression
  • Preferred for Hive integration
  • Built-in indexes for faster queries
df.write.orc("output/path", mode="overwrite") df = spark.read.orc("output/path")

JSON (Semi-Structured Data):

  • Human-readable but less efficient
  • Schema inference on read
  • Good for nested/complex data
# Read with explicit schema from pyspark.sql.types import StructType, StructField, StringType, IntegerType schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True) ]) df = spark.read.schema(schema).json("data.json")

CSV (Legacy/Simple Data):

  • Widely compatible but slow
  • Requires header inference or explicit schema
  • Minimal compression benefits
df.write.csv("output.csv", header=True, mode="overwrite") df = spark.read.option("header", "true").option("inferSchema", "true").csv("data.csv")

Window Functions

Advanced analytics with window functions:

from pyspark.sql.window import Window from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, sum, avg # Define window specification window_spec = Window.partitionBy("department").orderBy(col("salary").desc()) # Ranking functions df.withColumn("rank", rank().over(window_spec)).show() df.withColumn("row_num", row_number().over(window_spec)).show() df.withColumn("dense_rank", dense_rank().over(window_spec)).show() # Aggregate functions over window df.withColumn("dept_avg_salary", avg("salary").over(window_spec)).show() df.withColumn("running_total", sum("salary").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))).show() # Offset functions df.withColumn("prev_salary", lag("salary", 1).over(window_spec)).show() df.withColumn("next_salary", lead("salary", 1).over(window_spec)).show()

User-Defined Functions (UDFs)

Create custom transformations:

from pyspark.sql.functions import udf from pyspark.sql.types import StringType, IntegerType # Python UDF (slower due to serialization overhead) def categorize_age(age): if age < 18: return "Minor" elif age < 65: return "Adult" else: return "Senior" categorize_udf = udf(categorize_age, StringType()) df.withColumn("age_category", categorize_udf(col("age"))).show() # Pandas UDF (vectorized, faster for large datasets) from pyspark.sql.functions import pandas_udf import pandas as pd @pandas_udf(IntegerType()) def square(series: pd.Series) -> pd.Series: return series ** 2 df.withColumn("age_squared", square(col("age"))).show()

UDF Performance Tips:

  • Use built-in Spark functions when possible (always faster)
  • Prefer Pandas UDFs over Python UDFs for better performance
  • Use Scala UDFs for maximum performance (no serialization overhead)
  • Cache DataFrames before applying UDFs if used multiple times

Transformations and Actions

Common Transformations

map: Apply function to each element

# RDD rdd = sc.parallelize([1, 2, 3, 4, 5]) squared = rdd.map(lambda x: x * 2) # [2, 4, 6, 8, 10] # DataFrame (use select with functions) from pyspark.sql.functions import col df.select(col("value") * 2).show()

filter: Select elements matching predicate

# RDD rdd.filter(lambda x: x > 2).collect() # [3, 4, 5] # DataFrame df.filter(col("age") > 25).show()

flatMap: Map and flatten results

# RDD - Split text into words lines = sc.parallelize(["hello world", "apache spark"]) words = lines.flatMap(lambda line: line.split(" ")) # ["hello", "world", "apache", "spark"]

reduceByKey: Aggregate values by key

# Word count example words = sc.parallelize(["apple", "banana", "apple", "cherry", "banana", "apple"]) word_pairs = words.map(lambda word: (word, 1)) word_counts = word_pairs.reduceByKey(lambda a, b: a + b) # Result: [("apple", 3), ("banana", 2), ("cherry", 1)]

groupByKey: Group values by key (avoid when possible - use reduceByKey instead)

# Less efficient than reduceByKey word_pairs.groupByKey().mapValues(list).collect() # Result: [("apple", [1, 1, 1]), ("banana", [1, 1]), ("cherry", [1])]

join: Combine datasets by key

# RDD join users = sc.parallelize([("user1", "Alice"), ("user2", "Bob")]) orders = sc.parallelize([("user1", 100), ("user2", 200), ("user1", 150)]) users.join(orders).collect() # Result: [("user1", ("Alice", 100)), ("user1", ("Alice", 150)), ("user2", ("Bob", 200))] # DataFrame join (more efficient) df_users.join(df_orders, "user_id", "inner").show()

distinct: Remove duplicates

# RDD rdd.distinct().collect() # DataFrame df.distinct().show() df.dropDuplicates(["user_id"]).show() # Drop based on specific columns

coalesce/repartition: Change partition count

# Reduce partitions (no shuffle, more efficient) df.coalesce(1).write.parquet("output") # Increase/decrease partitions (involves shuffle) df.repartition(10).write.parquet("output") df.repartition(10, "user_id").write.parquet("output") # Partition by column

Common Actions

collect: Retrieve all data to driver

results = rdd.collect() # Returns list # WARNING: Only use on small datasets that fit in driver memory

count: Count elements

total = df.count() # Number of rows

first/take: Get first N elements

first_elem = rdd.first() first_five = rdd.take(5)

reduce: Aggregate all elements

total_sum = rdd.reduce(lambda a, b: a + b)

foreach: Execute function on each element

# Side effects only (no return value) rdd.foreach(lambda x: print(x))

saveAsTextFile: Write to file system

rdd.saveAsTextFile("hdfs://path/to/output")

show: Display DataFrame rows (action)

df.show(20, truncate=False) # Show 20 rows, don't truncate columns

Structured Streaming

Process continuous data streams using DataFrame API.

Core Concepts

Streaming DataFrame:

  • Unbounded table that grows continuously
  • Same operations as batch DataFrames
  • Micro-batch processing (default) or continuous processing

Input Sources:

  • File sources (JSON, Parquet, CSV, ORC, text)
  • Kafka
  • Socket (for testing)
  • Rate source (for testing)
  • Custom sources

Output Modes:

  • Append: Only new rows added to result table
  • Complete: Entire result table written every trigger
  • Update: Only updated rows written

Output Sinks:

  • File sinks (Parquet, ORC, JSON, CSV, text)
  • Kafka
  • Console (for debugging)
  • Memory (for testing)
  • Foreach/ForeachBatch (custom logic)

Basic Streaming Example

from pyspark.sql import SparkSession from pyspark.sql.functions import col spark = SparkSession.builder.appName("StreamingExample").getOrCreate() # Read stream from JSON files input_stream = spark.readStream \ .format("json") \ .schema(schema) \ .option("maxFilesPerTrigger", 1) \ .load("input/directory") # Transform streaming data processed = input_stream \ .filter(col("value") > 10) \ .select("id", "value", "timestamp") # Write stream to Parquet query = processed.writeStream \ .format("parquet") \ .option("path", "output/directory") \ .option("checkpointLocation", "checkpoint/directory") \ .outputMode("append") \ .start() # Wait for termination query.awaitTermination()

Stream-Static Joins

Join streaming data with static reference data:

# Static DataFrame (loaded once) static_df = spark.read.parquet("reference/data") # Streaming DataFrame streaming_df = spark.readStream.format("kafka").load() # Inner join (supported) joined = streaming_df.join(static_df, "type") # Left outer join (supported) joined = streaming_df.join(static_df, "type", "left_outer") # Write result joined.writeStream \ .format("parquet") \ .option("path", "output") \ .option("checkpointLocation", "checkpoint") \ .start()

Windowed Aggregations

Aggregate data over time windows:

from pyspark.sql.functions import window, col, count # 10-minute tumbling window windowed_counts = streaming_df \ .groupBy( window(col("timestamp"), "10 minutes"), col("word") ) \ .count() # 10-minute sliding window with 5-minute slide windowed_counts = streaming_df \ .groupBy( window(col("timestamp"), "10 minutes", "5 minutes"), col("word") ) \ .count() # Write to console for debugging query = windowed_counts.writeStream \ .outputMode("complete") \ .format("console") \ .option("truncate", "false") \ .start()

Watermarking for Late Data

Handle late-arriving data with watermarks:

from pyspark.sql.functions import window # Define watermark (10 minutes tolerance for late data) windowed_counts = streaming_df \ .withWatermark("timestamp", "10 minutes") \ .groupBy( window(col("timestamp"), "10 minutes"), col("word") ) \ .count() # Data arriving more than 10 minutes late will be dropped

Watermark Benefits:

  • Limit state size by dropping old aggregation state
  • Handle late data within tolerance window
  • Improve performance by not maintaining infinite state

Session Windows

Group events into sessions based on inactivity gaps:

from pyspark.sql.functions import session_window, when # Dynamic session window based on user session_window_spec = session_window( col("timestamp"), when(col("userId") == "user1", "5 seconds") .when(col("userId") == "user2", "20 seconds") .otherwise("5 minutes") ) sessionized_counts = streaming_df \ .withWatermark("timestamp", "10 minutes") \ .groupBy(session_window_spec, col("userId")) \ .count()

Stateful Stream Processing

Maintain state across micro-batches:

from pyspark.sql.functions import expr # Deduplication using state deduplicated = streaming_df \ .withWatermark("timestamp", "1 hour") \ .dropDuplicates(["user_id", "event_id"]) # Stream-stream joins (stateful) stream1 = spark.readStream.format("kafka").option("subscribe", "topic1").load() stream2 = spark.readStream.format("kafka").option("subscribe", "topic2").load() joined = stream1 \ .withWatermark("timestamp", "10 minutes") \ .join( stream2.withWatermark("timestamp", "20 minutes"), expr("stream1.user_id = stream2.user_id AND stream1.timestamp >= stream2.timestamp AND stream1.timestamp <= stream2.timestamp + interval 15 minutes"), "inner" )

Checkpointing

Ensure fault tolerance with checkpoints:

# Checkpoint location stores: # - Stream metadata (offsets, configuration) # - State information (for stateful operations) # - Write-ahead logs query = streaming_df.writeStream \ .format("parquet") \ .option("path", "output") \ .option("checkpointLocation", "checkpoint/dir") # REQUIRED for production \ .start() # Recovery: Restart query with same checkpoint location # Spark will resume from last committed offset

Checkpoint Best Practices:

  • Always set checkpointLocation for production streams
  • Use reliable distributed storage (HDFS, S3) for checkpoints
  • Don't delete checkpoint directory while stream is running
  • Back up checkpoints for disaster recovery

Machine Learning with MLlib

Spark's scalable machine learning library.

Core Components

MLlib Features:

  • ML Pipelines: Chain transformations and models
  • Featurization: Vector assemblers, scalers, encoders
  • Classification & Regression: Linear models, tree-based models, neural networks
  • Clustering: K-means, Gaussian Mixture, LDA
  • Collaborative Filtering: ALS (Alternating Least Squares)
  • Dimensionality Reduction: PCA, SVD
  • Model Selection: Cross-validation, train-test split, parameter tuning

ML Pipelines

Chain transformations and estimators:

from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler, StandardScaler from pyspark.ml.classification import LogisticRegression # Load data df = spark.read.format("libsvm").load("data/sample_libsvm_data.txt") # Define pipeline stages assembler = VectorAssembler( inputCols=["feature1", "feature2", "feature3"], outputCol="features" ) scaler = StandardScaler( inputCol="features", outputCol="scaled_features", withStd=True, withMean=True ) lr = LogisticRegression( featuresCol="scaled_features", labelCol="label", maxIter=10, regParam=0.01 ) # Create pipeline pipeline = Pipeline(stages=[assembler, scaler, lr]) # Split data train_df, test_df = df.randomSplit([0.8, 0.2], seed=42) # Train model model = pipeline.fit(train_df) # Make predictions predictions = model.transform(test_df) predictions.select("label", "prediction", "probability").show()

Feature Engineering

Transform raw data into features:

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler # Categorical encoding indexer = StringIndexer(inputCol="category", outputCol="category_index") encoder = OneHotEncoder(inputCol="category_index", outputCol="category_vec") # Numerical scaling scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features") # Assemble features assembler = VectorAssembler( inputCols=["category_vec", "numeric_feature1", "numeric_feature2"], outputCol="features" ) # Text processing from pyspark.ml.feature import Tokenizer, HashingTF, IDF tokenizer = Tokenizer(inputCol="text", outputCol="words") hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=10000) idf = IDF(inputCol="raw_features", outputCol="features")

Streaming Linear Regression

Train models on streaming data:

from pyspark.mllib.regression import LabeledPoint from pyspark.streaming import StreamingContext from pyspark.streaming.ml import StreamingLinearRegressionWithSGD # Create StreamingContext ssc = StreamingContext(sc, batchDuration=1) # Define data streams training_stream = ssc.textFileStream("training/data/path") testing_stream = ssc.textFileStream("testing/data/path") # Parse streams into LabeledPoint objects def parse_point(line): values = [float(x) for x in line.strip().split(',')] return LabeledPoint(values[0], values[1:]) parsed_training = training_stream.map(parse_point) parsed_testing = testing_stream.map(parse_point) # Initialize model num_features = 3 model = StreamingLinearRegressionWithSGD(initialWeights=[0.0] * num_features) # Train and predict model.trainOn(parsed_training) predictions = model.predictOnValues(parsed_testing.map(lambda lp: (lp.label, lp.features))) # Print predictions predictions.pprint() # Start streaming ssc.start() ssc.awaitTermination()

Model Evaluation

Evaluate model performance:

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator # Binary classification binary_evaluator = BinaryClassificationEvaluator( labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC" ) auc = binary_evaluator.evaluate(predictions) print(f"AUC: {auc}") # Multiclass classification multi_evaluator = MulticlassClassificationEvaluator( labelCol="label", predictionCol="prediction", metricName="accuracy" ) accuracy = multi_evaluator.evaluate(predictions) print(f"Accuracy: {accuracy}") # Regression regression_evaluator = RegressionEvaluator( labelCol="label", predictionCol="prediction", metricName="rmse" ) rmse = regression_evaluator.evaluate(predictions) print(f"RMSE: {rmse}")

Hyperparameter Tuning

Optimize model parameters with cross-validation:

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator from pyspark.ml.classification import RandomForestClassifier from pyspark.ml.evaluation import MulticlassClassificationEvaluator # Define model rf = RandomForestClassifier(labelCol="label", featuresCol="features") # Build parameter grid param_grid = ParamGridBuilder() \ .addGrid(rf.numTrees, [10, 20, 50]) \ .addGrid(rf.maxDepth, [5, 10, 15]) \ .addGrid(rf.minInstancesPerNode, [1, 5, 10]) \ .build() # Define evaluator evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # Cross-validation cv = CrossValidator( estimator=rf, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5, parallelism=4 ) # Train cv_model = cv.fit(train_df) # Best model best_model = cv_model.bestModel print(f"Best numTrees: {best_model.getNumTrees}") print(f"Best maxDepth: {best_model.getMaxDepth()}") # Evaluate on test set predictions = cv_model.transform(test_df) accuracy = evaluator.evaluate(predictions) print(f"Test Accuracy: {accuracy}")

Distributed Matrix Operations

MLlib provides distributed matrix representations:

from pyspark.mllib.linalg.distributed import RowMatrix, IndexedRowMatrix, CoordinateMatrix from pyspark.mllib.linalg import Vectors # RowMatrix: Distributed matrix without row indices rows = sc.parallelize([ Vectors.dense([1.0, 2.0, 3.0]), Vectors.dense([4.0, 5.0, 6.0]), Vectors.dense([7.0, 8.0, 9.0]) ]) row_matrix = RowMatrix(rows) # Compute statistics print(f"Rows: {row_matrix.numRows()}") print(f"Cols: {row_matrix.numCols()}") print(f"Column means: {row_matrix.computeColumnSummaryStatistics().mean()}") # IndexedRowMatrix: Matrix with row indices from pyspark.mllib.linalg.distributed import IndexedRow indexed_rows = sc.parallelize([ IndexedRow(0, Vectors.dense([1.0, 2.0, 3.0])), IndexedRow(1, Vectors.dense([4.0, 5.0, 6.0])) ]) indexed_matrix = IndexedRowMatrix(indexed_rows) # CoordinateMatrix: Sparse matrix using (row, col, value) entries from pyspark.mllib.linalg.distributed import MatrixEntry entries = sc.parallelize([ MatrixEntry(0, 0, 1.0), MatrixEntry(0, 2, 3.0), MatrixEntry(1, 1, 5.0) ]) coord_matrix = CoordinateMatrix(entries)

Stratified Sampling

Sample data while preserving class distribution:

# Scala/Java approach data = [("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5), ("c", 6)] rdd = sc.parallelize(data) # Define sampling fractions per key fractions = {"a": 0.5, "b": 0.5, "c": 0.5} # Approximate sample (faster, one pass) sampled_rdd = rdd.sampleByKey(withReplacement=False, fractions=fractions) # Exact sample (slower, guaranteed exact counts) exact_sampled = rdd.sampleByKeyExact(withReplacement=False, fractions=fractions) print(sampled_rdd.collect())

Performance Tuning

Memory Management

Memory Breakdown:

  • Execution Memory: Used for shuffles, joins, sorts, aggregations
  • Storage Memory: Used for caching and broadcast variables
  • User Memory: Used for user data structures and UDFs
  • Reserved Memory: Reserved for Spark internal operations

Configuration:

spark = SparkSession.builder \ .appName("MemoryTuning") \ .config("spark.executor.memory", "4g") \ .config("spark.driver.memory", "2g") \ .config("spark.memory.fraction", "0.6") # Fraction for execution + storage \ .config("spark.memory.storageFraction", "0.5") # Fraction of above for storage \ .getOrCreate()

Memory Best Practices:

  • Monitor memory usage via Spark UI
  • Use appropriate storage levels for caching
  • Avoid collecting large datasets to driver
  • Increase executor memory for memory-intensive operations
  • Use kryo serialization for better memory efficiency

Shuffle Optimization

Shuffles are expensive operations - minimize them:

Causes of Shuffles:

  • groupByKey, reduceByKey, aggregateByKey
  • join, cogroup
  • repartition, coalesce (with increase)
  • distinct, intersection
  • sortByKey

Optimization Strategies:

# 1. Use reduceByKey instead of groupByKey # Bad: groupByKey shuffles all data word_pairs.groupByKey().mapValues(sum) # Good: reduceByKey combines locally before shuffle word_pairs.reduceByKey(lambda a, b: a + b) # 2. Broadcast small tables in joins from pyspark.sql.functions import broadcast large_df.join(broadcast(small_df), "key") # 3. Partition data appropriately df.repartition(200, "user_id") # Partition by key for subsequent aggregations # 4. Coalesce instead of repartition when reducing partitions df.coalesce(10) # No shuffle, just merge partitions # 5. Tune shuffle partitions spark.conf.set("spark.sql.shuffle.partitions", 200) # Default is 200

Shuffle Configuration:

spark = SparkSession.builder \ .config("spark.sql.shuffle.partitions", 200) \ .config("spark.default.parallelism", 200) \ .config("spark.sql.adaptive.enabled", "true") # Enable AQE \ .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ .getOrCreate()

Partitioning Strategies

Partition Count Guidelines:

  • Too few: Underutilized cluster, OOM errors
  • Too many: Task scheduling overhead
  • Sweet spot: 2-4x number of CPU cores
  • For large shuffles: 100-200+ partitions

Partition by Column:

# Partition writes by date for easy filtering df.write.partitionBy("date", "country").parquet("output") # Read with partition pruning (only reads relevant partitions) spark.read.parquet("output").filter(col("date") == "2025-01-15").show()

Custom Partitioning:

from pyspark.rdd import portable_hash # Custom partitioner for RDD def custom_partitioner(key): return portable_hash(key) % 100 rdd.partitionBy(100, custom_partitioner)

Caching Strategies

When to Cache:

# Iterative algorithms (ML) training_data.cache() for i in range(num_iterations): model = train_model(training_data) # Multiple aggregations on same data base_df.cache() result1 = base_df.groupBy("country").count() result2 = base_df.groupBy("city").avg("sales") # Interactive analysis df.cache() df.filter(condition1).show() df.filter(condition2).show() df.groupBy("category").count().show()

Storage Levels:

from pyspark import StorageLevel # Memory only (fastest, but may lose data) df.persist(StorageLevel.MEMORY_ONLY) # Memory and disk (spill to disk if needed) df.persist(StorageLevel.MEMORY_AND_DISK) # Serialized in memory (more compact, slower access) df.persist(StorageLevel.MEMORY_ONLY_SER) # Disk only (slowest, but always available) df.persist(StorageLevel.DISK_ONLY) # Replicated (fault tolerance) df.persist(StorageLevel.MEMORY_AND_DISK_2) # 2 replicas

Broadcast Joins

Optimize joins with small tables:

from pyspark.sql.functions import broadcast # Automatic broadcast (tables < spark.sql.autoBroadcastJoinThreshold) spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024) # 10 MB # Explicit broadcast hint large_df.join(broadcast(small_df), "key") # Benefits: # - No shuffle of large table # - Small table sent to all executors once # - Much faster for small dimension tables

Adaptive Query Execution (AQE)

Enable runtime query optimization:

spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true") # AQE Benefits: # - Dynamically coalesce partitions after shuffle # - Handle skewed joins by splitting large partitions # - Optimize join strategy at runtime

Data Format Selection

Performance Comparison:

  1. Parquet (Best for analytics): Columnar, compressed, fast queries
  2. ORC (Best for Hive): Similar to Parquet, slightly better compression
  3. Avro (Best for row-oriented): Good for write-heavy workloads
  4. JSON (Slowest): Human-readable but inefficient
  5. CSV (Legacy): Compatible but slow and no schema

Recommendation:

  • Use Parquet for most analytics workloads
  • Enable compression (snappy, gzip, lzo)
  • Partition by commonly filtered columns
  • Use columnar formats for read-heavy workloads

Catalyst Optimizer

Understand query optimization:

# View physical plan df.explain(mode="extended") # Optimizations include: # - Predicate pushdown: Push filters to data source # - Column pruning: Read only required columns # - Constant folding: Evaluate constants at compile time # - Join reordering: Optimize join order # - Partition pruning: Skip irrelevant partitions

Production Deployment

Cluster Managers

Standalone:

  • Simple, built-in cluster manager
  • Easy setup for development and small clusters
  • No resource sharing with other frameworks
# Start master $SPARK_HOME/sbin/start-master.sh # Start workers $SPARK_HOME/sbin/start-worker.sh spark://master:7077 # Submit application spark-submit --master spark://master:7077 app.py

YARN:

  • Hadoop's resource manager
  • Share cluster resources with MapReduce, Hive, etc.
  • Two modes: cluster (driver on YARN) and client (driver on local machine)
# Cluster mode (driver runs on YARN) spark-submit --master yarn --deploy-mode cluster app.py # Client mode (driver runs locally) spark-submit --master yarn --deploy-mode client app.py

Kubernetes:

  • Modern container orchestration
  • Dynamic resource allocation
  • Cloud-native deployments
spark-submit \ --master k8s://https://k8s-master:443 \ --deploy-mode cluster \ --name spark-app \ --conf spark.executor.instances=5 \ --conf spark.kubernetes.container.image=spark:latest \ app.py

Mesos:

  • General-purpose cluster manager
  • Fine-grained or coarse-grained resource sharing

Application Submission

Basic spark-submit:

spark-submit \ --master yarn \ --deploy-mode cluster \ --driver-memory 4g \ --executor-memory 8g \ --executor-cores 4 \ --num-executors 10 \ --conf spark.sql.shuffle.partitions=200 \ --py-files dependencies.zip \ --files config.json \ application.py

Configuration Options:

  • --master: Cluster manager URL
  • --deploy-mode: Where to run driver (client or cluster)
  • --driver-memory: Memory for driver process
  • --executor-memory: Memory per executor
  • --executor-cores: Cores per executor
  • --num-executors: Number of executors
  • --conf: Spark configuration properties
  • --py-files: Python dependencies
  • --files: Additional files to distribute

Resource Allocation

General Guidelines:

  • Driver Memory: 1-4 GB (unless collecting large results)
  • Executor Memory: 4-16 GB per executor
  • Executor Cores: 4-5 cores per executor (diminishing returns beyond 5)
  • Number of Executors: Fill cluster capacity, leave resources for OS/other services
  • Parallelism: 2-4x total cores

Example Calculations:

Cluster: 10 nodes, 32 cores each, 128 GB RAM each

Option 1: Many small executors
  - 30 executors (3 per node)
  - 10 cores per executor
  - 40 GB memory per executor
  - Total: 300 cores

Option 2: Fewer large executors (RECOMMENDED)
  - 50 executors (5 per node)
  - 5 cores per executor
  - 24 GB memory per executor
  - Total: 250 cores

Dynamic Allocation

Automatically scale executors based on workload:

spark = SparkSession.builder \ .appName("DynamicAllocation") \ .config("spark.dynamicAllocation.enabled", "true") \ .config("spark.dynamicAllocation.minExecutors", 2) \ .config("spark.dynamicAllocation.maxExecutors", 100) \ .config("spark.dynamicAllocation.initialExecutors", 10) \ .config("spark.dynamicAllocation.executorIdleTimeout", "60s") \ .getOrCreate()

Benefits:

  • Better resource utilization
  • Automatic scaling for varying workloads
  • Reduced costs in cloud environments

Monitoring and Logging

Spark UI:

  • Web UI at http://driver:4040
  • Stages, tasks, storage, environment, executors
  • SQL query plans and execution details
  • Identify bottlenecks and performance issues

History Server:

# Start history server $SPARK_HOME/sbin/start-history-server.sh # Configure event logging spark.conf.set("spark.eventLog.enabled", "true") spark.conf.set("spark.eventLog.dir", "hdfs://namenode/spark-logs")

Metrics:

# Enable metrics collection spark.conf.set("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink") spark.conf.set("spark.metrics.conf.*.sink.console.period", 10)

Logging:

# Configure log level spark.sparkContext.setLogLevel("WARN") # ERROR, WARN, INFO, DEBUG # Custom logging import logging logger = logging.getLogger(__name__) logger.info("Custom log message")

Fault Tolerance

Automatic Recovery:

  • Task failures: Automatically retry failed tasks
  • Executor failures: Reschedule tasks on other executors
  • Driver failures: Restore from checkpoint (streaming)
  • Node failures: Recompute lost partitions from lineage

Checkpointing:

# Set checkpoint directory spark.sparkContext.setCheckpointDir("hdfs://namenode/checkpoints") # Checkpoint RDD (breaks lineage for very long chains) rdd.checkpoint() # Streaming checkpoint (required for production) query = streaming_df.writeStream \ .option("checkpointLocation", "hdfs://namenode/streaming-checkpoint") \ .start()

Speculative Execution:

# Enable speculative execution for slow tasks spark.conf.set("spark.speculation", "true") spark.conf.set("spark.speculation.multiplier", 1.5) spark.conf.set("spark.speculation.quantile", 0.75)

Data Locality

Optimize data placement for performance:

Locality Levels:

  1. PROCESS_LOCAL: Data in same JVM as task (fastest)
  2. NODE_LOCAL: Data on same node, different process
  3. RACK_LOCAL: Data on same rack
  4. ANY: Data on different rack (slowest)

Improve Locality:

# Increase locality wait time spark.conf.set("spark.locality.wait", "10s") spark.conf.set("spark.locality.wait.node", "5s") spark.conf.set("spark.locality.wait.rack", "3s") # Partition data to match cluster topology df.repartition(num_nodes * cores_per_node)

Best Practices

Code Organization

  1. Modular Design: Separate data loading, transformation, and output logic
  2. Configuration Management: Externalize configuration (use config files)
  3. Error Handling: Implement robust error handling and logging
  4. Testing: Unit test transformations, integration test pipelines
  5. Documentation: Document complex transformations and business logic

Performance

  1. Avoid Shuffles: Use reduceByKey instead of groupByKey
  2. Cache Wisely: Only cache data reused multiple times
  3. Broadcast Small Tables: Use broadcast joins for small reference data
  4. Partition Appropriately: 2-4x CPU cores, partition by frequently filtered columns
  5. Use Parquet: Columnar format for analytical workloads
  6. Enable AQE: Leverage adaptive query execution for runtime optimization
  7. Tune Memory: Balance executor memory and cores
  8. Monitor: Use Spark UI to identify bottlenecks

Development Workflow

  1. Start Small: Develop with sample data locally
  2. Profile Early: Monitor performance from the start
  3. Iterate: Optimize incrementally based on metrics
  4. Test at Scale: Validate with production-sized data before deployment
  5. Version Control: Track code, configurations, and schemas

Data Quality

  1. Schema Validation: Enforce schemas on read/write
  2. Null Handling: Explicitly handle null values
  3. Data Validation: Check for expected ranges, formats, constraints
  4. Deduplication: Remove duplicates based on business logic
  5. Audit Logging: Track data lineage and transformations

Security

  1. Authentication: Enable Kerberos for YARN/HDFS
  2. Authorization: Use ACLs for data access control
  3. Encryption: Encrypt data at rest and in transit
  4. Secrets Management: Use secure credential providers
  5. Audit Trails: Log data access and modifications

Cost Optimization

  1. Right-Size Resources: Don't over-provision executors
  2. Dynamic Allocation: Scale executors based on workload
  3. Spot Instances: Use spot/preemptible instances in cloud
  4. Data Compression: Use efficient formats (Parquet, ORC)
  5. Partitioning: Prune unnecessary data reads
  6. Auto-Shutdown: Terminate idle clusters

Common Patterns

ETL Pipeline Pattern

def etl_pipeline(spark, input_path, output_path): # Extract raw_df = spark.read.parquet(input_path) # Transform cleaned_df = raw_df \ .dropDuplicates(["id"]) \ .filter(col("value").isNotNull()) \ .withColumn("processed_date", current_date()) # Enrich enriched_df = cleaned_df.join(broadcast(reference_df), "key") # Aggregate aggregated_df = enriched_df \ .groupBy("category", "date") \ .agg( count("*").alias("count"), sum("amount").alias("total_amount"), avg("value").alias("avg_value") ) # Load aggregated_df.write \ .partitionBy("date") \ .mode("overwrite") \ .parquet(output_path)

Incremental Processing Pattern

def incremental_process(spark, input_path, output_path, checkpoint_path): # Read last processed timestamp last_timestamp = read_checkpoint(checkpoint_path) # Read new data new_data = spark.read.parquet(input_path) \ .filter(col("timestamp") > last_timestamp) # Process processed = transform(new_data) # Write processed.write.mode("append").parquet(output_path) # Update checkpoint max_timestamp = new_data.agg(max("timestamp")).collect()[0][0] write_checkpoint(checkpoint_path, max_timestamp)

Slowly Changing Dimension (SCD) Pattern

def scd_type2_upsert(spark, dimension_df, updates_df): # Mark existing records as inactive if updated inactive_records = dimension_df \ .join(updates_df, "business_key") \ .select( dimension_df["*"], lit(False).alias("is_active"), current_date().alias("end_date") ) # Add new records new_records = updates_df \ .withColumn("is_active", lit(True)) \ .withColumn("start_date", current_date()) \ .withColumn("end_date", lit(None)) # Union unchanged, inactive, and new records result = dimension_df \ .join(updates_df, "business_key", "left_anti") \ .union(inactive_records) \ .union(new_records) return result

Window Analytics Pattern

def calculate_running_metrics(df): from pyspark.sql.window import Window from pyspark.sql.functions import row_number, lag, sum, avg # Define window window_spec = Window.partitionBy("user_id").orderBy("timestamp") # Calculate metrics result = df \ .withColumn("row_num", row_number().over(window_spec)) \ .withColumn("prev_value", lag("value", 1).over(window_spec)) \ .withColumn("running_total", sum("value").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))) \ .withColumn("moving_avg", avg("value").over(window_spec.rowsBetween(-2, 0))) return result

Troubleshooting

Out of Memory Errors

Symptoms:

  • java.lang.OutOfMemoryError
  • Executor failures
  • Slow garbage collection

Solutions:

# Increase executor memory spark.conf.set("spark.executor.memory", "8g") # Increase driver memory (if collecting data) spark.conf.set("spark.driver.memory", "4g") # Reduce memory pressure df.persist(StorageLevel.MEMORY_AND_DISK) # Spill to disk df.coalesce(100) # Reduce partition count spark.conf.set("spark.sql.shuffle.partitions", 400) # Increase shuffle partitions # Avoid collect() on large datasets # Use take() or limit() instead df.take(100)

Shuffle Performance Issues

Symptoms:

  • Long shuffle read/write times
  • Skewed partition sizes
  • Task stragglers

Solutions:

# Increase shuffle partitions spark.conf.set("spark.sql.shuffle.partitions", 400) # Handle skew with salting df_salted = df.withColumn("salt", (rand() * 10).cast("int")) result = df_salted.groupBy("key", "salt").agg(...) # Use broadcast for small tables large_df.join(broadcast(small_df), "key") # Enable AQE for automatic optimization spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

Streaming Job Failures

Symptoms:

  • Streaming query stopped
  • Checkpoint corruption
  • Processing lag increasing

Solutions:

# Increase executor memory for stateful operations spark.conf.set("spark.executor.memory", "8g") # Tune watermark for late data .withWatermark("timestamp", "15 minutes") # Increase trigger interval to reduce micro-batch overhead .trigger(processingTime="30 seconds") # Monitor lag and adjust parallelism spark.conf.set("spark.sql.shuffle.partitions", 200) # Recover from checkpoint corruption # Delete checkpoint directory and restart (data loss possible) # Or implement custom state recovery logic

Data Skew

Symptoms:

  • Few tasks take much longer than others
  • Unbalanced partition sizes
  • Executor OOM errors

Solutions:

# 1. Salting technique (add random prefix to keys) from pyspark.sql.functions import concat, lit, rand df_salted = df.withColumn("salted_key", concat(col("key"), lit("_"), (rand() * 10).cast("int"))) result = df_salted.groupBy("salted_key").agg(...) # 2. Repartition by skewed column df.repartition(200, "skewed_column") # 3. Isolate skewed keys skewed_keys = df.groupBy("key").count().filter(col("count") > threshold).select("key") skewed_df = df.join(broadcast(skewed_keys), "key") normal_df = df.join(broadcast(skewed_keys), "key", "left_anti") # Process separately skewed_result = process_with_salting(skewed_df) normal_result = process_normally(normal_df) final = skewed_result.union(normal_result)

Context7 Code Integration

This skill integrates real-world code examples from Apache Spark's official repository. All code snippets in the EXAMPLES.md file are sourced from Context7's Apache Spark library documentation, ensuring production-ready patterns and best practices.

Version and Compatibility

  • Apache Spark Version: 3.x (compatible with 2.4+)
  • Python: 3.7+
  • Scala: 2.12+
  • Java: 8+
  • R: 3.5+

References


Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: Big Data, Distributed Computing, Data Engineering, Machine Learning Context7 Integration: /apache/spark with 8000 tokens of documentation

manutej

manutej

luxor-claude-marketplace

View on GitHub

Download Skill Files

View Installation Guide

Download the complete skill directory including SKILL.md and all related files