Spark Optimization - Bucketing



Complex data computation is a part of every Big Data Project. 
Do you have a business scenario requiring complex join on data having billions or trillions of values? 
If you are using Apache Spark, stay tuned for next few minutes, as this article will address this expensive join problem, using Spark's optimization technique called Bucketing.

Before reading this article, I recommend you to once go through my previous articles on Spark's Sort Merge Join and Broadcast Hash Join. They will set up your foundation for understanding bucketing technique. It is recommended but is not mandatory, as you would still be able to grasp the core concept.


When to use Bucketing ?  




For any business problem, where we need to perform a join on tables with very high cardinality on join column(I repeat very high) in say millions, billions or even trillions and also this join needs to happen multiple times.

Lets understand bucketing using the data of a hypothetical service provider company ABC Corporation.

Data:

Our data consists of accounts of different businesses which signed up with ABC Corporation to use its services. 
Following are the two datasets we'll will use to explain this concept:

Business Accounts data -
This consists of the details of the accounts of customers who signed up with ABC Corporation to use its service. The schema of this dataset consists of around 28 columns and the join column we'll use is accountId.

Device data - Details of the devices used by customers to use ABC Corporation's services. 

Problem statement - We want to know the devices which different businesses used to access ABC Corporation's website. 

For this we need to join the business accounts and device datasets on accountId and fetch business details from business account dataset and the corresponding device details from device dataset.
You may think why not join on deviceId as that is how it usually should be. Well I have deliberately normalized the data differently, to explain you the bucketing concept by choosing the high cardinality column accountId as the join key. The cardinality of this data is though in millions.

Our goal is to perform join in the most optimized way. So we'll perform it with and without bucketing and analyze the Spark DAG in both cases. This will help us to practically realize the benefit of bucketing.

I'll show you how bucketing will reduce the execution time to 1/10th of the time consumed without bucketing.


Dataset s3 locations:

business_account - s3://akshay-spark-bucket/data/business_account/business_account.csv


Click on image to enlarge
business_device - s3://akshay-spark-bucket/data/business_device/business_device.csv


Click on image to enlarge



We will join these datasets on accountId column, first without bucketing and then with bucketing, to see the performance gain. .

Versions of EMR, Spark and Scala, which I used at the time I wrote this article are as follows:  

Cluster used  EMR (Release label v5.12.1) to process these datasets
Spark version - v2.2.1 
Scala Version - v2.11.8

Lets start the spark shell and create dataframes on top of our data sets:

I am naming the dataframe on business account data as ra_src. Here ra stands for revenue account, as these accounts generate revenue for ABC Corporation.

Click on image to enlarge

Now we'll create temporary views from these dataframes, to run our Spark SQL join queries.

Click on image to enlarge






Record counts :
On running a count action on our dataframes for accounts and device, we get:


Click on image to enlarge

Business accounts - 3975735 (~4 million)
Business device - 4390242 (~4.4 million)


                             Join without bucketing:

Now lets do a join query on the dataframes and observe the Spark application's details:

Click on image to enlarge


Now lets run a show() action on this result dataframe to view the results.
 



I am using Yarn cluster Manager to execute Spark applications. This show() action would materialize the DAG(Directed Acyclic Graph) and create the required Job for this action in my Yarn application. This job is Job 6., as shown below:

Yarn Application details

Yarn application's Job6 for this action (as shown below), executed in 3 stages.
Click on image to enlarge




DAG Visualization details:

















It clearly shows stage 6 and stage 7 first mapped the data and did shuffle write(in yellow), thus serializing the data. Then the exchange phase began for data travel in this stage, during which the partitioning on the join key column happens, which is then sorted and the data is shuffled and it travels over the network. The shuffelled data is read in stage 8,  where it is deserialized and sort merge join happens as shown below in optimizer's plan:

SQL Query plan:


Click on image to enlarge

The execution details I explained above, can be clearly seen through this plan framed by Catalyst optimizer, where exchange is the biggest barrier.

Bottleneck in long application time - It took total 30 seconds for the join application to complete where majority of time was spent in the shuffling stages -> stage 6 which took 26 seconds which then did a shuffle write operation and stage 7 which in parallel took 20 seconds and finally did shuffle write.

Question1 - Is this a bottleneck? Yes it is. 26 seconds may look small, as the data size here is really small. But this time would multiply, when data volume and cardinality increases to billions and even trillions. And also when the join needs to happen multiple times.

Question 2 - Can this be overcome? Yes of course. That is where bucketing would play its key role in optimization. 




Lets tighten our belts for the optimization ride  

                                    Join with bucketing:

Now lets bucket both the dataframes on join column accountId and also sort them on accountId and save them as tables ra_bucketed_50 and ra_dvc_bucketed_50. I have bucketed both dataframes into 50 buckets:


Click on image to enlarge

Click on image to enlarge





Benefit of above Bucketing operation 

The benefit of bucketing the two dataframes on accountId column is that we are pre partitioning the data on the join column and also sorting in advance, to avoid the overhead of exchange and sorting phase in actual job. We'll shortly see the huge performance gain when we'll perform the same join between these new bucketed dataframes.

Lets store the result of the join on bucketed dataframes into new dataframe bucketed_df:


Click on image to enlarge


As earlier we'll do a show on this dataframe to materialize the DAG and analyze the DAG:





Wow this show action completed in just 3 seconds(as shown below in Yarn application).




Lets observe the Yarn application to see the performance gain.

Yarn Application details

As shown below, the show() action created Job 7. 
Wow this Job executed in just one stage(Stage ID 9, Description - show at <console>) and span just 3 seconds. 
This means the data required to perform the join operation was available locally and no shuffling was required. Lets confirm this by analyzing the Job DAG.

Click on image to enlarge
DAG Visualization details:




As the DAG Visualization clearly shows us, this job just created one stage - Stage 9. And no other stage was required and hence no shuffling actually happened. We can clearly see this in the above DAG visualization image, in Shuffle Read and Shuffle Write columns which are blank. Hence bucketing proved its efficiency in completely eliminating the shuffling.

Hence we now know the benefit of bucketing for join scenarios, where the same data with high cardinality on join column, is used multiple times for joins. I hope you found this article useful. Thanks for taking the time to read this article.


Comments

  1. Should bucketing be used everywhere joins are used? Is there any particular case where should we use bucketing or should not be use bucketing?

    ReplyDelete
    Replies
    1. If you notice in the beginning of this article in "Why and when Bucketing" portion, I have explained this detail. Prefer to use bucketing in the scenarios of high cardinality on join columns and also where your query has repetition of that join operation, to gain the benefit.

      Delete

Popular posts from this blog

UPSERT in Hive(3 Step Process)

Parquet tools utility

Hive - Merging small files into bigger files.

Parquet File format - Storage details

Skew Join Optimization in Hive

Apache Spark - Sort Merge Join

Apache Spark - Explicit Broadcasting Optimization

Apache Spark - Broadcast Hash Join

Graph Technology - In plain English