Apache Spark - Sort Merge Join









Apache Spark is a well known open source cluster computation system, for large scale data processing. If you are into Big Data and Machine Learning, Apache Spark may not need any introduction.

Its capabilities include In-memory processing, Distributed SQL Analytics, Real Time stream processing, Graph processing, Machine Learning, support for several programming languages and many more. 


"What happens under the hood, when we execute a Spark application?".  

thinking?????







This is a very common question, which crops up in every Apache Spark user's mind.

If you have the same question, I'll answer this using Spark SQL by executing one of the most common data processing operation i.e. "Join".

Spark being a vast technology, it is nearly impossible to cover all of Spark's details in a single article. And we all learn something new, every time we use a technology to solve a new challenge. However, the details I'll cover here, should be enough to give you an idea of what goes on behind the scenes in a Spark application. 


Prerequisite - This article is going to be technical and would uncover the deeper details of a Spark application. Hence some basic familiarity with Spark and distributed processing, is a prerequisite. 
Though if you are new to Spark, I will cover some fundamentals of Spark in the "Spark fundamentals" section of this article.


Let's get started ðŸš€ 

Let us consider a company named A2ZMoney. A2ZMoney provides online financial services such as money transfer, business accounting and tax filing. 
A2ZMoney is completely web based and has its customer base globally, with thousands of businesses signed up to use its services.


Business use case: 

A2ZMoney wants to grow its revenue. And to grow its revenue, it is laser focused on achieving the following 2 goals:

1. Increase its mobile app usage. To achieve this goal, A2ZMoney wants to convert its desktop users to mobile app users. As mobile app usage leads to more engagement and hence more business.

2. Send promotional offers to its users, who are already on mobile platform. These promotional offers would lead to more transactions and hence generate more revenue. 

To fulfill these two goals, A2ZMoney first would like to know about its customers and most importantly the devices they use. This would help it to target customers accordingly. 

So the business requirement is to fetch the customers and the corresponding device used by them. 


Data:

We'll use the following two datasets, one with the details of customers(business) and the other with the details of devices used by them.

Business Accounts data (Parquet files) - It consists of the data of customer's online account they used to access services offered by A2ZMoney. The schema of this dataset though consists of several columns, but the key column of interest for us is business's accountId. 

Device data (Parquet files) - It consists of the details of devices used by customers(business). Primary columns of interest are business's accountId, deviceId, date_of_use(date on which the device was used).

To fetch the required customer and device details, one of the very first approach would be to join the business and device datasets. 


Infrastructure we'll use:

We'll use AWS Cloud for this exercise.

Processing Cluster - EMR(v5.12.1), 5 node cluster of r4.8xlarge instances.

Data storage - S3


Datasets location in S3:

I have created a dedicated S3 bucket named spark-buckt to load these datasets:

Data files paths in S3:

business_account -> s3://spark-buckt/src/business_account/
business_device -> s3://spark-buckt/src/business_device/

Note - While reading this article, if any time you need better image resolution, please click on images to enlarge.


Click on image to enlarge














Some data analysis before performing join:

Spark has a concept of dataframes(similar to Pandas dataframe) which is a distributed collection of data organized into named columns. Hence to perform any processing on these two datasets present in S3, we'll first create 2 dataframes on top of each. 

But before we can create a Spark dataframe, we need a Spark Session object which is the entry point to all Spark functionalities and is the first key entity which should be created. Below in the Application Code section, you can find the application code which shows the creation of Spark Session and data frames on our business accounts and device data. 

Before processing the data through our application, let us first count the number of records in our two dataframes separately in a notebook. I am using Zeppelin notebook for achieving this, using the read transformation and count action of Spark as follows. We'll learn about Spark transformation and action in the next section called Spark fundamentals:

Business_account - count ~ 10 million




Business_device - count ~ 3 million




Now we need to join these two datasets, to retrieve the customer details and the device used by them. The Spark application code is as follows:


Application code:


Click on image to enlarge


Before we execute this spark application, lets understand the code. 

We first create two data frames, one for business and the other for device datasets stored in S3. Then we perform joins on these two data frames, on matching join key accountId and then write the output data files back to S3.


Spark fundamentals:

Spark works on the concept of lazy evaluation using transformations and actions. 

Transformation - An operation which transforms a dataset and produces a new dataset. In our code join is that transformation.

Action - An action is an operation which performs some computations and returns a result back. In our code write is that action.

Transformations are always lazy, meaning they do not apply their computation until an action is executed or there is a need for some data to be returned. 




To achieve this lazy evaluation, Spark creates a directed graph called DAG (Directed Acyclic Graphbehind the scenesA DAG is a directed graph with no cycles or loops. 
Directed Acyclic Graph

In a Spark DAG, vertices reflect the data and edges reflect the operations performed on that data. 

A DAG is entirely logical and Spark does not materialize a DAG until an action is performed. In our code, write() is that action, which triggers the materialization of our application's DAG. We'll look at the DAG shortly to understand the details.

Every action triggers a Spark job. A job gets further subdivided into sets of tasks called stages. The DAG gives us the details of the stages. A stage has tasks working on the colocated data partitions and across stages the data travels over the network.

Both these datasets are larger than 10 MB in size, hence by default this join would be executed by Spark as Sort Merge Join.

Why is it so - Because the default maximum size of any table to participate in a broadcast hash join is 10 MB as shown below(for Spark v2.3.0).


Click on image to enlarge








Let us now look at the Spark application execution details.


Execution details of application:


Execution time = 47s(as shown below):
Click on image to enlarge



Click on image to enlarge


As we can see, the last job corresponding to parquet action took most of the execution time(31 s). We'll dive into the details of this Job and analyze the stages:

Click on image to enlarge


As shown above, this job executed in 3 stages with the exchange phases doing the shuffling of data over the network, as required for the join operation. 

The first two stages performs the shuffle write for the respective two business account and device datasets. The last stage reads all of the shuffled data as written in the previous two stages and performs the join. Lets take a look at what exactly is happening in these stages by opening the query plan. 





If you are familiar with Map Reduce, then this would be easy to understand. 

For this join, the data of these two datasets - business_account and business_device first gets scanned and mapped on the join key column accountId. Then in the exchange phase the data is hash partitioned on the join key and in the sort phase, it is sorted on the hash partitioned join key columns. Then once the required partitioning and sorting is done, the data from the two required tables are joined on the join key accountId, in the SortMergeJoin phase(inner join here). 

Then in the final project phase the final required output data is fetched from this result. Finally the output data file of join result is written in parquet format to the S3 output location(as shown below).



Click on image to enlarge




I hope through this article, you were able to get some understanding of the fundamentals of Spark and how Spark engine formulates the DAG and translates and performs the computation.

   Thanks for taking the time to read this article.


Comments

  1. I like your post very much. It is very much useful for my research. I hope you to share more info about this. Keep posting Spark Training

    ReplyDelete
    Replies
    1. Thank you IT. I'll surely be posting more articles around Big Data processing.

      Delete

Popular posts from this blog

UPSERT in Hive(3 Step Process)

Parquet tools utility

Hive - Merging small files into bigger files.

Parquet File format - Storage details

Skew Join Optimization in Hive

Apache Spark - Explicit Broadcasting Optimization

Spark Optimization - Bucketing

Fraud Detection - Using Graph Database

Apache Spark - Broadcast Hash Join