sql window functions explained (transparently as possible)

SQL has had “windowing functions” for a long time, but not everyone has explored them before. They definitely fall into the analytical query family and TBH, the first half of my 30 year career was focused on OLTP application development and CRUD programmers usually don’t need these fancy critters.

If you ALREADY know all about window functions then this isn’t the article for you, but I do welcome your comments at the bottom of the post if there are better ways to introduce these to new newbs. If you are new to them, let’s see if I can help you understand them.

What are window functions?

In SQL, a window function or analytic function is a function which uses values from one or multiple rows to return a value for each row. (This contrasts with an aggregate function, which returns a single value for multiple rows.) Window functions have an OVER clause; any function without an OVER clause is not a window function, but rather an aggregate or single-row (scalar) function.

https://en.wikipedia.org/wiki/Window_function_(SQL)

That’s a lot at once. What if I told you they are kinda like a GROUP BY, but keeping the the row-granularity in the result set and each row could have its own aggregated value?

Hmmm… maybe that didn’t help much! Let’s see if this first example helps any.

Canonical example is a running average

SELECT
  avg(totalprice) OVER (
    PARTITION BY
      custkey
    ORDER BY
      orderdate 
        ROWS 
          BETWEEN UNBOUNDED PRECEDING
            AND 
          CURRENT ROW
  )
FROM
  orders;

Basically, the figurative example above shows

  • That all rows are still present
  • Each row a logical “window” (or collection of records) that can be used for computations
  • That window contains the current row and all other orders with the same custkey that were placed prior to the current one
  • Each row has a new column that is the average of all the totalprice values for the given window

The ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW keywords in the above query define what records belong in the window that was used to calculate the aggregated value added to the record. The left side of the AND represents the lower_bound and the right side is the upper_bound. The bounds can be any of these options.

  • UNBOUNDED PRECEDING – all rows before the current row (the default lower_bound)
  • n PRECEDING – n rows before the current row
  • CURRENT ROW – the current row
  • n FOLLOWING – n rows after the current row
  • UNBOUNDED FOLLOWING – all rows after the current row (the default upper_bound)

Supports sophisticated range definitions

When the ORDER BY column is a numeric or a date/time datatype, you can swap out the keyword ROWS with RANGE to create a more sophisticated window definition. One in which the n PRECEDING and/or n FOLLOWING options are not a fixed number (n) of rows, but a logical watermark based on the value of the ORDER BY column.

I’m from Missouri… SHOW-ME!

SELECT
  avg(totalprice) OVER (
    PARTITION BY
      custkey
    ORDER BY
      orderdate 
        RANGE 
          BETWEEN interval '1' month PRECEDING
            AND 
          CURRENT ROW
  )
FROM
  orders;

Looking at the visual above and concentrating on the cust_1 records, the first three records (sorted by orderdate) end up having the same average as in the first example. This is because for each of these rows, all the preceding ones were placed within the single month lower_bound identified.

When the 2022-12-25 order is processed, the window itself only included itself as all of the preceding records were more than a month ago.

I’m hopeful this helps get you started on your window functions journey and if there is still any confusion, please add a comment below and I’ll try to help you out. I might even be able to update the post to make things even more clear if needed.

pystarburst analytics examples (querying aviation data part deux)

I had so much fun publishing pystarburst (the dataframe api) and running it in Starburst Galaxy that I wanted to share some more examples. Can you tell I’m pretty fired-up?!?! This time, I’m using the datasets and analytical questions posed in querying aviation data in the cloud (leveraging starburst galaxy) previously.

I encourage you to at least take a quick look through that last post, but I’ll provide a brief introduction of the datasets in this post. To set up tables for yourself so that you can run the PyStarburst example code, you’ll definitely want to follow the steps presented there.

More about DataFrames

In my first PyStarburst post I explicitly stated I was NOT “attempting to teach you everything you need to know about the DataFrame API” (and I’m still NOT trying to do that), but I’m realizing I should be a tiny bit nicer than that. Here’s a bit from the source to get you started.

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.

https://spark.apache.org/docs/latest/sql-programming-guide.html

The PyStarburst DataFrame API itself is the list of objects and methods available to Python programers to work with this VERY INTERESTING collection of rows. What makes it so interesting? Well… the collections are NOT collections of data. They are instructions for how to build (in a highly parallelized cluster) the DataFrames when, and only when, they are needed.

Yep, that makes about ZERO SENSE the first time you hear it. This is part of the “lazy execution” phrase you may have heard about regarding DataFrames. Basically, PyStarburst (and PySpark for that matter which this implementation’s API was based on) lets you write as much code as you want with the API (and create as many DataFrame objects as you need), but… no real data is read or written until a operation/function is called that requires an ACTION to be performed. Generally that means only when you want to retrieve or persist results.

Yes, it is STILL A LOT, but that’s all I’m going to tell you now. I just want you to TRUST ME that in the code below when you see an object that is a DataFrame, do NOT assume that any real work was done to fetch that data and bring it back to the Python program. I’ll mostly be using the show() function when I need to see some results. That is one of those “action” operations I mentioned above. The other functions are called “transformations” and create additional DataFrames (again, that just means that don’t really do any heavy lifting).

If you don’t feel TOTALLY LOST, feel free to check out my functional programming and big data API’s posts for more on this whole topic. 🙂 In fairness, it was CLEAR AS MUD for me when I first started working with Spark.

Create & load the tables

All the details for getting Starburst Galaxy with the tables you need are documented here. The following ERD should give you a rough idea of the tables and their logical relationships for this aviation-oriented domain.

Analyze the data

I’m going to port the 7 analytical questions raised here as well as the SQL used in that post. This time, I’ll do them with the DataFrame API. Let’s jump in!

Like before, I’m just running this from the CLI. I’m saving my code in a file called aviation.py and then running it by entering python3 aviation.py each time I want to run my code.

Here’s the boilerplate code again; including all the imports you’ll need for the code to come.

import trino
from pystarburst import Session
from pystarburst import functions as f
from pystarburst.functions import col, lag, round, row_number
from pystarburst.window import Window

db_parameters = {
    "host": "lXXXXXXXXXXXXr.trino.galaxy.starburst.io",
    "port": 443,
    "http_scheme": "https",
    "auth": trino.auth.BasicAuthentication("lXXXXXX/XXXXXXn", "<password>")
}
session = Session.builder.configs(db_parameters).create()

Q1: How many rows in the flight table?

SQL solution

SELECT count(*) 
  FROM mycloud.aviation.raw_flight;

Python solution

This is super simple. Just retrieve the raw_flight table as a DataFrame and then call the count() function (which returns an integer).

allFs = session.table("mycloud.aviation.raw_flight")
print(allFs.count())

Results

2056494

Q2: What country are most of the airports located in?

SQL solution

SELECT country, count() AS num_airports
  FROM mycloud.aviation.raw_airport
 GROUP BY country
 ORDER BY num_airports DESC;

Python solution

This one is pretty straightforward, too. After getting hold of the raw_airport table, I’m using a group_by() function and on those results performing a count() function on the aggregated rows. Finally, just order the results by the number of rows for each country and showing a single result.

# get the whole table, aggregate & sort
mostAs = session \
    .table("mycloud.aviation.raw_airport") \
	.group_by("country").count() \
	.sort("count", ascending=False)
mostAs.show(1)

Results

-----------------------
|"country"  |"count"  |
-----------------------
|USA        |3363     |
-----------------------

Q3: What are the top 5 airline codes with the most number of flights?

SQL solution

SELECT unique_carrier, count() as num_flights
  FROM mycloud.aviation.raw_flight
 GROUP BY unique_carrier 
 ORDER BY num_flights DESC
 LIMIT 5;

Python solution

This is VERY SIMILAR to Q2.

# get the whole table, aggregate & sort
mostFs = session \
    .table("mycloud.aviation.raw_flight") \
	.group_by("unique_carrier").count() \
	.rename("unique_carrier", "carr") \
	.sort("count", ascending=False)
mostFs.show(5)

Results

--------------------
|"carr"  |"count"  |
--------------------
|WN      |356167   |
|AA      |175969   |
|OO      |166445   |
|MQ      |141178   |
|US      |133403   |
--------------------

Q4: Same question, but show the airline carrier’s name.

SQL solution

SELECT c.description, count() as num_flights
  FROM mycloud.aviation.raw_flight  f 
  JOIN mycloud.aviation.raw_carrier c
    ON (f.unique_carrier = c.code)
 GROUP BY c.description 
 ORDER BY num_flights DESC
 LIMIT 5;

Python solution

You can create a DataFrame for the raw_carrier table to join on later. Then, just pick up where you left off in Q3 by chaining a few more methods on it; namely the join().

# get all of the carriers
allCs = session.table("mycloud.aviation.raw_carrier")

# repurpose mostFs from above (or chain on it) 
#   to join the 2 DFs and sort the results that
#   have already been grouped
top5CarrNm = mostFs \
    .join(allCs, mostFs.carr == allCs.code) \
    .drop("code") \
	.sort("count", ascending=False)
top5CarrNm.show(5, 30)

Results

-----------------------------------------------------
|"carr"  |"count"  |"description"                   |
-----------------------------------------------------
|WN      |356167   |Southwest Airlines Co.          |
|AA      |175969   |American Airlines Inc.          |
|OO      |166445   |Skywest Airlines Inc.           |
|MQ      |141178   |American Eagle Airlines Inc.    |
|US      |133403   |US Airways Inc. (Merged wit...  |
-----------------------------------------------------

Q5: What are the most common airplane models for flights over 1500 miles?

SQL solution

SELECT p.model, count() as num_flights
  FROM mycloud.aviation.raw_flight f 
  JOIN mycloud.aviation.raw_plane  p
    ON (f.tail_number = p.tail_number)
 WHERE f.distance > 1500
   AND p.model IS NOT NULL
 GROUP BY p.model
 ORDER BY num_flights desc
 LIMIT 10;

Python solution

# trimFs are flights projected & filtered
trimFs = session.table("mycloud.aviation.raw_flight") \
	.rename("tail_number", "tNbr") \
	.select("tNbr", "distance") \
	.filter(col("distance") > 1500) 

# trimPs are planes table projected & filtered
trimPs = session.table("mycloud.aviation.raw_plane") \
	.select("tail_number", "model") \
	.filter("model is not null")

# join, group & sort
q5Answer = trimFs \
	.join(trimPs, trimFs.tNbr == trimPs.tail_number) \
	.drop("tail_number") \
	.group_by("model").count() \
	.sort("count", ascending=False)	
q5Answer.show()

Results

----------------------
|"model"   |"count"  |
----------------------
|A320-232  |28926    |
|737-7H4   |21597    |
|757-222   |14609    |
|757-232   |12972    |
|737-824   |10789    |
|737-832   |9393     |
|A319-131  |5881     |
|A321-211  |4921     |
|767-332   |4522     |
|A319-132  |4480     |
----------------------

Q6: What is the month over month percentage change of number of flights departing from each airport?

SQL solution

This solution leveraged Common Table Expressions (CTE) which you could conceptualize as temporary tables. I’ll follow this general approach in the Python solution where I explain the code a bit more.

WITH agg_flights AS (
SELECT origination, month, 
       COUNT(*) AS num_flights
  FROM mycloud.aviation.raw_flight
 GROUP BY 1,2
),
 
change_flights AS (
SELECT origination, month, num_flights,
       LAG(num_flights, 1)
         OVER(PARTITION BY origination
                ORDER BY month ASC) 
           AS num_flights_before
  FROM agg_flights
)
 
SELECT origination, month, num_flights, num_flights_before,
       ROUND((1.0 * (num_flights - num_flights_before)) / 
             (1.0 * (num_flights_before)), 2)
          AS perc_change
  FROM change_flights;

Python solution

This first bit emulates the creation of the agg_flights CTE above.

# temp DF holds counts for each originating airport 
#   by month
aggFlights = session.table("mycloud.aviation.raw_flight") \
	.select("origination", "month") \
	.rename("origination", "orig") \
	.group_by("orig", "month").count() \
	.rename("count", "num_fs")

Then I created a Window definition that will help create a new column that is the number of flights from the prior record in the sorted list of all flights for each specific originating airport.

# define a window specification
w1 = Window.partition_by("orig").order_by("month")

# add col to grab the prior row's nbr flights
changeFlights = aggFlights \
	.withColumn("num_fs_b4", \
		lag("num_fs",1).over(w1))

Lastly, I determined the percentage change in the number of flights from the prior month.

# add col for the percentage change
q6Answer = changeFlights \
	.withColumn("perc_chg", \
		round((1.0 * (col("num_fs") - col("num_fs_b4")) / \
		      (1.0 * col("num_fs_b4"))), 1))
q6Answer.show()

Results

----------------------------------------------------------
|"orig"  |"month"  |"num_fs"  |"num_fs_b4"  |"perc_chg"  |
----------------------------------------------------------
|ABE     |1        |99        |NULL         |NULL        |
|ABE     |2        |111       |99           |0.1         |
|ABE     |3        |127       |111          |0.1         |
|ABE     |4        |142       |127          |0.1         |
|ABE     |5        |137       |142          |-0.0        |
|ABE     |6        |116       |137          |-0.2        |
|ABE     |7        |113       |116          |-0.0        |
|ABE     |8        |106       |113          |-0.1        |
|ABE     |9        |94        |106          |-0.1        |
|ABE     |10       |140       |94           |0.5         |
----------------------------------------------------------

Q7: Determine the top 3 routes departing from each airport.

SQL solution

This is another CTE solution and as in Q6, I’ll follow this approach in the Python solution.

WITH popular_routes AS (
SELECT origination, destination,
       COUNT(*) AS num_flights
  FROM raw_flight
 GROUP BY 1, 2
),
 
ranked_routes AS (
SELECT origination, destination,
       ROW_NUMBER() 
         OVER(PARTITION BY origination 
               ORDER BY num_flights DESC) 
           AS rank
  FROM popular_routes
)
 
SELECT origination, destination, rank
  FROM ranked_routes
  WHERE rank <= 3
  ORDER BY origination, rank;

Python solution

This first bit emulates the creation of the popular_routes CTE above.

# determine counts from orig>dest pairs
popularRoutes = session \
	.table("mycloud.aviation.raw_flight") \
	.rename("origination", "orig") \
	.rename("destination", "dest") \
	.group_by("orig", "dest").count() \
	.rename("count", "num_fs")

Then I created a Window definition that will help create a ranking value for all flights for an orginating airport sorted by the number of flights for each combination.

# define a window specification
w2 = Window.partition_by("orig") \
	.order_by(col("num_fs").desc())

# add col to put the curr row's ranking in
rankedRoutes = popularRoutes \
	.withColumn("rank", \
		row_number().over(w2))

Lastly, I just tossed out any ranking greater than 3 and sorted to show the top values for each originating airport.

# just show up to 3 for each orig airport
q7Answer = rankedRoutes \
	.filter(col("rank") <= 3) \
	.sort("orig", "rank")
q7Answer.show(17);

Results

---------------------------------------
|"orig"  |"dest"  |"num_fs"  |"rank"  |
---------------------------------------
|ABE     |ORD     |420       |1       |
|ABE     |DTW     |282       |2       |
|ABE     |ATL     |247       |3       |
|ABI     |DFW     |773       |1       |
|ABQ     |PHX     |1619      |1       |
|ABQ     |DEN     |1254      |2       |
|ABQ     |DAL     |951       |3       |
|ABY     |ATL     |338       |1       |
|ACK     |EWR     |62        |1       |
|ACK     |JFK     |58        |2       |
|ACT     |DFW     |567       |1       |
|ACV     |SFO     |705       |1       |
|ACV     |SMF     |175       |2       |
|ACV     |SLC     |134       |3       |
|ACY     |ATL     |34        |1       |
|ACY     |LGA     |1         |2       |
|ACY     |JFK     |1         |3       |
---------------------------------------

The code

Here is the code all in one file; aviation.py.

import trino
from pystarburst import Session
from pystarburst import functions as f
from pystarburst.functions import col, lag, round, row_number
from pystarburst.window import Window

db_parameters = {
    "host": "lXXXXXXXXXXXXr.trino.galaxy.starburst.io",
    "port": 443,
    "http_scheme": "https",
    "auth": trino.auth.BasicAuthentication("lXXXXXX/XXXXXXn", "<password>")
}
session = Session.builder.configs(db_parameters).create()


print("")
print("Q1 ---------------------------")
print("How many rows in the flight table?")

allFs = session.table("mycloud.aviation.raw_flight")
print(allFs.count())


print("")
print("Q2 ---------------------------")
print("What country are most of the airports")
print("  located in?")

# get the whole table, aggregate & sort
mostAs = session \
    .table("mycloud.aviation.raw_airport") \
	.group_by("country").count() \
	.sort("count", ascending=False)
mostAs.show(1)


print("")
print("Q3 ---------------------------")
print("What are the top 5 airline codes with ")
print("  the most number of flights?")

# get the whole table, aggregate & sort
mostFs = session \
    .table("mycloud.aviation.raw_flight") \
	.group_by("unique_carrier").count() \
	.rename("unique_carrier", "carr") \
	.sort("count", ascending=False)
mostFs.show(5)


print("")
print("Q4 ---------------------------")
print("Same question, but show the airline ") 
print("  carrier's name.")

# get all of the carriers
allCs = session.table("mycloud.aviation.raw_carrier")

# repurpose mostFs from above (or chain on it) 
#   to join the 2 DFs and sort the results that
#   have already been grouped
top5CarrNm = mostFs \
    .join(allCs, mostFs.carr == allCs.code) \
    .drop("code") \
	.sort("count", ascending=False)
top5CarrNm.show(5, 30)


print("")
print("Q5 ---------------------------")
print("What are the most common airplane models ") 
print("  for flights over 1500 miles?")

# trimFs are flights projected & filtered
trimFs = session.table("mycloud.aviation.raw_flight") \
	.rename("tail_number", "tNbr") \
	.select("tNbr", "distance") \
	.filter(col("distance") > 1500) 

# trimPs are planes table projected & filtered
trimPs = session.table("mycloud.aviation.raw_plane") \
	.select("tail_number", "model") \
	.filter("model is not null")

# join, group & sort
q5Answer = trimFs \
	.join(trimPs, trimFs.tNbr == trimPs.tail_number) \
	.drop("tail_number") \
	.group_by("model").count() \
	.sort("count", ascending=False)	
q5Answer.show()


print("")
print("Q6 ---------------------------")
print("What is the month over month percentage ")
print("  change of number of flights departing ")
print("  from each airport?")

# temp DF holds counts for each originating airport 
#   by month
aggFlights = session.table("mycloud.aviation.raw_flight") \
	.select("origination", "month") \
	.rename("origination", "orig") \
	.group_by("orig", "month").count() \
	.rename("count", "num_fs")

# define a window specification
w1 = Window.partition_by("orig").order_by("month")

# add col to grab the prior row's nbr flights
changeFlights = aggFlights \
	.withColumn("num_fs_b4", \
		lag("num_fs",1).over(w1))
	
# add col for the percentage change
q6Answer = changeFlights \
	.withColumn("perc_chg", \
		round((1.0 * (col("num_fs") - col("num_fs_b4")) / \
		      (1.0 * col("num_fs_b4"))), 1))
q6Answer.show()


print("")
print("Q7 ---------------------------")
print("Determine the top 3 routes departing from ")
print("  each airport. ")

# determine counts from orig>dest pairs
popularRoutes = session \
	.table("mycloud.aviation.raw_flight") \
	.rename("origination", "orig") \
	.rename("destination", "dest") \
	.group_by("orig", "dest").count() \
	.rename("count", "num_fs")

# define a window specification
w2 = Window.partition_by("orig") \
	.order_by(col("num_fs").desc())

# add col to put the curr row's ranking in
rankedRoutes = popularRoutes \
	.withColumn("rank", \
		row_number().over(w2))

# just show up to 3 for each orig airport
q7Answer = rankedRoutes \
	.filter(col("rank") <= 3) \
	.sort("orig", "rank")
q7Answer.show(17);

wait… what? (a video game named lester)

When I was a kid I absolutely hated my name. In fact, if you knew my full name (I’m a Junior and it is a full “NASCAR name” indeed) you might easily understand why a hated it as well.

It wasn’t until in college when I realized that having a somewhat unique name was a good thing. It helped separate you from all the Tom, Dick, and Harry’s. Not that there’s anything wrong with those names!!

I have a little running joke with a couple Lester Martins I’ve connected with on LinkedIn of the years saying that we should have a dedicated Lester Martin group.

During a short DM interaction with one of them this morning, I decided today is the day. I finally created our little group. Please forward to ANY Lester Martin you might know. Oh, and since I don’t imagine you know any, please share in the comments section to if you do!

What’s the banner above trying to say? Here it is in its entirety. And YES, I’m actually considering changing l11n to L-3573-R.

This reminds me of the kind of stuff I’d see on an early video game like we had back in the 1980’s. The cool thing is that it is from a video game. A new one created this year, but one created as a Commodore 64 game. Of all things, it is called Lester and it was created by knifegrinder.

I still love my Lester Stickers and my Lester Skateboards the most, but this sure this tickled me pink this morning, or maybe… just maybe… it turned me into…

THE ANDROID L-3573-R THE FIRST OF A NEW GENERATION OF GUARDIAN DROIDS INDEPENDENT FROM A CENTRAL AI.

hive acid transactions work on trino (can even update a partitioned column)

Ever since I joined Starburst, I’ve had to push back on my fellow All Stars when they told me Apache Hive does NOT allow for INSERT/UPDATE/DELETE/MERGE operations. I let them know that I was using Hive ACID for years at Hortonworks/Cloudera. This blog post is here to set the record straight on two important points.

  • Hive ACID does allow for INSERT/UPDATE/DELETE/MERGE operations
  • Probably more cool to me personally, Trino (and Starburst Galaxy/Enterprise) works very well with Hive ACID thanks to the base Hive Connector‘s functionality

That said…

Just because you can, doesn’t mean you should.

Sherrilyn Kenyon, William C. Taylor, Scott Bedbury, and just about everyone else…

I tossed out that age-old quote to make the point that I’m NOT actually recommending that new efforts should use Hive ACID. Modern table formats, my favorite Apache Iceberg for example, do all the cool things Hive ACID does and much more — including versioning and its benefits of time-travel querying and table rollbacks.

CRUD operations

This section shows the output of walking through the same use cases as my previous hive acid transactions with partitions post. This time, of course, I’m using Starburst instead of Hive. I usually test with Starburst Galaxy, but this time I’m using Starburst Enterprise.

Why? To make this work you do have to use an actual Hive MetaStore (HMS), not AWS Glue or the internal Starburst metastore implementation, and well… I didn’t have one setup for my Galaxy tenant and I was already in Starburst Enterprise with a Hive catalog using HMS.

And, of course, this functionality exist in the base Hive connector code in the Trino project for those running just Trino (or Presto for that matter).

Transactional table DDL

Here is the Trino version of the same DDL in the original post.

CREATE TABLE try_it (
    id int, a_val varchar, b_val varchar,
    prt varchar
)
WITH (
    format = 'ORC',
    transactional = true,
    partitioned_by = ARRAY['prt']
);

I highly encourage you to at least skim my previous hive acid transactions with partitions post as there is a lot of “behind the scenes” information that will be assumed you know. I’m talking about HOW Hive ACID works down at the data lake directory & file level.

Even if you don’t take my recommendation, this blog post will clearly show that INSERT/UPDATE/DELETE/MERGE operations do exist in Apache Hive AND can be leveraged from Trino.

DML use cases

Let’s explore some CRUD (Create, Retrieve, Update, Delete) use cases as expressed in Data Manipulation Language (DML).

Txn 1: INSERT single row

INSERT INTO try_it 
VALUES (1, 'noise', 'bogus', 'p1');

Like in the other post, verify that the p1 partition has a delta file and that it only includes changes belonging to transaction #1 (see the delta_0000001_... indicator).

NOTE: I’m not going to be exploring all the actual ORC files like I did in the posts I am reproducing now, but rest assured, the values are the same.

Txn 2: INSERT multiple rows across multiple partitions

Insert statements allow multiple rows to be added at once and they all belong to a single ACID transaction. This use case is to exercise that, but to make it a bit more fun we can span more than one partition.

INSERT INTO try_it VALUES
(2, 'noise', 'bogus', 'p2'),
(3, 'noise', 'bogus', 'p3');

Both the p2 and p3 partitions are present on the object store and they have delta directories & files each containing changes belonging to transaction #2.

NOTE: Again, I’m not going to be exploring all the actual delta directories and ORC files like I did in the posts I am reproducing now. Again, rest assured that the values are the same. I also promise to stop making this point. 😉

Txn 3: UPDATE multiple rows across multiple partitions

UPDATE try_it 
  SET b_val = 'bogus2' 
WHERE a_val = 'noise';

All three partitions are modified by each having delete_delta_ and delta_ directories.

Txn 4: UPDATE single row (leveraging partitioning)

This use case is just calling out that that we should be using the partitioned virtual column in the update statement as much as possible to make Trino’s job a bit easier; by only looking in the folders that can possibly be affected instead of walking the full table’s contents.

UPDATE try_it 
   SET b_val = 'bogus3'
 WHERE b_val = 'bogus2' 
   AND prt = 'p2';

In this example, without the partition condition we would have updated all three partitions again. Only the p2 partition has a delete_delta_0000004_0000004_ and delta_0000004_0000004_ folder.

Txn 5 (6 is not needed): UPDATE single row to change partition

UPDATE try_it 
   SET prt = 'p3' 
 WHERE a_val = 'noise' 
   AND prt = 'p1';

I discovered a VERY cool thing when trying to see what the equivalent Trino error message was going to be for this one…

Error: Error while compiling statement: FAILED:
 
   Updating values of partition columns is not supported 

What error message surfaced? NONE! IT WORKED!!

Check out the original blog post to see that with Hive I had to run two statements. The first to delete the record and a second one to add it back with the change to the partition column’s value. If anyone knows that Hive is NOW doing this as well, please leave a comment below as I was a couple of years ago when I wrote that blog post.

What about the delta file compactions?

My prior hive delta file compaction post walked through the minor & major file compaction processes. More importantly, it explained WHY they are needed. Since Trino is writing the very same directories and files while performing CRUD operations, this is STILL needed.

The bad news is that Trino cannot trigger either of these specialized processes with its own compaction process for these Hive ACID tables.

What does this mean? It means that you still need to have Hive around to run the minor & major compactions on. Even if all other CRUD operations and queries run solely on Trino. This is only one reason I would NOT suggest you create new tables with this Hive advanced feature.

That said, if you are still running a Hadoop/Hive cluster (likely where these tables where created and initially populated from) then you can easily run compactions as needed. If you are going to move away from Hadoop, I’d stop accessing the table, perform a final minor and then major compaction, and then migrate your Hive tables to Apache Iceberg.

MERGE works, too?

Of course it does! I’m also a huge fan of the MERGE statement and it’s ability to bundle all the changes it makes as a single transaction. This section just replays the scenario and solution I documented in hive’s merge statement so that you can see it works with Hive ACID operations running in Trino.

Create and populate a base table

CREATE TABLE bogus_info (
    bogus_id int,
    field_one varchar,
    field_two varchar,
    field_three varchar,
    date_created varchar
)
WITH (
    format = 'ORC',
    transactional = true,
    partitioned_by = ARRAY['date_created']
);

insert into bogus_info 
(bogus_id, date_created, field_one, field_two, field_three)
    values
        (11,'2019-09-17','base','base','base'),
        (12,'2019-09-17','base','base','base'),
        (13,'2019-09-17','base','base','base'),
        (14,'2019-09-18','base','base','base'),
        (15,'2019-09-18','base','base','base'),
        (16,'2019-09-18','base','base','base'),
        (17,'2019-09-19','base','base','base'),
        (18,'2019-09-19','base','base','base'),
        (19,'2019-09-19','base','base','base');

Create and populate a table of changes

CREATE TABLE deltas_recd (
    bogus_id int,
    date_created varchar,
    field_one varchar,
    field_two varchar,
    field_three varchar
)
WITH (
    format = 'ORC',
    transactional = true
);

insert into deltas_recd 
(bogus_id, date_created, field_one, field_two, field_three)
    values
        (20,'2019-09-20','NEW','base','base'),
        (21,'2019-09-20','NEW','base','base'),
        (22,'2019-09-20','NEW','base','base'),
        (12,'2019-09-17','EXISTS','CHANGED','base'),
        (14,'2019-09-18','EXISTS','CHANGED','base'),
        (16,'2019-09-18','EXISTS','CHANGED','base');

This shows us that the new changes include 3 totally new records (20, 21, and 22). We can also see that records 12, 14, and 16, need to be modified.

Create and execute the MERGE statement

The MERGE statement below lines up the matching records based on their ID and creation date. When there is a match it is treated as an UPDATE and when there is not it is handled as an INSERT. In this simple case, I am not addressing DELETE operations.

MERGE INTO bogus_info  AS B 
     USING deltas_recd AS D
      
  ON B.bogus_id     = D.bogus_id 
 AND B.date_created = D.date_created
  
WHEN MATCHED THEN
    UPDATE SET field_one   = D.field_one, 
               field_two   = D.field_two, 
               field_three = D.field_three
                
WHEN NOT MATCHED THEN
    INSERT VALUES (D.bogus_id,  D.field_one, 
                   D.field_two, D.field_three, 
                   D.date_created);

Gosh darn it, I <3 MERGE!!

Final thoughts

As I said in the beginning, Hive ACID works solidly on Trino clusters. Again, I also said I would NOT start there today, but if you have some of these already in production don’t feel you can’t work with them. You can!

In all fairness, people don’t give Hive the love and accolades it deserves. Hive ACID has been out there for years and IMHO, it was the archetype of the modern table formats. Should we move on from it and embrace these new approaches? Absolutely, but doesn’t mean we can’t appreciate the past as well!

I’d even suggest you read my comparison of hive, trino & spark features post for some more thoughts on why Hive deserves our respect; even if not our go-forward approach. Makes me think of Teachers by Daft Punk. Enjoy!

configuring the cache service (starburst enterprise)

My awesome teammates at Starburst Academy posted some of my ruff-cut videos again. This playlist focused on configuring the Starburst Enterprise cache service which enables table scan redirection & materialized views. It is also a prerequisite for Data Products on this platform.

pystarburst (the dataframe api)

I’m so excited about the Starburst blog Introducing Python DataFrames in Starburst Galaxy and I wanted to show a few examples of this exciting new feature set.

NOTE: This blog post is NOT attempting to teach you everything you need to know about the DataFrame API, but it will provide some insight into this rich subject matter.

The real goal is to see it in action!!

Setup your environment

As the Py in PyStarburst suggests, you clearly need Python installed. For my Mac, I set this up with brew a long time ago. For your environment, you may do something different.

$ brew install python
    ... many lines rm'd ...
$ python3 --version
Python 3.10.9

I then needed to get pip set up. Here’s what I did.

$ python3 -m ensurepip
    ... many lines rm'd ...
$ python3 -m pip install --upgrade pip
    ... many lines rm'd ...
$ pip --version
pip 23.2.1 from ... (python 3.10)

At this point you can get some more help from Starburst Galaxy by visiting Partner connect >> Drivers & Clients >> PyStarburst which surfaces a pop-up like the following. Use the Select cluster pulldown to align with the cluster you want to run some PyStarburst code against.

Click on the Download connection file button to get something like the following (file is named main.py) which has everything filled in, except the password. I masked out the values from my orange strike-outs above, too.

import trino
from pystarburst import Session

db_parameters = {
    "host": "tXXXXXXXXXXe.trino.galaxy.starburst.io",
    "port": 443,
    "http_scheme": "https",
    # Setup authentication through login or password or any other supported authentication methods
    # See docs: https://github.com/trinodb/trino-python-client#authentication-mechanisms
    "auth": trino.auth.BasicAuthentication("lXXXXXX/XXXXXXn", "<password>")
}
session = Session.builder.configs(db_parameters).create()
session.sql("SELECT * FROM system.runtime.nodes").collect()

Just to clean that up and make things go a bit smoother, delete lines 8 & 9 and then add the following two lines after line 2.

from pystarburst import functions as f
from pystarburst.functions import col

Lastly, replace the last line with the following (assuming you are using the TPCH catalog on the cluster you selected earlier).

session.table("tpch.tiny.region").show()

Back in the pop-up from earlier, there is a link to the PyStarburst docs site. From there, run the pip install command listed in the Install the library section. There is also some boilerplate code that you already have manipulated above.

Test the boilerplate code

The docs site above also points to an example Jupyter notebook and that suggests you should be using Jupyter, or another web-based notebook tool. That’s a great path to go down, but I’m going to keep it a bit more simple and just run my code from the CLI.

$ python3 main.py
----------------------------------------------------------------------------------
|"regionkey"  |"name"       |"comment"                                           |
----------------------------------------------------------------------------------
|0            |AFRICA       |lar deposits. blithely final packages cajole. r...  |
|1            |AMERICA      |hs use ironic, even requests. s                     |
|2            |ASIA         |ges. thinly even pinto beans ca                     |
|3            |EUROPE       |ly final courts cajole furiously final excuse       |
|4            |MIDDLE EAST  |uickly special accounts cajole carefully blithe...  |
----------------------------------------------------------------------------------

Awesome! We used the API to basically run a SELECT statement, which verified we can create a DataFrame with code that ran in Starburst Galaxy. In fact, you can see in Query history that it was run.

Explore the API

The docs page from above has a link to the detailed PyStarburst DataFrame API documentation site. As mentioned at the start of this post, I am NOT going to try to teach you Spark’s DataFrame API here. If this is totally new to you, one place you might start is this programming guide on the Apache Spark website.

I’ll be building some training around PyStarburst and it will surely start from the basics of what a DataFrame is and build from there. Ping me if you’re interested in such a class. Of course, I’ll let you know what the code below is doing — at least at a high-level.

Select a full table

Add these next lines to the end of your Python source file which use the table() function to grab hold of the customer table and then display the first 10 rows (the show() function, without an integer as an argument, defaults to 10) and then run it with python3 main.py as shown above.

custDF = session.table("tpch.tiny.customer")
custDF.show()

Here is the output.

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"custkey"  |"name"              |"address"                              |"nationkey"  |"phone"          |"acctbal"  |"mktsegment"  |"comment"                                           |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|1          |Customer#000000001  |IVhzIApeRb ot,c,E                      |15           |25-989-741-2988  |711.56     |BUILDING      |to the even, regular platelets. regular, ironic...  |
|2          |Customer#000000002  |XSTf4,NCwDVaWNe6tEgvwfmRchLXak         |13           |23-768-687-3665  |121.65     |AUTOMOBILE    |l accounts. blithely ironic theodolites integra...  |
|3          |Customer#000000003  |MG9kdTD2WBHm                           |1            |11-719-748-3364  |7498.12    |AUTOMOBILE    | deposits eat slyly ironic, even instructions. ...  |
|4          |Customer#000000004  |XxVSJsLAGtn                            |4            |14-128-190-5944  |2866.83    |MACHINERY     | requests. final, regular ideas sleep final accou   |
|5          |Customer#000000005  |KvpyuHCplrB84WgAiGV6sYpZq7Tj           |3            |13-750-942-6364  |794.47     |HOUSEHOLD     |n accounts will have to unwind. foxes cajole accor  |
|6          |Customer#000000006  |sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn   |20           |30-114-968-4951  |7638.57    |AUTOMOBILE    |tions. even deposits boost according to the sly...  |
|7          |Customer#000000007  |TcGe5gaZNgVePxU5kRrvXBfkasDTea         |18           |28-190-982-9759  |9561.95    |AUTOMOBILE    |ainst the ironic, express theodolites. express,...  |
|8          |Customer#000000008  |I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5  |17           |27-147-574-9335  |6819.74    |BUILDING      |among the slyly regular theodolites kindle blit...  |
|9          |Customer#000000009  |xKiAFTjUsCuxfeleNqefumTrjS             |8            |18-338-906-3675  |8324.07    |FURNITURE     |r theodolites according to the requests wake th...  |
|10         |Customer#000000010  |6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2     |5            |15-741-346-9870  |2753.54    |HOUSEHOLD     |es regular deposits haggle. fur                     |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

That is quite busy in the CLI, but probably looks good in a notebook since it won’t wrap the text.

Use projection

We really only need a couple of columns, so we can use the select() method on the existing DataFrame to identify those that we really want. There is a compensatory drop() function that would be better if we wanted to keep most of the columns and only remove a few.

projectedDF = custDF.select(custDF.name, custDF.acctbal, custDF.nationkey)
projectedDF.show()

Here is the output after adding those lines of code above and running your Python program again. It looks a bit more manageable.

------------------------------------------------
|"name"              |"acctbal"  |"nationkey"  |
------------------------------------------------
|Customer#000000751  |2130.98    |0            |
|Customer#000000752  |8363.66    |8            |
|Customer#000000753  |8114.44    |17           |
|Customer#000000754  |-566.86    |0            |
|Customer#000000755  |7631.94    |16           |
|Customer#000000756  |8116.99    |14           |
|Customer#000000757  |9334.82    |3            |
|Customer#000000758  |6352.14    |17           |
|Customer#000000759  |3477.59    |1            |
|Customer#000000760  |2883.24    |2            |
------------------------------------------------

Again, the show() command without an argument is displaying only 10 rows.

Filter the rows

Well-named, the filter() function does exactly what we need it to do. In this example, we are trying to limit to the customer records with the highest account balance values. Add these next lines to the end of your Python source file and run it again.

filteredDF = projectedDF.filter(projectedDF.acctbal > 9900.0)
filteredDF.show(100)

Here is the output.

------------------------------------------------
|"name"              |"acctbal"  |"nationkey"  |
------------------------------------------------
|Customer#000001106  |9977.62    |21           |
|Customer#000000043  |9904.28    |19           |
|Customer#000000045  |9983.38    |9            |
|Customer#000000140  |9963.15    |4            |
|Customer#000000200  |9967.6     |16           |
|Customer#000000213  |9987.71    |24           |
|Customer#000000381  |9931.71    |5            |
------------------------------------------------

Notice that even though 100 records were requested to be displayed, there are only 7 records that meet this criteria.

Select a second table

Later, we are going to join our customer records to the nation table to get the name of the country, not just a key value for it. In the example below, we are chaining methods together instead of assigning each output to a distinct variable as we have done up until now.

nationDF = session.table("tpch.tiny.nation") \
                  .drop("regionkey", "comment") \
                  .rename("name", "nation_name") \
                  .rename("nationkey", "n_nationkey")
nationDF.show()

We have already presented table() and drop(). The rename() function simply changes a column’s name to something else as you can see in the output.

---------------------------------
|"n_nationkey"  |"nation_name"  |
---------------------------------
|0              |ALGERIA        |
|1              |ARGENTINA      |
|2              |BRAZIL         |
|3              |CANADA         |
|4              |EGYPT          |
|5              |ETHIOPIA       |
|6              |FRANCE         |
|7              |GERMANY        |
|8              |INDIA          |
|9              |INDONESIA      |
---------------------------------

Join the tables

Now we can join() the two DataFrames using their nationkey values.

joinedDF = filteredDF.join(nationDF, filteredDF.nationkey == nationDF.n_nationkey)
joinedDF.show()

Here is the output.

--------------------------------------------------------------------------------
|"name"              |"acctbal"  |"nationkey"  |"n_nationkey"  |"nation_name"  |
--------------------------------------------------------------------------------
|Customer#000000140  |9963.15    |4            |4              |EGYPT          |
|Customer#000000381  |9931.71    |5            |5              |ETHIOPIA       |
|Customer#000000045  |9983.38    |9            |9              |INDONESIA      |
|Customer#000000200  |9967.6     |16           |16             |MOZAMBIQUE     |
|Customer#000000043  |9904.28    |19           |19             |ROMANIA        |
|Customer#000001106  |9977.62    |21           |21             |VIETNAM        |
|Customer#000000213  |9987.71    |24           |24             |UNITED STATES  |
--------------------------------------------------------------------------------

As you can see, the join() function did not let us get rid of unwanted columns; we have all from both DataFrames.

Project the joined result

How do we clean up those unwanted columns? Exactly right, we talked about this before!

projectedJoinDF = joinedDF.drop("nationkey").drop("n_nationkey")
projectedJoinDF.show()
--------------------------------------------------
|"name"              |"acctbal"  |"nation_name"  |
--------------------------------------------------
|Customer#000000140  |9963.15    |EGYPT          |
|Customer#000000381  |9931.71    |ETHIOPIA       |
|Customer#000000045  |9983.38    |INDONESIA      |
|Customer#000000200  |9967.6     |MOZAMBIQUE     |
|Customer#000000043  |9904.28    |ROMANIA        |
|Customer#000001106  |9977.62    |VIETNAM        |
|Customer#000000213  |9987.71    |UNITED STATES  |
--------------------------------------------------

Apply a sort

I love it when the methods do what they say; sort() is no different.

orderedDF = projectedJoinDF.sort(col("acctbal"), ascending=False)
orderedDF.show()
--------------------------------------------------
|"name"              |"acctbal"  |"nation_name"  |
--------------------------------------------------
|Customer#000000213  |9987.71    |UNITED STATES  |
|Customer#000000045  |9983.38    |INDONESIA      |
|Customer#000001106  |9977.62    |VIETNAM        |
|Customer#000000200  |9967.6     |MOZAMBIQUE     |
|Customer#000000140  |9963.15    |EGYPT          |
|Customer#000000381  |9931.71    |ETHIOPIA       |
|Customer#000000043  |9904.28    |ROMANIA        |
--------------------------------------------------

Put it all together

While the creation of multiple DataFrame objects was used above, in practice (as discussed when fetching the nation table) most DataFrame API programmers chain many methods together to look at bit more like this.

nationDF = session.table("tpch.tiny.nation") \
            .drop("regionkey", "comment") \
            .rename("name", "nation_name") \
            .rename("nationkey", "n_nationkey")

apiSQL = session.table("tpch.tiny.customer") \
            .select("name", "acctbal", "nationkey") \
            .filter(col("acctbal") > 9900.0) \
            .join(nationDF, col("nationkey") == nationDF.n_nationkey) \
            .drop("nationkey").drop("n_nationkey") \
            .sort(col("acctbal"), ascending=False)
apiSQL.show()

This produces the same result as before. There is a lot more going on with the PyStarburst implementation including the lazy execution model that the DataFrame API is known for. In a nutshell, this simply means that the program waits until it absolutely needs to run some code on the Trino engine that Starburst Galaxy is built on top of.

If only these 3 lines of code were run after the session object was created in the boilerplate source, then ultimately only a single SQL statement was sent to Starburst Galaxy — again, that you can find in the Query history page.

The generated SQL

SELECT "name" , "acctbal" , "nation_name" FROM ( SELECT "name" , "acctbal" , "n_nationkey" , "nation_name" FROM ( SELECT * FROM (( SELECT "name" "name" , "acctbal" "acctbal" , "nationkey" "nationkey" FROM ( SELECT * FROM ( SELECT "name" , "acctbal" , "nationkey" FROM ( SELECT * FROM tpch.tiny.customer ) ) WHERE ("acctbal" > DOUBLE '9900.0') ) ) INNER JOIN ( SELECT "n_nationkey" "n_nationkey" , "nation_name" "nation_name" FROM ( SELECT "nationkey" "n_nationkey" , "nation_name" FROM ( SELECT "nationkey" , "name" "nation_name" FROM ( SELECT "nationkey" , "name" FROM ( SELECT * FROM tpch.tiny.nation ) ) ) ) ) ON ("nationkey" = "n_nationkey")) ) ) ORDER BY "acctbal" DESC NULLS LAST OFFSET 0 ROWS LIMIT 10

The generated SQL above is clearly something a program would have created and in fairness it is walking the PyStarburst function calls and building some pretty ugly SQL. The good news is the cost-based optimizer (CBO) inside Trino deciphered it all and broke it down into a very efficient 3 stage job that utilized a broadcast join as seen in this eye exam of a visualization from the directed acyclic graph (DAG).

If all that CBO and DAG talk was mumbo-jumbo, and you want to learn more, check out these free training modules from Starburst Academy.

Or… just run some SQL

I’ll be honest, I actually LIKE that code above chaining methods together all while looking back and forth into the API doc, but I’m a programmer. If you were following the code along the way, you realized we were just building the equivalent to a rather simple SQL statement doing filtering & projection, joining two tables, and sorting the results.

Are you wondering instead of using the Session object’s table() function to start our efforts if there would be a way to just run some SQL instead?

Well, yes, there is. It is called the sql() method and here is an example of its use with the hand-crafted, rather simple, SQL statement that is doing the same thing as the rest of this post.

dfSQL = session.sql("SELECT c.name, c.acctbal, n.name "\
                    "  FROM tpch.tiny.customer c "\
                    "  JOIN tpch.tiny.nation n "\
                    "    ON c.nationkey = n.nationkey "\
                    " WHERE c.acctbal > 9900.0 "\
                    " ORDER BY c.acctbal DESC ")
dfSQL.show()

Probably a bit more obvious than before is the generated code that you can find in the Query history page on Starburst Galaxy

The generated SQL

SELECT c.name , c.acctbal , n.name FROM (tpch.tiny.customer c INNER JOIN tpch.tiny.nation n ON (c.nationkey = n.nationkey)) WHERE (c.acctbal > DECIMAL '9900.0') ORDER BY c.acctbal DESC OFFSET 0 ROWS LIMIT 10

If you look closely, you’ll see that the SQL was modified a bit such as adding the INNER keyword for the join type and a LIMIT 10 clause due to the show() function’s default behavior. It is not simply “passing through” the query.

More interesting is that the same 3 stage job with a broadcast join was run with the same text and visual query plan being created from the DAG.

Wrap up

You’ve had a quick tour of the DataFrame API implementation with Python that runs the code ultimately as SQL on Starburst Galaxy.

We’ve see just a tiny bit of the rich API that is available to data engineers who prefer to write programs over SQL. We also saw that often, we can just replace the “neat” function calls with just hard-coding SQL and in all fairness, it is a great idea for code maintainability to use the sql() function to generate DataFrames when we can.

I hope you are as excited as I am to experiment more with PyStarburst!

building a sql-based data pipeline with trino & starburst (5 slick videos)

Evan Smith has posted the YouTube video series below that are a part of the FREE on-demand Exploring data pipelines course available via Starburst Academy. I figured they fit together nicely with a wrapper blog post as well. Oh, and yes, that’s my soothing voice on the videos, too. 😉

Assessing the requirements

Creating the land layer

Creating the structure layer

Creating the consume layer

Automation with Starburst and dbt

better iceberg materialized views in galaxy (no staleness check)

Ok, YOU try to find a cool “staleness” image to put at the top of YOUR blog post about how Apache Iceberg materialized views handle potential staleness of underlying datasets they are created from. Yep… not so easy!!

The image above comes from a blog post with the same title, How to Make Bread Stale Fast?, and it is appropriate for this post as it calls out that “staleness” (even with bread) doesn’t always mean “bad”. Alright, let’s go a bit deeper into my topic for today as this is not a cooking show by any means. I do make a mean lettuce wrap appetizer, but I digress.

Recently, I published starburst galaxy’s materialized views (using apache iceberg) where I made a big deal about how “staleness” was addressed. In this context, I’m talking about when the situation arises (and it does VERY OFTEN) where the underlying dataset(s) that were used to populate a materialized view are updated. Materialized views are still relatively new in the big data world and generally they don’t (today at least!) automatically update any time a change occurs to the underlying dataset(s) they are populated from.

That said, as you’ll see in the Addressing Staleness section of the link above that I called out that the underlying Trino Iceberg Connector describes in detail how it DOES actively check for such a situation and then treats the materialized view as a basic view (i.e. queries the underlying table(s) instead).

The incredible Tom Nats recently pointed out to me that this wasn’t acting that way any longer and I had a mixed bag of test results when I did a double-check. Then when the tenacious Kyle (Major) Payne pinged me about it this morning I decided to take yet another look.

Low and behold, the Apache Iceberg materialized views running on Starburst Galaxy are NOT checking for staleness and always returning the values from the last time the REFRESH command was run!!

This, my friends, is a GOOD thing in my book. Maybe I’ll write a blog post on why I feel that way, but the nutshell is that data changes fast in the big data world and refreshing materialized views isn’t free. My personal belief is that data engineering pipeline which is populating/modifying the data is the best place to include, at appropriate ingestion points, the directive to rebuild the materialized view. Comments ALWAYS welcomed. 😉

If that’s enough of an explanation, then thanks for attending my TED Talk and I’ll see you next time. If you wanna SEE this in action, then come on in… the water’s feels fine!

Behind the Curtain

I was able to perform the same steps from starburst galaxy’s materialized views (using apache iceberg) and see the staleness condition ignored. Much of below is the same stuff from that post.

I created a couple of schemas then created, populated, and queried an Iceberg table.

CREATE SCHEMA mycloud.vmv_storage;
CREATE SCHEMA mycloud.vmv;
USE mycloud.vmv;

CREATE TABLE dune_characters (
  id integer,
  name varchar(55),
  notes varchar(255)
)
WITH (type = 'iceberg');
 
INSERT INTO dune_characters 
  (id, name, notes)
VALUES
  (101, 'Leto', 'Ruler of House Atreides'),
  (102, 'Jessica', 'Concubine of the Duke');

SELECT name, notes 
  FROM dune_characters;

Looking into the query details you can see these 2 rows came from the base table as you would expect.

Next, I created a materialized view, but did not perform an initial REFRESH on it before I queried it.

CREATE MATERIALIZED VIEW mat_view
WITH (storage_schema = 'vmv_storage')
AS SELECT name, notes 
     FROM dune_characters;

SELECT * FROM mat_view;

Looking into the query details you can see the engine was aware of mat_view, but ended up querying the dune_characters table for these initial 2 rows.

Refreshed the materialized view with the command below and queried it again to see the same (i.e. only) 2 rows.

REFRESH MATERIALIZED VIEW mat_view;

Notice below that the results actually came from the materialized view’s storage table.

Now, the big test. I added another row and queried the materialized view again.

INSERT INTO dune_characters 
  (id, name, notes)
VALUES
  (103, 'Paul', 'Son of Leto');
 
SELECT * FROM mat_view;

Like before, I only received the original 2 records and could verify the storage table was read. I added another record and checked again.

INSERT INTO dune_characters 
  (id, name, notes)
VALUES
  (103, 'Paul', 'Son of Leto');

Again, just got the 2 records and same read on the storage table. Final test is to refresh the materialized view and query it again.

REFRESH MATERIALIZED VIEW mat_view;

SELECT * FROM mat_view;

And yep, now all 4 rows are returned!!

I’m GLAD this change was made as the “check for staleness” was technically cool, but practically made Iceberg materialized views worthless since the underlying source table(s) are very likely changing all the time.

determining # of splits w/trino/starburst/galaxy (iceberg table format)

At the end of determining # of splits w/trino/starburst/galaxy (hive table format), I raised the question of how Trino decides the number of splits for the Apache Iceberg table format. I was hoping to find some cool properties like with Hive, but found this instead…

Splits-size while querying Iceberg data is static #10874

Based on what I discovered in the Hive blog post, then this suggests that any file <= 128MB will be a single split and any file > 128MB will be broken up into that increment (or whatever is left). Let’s verify.

Here is the S3 contents of an Iceberg table that I created and loaded some data into.

There are a total of 15 files — 4 of which are bigger than 128MB. That suggests that there would be 19 total splits when this table is queried since the ones > 128MB (and < 256MB) would be broken into two splits (i.e. 15 original files + 4 more splits = 19 total). Let’s verify.

Here is the Trino console query details page that indicates the 4 workers in my cluster operated on a total of 19 splits.

To further verify this, I added some more data to the table which drove some more files on both sides of the 128MB split-size and ended up with 29 files that you can see the sizes of below along with the per-file number of splits and the overall total number of splits.

Here is the Trino console query details page that indicates the 4 workers in my cluster operated on these 51 splits.

After seeing all of that, my initial suggestion is that you might be best served having files just under that split limit — let’s just say 110-120MB might just be your best bet if you were lucky enough to control it at that level for most files.

delta lake in starburst galaxy (intro & integration)

What is Delta Lake? I guess the project page says it best.

Delta Lake is an open-source storage framework that enables building a Lakehouse architecture with compute engines that include Trino

https://delta.io/

Yes, many other engines (including Spark and Hive) and APIs (notably Python and Java) are available to utilize this popular modern table format.

Wondering what a “table format” is? Think of Hive, but with more features such as versioning and storing metadata in the data lake alongside the data files. Still wondering? If so, check out the following video from my colleague Evan Smith.

OK, where were we? Oh yeah, learning a bit about the Delta Lake table format. Delta Lake was initially developed by Databricks and became an open source project in 2019. It has evolved since then to have some strong key features.

Delta Lake allows classical Data Manipulation Language (DML) statements (INSERT, UPDATE, DELETE, and MERGE) and manages the commit of those operations as an ACID-compliant transaction. It does this by maintaining a transaction log of changes (also know as the DeltaLog) which itself is the enabler for performing time-travel querying since every change creates a new version of the table.

For a deep-dive on the metadata and DeltaLog that lives on the data lake along with the table’s datasets, as well as how this information is leveraged to maintain table versioning, check out the following video.

That is all cool stuff, but let’s get our hands dirty. Delta Lake integrates nicely with Trino via the Delta Lake Connector. Of course, what’s the fastest way to get going with Trino? Since you asked… it is Starburst Galaxy! Not only is it already running in the cloud, but you can get started for free!!

You can then leverage its Great Lakes connectivity to use one of the popular public cloud object stores. For a little help with that, check out my querying aviation data in the cloud (leveraging starburst galaxy) post that sets up an Amazon S3 catalog.

Here I am ready to go now! How about you?

As you might be able to see above (if you have stronger glasses than me!), I ran a couple of statements to create a schema to play in and then to root myself to that schema.

-- my catalog is named 'mycloud', so update accordingly
CREATE SCHEMA mycloud.dlblog1;
USE mycloud.dlblog1;

Let’s create a new table. Notice the type property in the WITH clause — Great Lakes connectivity allow multiple table formats to be created and used. I’m just telling it to leverage Delta Lake.

CREATE TABLE dune_characters (
  id integer,
  name varchar(55),
  notes varchar(255)
)
WITH (type = 'delta');

Now, add a few records.

INSERT INTO dune_characters 
  (id, name, notes)
VALUES
  (101, 'Leto', 'Ruler of House Atreides'),
  (102, 'Jessica', 'Concubine of the Duke');

Verify these records are present.

Ooof! I forgot the star of the book/movie!! Let’s get him inserted, too.

INSERT INTO dune_characters 
  (id, name, notes)
VALUES
  (103, 'Paul', 'Son of Leto');

Instead of just querying the table again to verify it has 3 rows now, here is a peek into the S3 bucket where the table’s data is stored. You can see 2 datasets there (1 from the first INSERT and the 2nd from the next one).

And that _delta_log folder? Yep, it is the mythical DeltaLog we touched upon earlier. Let’s take a peek into it!

Basically, we have 3 different JSON files. One for each version. Yes, the CREATE TABLE created the 000 version. Here are a few relevant snippets from each of these files.

Table VersionRelevant Snippets
...000.json"operation":"CREATE TABLE"
...001.json"operation":"WRITE"
\"numRecords\":2
...002.json"operation":"WRITE"
\"numRecords\":1

Sparked (pun definitely intended) your interest? Go back up and watch that hour-long video for more. I promise it can be fun if your the science fair kind of person that enjoys seeing the gory details. For the rest of us, this is plenty for now!

In summary, Starburst Galaxy is the fastest way to get going with Trino. It also has great support for the Delta Lake table format. Stay tuned as I have a few more posts in this series I will be publishing soon.