As like many other programming languages or frameworks, writing a toy program is easy but it’s always difficult how to make a program high performant, especially with Spark, which is popular running in distributed environment.
Spark provides a rich lib of functions to meet various requirements, but you may only need some of them in your daily work. Here are some notes I took during reading the book “High Performance Spark”, with this notes, hope developers can have a better design of spark programs at upfront instead of learn from failures themselves. While a key takeaway is that, instead of only knowing how to use what, it’s more important why it is better.
Pyspark seems very popular in the market because of the popularity of big data, but since Spark relies heavily on inline function definitions and lambda expressions, which are much more naturally supported in Scala, also lazy evaluation within memory computations makes it particularly unique, this also makes programming in Scala has its own advantages over Python.
General RDDs
Evert RDD has the following properties:
- A list of partition objects,
- A function for computing an iterator of each partition,
- A list of dependencies on other RDDs,
- A partitioner,
- A list of preferred locations
Transformations and actions are two categories of operations that working on your data,
- A transformation is a function that return another RDD. e.g:
flatMap
,map
,reduceByKey
,groupByKey
etc - An action is an operation that returns something other than an RDD,
Actions trigger the scheduler, which builds a directed acyclic graph (called the DAG), based on the dependencies between RDD transformations (I also mentioned this in a previous post).
Note that not all transformations are 100% lazy. sortByKey
needs to evaluate the RDD to determine the range of data, so it involves both a transformation and an action.
When persisting RDDs, the default implementation of RDDs evicts the least recently used partition (called LRU caching) if the space it takes is required to compute or to cache a new partition. However, you can change this behaviour and control Spark’s memory prioritization with the persistencePriority()
function in the RDD class
Knowing your Spark Program
The Spark program itself runs in the driver node and sends some instructions to the executors
When the SparkContext is started, a driver and a series of executors are started on the worker nodes of the cluster. The SparkContext determines how many resources are allotted to each executor
With each action in a spark application, the Spark scheduler builds an execution graph and launches a Spark job, A job is then divided into stages and tasks. Stage boundary is determined by wide transformations which requires shuffle (narrow and wide transformations).
1 | Job -> stages -> tasks |
The execution of job is taken care of by different kinds of schedulers:
1 | Dag Scheduler -> Task Scheduler |
DataSet and DataFrame
DataFrames and Datasets can be read using the Spark SQL equivalent to a SparkContext, the SparkSession.
DataFrames are Datasets of a special Row object, which doesn’t provide any compile-time type checking, DataFrames allow Spark’s optimizer to better understand our code and our data, which allows for a new class of optimizations.
DataSet API is quite different when compared with general RDD APIs, it has its own optimizer and execution plans.
jobs are transformed to logical plan and then physical plan which has better performance.
Spark ML and Spark MLlib
Spark ML provides a higher-level API than MLlib with the goal of allowing users to more easily create practical machine learning pipelines. Spark MLlib is primarily built on top of RDDs and uses functions from Spark Core, while ML is built on top of Spark SQL DataFrames
Some optimization tips
Sharing the same partitioner with RDDs are materialized by the same action, so thet will end up being co-located (which can even reduce network traffic).
Speed up joins by broad cast join, real world application is way more complicate than this, it depends on the property of the two datasets you want to join. If you are joining a big dataset with a small one, then you can broadcast the small one.
Since we can’t control the partitioner for DataFrames or Datasets, so we can’t manually avoid shuffles as you did with core Spark joins
Minimizing object creation in your RDD operations:
- Reusing existing objects: but note this may cause object mutable
- Using smaller data structures, using primitive types instead of case classes, objects
- Reduce setup overhead: db connection etc
- Reusing RDD: persist, cache, checkpoint, shuffle files. but notice it is space intensive to store data in memory and will take time to serialize and deserialize
Some common issues to avoid when doing key-value transformations:
- Memory error on Spark Driver: Avoid calling
collection
,collectionAsMap
on large dataset. - OOM on executors: Spark parameter tuning, reduce data loading by filtering or aggregation first
- Shuffle failures: Try to preserve partitioning across narrow transformations to avoid reshuffling data
- Straggler tasks: Make partition size even by customizing partitioner
- Memory error on Spark Driver: Avoid calling