We explored hive acid transactions with partitions (a behind the scenes perspective) to see the various delta & base ORC files that get created when using Hive’s INSERT, UPDATE and DELETE statements. I thought it made sense to present a simple example of the MERGE statement as well; and, of course, to peek behind the glass a bit, but this time not go as far as viewing the content of ORC files (using the Jaava ORC tool jar). We will look at the delta files from a presence perspective to make sure things are working well.
For a test case, I thought I’d pull from the scenario I used when presenting at hadoop summit (archiving evolving databases in hive). That means I need a simple example table for our primary Hive table that queries would be run against and load it with some easy to work with seed data.
create database if not exists merge_example;
drop table if exists merge_example.bogus_info;
create table merge_example.bogus_info (
bogus_id int,
field_one string,
field_two string,
field_three string)
partitioned by (date_created string)
stored as ORC;
insert into merge_example.bogus_info (bogus_id, date_created, field_one, field_two, field_three)
values
(11,'2019-09-17','base','base','base'),
(12,'2019-09-17','base','base','base'),
(13,'2019-09-17','base','base','base'),
(14,'2019-09-18','base','base','base'),
(15,'2019-09-18','base','base','base'),
(16,'2019-09-18','base','base','base'),
(17,'2019-09-19','base','base','base'),
(18,'2019-09-19','base','base','base'),
(19,'2019-09-19','base','base','base');
As we want to look behind the glass, let’s run hive delta file compaction (minor and major) on these three new partitions.
alter table merge_example.bogus_info partition (date_created='2019-09-17') compact 'major';
alter table merge_example.bogus_info partition (date_created='2019-09-18') compact 'major';
alter table merge_example.bogus_info partition (date_created='2019-09-19') compact 'major';
show compactions;
+---------------+----------------+-------------+--------------------------+--------+------------+-----------+----------------+---------------+-------------------------+
| compactionid | dbname | tabname | partname | type | state | workerid | starttime | duration | hadoopjobid |
+---------------+----------------+-------------+--------------------------+--------+------------+-----------+----------------+---------------+-------------------------+
| CompactionId | Database | Table | Partition | Type | State | Worker | Start Time | Duration(ms) | HadoopJobId |
| 16 | merge_example | bogus_info | date_created=2019-09-17 | MAJOR | succeeded | --- | 1586814377000 | 299000 | job_1586789130086_0033 |
| 17 | merge_example | bogus_info | date_created=2019-09-18 | MAJOR | succeeded | --- | 1586814378000 | 318000 | job_1586789130086_0034 |
| 18 | merge_example | bogus_info | date_created=2019-09-19 | MAJOR | succeeded | --- | 1586814378000 | 333000 | job_1586789130086_0035 |
+---------------+----------------+-------------+--------------------------+--------+------------+-----------+----------------+---------------+-------------------------+
Wait until they have all succeeded as shown above. Now, we can take a peek at the underlying file system.
$ hdfs dfs -ls -R /warehouse/tablespace/managed/hive/merge_example.db/bogus_info
877 ...db/bogus_info/date_created=2019-09-17/base_0000001/bucket_00000
878 ...db/bogus_info/date_created=2019-09-18/base_0000001/bucket_00000
877 ...db/bogus_info/date_created=2019-09-19/base_0000001/bucket_00000
Note: I deleted the folders and the _metadata_acid & _orc_acid_version files to hone in on the actual ORC datafiles that are present. I also replaced /warehouse/tablespace/managed/hive/merge_example.db with just …db to help readability. Lastly, I removed the posix file permissions & ownership details along with the timestamps; again, to focus in on the datafiles we want to see.
In the HDFS file listing we see that we have a base file for each of the three partitions and they are all about the same size since the amount (and specifics) of the data is almost identical. Let’s make sure it looks good in with a simple query.
select * from merge_example.bogus_info;
+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+
| bogus_info.bogus_id | bogus_info.field_one | bogus_info.field_two | bogus_info.field_three | bogus_info.date_created |
+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+
| 11 | base | base | base | 2019-09-17 |
| 12 | base | base | base | 2019-09-17 |
| 13 | base | base | base | 2019-09-17 |
| 14 | base | base | base | 2019-09-18 |
| 15 | base | base | base | 2019-09-18 |
| 16 | base | base | base | 2019-09-18 |
| 17 | base | base | base | 2019-09-19 |
| 18 | base | base | base | 2019-09-19 |
| 19 | base | base | base | 2019-09-19 |
+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+
Now that we have our table we want to merge updates into we have to have a table to merge from. Let’s use this simple solution below that has the same table structure as before. The scenario would be that if a new or updated recorded was created since the last list of deltas, the current state of the record should be brought forward.
drop table if exists merge_example.deltas_recd;
create table merge_example.deltas_recd (
bogus_id int,
date_created string,
field_one string,
field_two string,
field_three string)
tblproperties ('transactional_properties'='insert_only');
insert into merge_example.deltas_recd (bogus_id, date_created, field_one, field_two, field_three)
values
(20,'2019-09-20','NEW','base','base'),
(21,'2019-09-20','NEW','base','base'),
(22,'2019-09-20','NEW','base','base'),
(12,'2019-09-17','EXISTS','CHANGED','base'),
(14,'2019-09-18','EXISTS','CHANGED','base'),
(16,'2019-09-18','EXISTS','CHANGED','base');
select * from merge_example.deltas_recd;
+-----------------------+---------------------------+------------------------+------------------------+--------------------------+
| deltas_recd.bogus_id | deltas_recd.date_created | deltas_recd.field_one | deltas_recd.field_two | deltas_recd.field_three |
+-----------------------+---------------------------+------------------------+------------------------+--------------------------+
| 20 | 2019-09-20 | NEW | base | base |
| 21 | 2019-09-20 | NEW | base | base |
| 22 | 2019-09-20 | NEW | base | base |
| 12 | 2019-09-17 | EXISTS | CHANGED | base |
| 14 | 2019-09-18 | EXISTS | CHANGED | base |
| 16 | 2019-09-18 | EXISTS | CHANGED | base |
+-----------------------+---------------------------+------------------------+------------------------+--------------------------+
The current batch of deltas from the source (yes, we did NOT focus on how you populated this) shows that there are three new records, 20-22, that are new for September 20th and that there are changes to three existing records across two existing partitions (the 19th has no changes).
At this point, we can build a MERGE statement that can simply line up the matching record based on their ID and creation date. When there is a match we will treat it as an UPDATE and when there is not we will handle as an INSERT. As this is the exact simple case from presenting at hadoop summit (archiving evolving databases in hive), we are not addressing DELETE operations.
MERGE INTO merge_example.bogus_info AS B
USING merge_example.deltas_recd AS D
ON B.bogus_id = D.bogus_id
AND B.date_created = D.date_created
WHEN MATCHED THEN
UPDATE SET field_one = D.field_one,
field_two = D.field_two,
field_three = D.field_three
WHEN NOT MATCHED THEN
INSERT VALUES (D.bogus_id, D.field_one,
D.field_two, D.field_three,
D.date_created);
select * from merge_example.bogus_info order by bogus_id;
+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+
| bogus_info.bogus_id | bogus_info.field_one | bogus_info.field_two | bogus_info.field_three | bogus_info.date_created |
+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+
| 11 | base | base | base | 2019-09-17 |
| 13 | base | base | base | 2019-09-17 |
| 12 | EXISTS | CHANGED | base | 2019-09-17 |
| 15 | base | base | base | 2019-09-18 |
| 14 | EXISTS | CHANGED | base | 2019-09-18 |
| 16 | EXISTS | CHANGED | base | 2019-09-18 |
| 17 | base | base | base | 2019-09-19 |
| 18 | base | base | base | 2019-09-19 |
| 19 | base | base | base | 2019-09-19 |
| 20 | NEW | base | base | 2019-09-20 |
| 21 | NEW | base | base | 2019-09-20 |
| 22 | NEW | base | base | 2019-09-20 |
Looking at the results above you can see:
- One record from the 17th was updated
- Two records from the 18th were updated
- No changes were made to the records from the 19th
- Three new records were added for the 20th
Inquiring minds might want to see what happened at the filesystem level — let’s check!
$ hdfs dfs -ls -R /warehouse/tablespace/managed/hive/merge_example.db/bogus_info
877 ...db/bogus_info/date_created=2019-09-17/base_0000001/bucket_00000
775 ...db/bogus_info/date_created=2019-09-17/delete_delta_0000005_0000005_0001/bucket_00000
928 ...db/bogus_info/date_created=2019-09-17/delta_0000005_0000005_0001/bucket_00000
878 ...db/bogus_info/date_created=2019-09-18/base_0000001/bucket_00000
788 ...db/bogus_info/date_created=2019-09-18/delete_delta_0000005_0000005_0001/bucket_00000
981 ...db/bogus_info/date_created=2019-09-18/delta_0000005_0000005_0001/bucket_00000
877 ...db/bogus_info/date_created=2019-09-19/base_0000001/bucket_00000
914 ...db/bogus_info/date_created=2019-09-20/delta_0000005_0000005_0000/bucket_00000
Here is what all of that says:
- The 17th partition has the original
basefile from the initial inserts plus it has adelete_deltafile and adeltafile for transaction #5 because Hive does not have in-place updates and simply deletes the affected record and inserts a replacement - The 18th partition looks the same for the same reasons
- The 19th still just has its
basefile from earlier since there were no changes - A new partition for the 20th was created to handle the three new records and they are present in its
deltafile
This clearly was a trivial example of the MERGE command, but I think it will jump-start your efforts at using this powerful feature of Hive. I also hope the peek under the hoods helps you understand better how this is working and how it lines well with what we learned in hive acid transactions with partitions (a behind the scense perspective) with the added benefit of bundling all of the changes created by the MERGE command into a single transaction which will surely aid in query performance when the amount of change is much more expansive than this simple exercise.
This is great, thanks for putting it together. I am also interested in how to handle replicating deletes…
Thanks, Jon. There is no one-size-fits-all MERGE statement and it relies heavily on the type of delta information you have. Having a status indicator such as new, changed or removed which be extremely helpful. I was trying to keep to the simple scenario I had done a while back long before we had MERGE and just wanted to tie them together well. Here’s an example of one way DELETEs could be addressed with MERGE; https://community.cloudera.com/t5/Community-Articles/Hive-ACID-Merge-by-Example/ta-p/245402