Avoid shuffling at all cost. Think about ways to leverage existing partitions. Leverage partial aggregation to reduce data transfer.
Shuffling is a process of redistributing data across partitions (aka repartitioning) that may or may not cause moving data across JVM processes or even over the wire (between executors on separate machines). Shuffling is the process of data transfer between stages. By default, shuffling doesn’t change the number of partitions, but their content.
RDD is created by loading a file from HDFS, or reading a file in local storage. Spark has no control over what bits of data are distributed in which partitions.
The data is read and partitioned in an RDD, and when an “action” function is called, Spark sends out tasks to the worker nodes. If the action is a reduction, data shuffling takes place.
This becomes a problem for key-value RDDs: these often require knowing where occurrences of a particular key are, for instance to perform a join. If the key can occur anywhere in the RDD, we have to look through every partition to find the key.
Avoid groupByKey and use reduceByKey or combineByKey instead.
groupByKey shuffles all the data, which is slow.
reduceByKey shuffles only the results of sub-aggregations in each partition of the data.
PairRDD offers join transformation that when called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key
when shuffling is triggered on Spark? Any join, cogroup, or ByKey operation involves holding objects in hashmaps or in-memory buffers to group or sort. join, cogroup, and groupByKey use these data structures in the tasks for the stages that are on the fetching side of the shuffles they trigger.
distinct creates a shuffle
reduceByKey and aggregateByKey use data structures in the tasks for the stages on both sides of the shuffles they trigger.
How to check for shuffle?
toDebugString returns “A description of this RDD and its recursive dependencies for debugging.” So it will include possible shuffles from prior transformations if they exist, even if the most recent transformation does not incur shuffle.
scala> val a = sc.parallelize(Array(1,2,3)).distinct scala> a.toDebugString MappedRDD at distinct at <console>:12 (1 partitions) MapPartitionsRDD at distinct at <console>:12 (1 partitions) **ShuffledRDD at distinct at <console>:12 (1 partitions)** MapPartitionsRDD at distinct at <console>:12 (1 partitions) MappedRDD at distinct at <console>:12 (1 partitions) ParallelCollectionRDD at parallelize at <console>:12 (1 partitions)
There are situations where a shuffle will be required or not required for a certain function. For example join usually requires a shuffle but if you join two RDD’s that branch from the same RDD, spark can sometimes elide the shuffle.