UPSERT in Hive(3 Step Process)


In this post we'll learn an efficient 3 step process, for performing UPSERT in hive on a large size table containing entire history.
Just for the audience not aware of UPSERT - It is a combination of UPDATE and INSERT. If on a table containing history data, we receive new data which needs to be inserted as well as some data which is an UPDATE to the existing data, then we have to perform an UPSERT operation to achieve this.

Prerequisite – The table containing history being very large in size should be partitioned, which is also a best practice for efficient data storage, when storing large volume of data in any Big Data warehouse.

Business scenario – Lets take an example of click stream data of a website, as gathered from different browsers of visitors who visited the website. The site_view_hist table contains the clicks and page impressions counts from different browsers and the table is partitioned on hit_date(the date on which the visitor hitted or visited the website).
Clicks – number of clicks(Eg on advertisements displayed) done by visitor on website page.
Impressions – number of times the website pages or different sections were viewed by the visitor.

Problem statement - If we receive correction in the number of clicks and impressions as recorded by browser, we need to update them in the history table and also insert any new records we received.
Lets dive into it:
In the history table we have browser_name and hit_date as a composite key which will remain constant and we receive updates in the values of clicks_count and impressions_count columns.
DDL of history table

Data:

Now suppose we receive updated records for date 2016-01-01(marked in blue) for firefox and chrome browsers, and we also received a new record(iexplorer) for 2016-01-31. Let us store these new and updated records in the following raw table:
DDL of Raw table




Data

Now we need an UPSERT solution, which updates the records of site_view_hist table for hit_date 2016-01-01 and insert the new record for 2016-01-31.
                                               SOLUTION (3 STEP):
To achieve this in an efficient way, we will use the following 3 step process:
Prep Step - We should first get those partitions from the history table which needs to be updated. So we create a temp table site_view_temp1 which contains the rows from history with hit_date equal to the hit_date of raw table.
This will bring us all the hit_date partitions from history table for which atleast one updated record exists in the raw table.
NOTE - Instead of table we can also create a view for efficient processing and saving storage space.


Data of site_view_temp1 table:

Step 1 – From these fetched partitions we will separate the old unchanged rows. These are the rows in which there is no change in the clicks and impressions count. For this we will create a temp table site_view_temp2 as follows:








Data of site_view_temp2 table:

Step2 – Now we will insert into this new temp table, all the rows from the raw table. This step will bring in the updated rows as well as any new rows. And since site_view_temp2 already contained the old rows, so it will now have all the rows including new, updated, and unchanged old rows. Following query does this: 



New Data of site_view_temp2 table

Step3 – Now simply insert overwriting the site_view_hist table with site_view_temp2 table, will provide us the required output rows including two updated rows for 2016-01-01 and one new inserted row for 2016-01-31.
CATCH – Since the history table is partitioned on the hit_date, the respective partitions will only be overwritten as follows:




Final history table  with updated and inserted rows:

Benefits of this approach:         
  1. In the prep step itself since we are fetching just the partitions we have to update, so we are not scanning the whole history table. This makes our processing faster.
  2. In the final step as we are insert overwriting the history with the temp table, we are touching just the partition we want to update along with a new partition created for the new record.This gives a high performance gain, as I gained for my production process on a 6.7 TB history table with over 5 billion records. But since my 3 step process(included in one hive script) just touched few partitions of few thousand rows, the process completed swiftly.

Comments

  1. Hi Akshay ,

    What if we receive null in "hit_date" first day and then we receive actual date . How to handle such scenario .

    ReplyDelete
    Replies
    1. Hi Chirag,
      Ideally the hit_date column should never be NULL, it being a part of composite key and also the partitioned column as per the design of the table. If we are still receiving null value for hit_date, the data should be first cleansed to delete this record as it represents corrupt data, and later with actual date it would be an insert record. With upcoming Hive version 3.0, I am expecting this scenario to be take care of, as I read that version 3.0 would allow us to apply NOT NULL constraints on columns like hit_date in definition itself.

      Delete
  2. Hi Akshay,

    I have a bit different use case, let's say I have a table T1 with attributes A1, A2, A3, PARTITION_KEY.
    My PARTITION_KEY is dynamically generated for a given day so there might be same row inserted for previous day but also re-published today and ideally this same data has different PARTITION_KEY but is same data that I need to update. Also, doing overwrite partition might wipe out other data sitting in the old partition.

    One solution that I can think of is, fetch PARTITION_KEY for data to be updated then fetch all data for those PARTITION_KEYs and continue your steps mentioned. Do you think this is feasible or can be optimized in a more better way ?

    ReplyDelete
    Replies
    1. Hi Prits. The core point is that the row which needs to be updated should have the same values for the composite key columns as the new incoming updated row and the partition key should be a part of composite key columns. If your partition key is also date, then your data flow scenario is different, because the composite key itself is changing as the same data is coming on a new date. If date is your partition key then I am curious to know from business side, why exactly is this happening. Why same duplicate data is coming on a different day? And if it is coming, are you not having any cleanup process to clean such duplicate rows?At the same time if date is not your partition column I would like to understand your table's partition strategy. The optimization and efficiency of processing depends entirely on how far back in time and how frequently we are getting updated records. A rule of thumb is the lesser the number of partitions we update(by overwriting), the faster the processing would be.

      Delete
  3. Excellent article. Very interesting to read. I really love to read such a nice article. Thanks! keep rocking. Big data Hadoop online Training India

    ReplyDelete
  4. I think Upsert functions such as Update and Delete are always the best when it comes to providing some information about database functions.

    SSIS Upsert

    ReplyDelete
  5. Hi James Zicrov. Can you please elaborate on your comment. Because this article I published in 2016 focuses on Hive(Hadoop's Data Warehouse solution), which though started supporting UPDATE and DELETE but with several restrictions for transactions support, ranging from file format to restriction of bucketing the data etc. I published separate article on how to perform UPDATE and DELETE in Hive. You can please find the same in the Blog archive on the top right side of this current page. But UPDATE and DELETE operations in Hive comes with several restrictions.
    This approach achieves UPSERT efficiently by utilizing the partitioned storage of data in HDFS(or any other file system) and also does this irrespective of the underlying file format of data and overcoming other restrictions as well.
    This article focuses on Big Data warehouse solution-Hive and not SSIS. Hence I would appreciate if you can please elaborate your point.

    ReplyDelete
  6. How did you load data in site_view_raw table?

    ReplyDelete
    Replies
    1. site_view_raw table is the table which contains the incoming data with updated and also newly inserted records. Please don't get confused by the name consisting of keyword 'raw'. In my use case I had an unpartitioned table with this source data of update and insert rows, from which I loaded site_view_raw table using insert overwrite command, also ensuring that dynamic partitioning is enabled, by setting dynamic partitioning mode to nonstrict. I hope this helps. And sorry for a bit delay in my response.

      Delete

Popular posts from this blog

Parquet tools utility

Hive - Merging small files into bigger files.

Parquet File format - Storage details

Skew Join Optimization in Hive

Apache Spark - Sort Merge Join

Apache Spark - Explicit Broadcasting Optimization

Spark Optimization - Bucketing

Fraud Detection - Using Graph Database

Apache Spark - Broadcast Hash Join