joining spark dataframes with identical column names (not just in the join condition)

UPDATE (2024-05-08): Check out joining spark dataframes with identical column names (an easier way), too.

Earlier today I was asked what happens when joining two Spark DataFrames that both have a column (not being used for the join) with the same name. Shame on me as I should have had a quick answer, not just a conjecture. What was my anticipated outcome? I figured that Spark SQL would have either done same swizzling of the duplicate names, or that it would throw a runtime exception complaining about it. Since I have never seen Spark SQL rename any columns before on its own, my money was on the RTE.

Since I didn’t know for sure, I stated that and said “let’s go see!” I decided to try out a couple of other scenarios, as well. Here’s what I found.

The Classic; Join on DataFrames with Identical Column Names

Let’s stand up some simple DFs for customers and orders that we can eventually join together.

case class Customer(custId:Int, name:String)

val customersData = Seq( 
        Customer(101, "Lester"), 
        Customer(102, "Gretchen"),
        Customer(103, "Zoe"),
        Customer(104, "Connor"))
        
val customerDF = customersData.toDF
customerDF.show()

+------+--------+
|custId|    name|
+------+--------+
|   101|  Lester|
|   102|Gretchen|
|   103|     Zoe|
|   104|  Connor|
+------+--------+


case class Order(orderId:Int, custId:Int, totalPrice:Float)

val ordersData = Seq(
        Order(8888, 101, 33.33f),
        Order(8889, 101, 66.66f),
        Order(8890, 102, 101.01f),
        Order(8891, 101, 99.99f))
        
val orderDF = ordersData.toDF
orderDF.show()

+-------+------+----------+
|orderId|custId|totalPrice|
+-------+------+----------+
|   8888|   101|     33.33|
|   8889|   101|     66.66|
|   8890|   102|    101.01|
|   8891|   101|     99.99|
+-------+------+----------+

Let’s join them.

val joinedDF = orderDF.join(customerDF, 
    orderDF("custId") === customerDF("custId"))
joinedDF.show()

+-------+------+----------+------+--------+
|orderId|custId|totalPrice|custId|    name|
+-------+------+----------+------+--------+
|   8891|   101|     99.99|   101|  Lester|
|   8889|   101|     66.66|   101|  Lester|
|   8888|   101|     33.33|   101|  Lester|
|   8890|   102|    101.01|   102|Gretchen|
+-------+------+----------+------+--------+

As you can see in the output above and the schema below, we end up having two columns named custId which hold the exact same values.

joinedDF.printSchema

root
 |-- orderId: integer (nullable = false)
 |-- custId: integer (nullable = false)
 |-- totalPrice: float (nullable = false)
 |-- custId: integer (nullable = false)
 |-- name: string (nullable = true)

This causes us trouble when trying to directly use this duplication of fields with the same names.

joinedDF.where("custId > 100").show

AnalysisException: Reference 'custId' is ambiguous, could be: custId, custId.; line 1 pos 0

A well known fix is documented here and is shown below of only including a single column named custId.

val joinedDF2 = orderDF.join(customerDF,
    Seq("custId"))
joinedDF2.show()

+------+-------+----------+--------+
|custId|orderId|totalPrice|    name|
+------+-------+----------+--------+
|   101|   8891|     99.99|  Lester|
|   101|   8889|     66.66|  Lester|
|   101|   8888|     33.33|  Lester|
|   102|   8890|    101.01|Gretchen|
+------+-------+----------+--------+

Perfect as with that silly “ambiguous” exception early, we could not have prevented both columns from being eliminated if we tried to get rid of one of them. Additionally, both would be renamed if we tried that instead.

joinedDF.drop("custID").show()

+-------+----------+--------+
|orderId|totalPrice|    name|
+-------+----------+--------+
|   8891|     99.99|  Lester|
|   8889|     66.66|  Lester|
|   8888|     33.33|  Lester|
|   8890|    101.01|Gretchen|
+-------+----------+--------+


joinedDF.withColumnRenamed("custID", "custID2").show()

+-------+-------+----------+-------+--------+
|orderId|custID2|totalPrice|custID2|    name|
+-------+-------+----------+-------+--------+
|   8891|    101|     99.99|    101|  Lester|
|   8889|    101|     66.66|    101|  Lester|
|   8888|    101|     33.33|    101|  Lester|
|   8890|    102|    101.01|    102|Gretchen|
+-------+-------+----------+-------+--------+

Good stuff.

What If the Duplicate Column Was Not Being Joined On?

This scenario is actually the one I was being asked about in the first place. Let’s just add a notes column to both of the DFs and jam them with some bogus data.

val customerWithNotesDF = customerDF.withColumn(
        "notes", lit("bogus cust note"))
customerWithNotesDF.show(1)

+------+------+---------------+
|custId|  name|          notes|
+------+------+---------------+
|   101|Lester|bogus cust note|
+------+------+---------------+


val orderWithNotesDF = orderDF.withColumn(
        "notes", lit("bogus order note"))
orderWithNotesDF.show(1)

+-------+------+----------+----------------+
|orderId|custId|totalPrice|           notes|
+-------+------+----------+----------------+
|   8888|   101|     33.33|bogus order note|
+-------+------+----------+----------------+

Now that we know we can multiple DF columns with the same name, we could imagine that notes will be there twice now and only the bogus data values themselves give us a hint as to which notes field came from which DF.

val joinedDF3 = orderWithNotesDF.join(
    customerWithNotesDF, Seq("custId"))
joinedDF3.drop("totalPrice", "name").show(2)

+------+-------+----------------+---------------+
|custId|orderId|           notes|          notes|
+------+-------+----------------+---------------+
|   101|   8888|bogus order note|bogus cust note|
|   101|   8889|bogus order note|bogus cust note|
+------+-------+----------------+---------------+

Like before, this is going to be “ambiguous” again…

joinedDF3.select("custId", "totalPrice", "notes").show()

AnalysisException: Reference 'notes' is ambiguous, could be: notes, notes.;

How do we fix it? Well, we just do a little name swizzling of our own. We could have done this before the join, but probably just easier to do on the fly.

val joinedDF4 = orderWithNotesDF.withColumnRenamed(
    "notes", "orderNotes").join(
        customerWithNotesDF.withColumnRenamed(
            "notes", "custNotes"),
            Seq("custId"))
joinedDF4.drop("totalPrice", "name").show(2)

+------+-------+----------------+---------------+
|custId|orderId|      orderNotes|      custNotes|
+------+-------+----------------+---------------+
|   101|   8888|bogus order note|bogus cust note|
|   101|   8889|bogus order note|bogus cust note|
+------+-------+----------------+---------------+

Easy peasey

A Twist on the Classic; Join on DataFrames with DIFFERENT Column Names

For this scenario, let’s assume there is some naming standard (sounds like they didn’t read my fruITion and recrEAtion (a double-header book review) post) declared that the primary key (yes, we don’t really have PKs here, but you know what I mean) of ever table that uses a surrogate value just be called id. We can just swizzle the original customer and order DFs to conform to the standard.

val customerWithPkOfIdDF = 
    customerDF.withColumnRenamed("custId", "id")
customerWithPkOfIdDF.show(2)

+---+--------+
| id|    name|
+---+--------+
|101|  Lester|
|102|Gretchen|
+---+--------+


val orderWithPkOfIdDF = 
    orderDF.withColumnRenamed("orderId","id")
orderWithPkOfIdDF.show(2)

+----+------+----------+
|  id|custId|totalPrice|
+----+------+----------+
|8888|   101|     33.33|
|8889|   101|     66.66|
+----+------+----------+

Yep, you can guess what this is going to look like I’m betting when we join them!!

val joinedDF5 = orderWithPkOfIdDF.join(
        customerWithPkOfIdDF,
        orderWithPkOfIdDF("custId") === 
        customerWithPkOfIdDF("id"))
joinedDF5.show()

+----+------+----------+---+--------+
|  id|custId|totalPrice| id|    name|
+----+------+----------+---+--------+
|8891|   101|     99.99|101|  Lester|
|8889|   101|     66.66|101|  Lester|
|8888|   101|     33.33|101|  Lester|
|8890|   102|    101.01|102|Gretchen|
+----+------+----------+---+--------+

That first id column is from the orders and the second id column is from customers (and is the same thing as the custId column). What a mess and absolutely something like the following is going to get you another “ambiguous” exception!!

joinedDF5.where("id > 100").show()

AnalysisException: Reference 'id' is ambiguous, could be: id, id.; line 1 pos 0

The fix? Well, we just merge the other two scenarios’ solutions into one! We just rename the customers id field to custID on the fly inside the join operation and then we can use the abbreviated condition of Seq("custId") to ensure we only have one column for the join key.

val joinedDF6 = orderWithPkOfIdDF.join(
        customerWithPkOfIdDF.withColumnRenamed(
            "id", "custId"), Seq("custId"))
joinedDF6.show(3)

+------+----+----------+------+
|custId|  id|totalPrice|  name|
+------+----+----------+------+
|   101|8891|     99.99|Lester|
|   101|8889|     66.66|Lester|
|   101|8888|     33.33|Lester|
+------+----+----------+------+

Wrap-Up

In some ways, all of that might have been harder than it should have been. What could likely have been much easier? Well, just submitting a SQL statement via the SparkSession.sql function, but where’s the fun in that!!

NOTE: I used Zeppelin for testing this all out and I exported the notebook should you want to leverage it.

securing hive entities (ranger and atlas to the rescue)

Check out my latest video showing how to leverage Apache Ranger and Apache Atlas when creating security rules such as the following use cases on Apache Hive entities.

  • Table Restrictions
  • Column Restrictions
  • Row-Level Filtering
  • Column-Level Masking
  • Tag-Based Policies

hive’s merge statement (it drops a lot of acid)

We explored hive acid transactions with partitions (a behind the scenes perspective) to see the various delta & base ORC files that get created when using Hive’s INSERT, UPDATE and DELETE statements. I thought it made sense to present a simple example of the MERGE statement as well; and, of course, to peek behind the glass a bit, but this time not go as far as viewing the content of ORC files (using the Jaava ORC tool jar). We will look at the delta files from a presence perspective to make sure things are working well.

For a test case, I thought I’d pull from the scenario I used when presenting at hadoop summit (archiving evolving databases in hive). That means I need a simple example table for our primary Hive table that queries would be run against and load it with some easy to work with seed data.

create database if not exists merge_example;
drop table if exists merge_example.bogus_info;

create table merge_example.bogus_info (
    bogus_id int,
    field_one string,
    field_two string,
    field_three string)
        partitioned by (date_created string)
        stored as ORC;

insert into merge_example.bogus_info (bogus_id, date_created, field_one, field_two, field_three)
    values
        (11,'2019-09-17','base','base','base'),
        (12,'2019-09-17','base','base','base'),
        (13,'2019-09-17','base','base','base'),
        (14,'2019-09-18','base','base','base'),
        (15,'2019-09-18','base','base','base'),
        (16,'2019-09-18','base','base','base'),
        (17,'2019-09-19','base','base','base'),
        (18,'2019-09-19','base','base','base'),
        (19,'2019-09-19','base','base','base');

As we want to look behind the glass, let’s run hive delta file compaction (minor and major) on these three new partitions.

alter table merge_example.bogus_info partition (date_created='2019-09-17') compact 'major';
alter table merge_example.bogus_info partition (date_created='2019-09-18') compact 'major';
alter table merge_example.bogus_info partition (date_created='2019-09-19') compact 'major';

show compactions;

+---------------+----------------+-------------+--------------------------+--------+------------+-----------+----------------+---------------+-------------------------+
| compactionid  |     dbname     |   tabname   |         partname         |  type  |   state    | workerid  |   starttime    |   duration    |       hadoopjobid       |
+---------------+----------------+-------------+--------------------------+--------+------------+-----------+----------------+---------------+-------------------------+
| CompactionId  | Database       | Table       | Partition                | Type   | State      | Worker    | Start Time     | Duration(ms)  | HadoopJobId             |
| 16            | merge_example  | bogus_info  | date_created=2019-09-17  | MAJOR  | succeeded  |  ---      | 1586814377000  | 299000        | job_1586789130086_0033  |
| 17            | merge_example  | bogus_info  | date_created=2019-09-18  | MAJOR  | succeeded  |  ---      | 1586814378000  | 318000        | job_1586789130086_0034  |
| 18            | merge_example  | bogus_info  | date_created=2019-09-19  | MAJOR  | succeeded  |  ---      | 1586814378000  | 333000        | job_1586789130086_0035  |
+---------------+----------------+-------------+--------------------------+--------+------------+-----------+----------------+---------------+-------------------------+

Wait until they have all succeeded as shown above. Now, we can take a peek at the underlying file system.

$ hdfs dfs -ls -R /warehouse/tablespace/managed/hive/merge_example.db/bogus_info
       877 ...db/bogus_info/date_created=2019-09-17/base_0000001/bucket_00000
       878 ...db/bogus_info/date_created=2019-09-18/base_0000001/bucket_00000
       877 ...db/bogus_info/date_created=2019-09-19/base_0000001/bucket_00000

Note: I deleted the folders and the _metadata_acid & _orc_acid_version files to hone in on the actual ORC datafiles that are present. I also replaced /warehouse/tablespace/managed/hive/merge_example.db with just …db to help readability. Lastly, I removed the posix file permissions & ownership details along with the timestamps; again, to focus in on the datafiles we want to see.

In the HDFS file listing we see that we have a base file for each of the three partitions and they are all about the same size since the amount (and specifics) of the data is almost identical. Let’s make sure it looks good in with a simple query.

select * from merge_example.bogus_info;

+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+
| bogus_info.bogus_id  | bogus_info.field_one  | bogus_info.field_two  | bogus_info.field_three  | bogus_info.date_created  |
+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+
| 11                   | base                  | base                  | base                    | 2019-09-17               |
| 12                   | base                  | base                  | base                    | 2019-09-17               |
| 13                   | base                  | base                  | base                    | 2019-09-17               |
| 14                   | base                  | base                  | base                    | 2019-09-18               |
| 15                   | base                  | base                  | base                    | 2019-09-18               |
| 16                   | base                  | base                  | base                    | 2019-09-18               |
| 17                   | base                  | base                  | base                    | 2019-09-19               |
| 18                   | base                  | base                  | base                    | 2019-09-19               |
| 19                   | base                  | base                  | base                    | 2019-09-19               |
+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+

Now that we have our table we want to merge updates into we have to have a table to merge from. Let’s use this simple solution below that has the same table structure as before. The scenario would be that if a new or updated recorded was created since the last list of deltas, the current state of the record should be brought forward.

drop table if exists merge_example.deltas_recd;

create table merge_example.deltas_recd (
    bogus_id int,
    date_created string,
    field_one string,
    field_two string,
    field_three string)
        tblproperties ('transactional_properties'='insert_only');

insert into merge_example.deltas_recd (bogus_id, date_created, field_one, field_two, field_three)
    values
        (20,'2019-09-20','NEW','base','base'),
        (21,'2019-09-20','NEW','base','base'),
        (22,'2019-09-20','NEW','base','base'),
        (12,'2019-09-17','EXISTS','CHANGED','base'),
        (14,'2019-09-18','EXISTS','CHANGED','base'),
        (16,'2019-09-18','EXISTS','CHANGED','base');
        

select * from merge_example.deltas_recd;

+-----------------------+---------------------------+------------------------+------------------------+--------------------------+
| deltas_recd.bogus_id  | deltas_recd.date_created  | deltas_recd.field_one  | deltas_recd.field_two  | deltas_recd.field_three  |
+-----------------------+---------------------------+------------------------+------------------------+--------------------------+
| 20                    | 2019-09-20                | NEW                    | base                   | base                     |
| 21                    | 2019-09-20                | NEW                    | base                   | base                     |
| 22                    | 2019-09-20                | NEW                    | base                   | base                     |
| 12                    | 2019-09-17                | EXISTS                 | CHANGED                | base                     |
| 14                    | 2019-09-18                | EXISTS                 | CHANGED                | base                     |
| 16                    | 2019-09-18                | EXISTS                 | CHANGED                | base                     |
+-----------------------+---------------------------+------------------------+------------------------+--------------------------+

The current batch of deltas from the source (yes, we did NOT focus on how you populated this) shows that there are three new records, 20-22, that are new for September 20th and that there are changes to three existing records across two existing partitions (the 19th has no changes).

At this point, we can build a MERGE statement that can simply line up the matching record based on their ID and creation date. When there is a match we will treat it as an UPDATE and when there is not we will handle as an INSERT. As this is the exact simple case from presenting at hadoop summit (archiving evolving databases in hive), we are not addressing DELETE operations.

MERGE INTO merge_example.bogus_info  AS B 
     USING merge_example.deltas_recd AS D
     
  ON B.bogus_id     = D.bogus_id 
 AND B.date_created = D.date_created
 
WHEN MATCHED THEN 
    UPDATE SET field_one   = D.field_one, 
               field_two   = D.field_two, 
               field_three = D.field_three
               
WHEN NOT MATCHED THEN 
    INSERT VALUES (D.bogus_id,  D.field_one, 
                   D.field_two, D.field_three, 
                   D.date_created);
                   
select * from merge_example.bogus_info order by bogus_id;

+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+
| bogus_info.bogus_id  | bogus_info.field_one  | bogus_info.field_two  | bogus_info.field_three  | bogus_info.date_created  |
+----------------------+-----------------------+-----------------------+-------------------------+--------------------------+
| 11                   | base                  | base                  | base                    | 2019-09-17               |
| 13                   | base                  | base                  | base                    | 2019-09-17               |
| 12                   | EXISTS                | CHANGED               | base                    | 2019-09-17               |
| 15                   | base                  | base                  | base                    | 2019-09-18               |
| 14                   | EXISTS                | CHANGED               | base                    | 2019-09-18               |
| 16                   | EXISTS                | CHANGED               | base                    | 2019-09-18               |
| 17                   | base                  | base                  | base                    | 2019-09-19               |
| 18                   | base                  | base                  | base                    | 2019-09-19               |
| 19                   | base                  | base                  | base                    | 2019-09-19               |
| 20                   | NEW                   | base                  | base                    | 2019-09-20               |
| 21                   | NEW                   | base                  | base                    | 2019-09-20               |
| 22                   | NEW                   | base                  | base                    | 2019-09-20               |

Looking at the results above you can see:

  • One record from the 17th was updated
  • Two records from the 18th were updated
  • No changes were made to the records from the 19th
  • Three new records were added for the 20th

Inquiring minds might want to see what happened at the filesystem level — let’s check!

$ hdfs dfs -ls -R /warehouse/tablespace/managed/hive/merge_example.db/bogus_info
       877 ...db/bogus_info/date_created=2019-09-17/base_0000001/bucket_00000
       775 ...db/bogus_info/date_created=2019-09-17/delete_delta_0000005_0000005_0001/bucket_00000
       928 ...db/bogus_info/date_created=2019-09-17/delta_0000005_0000005_0001/bucket_00000
       878 ...db/bogus_info/date_created=2019-09-18/base_0000001/bucket_00000
       788 ...db/bogus_info/date_created=2019-09-18/delete_delta_0000005_0000005_0001/bucket_00000
       981 ...db/bogus_info/date_created=2019-09-18/delta_0000005_0000005_0001/bucket_00000
       877 ...db/bogus_info/date_created=2019-09-19/base_0000001/bucket_00000
       914 ...db/bogus_info/date_created=2019-09-20/delta_0000005_0000005_0000/bucket_00000

Here is what all of that says:

  • The 17th partition has the original base file from the initial inserts plus it has a delete_delta file and a delta file for transaction #5 because Hive does not have in-place updates and simply deletes the affected record and inserts a replacement
  • The 18th partition looks the same for the same reasons
  • The 19th still just has its base file from earlier since there were no changes
  • A new partition for the 20th was created to handle the three new records and they are present in its delta file

This clearly was a trivial example of the MERGE command, but I think it will jump-start your efforts at using this powerful feature of Hive. I also hope the peek under the hoods helps you understand better how this is working and how it lines well with what we learned in hive acid transactions with partitions (a behind the scense perspective) with the added benefit of bundling all of the changes created by the MERGE command into a single transaction which will surely aid in query performance when the amount of change is much more expansive than this simple exercise.

moving my tech blog (already missing confluence)

My blogging has sure slowed down lately, but I have challenged myself to get going in earnest again. I have been using Confluence OnDemand personally for years and have been TRYING over the years to allow easy (and consistent) anonymous access to a couple of blogs, including my technical blog that I’m moving here to WordPress.com.

Don’t get me wrong… I absolutely LOVE using Confluence and as called out in enterprise 2.0 book review (using web 20 technologies within organizations) I see it as an incredible enabling tool for Enterprise 2.0 level collaboration. If I was not opening up a JIRA every few months with them to keep fixing things that have worked in the past, then I surely would not be taking this big step. In all fairness, I am using Confluence for more than they are imagining the primary use cases are. For a team, big or small, that needs a highly collaborative wiki for document management I cannot recommend anything other than Confluence.

Meanwhile, I’m going to soldier on and make the move. I’m looking at the Scroll WP Publisher plug-in for Confluence to help with Confluence to WordPress conversions. I had some initial luck with simple text-heavy posts such as give as few orders as possible (encourage autonomy and responsibility) and are you a mort, elvis or einstein (or are these labels nonsense)?, but I started encountering problems with ones like visiting the computer history museum (yes, i’m a geek) which is just pictures and text.

I had to perform quite a bit of editing to get my enterprise 2.0 book review (using web 2.0 technologies within organizations) going because of integrations with YouTube and SlideShare. Even more effort was poured into moving how do i load a fixed-width formatted file into hive? (with a little help from pig); so much that I have opted to just create a PDF of any post that doesn’t migrate with only minimal intervention as well as a link back to the original Confluence posting.

I’m likely going to “launch” this blog site BEFORE finishing all the ports which is yet another task on my personal backlog. As for WordPress, I am definitely a newb and I am VERY open to hearing your suggestions and shared links to help me become proficient and efficient in this well-loved blogging platform.

hive delta file compaction (minor and major)

This is a quick blog post to show how minor and major compaction for Hive transactional tables occurs. Let’s use the situation that the hive acid transactions with partitions (a behind the scenes perspective) post leaves us in. Here it is!

$ hdfs dfs -ls -R /wa/t/m/h/try_it
drwxrwx---+  - hive hadoop          0 2019-12-12 09:57 /wa/t/m/h/try_it/prt=p1
drwxrwx---+  - hive hadoop          0 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p1/delete_delta_0000003_0000003_0000
-rw-rw----+  3 hive hadoop          1 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p1/delete_delta_0000003_0000003_0000/_orc_acid_version
-rw-rw----+  3 hive hadoop        733 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p1/delete_delta_0000003_0000003_0000/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 09:57 /wa/t/m/h/try_it/prt=p1/delete_delta_0000005_0000005_0000
-rw-rw----+  3 hive hadoop          1 2019-12-12 09:57 /wa/t/m/h/try_it/prt=p1/delete_delta_0000005_0000005_0000/_orc_acid_version
-rw-rw----+  3 hive hadoop        733 2019-12-12 09:57 /wa/t/m/h/try_it/prt=p1/delete_delta_0000005_0000005_0000/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 07:43 /wa/t/m/h/try_it/prt=p1/delta_0000001_0000001_0000
-rw-rw----+  3 hive hadoop          1 2019-12-12 07:43 /wa/t/m/h/try_it/prt=p1/delta_0000001_0000001_0000/_orc_acid_version
-rw-rw----+  3 hive hadoop        788 2019-12-12 07:43 /wa/t/m/h/try_it/prt=p1/delta_0000001_0000001_0000/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p1/delta_0000003_0000003_0000
-rw-rw----+  3 hive hadoop          1 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p1/delta_0000003_0000003_0000/_orc_acid_version
-rw-rw----+  3 hive hadoop        816 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p1/delta_0000003_0000003_0000/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 09:45 /wa/t/m/h/try_it/prt=p2
drwxrwx---+  - hive hadoop          0 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p2/delete_delta_0000003_0000003_0000
-rw-rw----+  3 hive hadoop          1 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p2/delete_delta_0000003_0000003_0000/_orc_acid_version
-rw-rw----+  3 hive hadoop        727 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p2/delete_delta_0000003_0000003_0000/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 09:45 /wa/t/m/h/try_it/prt=p2/delete_delta_0000004_0000004_0000
-rw-rw----+  3 hive hadoop          1 2019-12-12 09:44 /wa/t/m/h/try_it/prt=p2/delete_delta_0000004_0000004_0000/_orc_acid_version
-rw-rw----+  3 hive hadoop        733 2019-12-12 09:44 /wa/t/m/h/try_it/prt=p2/delete_delta_0000004_0000004_0000/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 08:15 /wa/t/m/h/try_it/prt=p2/delta_0000002_0000002_0000
-rw-rw----+  3 hive hadoop          1 2019-12-12 08:15 /wa/t/m/h/try_it/prt=p2/delta_0000002_0000002_0000/_orc_acid_version
-rw-rw----+  3 hive hadoop        788 2019-12-12 08:15 /wa/t/m/h/try_it/prt=p2/delta_0000002_0000002_0000/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p2/delta_0000003_0000003_0000
-rw-rw----+  3 hive hadoop          1 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p2/delta_0000003_0000003_0000/_orc_acid_version
-rw-rw----+  3 hive hadoop        816 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p2/delta_0000003_0000003_0000/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 09:45 /wa/t/m/h/try_it/prt=p2/delta_0000004_0000004_0000
-rw-rw----+  3 hive hadoop          1 2019-12-12 09:44 /wa/t/m/h/try_it/prt=p2/delta_0000004_0000004_0000/_orc_acid_version
-rw-rw----+  3 hive hadoop        815 2019-12-12 09:44 /wa/t/m/h/try_it/prt=p2/delta_0000004_0000004_0000/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 10:06 /wa/t/m/h/try_it/prt=p3
drwxrwx---+  - hive hadoop          0 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p3/delete_delta_0000003_0000003_0000
-rw-rw----+  3 hive hadoop          1 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p3/delete_delta_0000003_0000003_0000/_orc_acid_version
-rw-rw----+  3 hive hadoop        727 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p3/delete_delta_0000003_0000003_0000/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 08:15 /wa/t/m/h/try_it/prt=p3/delta_0000002_0000002_0000
-rw-rw----+  3 hive hadoop          1 2019-12-12 08:15 /wa/t/m/h/try_it/prt=p3/delta_0000002_0000002_0000/_orc_acid_version
-rw-rw----+  3 hive hadoop        796 2019-12-12 08:15 /wa/t/m/h/try_it/prt=p3/delta_0000002_0000002_0000/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p3/delta_0000003_0000003_0000
-rw-rw----+  3 hive hadoop          1 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p3/delta_0000003_0000003_0000/_orc_acid_version
-rw-rw----+  3 hive hadoop        807 2019-12-12 09:34 /wa/t/m/h/try_it/prt=p3/delta_0000003_0000003_0000/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 10:06 /wa/t/m/h/try_it/prt=p3/delta_0000006_0000006_0000
-rw-rw----+  3 hive hadoop          1 2019-12-12 10:06 /wa/t/m/h/try_it/prt=p3/delta_0000006_0000006_0000/_orc_acid_version
-rw-rw----+  3 hive hadoop        813 2019-12-12 10:06 /wa/t/m/h/try_it/prt=p3/delta_0000006_0000006_0000/bucket_00000

As that blog post, and the directory listing above, shows, there where a total of six ACID transactions that have occurred across three partitions to get to this point. The table looks like the following.

select * from try_it;
+------------+---------------+---------------+-------------+
| try_it.id  | try_it.a_val  | try_it.b_val  | try_it.prt  |
+------------+---------------+---------------+-------------+
| 2          | noise         | bogus3        | p2          |
| 3          | noise         | bogus2        | p3          |
| 1          | noise         | bogus2        | p3          |
+------------+---------------+---------------+-------------+
3 rows selected (0.307 seconds)

As https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-Compactor shows, we need to make sure hive.compactor.initiator.on is set to true for the compactor to be run.

Minor Compaction

This type of compaction is scheduled after the number of delta directories passes the value set in the hive.compactor.delta.num.threshold property, but you can also trigger it to run on-demand.

ALTER TABLE try_it COMPACT 'minor';

ERROR : FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. 
You must specify a partition to compact for partitioned tables

This error helps us by making the point that we must run compaction on a specific partition unless the table is not partitioned. Let’s try it again and be sure to wait until it completes!

ALTER TABLE try_it partition (prt='p1') COMPACT 'minor';

show compactions;
+---------------+-----------+----------+------------+--------+------------+-----------+----------------+---------------+-------------------------+
| compactionid  |  dbname   | tabname  |  partname  |  type  |   state    | workerid  |   starttime    |   duration    |       hadoopjobid       |
+---------------+-----------+----------+------------+--------+------------+-----------+----------------+---------------+-------------------------+
| CompactionId  | Database  | Table    | Partition  | Type   | State      | Worker    | Start Time     | Duration(ms)  | HadoopJobId             |
| 1             | default   | try_it   | prt=p1     | MINOR  | succeeded  |  ---      | 1576145642000  | 179000        | job_1575915931720_0012  |
+---------------+-----------+----------+------------+--------+------------+-----------+----------------+---------------+-------------------------+
2 rows selected (0.031 seconds)

Let’s look at the file system again for the p1 partition.

$ hdfs dfs -ls -R /wa/t/m/h/try_it/prt=p1
drwxrwx---+  - hive hadoop          0 2019-12-12 10:16 /wa/t/m/h/try_it/prt=p1/delete_delta_0000001_0000005
-rw-rw----+  3 hive hadoop          1 2019-12-12 10:16 /wa/t/m/h/try_it/prt=p1/delete_delta_0000001_0000005/_orc_acid_version
-rw-rw----+  3 hive hadoop        654 2019-12-12 10:16 /wa/t/m/h/try_it/prt=p1/delete_delta_0000001_0000005/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 10:16 /wa/t/m/h/try_it/prt=p1/delta_0000001_0000005
-rw-rw----+  3 hive hadoop          1 2019-12-12 10:16 /wa/t/m/h/try_it/prt=p1/delta_0000001_0000005/_orc_acid_version
-rw-rw----+  3 hive hadoop        670 2019-12-12 10:16 /wa/t/m/h/try_it/prt=p1/delta_0000001_0000005/bucket_00000

Since there were changes in this partition from transaction #s 1, 3, and 5, we now see rolled together versions of the delta directories spanning these transaction #s. Let’s verify that the contents of the files have the rolled up details in a single file for the delta and delete_delta transactions.

$ hdfs dfs -get /wa/t/m/h/try_it/prt=p1/delete_delta_0000001_0000005/bucket_00000 p1Minor-delete_delta
$ java -jar orc-tools-1.5.1-uber.jar data p1Minor-delete_delta 
Processing data file p1Minor-delete_delta [length: 654]
{"operation":2,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":3,
"row":null}
{"operation":2,"originalTransaction":3,"bucket":536870912,"rowId":0,"currentTransaction":5,
"row":null}

$ hdfs dfs -get /wa/t/m/h/try_it/prt=p1/delta_0000001_0000005/bucket_00000 p1Minor-delta
$ java -jar orc-tools-1.5.1-uber.jar data p1Minor-delta 
Processing data file p1Minor-delta [length: 670]
{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,
"row":{"id":1,"a_val":"noise","b_val":"bogus"}}
{"operation":0,"originalTransaction":3,"bucket":536870912,"rowId":0,"currentTransaction":3,
"row":{"id":1,"a_val":"noise","b_val":"bogus2"}}

Even though the minor compacted delete_delta file only shows transaction IDs 3 and 5 (see the currentTransaction attributes) and the delta only includes 1 and 3, both of these compacted files show the comprehensive width of 0000001_0000005.

Now, initiate minor compacting for the the other two partitions.

ALTER TABLE try_it partition (prt='p2') COMPACT 'minor';
ALTER TABLE try_it partition (prt='p3') COMPACT 'minor';

show compactions
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20191212201440_3a58f8c2-ebc2-4ebe-96e3-a774bf8bec65); Time taken: 0.002 seconds
INFO  : OK
+---------------+-----------+----------+------------+--------+------------+-------------------------------------------------+----------------+---------------+-------------------------+
| compactionid  |  dbname   | tabname  |  partname  |  type  |   state    |                    workerid                     |   starttime    |   duration    |       hadoopjobid       |
+---------------+-----------+----------+------------+--------+------------+-------------------------------------------------+----------------+---------------+-------------------------+
| CompactionId  | Database  | Table    | Partition  | Type   | State      | Worker                                          | Start Time     | Duration(ms)  | HadoopJobId             |
| 2             | default   | try_it   | prt=p2     | MINOR  | working    | ip-172-30-10-206.us-west-2.compute.internal-37  | 1576181672000  |  ---          | job_1575915931720_0013  |
| 3             | default   | try_it   | prt=p3     | MINOR  | working    | ip-172-30-10-206.us-west-2.compute.internal-36  | 1576181677000  |  ---          | job_1575915931720_0014  |
| 1             | default   | try_it   | prt=p1     | MINOR  | succeeded  |  ---                                            | 1576145642000  | 179000        | job_1575915931720_0012  |
+---------------+-----------+----------+------------+--------+------------+-------------------------------------------------+----------------+---------------+-------------------------+
4 rows selected (0.03 seconds)

Once those two compacting process finish, the table’s underlying filesystem footprint should look like the following.

$ hdfs dfs -ls -R /wa/t/m/h/try_it
drwxrwx---+  - hive hadoop          0 2019-12-12 10:17 /wa/t/m/h/try_it/prt=p1
drwxrwx---+  - hive hadoop          0 2019-12-12 10:16 /wa/t/m/h/try_it/prt=p1/delete_delta_0000001_0000005
-rw-rw----+  3 hive hadoop          1 2019-12-12 10:16 /wa/t/m/h/try_it/prt=p1/delete_delta_0000001_0000005/_orc_acid_version
-rw-rw----+  3 hive hadoop        654 2019-12-12 10:16 /wa/t/m/h/try_it/prt=p1/delete_delta_0000001_0000005/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 10:16 /wa/t/m/h/try_it/prt=p1/delta_0000001_0000005
-rw-rw----+  3 hive hadoop          1 2019-12-12 10:16 /wa/t/m/h/try_it/prt=p1/delta_0000001_0000005/_orc_acid_version
-rw-rw----+  3 hive hadoop        670 2019-12-12 10:16 /wa/t/m/h/try_it/prt=p1/delta_0000001_0000005/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 20:14 /wa/t/m/h/try_it/prt=p2
drwxrwx---+  - hive hadoop          0 2019-12-12 20:14 /wa/t/m/h/try_it/prt=p2/delete_delta_0000002_0000004
-rw-rw----+  3 hive hadoop          1 2019-12-12 20:14 /wa/t/m/h/try_it/prt=p2/delete_delta_0000002_0000004/_orc_acid_version
-rw-rw----+  3 hive hadoop        654 2019-12-12 20:14 /wa/t/m/h/try_it/prt=p2/delete_delta_0000002_0000004/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 20:14 /wa/t/m/h/try_it/prt=p2/delta_0000002_0000004
-rw-rw----+  3 hive hadoop          1 2019-12-12 20:14 /wa/t/m/h/try_it/prt=p2/delta_0000002_0000004/_orc_acid_version
-rw-rw----+  3 hive hadoop        670 2019-12-12 20:14 /wa/t/m/h/try_it/prt=p2/delta_0000002_0000004/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 20:15 /wa/t/m/h/try_it/prt=p3
drwxrwx---+  - hive hadoop          0 2019-12-12 20:15 /wa/t/m/h/try_it/prt=p3/delete_delta_0000002_0000006
-rw-rw----+  3 hive hadoop          1 2019-12-12 20:15 /wa/t/m/h/try_it/prt=p3/delete_delta_0000002_0000006/_orc_acid_version
-rw-rw----+  3 hive hadoop        650 2019-12-12 20:15 /wa/t/m/h/try_it/prt=p3/delete_delta_0000002_0000006/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 20:15 /wa/t/m/h/try_it/prt=p3/delta_0000002_0000006
-rw-rw----+  3 hive hadoop          1 2019-12-12 20:15 /wa/t/m/h/try_it/prt=p3/delta_0000002_0000006/_orc_acid_version
-rw-rw----+  3 hive hadoop        678 2019-12-12 20:15 /wa/t/m/h/try_it/prt=p3/delta_0000002_0000006/bucket_00000

Major Compaction

This type of compaction is scheduled based on the value set in the hive.compactor.delta.pct.threshold property whose formal definition expresses it best.

Percentage (fractional) size of the delta files relative to the base that will trigger a major compaction. (1.0 = 100%, so the default 0.1 = 10%.)

And, of course, you can also trigger it to run on-demand as shown below; again, let these processes finish before checking again.

ALTER TABLE try_it partition (prt='p1') COMPACT 'major';
ALTER TABLE try_it partition (prt='p2') COMPACT 'major';
ALTER TABLE try_it partition (prt='p3') COMPACT 'major';

+---------------+-----------+----------+------------+--------+------------+-----------+----------------+---------------+-------------------------+
| compactionid  |  dbname   | tabname  |  partname  |  type  |   state    | workerid  |   starttime    |   duration    |       hadoopjobid       |
+---------------+-----------+----------+------------+--------+------------+-----------+----------------+---------------+-------------------------+
| CompactionId  | Database  | Table    | Partition  | Type   | State      | Worker    | Start Time     | Duration(ms)  | HadoopJobId             |
| 1             | default   | try_it   | prt=p1     | MINOR  | succeeded  |  ---      | 1576145642000  | 179000        | job_1575915931720_0012  |
| 2             | default   | try_it   | prt=p2     | MINOR  | succeeded  |  ---      | 1576181672000  | 22000         | job_1575915931720_0013  |
| 3             | default   | try_it   | prt=p3     | MINOR  | succeeded  |  ---      | 1576181677000  | 37000         | job_1575915931720_0014  |
| 4             | default   | try_it   | prt=p1     | MAJOR  | succeeded  |  ---      | 1576183603000  | 31000         | job_1575915931720_0015  |
| 5             | default   | try_it   | prt=p2     | MAJOR  | succeeded  |  ---      | 1576187575000  | 50000         | job_1575915931720_0017  |
| 6             | default   | try_it   | prt=p3     | MAJOR  | succeeded  |  ---      | 1576187583000  | 57000         | job_1575915931720_0018  |
+---------------+-----------+----------+------------+--------+------------+-----------+----------------+---------------+-------------------------+
7 rows selected (0.029 seconds)

This will build a single “base” file for each partition.

$ hdfs dfs -ls -R /wa/t/m/h/try_it
drwxrwx---+  - hive hadoop          0 2019-12-12 20:47 /wa/t/m/h/try_it/prt=p1
drwxrwx---+  - hive hadoop          0 2019-12-12 20:46 /wa/t/m/h/try_it/prt=p1/base_0000005
-rw-rw----+  3 hive hadoop         48 2019-12-12 20:46 /wa/t/m/h/try_it/prt=p1/base_0000005/_metadata_acid
-rw-rw----+  3 hive hadoop          1 2019-12-12 20:46 /wa/t/m/h/try_it/prt=p1/base_0000005/_orc_acid_version
-rw-rw----+  3 hive hadoop        228 2019-12-12 20:46 /wa/t/m/h/try_it/prt=p1/base_0000005/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 21:53 /wa/t/m/h/try_it/prt=p2
drwxrwx---+  - hive hadoop          0 2019-12-12 21:53 /wa/t/m/h/try_it/prt=p2/base_0000004
-rw-rw----+  3 hive hadoop         48 2019-12-12 21:53 /wa/t/m/h/try_it/prt=p2/base_0000004/_metadata_acid
-rw-rw----+  3 hive hadoop          1 2019-12-12 21:53 /wa/t/m/h/try_it/prt=p2/base_0000004/_orc_acid_version
-rw-rw----+  3 hive hadoop        815 2019-12-12 21:53 /wa/t/m/h/try_it/prt=p2/base_0000004/bucket_00000
drwxrwx---+  - hive hadoop          0 2019-12-12 21:54 /wa/t/m/h/try_it/prt=p3
drwxrwx---+  - hive hadoop          0 2019-12-12 21:53 /wa/t/m/h/try_it/prt=p3/base_0000006
-rw-rw----+  3 hive hadoop         48 2019-12-12 21:53 /wa/t/m/h/try_it/prt=p3/base_0000006/_metadata_acid
-rw-rw----+  3 hive hadoop          1 2019-12-12 21:53 /wa/t/m/h/try_it/prt=p3/base_0000006/_orc_acid_version
-rw-rw----+  3 hive hadoop        835 2019-12-12 21:53 /wa/t/m/h/try_it/prt=p3/base_0000006/bucket_00000

Looking at p1’s data we make sure there is no data for this partition.

$ hdfs dfs -get /wa/t/m/h/try_it/prt=p1/base_0000005/bucket_00000 p1Base
$ java -jar orc-tools-1.5.1-uber.jar data p1Base 
Processing data file p1Base [length: 228]

We can see that p2 has a single row.

$ hdfs dfs -get /wa/t/m/h/try_it/prt=p2/base_0000004/bucket_00000 p2Base
$ java -jar orc-tools-1.5.1-uber.jar data p2Base 
Processing data file p2Base [length: 815]
{"operation":0,"originalTransaction":4,"bucket":536870912,"rowId":0,"currentTransaction":4,
"row":{"id":2,"a_val":"noise","b_val":"bogus3"}}

And lastly, we verify that p3’s data contains two rows.

$ hdfs dfs -get /wa/t/m/h/try_it/prt=p3/base_0000006/bucket_00000 p3Base
$ java -jar orc-tools-1.5.1-uber.jar data p3Base 
Processing data file p3Base [length: 835]
{"operation":0,"originalTransaction":3,"bucket":536870912,"rowId":0,"currentTransaction":3,
"row":{"id":3,"a_val":"noise","b_val":"bogus2"}}
{"operation":0,"originalTransaction":6,"bucket":536870912,"rowId":0,"currentTransaction":6,
"row":{"id":1,"a_val":"noise","b_val":"bogus2"}}

Again, the contents of the now fully compacted into base files table looks like the following.

select * from try_it;
+------------+---------------+---------------+-------------+
| try_it.id  | try_it.a_val  | try_it.b_val  | try_it.prt  |
+------------+---------------+---------------+-------------+
| 2          | noise         | bogus3        | p2          |
| 3          | noise         | bogus2        | p3          |
| 1          | noise         | bogus2        | p3          |
+------------+---------------+---------------+-------------+
3 rows selected (0.307 seconds)

hive acid transactions with partitions (a behind the scenes perspective)

Ever since Hive Transactions have surfaced, and especially since Apache Hive 3 was released, I’ve been meaning to capture a behind-the-scenes look at the underlying delta ORC files that are created; and yes, compacted. If you are new to Hive’s ACID transactions, then the first link in this post as well as the Understanding Hive ACID Transaction Table blog posting are great places to start.

Bonus points to those who remember what ACID stands for – add a comment at the bottom of this posting if you know! If you don’t it might be time to review RDBMS fundamentals.  😉

Transactional Table DDL

Let’s create a transactional table with some Data Definition Language to test our use cases out on.

CREATE TABLE try_it (id int, a_val string, b_val string)
 PARTITIONED BY (prt string)  STORED AS ORC;
 
desc try_it;
+--------------------------+------------+----------+
|         col_name         | data_type  | comment  |
+--------------------------+------------+----------+
| id                       | int        |          |
| a_val                    | string     |          |
| b_val                    | string     |          |
| prt                      | string     |          |
|                          | NULL       | NULL     |
| # Partition Information  | NULL       | NULL     |
| # col_name               | data_type  | comment  |
| prt                      | string     |          |
+--------------------------+------------+----------+

Check to make sure the HDFS file structure was created.

hdfs dfs -ls /warehouse/tablespace/managed/hive/
drwxrwx---+  - hive hadoop          0 2019-12-12 07:38 /wa/t/m/h/try_it

In the remainder of this blog post the /warehouse/tablespace/managed/hive/ path is abbreviated as /wa/t/m/h/ in commands, but will be removed completely in the output for readability. Additionally, the rwx permission details, owner/group name, replication count and timestamp are also rm’d in the output to further aid in readability.

DML Use Cases

Let’s explore some CRUD (Create, Retrieve, Update, Delete) uses cases as expressed in Data Manipulation Language.

Txn 1: INSERT Single Row

INSERT INTO try_it VALUES (1, 'noise', 'bogus', 'p1');

select * from try_it;
+------------+---------------+---------------+-------------+
| try_it.id  | try_it.a_val  | try_it.b_val  | try_it.prt  |
+------------+---------------+---------------+-------------+
| 1          | noise         | bogus         | p1          |
+------------+---------------+---------------+-------------+
1 row selected (0.515 seconds)

Verify that the p1 partition now has a delta file and that it only includes changes belonging to transaction #1 (see the delta_0000001_0000001 indicator).

hdfs dfs -ls -R /wa/t/m/h/try_it
/prt=p1
/prt=p1/delta_0000001_0000001_0000
/prt=p1/delta_0000001_0000001_0000/_orc_acid_version
/prt=p1/delta_0000001_0000001_0000/bucket_00000

Pull down this delta file and use the knowledge from viewing the content of ORC files (using the Java ORC tool jar) to inspect it.

hdfs dfs -get /wa/t/m/h/try_it/prt=p1/delta_0000001_0000001_0000/bucket_00000 add1
ls -l add*
-rw-r--r--. 1 hive hadoop 788 Dec 12 08:09 add1

java -jar orc-tools-1.5.1-uber.jar data add1
Processing data file add1 [length: 788]
{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,
"row":{"id":1,"a_val":"noise","b_val":"bogus"}}

You can see the single row we added into the p1 partition is present. You’ll also notice it is annotated as coming from transaction #1 as indicated by "currentTransaction":1.

Txn 2: INSERT Multiple Rows Across Multiple Partitions

Insert statements allow multiple rows to be added at once and they all belong to a single ACID transaction. This use case is to exercise that, but to make it a bit more fun we can span more than one partition.

INSERT INTO try_it VALUES 
(2, 'noise', 'bogus', 'p2'),
(3, 'noise', 'bogus', 'p3');

select * from try_it;
+------------+---------------+---------------+-------------+
| try_it.id  | try_it.a_val  | try_it.b_val  | try_it.prt  |
+------------+---------------+---------------+-------------+
| 1          | noise         | bogus         | p1          |
| 2          | noise         | bogus         | p2          |
| 3          | noise         | bogus         | p3          |
+------------+---------------+---------------+-------------+
3 rows selected (0.193 seconds)

Verify that both p2 and p3 partitions now have delta file directories/files which each contain changes belonging to transaction #2.

hdfs dfs -ls -R /wa/t/m/h/try_it
/prt=p1
/prt=p1/delta_0000001_0000001_0000
/prt=p1/delta_0000001_0000001_0000/_orc_acid_version
/prt=p1/delta_0000001_0000001_0000/bucket_00000
/prt=p2
/prt=p2/delta_0000002_0000002_0000
/prt=p2/delta_0000002_0000002_0000/_orc_acid_version
/prt=p2/delta_0000002_0000002_0000/bucket_00000
/prt=p3
/prt=p3/delta_0000002_0000002_0000
/prt=p3/delta_0000002_0000002_0000/_orc_acid_version
/prt=p3/delta_0000002_0000002_0000/bucket_00000

Pull down the delta files and inspect them.

hdfs dfs -get /wa/t/m/h/try_it/prt=p2/delta_0000002_0000002_0000/bucket_00000 add2-p2
hdfs dfs -get /wa/t/m/h/try_it/prt=p3/delta_0000002_0000002_0000/bucket_00000 add2-p3
ls -l add2-*
-rw-r--r--. 1 hive hadoop 788 Dec 12 09:22 add2-p2
-rw-r--r--. 1 hive hadoop 796 Dec 12 09:22 add2-p3

java -jar orc-tools-1.5.1-uber.jar data add2-p2
Processing data file add2-p2 [length: 788]
{"operation":0,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":2,
"row":{"id":2,"a_val":"noise","b_val":"bogus"}}

java -jar orc-tools-1.5.1-uber.jar data add2-p3
Processing data file add2-p3 [length: 796]
{"operation":0,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":2,
"row":{"id":3,"a_val":"noise","b_val":"bogus"}}

You can now see that both the p2 and p3 record additions are linked to "currentTransaction":2. To reiterate, transaction #2 spanned multiple partitions and each partition’s delta folder/file was properly aligned with the same transaction.

Txn 3: UPDATE Multiple Rows Across Multiple Partitions

Updates are tricky with Hive Transactions as there is no real in-place update occurring. Basically, Hive deletes the record to be updated and then does a net-new insert to account for what the updated recorded should look like at the end of the SQL statement. The section will show what this looks like behind-the-scenes.

To make the use case more interesting, we’ll make the update span records in multiple partitions so that we can see a similar behavior to the prior use case of a particular transaction number spanning these affected partitions.

Let’s start off with the SQL.

UPDATE try_it SET b_val = 'bogus2' WHERE a_val = 'noise';

select * from try_it;
+------------+---------------+---------------+-------------+
| try_it.id  | try_it.a_val  | try_it.b_val  | try_it.prt  |
+------------+---------------+---------------+-------------+
| 1          | noise         | bogus2        | p1          |
| 2          | noise         | bogus2        | p2          |
| 3          | noise         | bogus2        | p3          |
+------------+---------------+---------------+-------------+
3 rows selected (0.192 seconds)

Verify that all three partitions are modified by each having delete_delta_ and delta_ directories.

hdfs dfs -ls -R /wa/t/m/h/try_it
/prt=p1
/prt=p1/delete_delta_0000003_0000003_0000
/prt=p1/delete_delta_0000003_0000003_0000/_orc_acid_version
/prt=p1/delete_delta_0000003_0000003_0000/bucket_00000
    ... delta_0000001_0000001_ DELETED FOR BRIEVITY ...
/prt=p1/delta_0000003_0000003_0000
/prt=p1/delta_0000003_0000003_0000/_orc_acid_version
/prt=p1/delta_0000003_0000003_0000/bucket_00000
/prt=p2
/prt=p2/delete_delta_0000003_0000003_0000
/prt=p2/delete_delta_0000003_0000003_0000/_orc_acid_version
/prt=p2/delete_delta_0000003_0000003_0000/bucket_00000
    ... delta_0000002_0000002_ DELETED FOR BRIEVITY ...
/prt=p2/delta_0000003_0000003_0000
/prt=p2/delta_0000003_0000003_0000/_orc_acid_version
/prt=p2/delta_0000003_0000003_0000/bucket_00000
/prt=p3
/prt=p3/delete_delta_0000003_0000003_0000
/prt=p3/delete_delta_0000003_0000003_0000/_orc_acid_version
/prt=p3/delete_delta_0000003_0000003_0000/bucket_00000
    ... delta_0000002_0000002_ DELETED FOR BRIEVITY ...
/prt=p3/delta_0000003_0000003_0000
/prt=p3/delta_0000003_0000003_0000/_orc_acid_version
/prt=p3/delta_0000003_0000003_0000/bucket_00000

Let’s just focus on p2’s files as the other two partitions are basically the same thing.

First, look at the delete file which shows that this is "currentTransaction":3 which is a delete of "originalTransaction":2 created earlier in transaction #2.

hdfs dfs -get /wa/t/m/h/try_it/prt=p2/delete_delta_0000003_0000003_0000/bucket_00000 updateAllPartitionsExample-delete_delta
java -jar orc-tools-1.5.1-uber.jar data updateAllPartitionsExample-delete_delta 
Processing data file updateAllPartitionsExample-delete_delta [length: 727]
{"operation":2,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":3,
"row":null}

The delta file then shows a new "currentTransaction":3 record which is the projection of what the update statement modified to the record that was just deleted.

hdfs dfs -get  /wa/t/m/h/try_it/prt=p2/delta_0000003_0000003_0000/bucket_00000 updateAllPartitionsExample-delta
java -jar orc-tools-1.5.1-uber.jar data updateAllPartitionsExample-delta 
Processing data file updateAllPartitionsExample-delta [length: 816]
{"operation":0,"originalTransaction":3,"bucket":536870912,"rowId":0,"currentTransaction":3,
"row":{"id":2,"a_val":"noise","b_val":"bogus2"}}

Txn 4: UPDATE Single Row (Leveraging Partitioning)

This use case is just calling out that that we should be using the partitioned virtual column in the update statement as much as possible to make Hive’s just much easier, by only looking in the folders that can possibly be affected instead of walking the full table’s contents.

UPDATE try_it SET b_val = 'bogus3' WHERE b_val = 'bogus2' AND prt = 'p2';

+------------+---------------+---------------+-------------+
| try_it.id  | try_it.a_val  | try_it.b_val  | try_it.prt  |
+------------+---------------+---------------+-------------+
| 1          | noise         | bogus2        | p1          |
| 2          | noise         | bogus3        | p2          |
| 3          | noise         | bogus2        | p3          |
+------------+---------------+---------------+-------------+
3 rows selected (0.201 seconds)

In this example, without the partition condition we would have updated all three partitions again. Make sure only the p2 partition shows delete_delta_0000004_0000004_ and delta_0000004_0000004_ folders.

hdfs dfs -ls -R /wa/t/m/h/try_it
/prt=p1
/prt=p1/delete_delta_0000003_0000003_0000
/prt=p1/delete_delta_0000003_0000003_0000/_orc_acid_version
/prt=p1/delete_delta_0000003_0000003_0000/bucket_00000
/prt=p1/delta_0000001_0000001_0000
/prt=p1/delta_0000001_0000001_0000/_orc_acid_version
/prt=p1/delta_0000001_0000001_0000/bucket_00000
/prt=p1/delta_0000003_0000003_0000/_orc_acid_version
/prt=p1/delta_0000003_0000003_0000/bucket_00000
/prt=p2
/prt=p2/delete_delta_0000003_0000003_0000
/prt=p2/delete_delta_0000003_0000003_0000/_orc_acid_version
/prt=p2/delete_delta_0000003_0000003_0000/bucket_00000
/prt=p2/delete_delta_0000004_0000004_0000
/prt=p2/delete_delta_0000004_0000004_0000/_orc_acid_version
/prt=p2/delete_delta_0000004_0000004_0000/bucket_00000
/prt=p2/delta_0000002_0000002_0000
/prt=p2/wa/t/m/h/try_it/prt=p2/delta_0000002_0000002_0000/bucket_00000
/prt=p2/delta_0000003_0000003_0000
/prt=p2/delta_0000003_0000003_0000/_orc_acid_version
/prt=p2/delta_0000003_0000003_0000/bucket_00000
/prt=p2/delta_0000004_0000004_0000
/prt=p2/delta_0000004_0000004_0000/_orc_acid_version
/prt=p2/delta_0000004_0000004_0000/bucket_00000
/prt=p3
/prt=p3/delete_delta_0000003_0000003_0000
/prt=p3/delete_delta_0000003_0000003_0000/_orc_acid_version
/prt=p3/delete_delta_0000003_0000003_0000/bucket_00000
/prt=p3/delta_0000002_0000002_0000
/prt=p3/delta_0000002_0000002_0000/_orc_acid_version
/prt=p3/delta_0000002_0000002_0000/bucket_00000
/prt=p3/delta_0000003_0000003_0000
/prt=p3/delta_0000003_0000003_0000/_orc_acid_version
/prt=p3/delta_0000003_0000003_0000/bucket_00000

There is no need to look at the ORC files for this change as it is similar to what we saw in transaction #3.

Txn 5 & 6: UPDATE Single Row to Change Partition

A student in a recent class asked if it would be possible to run an update statement that changed the partition virtual column value. I wasn’t sure and could argue it both ways. So, like most things in Hadoop the best way to answer a question like this is to just TRY IT!

UPDATE try_it SET prt = 'p3' WHERE a_val = 'noise' AND prt = 'p1';

Error: Error while compiling statement: FAILED: SemanticException [Error 10292]: 
  Updating values of partition columns is not supported (state=42000,code=10292)

Well, as you can see, NO JOY!! But would could just run to separate transactions; one to delete it and one to add it, but we do not have an atomic transaction with this and the responsibility of the data integrity would fall to the application, or person, to make sure both are committed.

DELETE FROM try_it WHERE a_val = 'noise' AND prt = 'p1';

INSERT INTO try_it VALUES (1,'noise','bogus2','p3');

select * from try_it;
+------------+---------------+---------------+-------------+
| try_it.id  | try_it.a_val  | try_it.b_val  | try_it.prt  |
+------------+---------------+---------------+-------------+
| 2          | noise         | bogus3        | p2          |
| 3          | noise         | bogus2        | p3          |
| 1          | noise         | bogus2        | p3          |
+------------+---------------+---------------+-------------+
3 rows selected (0.227 seconds)

Here are the applicable HDFS details for these two transactions.

/prt=p1/delete_delta_0000005_0000005_0000
/prt=p1/delete_delta_0000005_0000005_0000/_orc_acid_version
/prt=p1/delete_delta_0000005_0000005_0000/bucket_00000
/prt=p3/delta_0000006_0000006_0000
/prt=p3/delta_0000006_0000006_0000/_orc_acid_version
/prt=p3/delta_0000006_0000006_0000/bucket_00000

Parting Thoughts

These ACID transaction capabilities were mentioned way back during my presenting at hadoop summit (archiving evolving databases in hive) talk a few years ago and provide a much cleaner way at develop-time to address this need for allowing mutable data in Hive’s (previously) immutable world.

A concern that surfaces is that small files and hadoop’s hdfs (bonus: an inode formula) causes not just “Namenode pressure”, but also forces the processing to read potentially lots and lots of small(ish) delta files instead of a few big(ger) “base” files. To make it worse, this work cannot be done in parallel for a given partition due to the need to walk these edits files in order so that Hive can present an accurate representation of what the end results should be.

Some of these concerns are addressed with hive delta file compaction (minor and major) processing that exists and another big helper is to leverage the MERGE command to lump many changes into a single transaction. Both of these topics will be presented in future blog posts.

viewing the content of ORC files (using the Java ORC tool jar)

The Apache ORC file format has been used heavily by Apache Hive for many years now, but being a bit of a “binary file format” there just isn’t much we can do with basic tools to see the contents of these files as shown below.

$ cat orcfile 
ORC
P1

       ???>P>??be!Q%~.ע?d!?????T	?;


DoeSmith(P4??be!%..&wG!??       ?
                                 ??'LesterEricJohnSusie	FdEBR	F6PDoeMartinSmithGATXOKMA???
??]?M?Ku??????9?sT?#?ްͲ㖆O:^xh?>??FWe?Pve??桿F?Ӳ?LuS????b?`	`??`???/p?_?]C?8???kQf?kpiqf??PB?K
                  (???쒟
X?X8X?9X?89.?   Ź?????B"$?b4?`X?$???,??(???????#?????"Ŝ??"Ś????*Ś??KKR??8????
             ????b??a%???????Z??,?\Z??*????J?q1???s3K2$4??rVB@q..&wG!?? ???????"
                                                                               (^0??ORC

Fortunately, the ORC project has a couple of options for CLI tools. For this posting, I settled on the Java Tools. Now, you could be a good citizen and build these yourself from source, but I (the lazy programmer that I am) decided to just download a compiled “uber jar” file.

First, I needed to figure out which version of ORC I was using. I am currently using HDP 3.1.0 and I took a peek into the Hive lib folder.

$ ls /usr/hdp/current/hive-client/lib/orc*
/usr/hdp/current/hive-client/lib/orc-core-1.5.1.3.1.0.0-78.jar
/usr/hdp/current/hive-client/lib/orc-shims-1.5.1.3.1.0.0-78.jar

The HDP jar file naming convention let me know I was using ORC 1.5.1, so I surfed over to http://repo1.maven.org/maven2/org/apache/orc/orc-tools/1.5.1/ and then pulled down the appropriate file.

wget https://repo1.maven.org/maven2/org/apache/orc/orc-tools/1.5.1/orc-tools-1.5.1-uber.jar

Now, I’m ready to use the tools, but… I realized I didn’t have an ORC file to test it out with, so I decided I would use Apache Pig to build a small file. I first created a simple CSV file with vi and then pushed it to HDFS. The contents of the file are as follows.

$ hdfs dfs -cat pig/customers.csv
1001,Lester,Martin,GA
1002,Eric,Martin,TX
1003,John,Doe,OK
1004,Susie,Smith,MA

I then wrote a little read & write conversion script and then executed it.

$ cat createORC.pig 
custs = LOAD 'pig/customers.csv' USING PigStorage(',')
          AS (cid:int, fname:chararray, lname:chararray, state:chararray);
STORE custs INTO 'orcfile' USING OrcStorage;
$ pig createORC.pig

As expected, it created a simple little ORC file which I pulled down to my linux home directory.

$ hdfs dfs -ls orcfile
Found 2 items
-rw-r--r--   3 zeppelin hdfs          0 2019-12-12 08:35 orcfile/_SUCCESS
-rw-r--r--   3 zeppelin hdfs        569 2019-12-12 08:35 orcfile/part-v000-o000-r-00000
$ hdfs dfs -get orcfile/part-v000-o000-r-00000 orcfile
$ ls -l orcf*
-rw-r--r--. 1 zeppelin hadoop 569 Dec 12 08:36 orcfile

NOW, we can finally try out the ORC Tools jar. First up, we can look at the metadata of this file.

$ java -jar orc-tools-1.5.1-uber.jar meta orcfile
Processing data file orcfile [length: 569]
Structure for orcfile
File Version: 0.12 with ORC_135
Rows: 4
Compression: ZLIB
Compression size: 262144
Type: struct<cid:int,fname:string,lname:string,state:string>

**** REMOVED CONTENT FROM THESE SECTIONS FOR BREVITY ****
Stripe Statistics:
File Statistics:
Stripes:

File length: 569 bytes
Padding length: 0 bytes
Padding ratio: 0%

That had some interesting info (and I definitely deleted a bunch to not be so verbose), but what this post was really about all this time is to show the contents of the file, so we just switch the subcommand.

$ java -jar orc-tools-1.5.1-uber.jar data orcfile
Processing data file orcfile [length: 569]
{"cid":1001,"fname":"Lester","lname":"Martin","state":"GA"}
{"cid":1002,"fname":"Eric","lname":"Martin","state":"TX"}
{"cid":1003,"fname":"John","lname":"Doe","state":"OK"}
{"cid":1004,"fname":"Susie","lname":"Smith","state":"MA"}

Perfect, we can see the four rows represented as JSON documents which is so much easier to read than that stuff we started out with originally.

topology supervision features of streaming frameworks (or lack thereof)

This blog post introduces the three streaming frameworks that are bundled in the Hortonworks Data Platform (HDP) – Apache Storm, Spark Streaming, and Kafka Streams – and focuses on the supervision features offered to the topologies (aka workflows) running with, or within, these particular frameworks. This post does not attempt to fully described each framework nor does it provide examples of their usage via working code. The goal is to develop an understanding of what, if any, services are available to help with lifecyle events, scalability, management, and monitoring.

The Frameworks

Kafka Streams

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology. –http://kafka.apache.org/documentation/streams/

Kafka Streams are tightly coupled with Kafka’s messaging platform; especially the streaming input data. Kafka Streams is intentionally designed to fit into any Java or Scala application which gives it plenty of flexibility, but offers no inherent lifecycle, scaling, management, or monitoring features.

Spark Streaming

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards.http://spark.apache.org/docs/latest/streaming-programming-guide.html

Apache Spark’s streaming frameworks allow for a variety of input and output data technologies. Spark Streaming apps are themselves Spark applications who, in a Hadoop cluster at least, run under YARN which provides coverage for many of the lifecycle and management features. The Spark framework addresses a number of the scaling and monitoring needs.

Apache Storm

Apache Storm is a … distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, … Storm is simple, can be used with any programming language, … Storm is fast: a benchmark clocked it at over a million tuples processed per second per node. It is scalable, fault-tolerant, guarantees your data will be processed, and is easy to set up and operate.http://storm.apache.org/

Storm topologies are assembled with components that can be built with any language, but primarily Java is used. It includes ready to use components for many queueing and database technologies. Storm is a comprehensive framework that solely focuses on streaming applications and has rich solutions addressing lifecyle events, scalability, management, and topology monitoring.

Feature Analysis

Summary & Recommendations

Let me start by pointing out that it looks like Kafka Streams is “all bad”, but that’s not the case. It is build around the concept of writing and deploying standard applications and consciously does not want be part of a runtime framework. Due to that and the focus of this blog post, it should be obvious why it scored so low on these features. The RYO (Roll Your Own – aka “custom”) callouts I gave are likely a badge of honor to the folks who are bringing us this framework.

Kafka Streams also has a lot of early interest and I surely would not discount it for a second. The biggest issue for those teams who stand up a decent sized Hadoop/Spark cluster is that you don’t get to take advantage of all those nodes to run your Kafka Streams apps on. You’ll need to size out what is needed for each application and ensure that needed resources are available to run your apps on.

On the other end of the spectrum, one would think that will an almost perfect green checkmark score on the features identified that Storm would be a no-brainer. Storm is the grandpa of the streaming engines and its event-level isolation provide something the other microbatch frameworks can’t do. This maturity shines through in all of these supervision features, but on the other hand it is the least “exciting” of the frameworks for folks starting their streaming journey in 2019. If you need to get something into production asap and you just need to know it works – all day long and every day… then go with Storm!

This brings me to my personal recommendation of Spark Streaming. Note that this comes from a guy who really does love Apache Storm and values the simplicity & flexibility of Kafka Streams. There is simply too much excitement & focus around Spark in general and the ability to transition applications between batch and streaming paradigms with minimal coding close the case. It is still maturing, but its alignment with YARN help it score high on many of these supervision-oriented features.

presenting at hadoop summit (archiving evolving databases in hive)

4/13/2020 UPDATE: see hive’s merge statement (it drops a lot of acid) as a much better way to solve this problem.

What a humbling experience to have the opportunity to present at the 2015 Hadoop Summit conference in San Jose.  I’ve done a decent number of user group presentations over the years, and even have presented Hadoop topics to audiences as big as 500, but this is the first time I have talked at a major industry conference and I had a blast.  It was just cool to have a “presenter” badge and to have my name in all of the conference literature.

My topic was Mutable Data in Hive’s Immutable World and here is the synopsis that is visible from the agenda.

Going beyond Hive`s sweet spot of time-series immutable data, another popular utilization is replicating RDBMS schemas. This “active archive” use case`s intention is not to capture every single change, but to update the current view of the source system at regular intervals. This breakout session will compare/contrast full-refresh & delta-processing approaches as well as present advanced strategies for truly “big” data. Those strategies not only parallelize their processing, but leverage Hive`s partitioning to intelligently target the smallest amount of data as possible to improve performance and scalability. Hive 14`s INSERT, UPDATE, and DELETE statements will also be explored.

I’ve loaded my slides up on SlideShare.  Please use http://www.slideshare.net/lestermartin/mutable-data-in-hives-immutable-world if the preview below is having troubles.

I was lucky enough to find out that Jennifer Knight took some “action shots” of myself during my presentation.

Yep, as that last one shows, I was talking about BIG data!

The conference coordinators posted my presentation to YouTube, too.

small files and hadoop’s hdfs (bonus: an inode formula)

This topic is a fairly detailed and better described via sources such as this HDFS Scalability whitepaper, but basically it comes down to the NN needing to keep track of “objects” (i.e. in memory).  These objects are directories, files and blocks.  For example; if you had a tree that only had 3 top-level directories *and* each of these had 3 files *and* each of these took up 3 blocks, then your NN would need to keep track of 39 objects as detailed below.

[hdfs@sandbox ~]$ hdfs dfs -count /
         248          510          560952712 /

1 GB of heap size for every 1 million files

Not to prescribe a setting for your cluster, but at a minimum, even for early POC efforts, increase your NN heap size to, at least, 4096 MB.  If using Ambari, the following screenshot shows you where to find this setting.

So, if we are actively monitoring our NN heap size and keeping it inline with the number of objects the NN is managing we can more accurately fine-tune our expectations for each cluster.

On the flip side, it seems easy enough to manage the amount of disk space we have on HDFS by all the inherent reporting abilities of HDFS (and tools like Ambari), not to mention some very simple math.  I recently did get asked about how inodes themselves can prevent the cluster from allowing new files to be added.  As a refresher, inodes keep track of all the files on a given (physical, not HDFS) file system.  Here is an example of a partition that has 99% of the space allocated showing free, but the inodes are all used up.

Filesystem    1K-blocks     Used  Available Use% Mounted on
/dev/sdf1    3906486416 10586920 3700550300   1% /data04

Filesystem          Inodes  IUsed    IFree IUse% Mounted on
/dev/sdf1           953856 953856        0  100% /data04

Of course, things are often more complicated when we dive deeper.  First, each file will be broken up into appropriately sized blocks and then each of these will have three copies.  So, our example file above with 3 blocks will need to have 9 separate physical files stored by the DNs.  As you peel back the onion, you’ll see there really are two files for each block stored; the block itself and then a second file that contains metadata & checksum values.  In fact, it gets a tiny bit more complicated than that by the DNs needing to have a directory structure so they don’t overrun a flat directory.  Chapter 10 of Hadoop: The Definitive Guide (3rd Edition) has a good write up on this as you can see here which is further visualized by the abbreviated directory listing from a DN’s block data below.

[hdfs@sandbox finalized]$ pwd
/hadoop/hdfs/data/current/BP-1200952396-10.0.2.15-1398089695400/current/finalized
[hdfs@sandbox finalized]$ ls -al
total 49940
drwxr-xr-x 66 hdfs hadoop    12288 Apr 21 07:18 .
drwxr-xr-x  4 hdfs hadoop     4096 Jul  3 12:33 ..
-rw-r--r--  1 hdfs hadoop        7 Apr 21 07:16 blk_1073741825
-rw-r--r--  1 hdfs hadoop       11 Apr 21 07:16 blk_1073741825_1001.meta
-rw-r--r--  1 hdfs hadoop       42 Apr 21 07:16 blk_1073741826
-rw-r--r--  1 hdfs hadoop       11 Apr 21 07:16 blk_1073741826_1002.meta
-rw-r--r--  1 hdfs hadoop   392124 Apr 21 07:18 blk_1073741887
-rw-r--r--  1 hdfs hadoop     3071 Apr 21 07:18 blk_1073741887_1063.meta
-rw-r--r--  1 hdfs hadoop  1363159 Apr 21 07:18 blk_1073741888
-rw-r--r--  1 hdfs hadoop    10659 Apr 21 07:18 blk_1073741888_1064.meta
drwxr-xr-x  2 hdfs hadoop     4096 Apr 21 07:22 subdir0
drwxr-xr-x  2 hdfs hadoop     4096 Jun  3 08:45 subdir1
drwxr-xr-x  2 hdfs hadoop     4096 Apr 21 07:21 subdir63
drwxr-xr-x  2 hdfs hadoop     4096 Apr 21 07:18 subdir9

To start the math, let’s calculate the NbrOfAvailBlocksPerCluster which is simply the inode limit per disk TIMES the number of disks in a DN TIMES the number of DNs in the cluster and then DIVIDE that number by the 6.3 value described above.  For example, the following values surface for a cluster that has DNs with 10 disks each whose JBOD file system partitions can support 1 million inodes.

So, let’s see if we can write that down in an algebraic formula.  Remember, we’re just thinking about inodes here — the disk sizing calculation is MUCH easier.

NbrOfFilesInodeSettingCanSupport = ( ( InodeSettingPerDisk * NbrDisksPerDN * NbrDNs ) / ( ReplFactor * 2.1 ) )  /  AvgNbrBlocksPerFile