After this and that, you might think I’d be done posting PyStarburst DataFrame API examples, but I’m still excited to share a few more. I ended my last PyStarburst post with some examples of Window functions. To help conceptually understand them better, I posted window functions explained. This post will focus on some additional windowing examples with Python and Starburst via the DataFrame API.
Also like in my last post, I’ll share SQL first followed a Python approach. For a dataset, I’ll make it super easy. Starburst Galaxy already has a nice little sample
catalog whose demo
schema offers up some out-of-this-world tables.
For this post, I’ll explore the astronauts
table exclusively.
Note: Take a look at the full code listing at the end of this post to get the boiler plate code you’ll need at the top of your .py
file or in your web-based notebook.
10/27/2023 Update: This code is now in a Jupyter notebook available at notebooks/astronaut_windows.ipynb
within https://github.com/starburstdata/pystarburst-examples to make it easier to execute.
Check out the table
Let’s just get some rows to look at.
-- SQL soln
SELECT *
FROM sample.demo.astronauts
LIMIT 10;
# Python soln
a = session.table("sample.demo.astronauts")
a.show(10);
Both the SQL and Python outputs show this to be a pretty wide table. So wide, I don’t want to show it. We could ask them both to share some more about the columns that make them up.
-- SQL soln
DESC sample.demo.astronauts;
# Python soln
for field in a.schema.fields:
print(field.name +" , "+str(field.datatype))
Note: PySpark has a nice printSchema()
function that produces a very nice output. At this early stage in PyStarburst it doesn’t seem it has been implemented, but I’m sure it will surface before long.
And again, BOTH of these produce a BUNCH of text, so I’m just showing the web UI’s output as it is more quickly consumable for most of us.
As I know the table pretty well, let’s just run this highly projected & filtered query to simplify the data we are looking at.
-- SQL soln
SELECT name, nationality,
year_of_mission AS m_yr,
hours_mission AS m_hrs
WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')
ORDER BY name, m_yr;
# Python soln
li = ["Nicollier, Claude", "Ross, Jerry L."]
twoAs = a.select("name", "nationality", \
"year_of_mission", "hours_mission") \
.rename("year_of_mission", "m_yr") \
.rename("hours_mission", "m_hrs") \
.filter(a.name.isin(li)) \
.sort("name", "m_yr")
twoAs.show(20)
What I wanted to show is that this table is NOT a list of astronauts. It is a table of astronauts’ trips to space. These two fellas are represented multiple times; Jerry went to space 7 times and Claude went 4 times.
Window function examples
Single window for all rows
Let’s see how each mission compares with an OVERALL average across ALL missions. Create a single window that encompasses all input rows and calculates a single average for all output rows.
SELECT name,
year_of_mission AS m_yr,
hours_mission AS m_hrs,
AVG(hours_mission)
OVER() -- the WINDOW is ALL rows
AS avg_all_m_hrs -- NOT typical ^^^^^^^^
FROM sample.demo.astronauts
WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')
ORDER BY name, m_yr;
Introduce the over()
function without any parameters to create a single window to be used for all input rows.
# trim out the nation column
aDF = twoAs.drop("nationality")
# use an empty parameter over function call
aDF.withColumn("avg_all_m_hrs", F.avg("m_hrs").over()) \
.sort("name", "m_yr").show(20)
Window for each distinct value
Keep all the rows, but calculate an aggregate for each window created for all input rows with the same astronaut.
SELECT name,
year_of_mission AS m_yr,
hours_mission AS m_hrs,
SUM(hours_mission)
-- kinda like a GROUP BY, but you get all rows
OVER (PARTITION BY name)
AS tot_m_hrs
FROM sample.demo.astronauts
WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')
ORDER BY name, m_yr;
Create a WindowSpec
object based on the Window.partition_by()
factory method.
from pystarburst.window import Window as W
# define the window specification
w2 = W.partition_by("name")
aDF.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
.sort("name", "m_yr").show(20)
Multiple aggregations on the same window
You can calculate multiple aggregations on the same window specification.
SELECT name,
year_of_mission AS m_yr,
hours_mission AS m_hrs,
AVG(hours_mission)
OVER (PARTITION BY name)
AS avg_m_hrs,
-- we can have more than 1
SUM(hours_mission)
OVER (PARTITION BY name)
AS tot_m_hrs
FROM sample.demo.astronauts
WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')
ORDER BY name, m_yr;
# chain another withColumn method
aDF.withColumn("avg_m_hrs", F.avg("m_hrs").over(w2)) \
.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
.sort("name", "m_yr").show(20)
Of course, we can also manipulate the values we get back from the window’s aggregate functions, too.
-- determine the percentage of each mission against
-- that astronaut's total
SELECT name,
year_of_mission AS m_yr,
hours_mission AS m_hrs,
ROUND(hours_mission /
SUM(hours_mission) OVER (PARTITION BY name)
* 100.0, -- example: change 0.12 to 120
2) -- round off to decimal places
AS percent_of_tot,
SUM(hours_mission)
OVER (PARTITION BY name)
AS tot_m_hrs
FROM sample.demo.astronauts
WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')
ORDER BY name, m_yr;
aDF.withColumn("percent_of_tot",
round(aDF.m_hrs / F.sum("m_hrs").over(w2) * 100,
2)) \
.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
.sort("name", "m_yr").show(20)
Using different columns to create multiple windows
For each row you can create additional windows. In the example below, we are adding another one based on all input records that have the same year_of_mission
column as the current row.
-- use a second window definition
SELECT name,
year_of_mission AS m_yr,
hours_mission AS m_hrs,
SUM(hours_mission)
OVER (PARTITION BY name)
AS tot_m_hrs,
COUNT()
OVER (PARTITION BY year_of_mission)
AS tot_m_yr_for_all
FROM sample.demo.astronauts
WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')
ORDER BY name, m_yr;
# define another window specification
w3 = W.partition_by("m_yr")
aDF.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
.withColumn("tot_m_yr_for_all",
F.count("m_yr").over(w3)) \
.sort("name", "m_yr").show(20)
Only 1993 had more than one space flight and we see it for both astronauts in our dataset.
Order the window’s contents
By adding an ORDER BY
clause within the window definition, we can perform some additional calculations based on position of the current input row in the window. This example shows how you can create a mission number.
SELECT name,
year_of_mission AS m_yr,
hours_mission AS m_hrs,
RANK() OVER (PARTITION BY name
ORDER BY year_of_mission)
AS m_nbr
FROM sample.demo.astronauts
WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')
ORDER BY name, m_yr;
from pystarburst.functions import row_number
w4 = W.partition_by("name").order_by("m_yr")
aDF.withColumn("m_nbr", row_number().over(w4)) \
.sort("name", "m_yr").show(20)
In addition to the prior ranking function, you can look forward or backward 1+ record to get values to populate new columns as shown below.
SELECT name,
year_of_mission AS m_yr,
hours_mission AS m_hrs,
LAG(hours_mission, 1)
OVER (PARTITION BY name
ORDER BY year_of_mission)
AS prev_m_hrs,
LEAD(hours_mission, 1)
OVER (PARTITION BY name
ORDER BY year_of_mission)
AS next_m_hrs
FROM sample.demo.astronauts
WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')
ORDER BY name, m_yr;
from pystarburst.functions import lag, lead
# lag and lead functions
aDF.withColumn("prev_m_hrs",
lag(aDF.m_hrs, 1).over(w4)) \
.withColumn("next_m_hrs",
lead(aDF.m_hrs, 1).over(w4)) \
.sort("name", "m_yr").show(20)
Create rolling windows
You can bind the window using boundaries such as UNBOUNDED PRECEDING
, n PRECEDING
, CURRENT ROW
, n FOLLOWING
, and UNBOUNDED FOLLOWING
. The example below has a window that includes the current input row and all previous records based on the sort order.
YES, this is a good time to look at window functions explained again.
--calculate running_total_hours
SELECT name,
year_of_mission AS m_yr,
hours_mission AS m_hrs,
SUM(hours_mission)
OVER (PARTITION BY name
ORDER BY year_of_mission
ROWS BETWEEN
UNBOUNDED PRECEDING
AND CURRENT ROW)
AS running_tot_m_hrs
FROM sample.demo.astronauts
WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')
ORDER BY name, m_yr;
w5 = W.partition_by("name").order_by("m_yr") \
.rows_between(W.UNBOUNDED_PRECEDING, W.CURRENT_ROW)
aDF.withColumn("running_tot_m_hrs",
F.sum("m_hrs").over(w5)) \
.sort("name", "m_yr").show(20)
Here is another example.
-- rolling overage over current record and last two
SELECT name,
year_of_mission AS m_yr,
hours_mission AS m_hrs,
ROUND(
AVG(hours_mission)
OVER (PARTITION BY name
ORDER BY year_of_mission
ROWS BETWEEN
2 PRECEDING
AND CURRENT ROW),
2)
AS avg_this_and_last2
FROM sample.demo.astronauts
WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')
ORDER BY name, m_yr;
w6 = W.partition_by("name").order_by("m_yr") \
.rows_between(-2, W.CURRENT_ROW)
# rollling avg over curr rec and last two
aDF.withColumn("avg_this_and_last2",
round(F.avg("m_hrs").over(w6), 2)) \
.sort("name", "m_yr").show(20)
To call out an example from above, look at Jerry’s 1998 mission. The avg_this_and_last2
value of 217 is the average of 283, 129, and 239.
Another example is Claude’s 1993 entry. The 225.61 average is only based on 259.97 and 191.25 as there is only one row preceding it.
Sophisticated rolling windows
In the prior section, the n
value when using ROW
is a specific number of rows PRECEDING
and/or FOLLOWING
. When the data type that the sorting is done on is a number or date datatype and the RANGE
keyword replaces ROW
, the number of rows before/after are relative to the actual value of the sorting column.
AGAIN, see window functions explained for some visuals.
SELECT name,
year_of_mission AS m_yr,
hours_mission AS m_hrs,
ROUND(
AVG(hours_mission)
OVER (PARTITION BY name
ORDER BY year_of_mission
-- using RANGE not ROWS
RANGE BETWEEN
2 PRECEDING
AND CURRENT ROW),
2) AS avg_last2_YEARS,
ROUND(
AVG(hours_mission)
OVER (PARTITION BY name
ORDER BY year_of_mission
ROWS BETWEEN
2 PRECEDING
AND CURRENT ROW),
2) AS avg_last2_ROWS
FROM sample.demo.astronauts
WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')
ORDER BY name, m_yr;
w7 = W.partition_by("name").order_by("m_yr") \
.range_between(-2, W.CURRENT_ROW)
aDF.withColumn("avg_last2_YEARS",
round(F.avg("m_hrs").over(w7), 2)) \
.withColumn("avg_last2_ROWS",
round(F.avg("m_hrs").over(w6), 2)) \
.sort("name", "m_yr").show(20)
To call out an example from above, look at Claude’s 1999 mission. The avg_last2_rows
value of 330.94 is the average of 191.18, 541.67, and 259.97. Conversely, his avg_last2_years
is only the current value of 191.18 as there were no other missions looking backwards 2 years.
Or… just run some SQL
While the programmer in me likes the method chaining code you see in this post, you could still use the Session object’s sql()
function to just write some SQL.
Start with SQL…
You could start off with some SQL to get an initial DataFrame and then perform some more method chaining for additional transformations.
twoAs_fromSQL = session.sql(
"SELECT name, "\
" year_of_mission AS m_yr, "\
" hours_mission AS m_hrs "\
" FROM sample.demo.astronauts "\
" WHERE name IN ('Nicollier, Claude', "\
" 'Ross, Jerry L.')")
w8 = W.partition_by("name")
twoAs_fromSQL.withColumn("tot_m_hrs",
F.sum("m_hrs").over(w2)) \
.sort("name", "m_yr").show(20)
Do it all with SQL…
Or just put the whole SQL statement inside the sql()
call.
ALL_fromSQL = session.sql(
"SELECT name, "\
" year_of_mission AS m_yr, "\
" hours_mission AS m_hrs, "\
" SUM(hours_mission) "\
" OVER (PARTITION BY name) "\
" AS tot_m_hrs "\
" FROM sample.demo.astronauts "\
" WHERE name IN ('Nicollier, Claude', "\
" 'Ross, Jerry L.')"\
" ORDER BY name, m_yr ")
ALL_fromSQL.show(20)
You definitely have some optionality with the DataFrame API.
The code
Here is the code all in one file; astronauts.py
.
import trino
from pystarburst import Session
from pystarburst import functions as F
from pystarburst.functions import col, lag, lead, row_number, round
from pystarburst.window import Window as W
db_parameters = {
"host": "lXXXXXXXXXXXXr.trino.galaxy.starburst.io",
"port": 443,
"http_scheme": "https",
"auth": trino.auth.BasicAuthentication("lXXXXXX/XXXXXXn", "<password>")
}
session = Session.builder.configs(db_parameters).create()
print("")
print("---------------------------")
print("Take a peek at a couple of astronauts missions")
a = session.table("sample.demo.astronauts")
a.show(10);
print("")
print("---------------------------")
print("What does the schema look like?")
#Get all column names and their types
for field in a.schema.fields:
print(field.name +" , "+str(field.datatype))
print("")
print("---------------------------")
print("Apply some projection & filtering")
# identify the two astronauts we want to focus on
li = ["Nicollier, Claude", "Ross, Jerry L."]
twoAs = a.select("name", "nationality", \
"year_of_mission", "hours_mission") \
.rename("year_of_mission", "m_yr") \
.rename("hours_mission", "m_hrs") \
.filter(a.name.isin(li)) \
.sort("name", "m_yr")
twoAs.show(20)
print("")
print("---------------------------")
print("See how each mission compares with an ")
print(" OVERALL average across ALL missions")
# trim out the nation column
aDF = twoAs.drop("nationality")
aDF.withColumn("avg_all_m_hrs", F.avg("m_hrs").over()) \
.sort("name", "m_yr").show(20)
print("")
print("---------------------------")
print("Have a window per person of all their rows")
# define the window specification
w2 = W.partition_by("name")
aDF.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
.sort("name", "m_yr").show(20)
print("")
print("---------------------------")
print("Multiple aggs for the same window")
# chain another withColumn method
aDF.withColumn("avg_m_hrs", F.avg("m_hrs").over(w2)) \
.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
.sort("name", "m_yr").show(20)
print("")
print("---------------------------")
print("Another ex: multiple aggs for the same window")
# manipulate the window's agg value
aDF.withColumn("percent_of_tot",
round(aDF.m_hrs / F.sum("m_hrs").over(w2) * 100,
2)) \
.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
.sort("name", "m_yr").show(20)
print("")
print("---------------------------")
print("Different windows by different partition_by's")
# define another window specification
w3 = W.partition_by("m_yr")
aDF.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
.withColumn("tot_m_yr_for_all",
F.count("m_yr").over(w3)) \
.sort("name", "m_yr").show(20)
print("")
print("---------------------------")
print("Ordering window contents")
# define another window specification
w4 = W.partition_by("name").order_by("m_yr")
# row_number
aDF.withColumn("m_nbr", row_number().over(w4)) \
.sort("name", "m_yr").show(20)
# lag and lead
aDF.withColumn("prev_m_hrs",
lag(aDF.m_hrs, 1).over(w4)) \
.withColumn("next_m_hrs",
lead(aDF.m_hrs, 1).over(w4)) \
.sort("name", "m_yr").show(20)
print("")
print("---------------------------")
print("Rolling windows")
w5 = W.partition_by("name").order_by("m_yr") \
.rows_between(W.UNBOUNDED_PRECEDING, W.CURRENT_ROW)
aDF.withColumn("running_tot_m_hrs",
F.sum("m_hrs").over(w5)) \
.sort("name", "m_yr").show(20)
w6 = W.partition_by("name").order_by("m_yr") \
.rows_between(-2, W.CURRENT_ROW)
# rollling avg over curr rec and last two
aDF.withColumn("avg_this_and_last2",
round(F.avg("m_hrs").over(w6), 2)) \
.sort("name", "m_yr").show(20)
print("")
print("---------------------------")
print("Sophisticated window ranges")
w7 = W.partition_by("name").order_by("m_yr") \
.range_between(-2, W.CURRENT_ROW)
aDF.withColumn("avg_last2_YEARS",
round(F.avg("m_hrs").over(w7), 2)) \
.withColumn("avg_last2_ROWS",
round(F.avg("m_hrs").over(w6), 2)) \
.sort("name", "m_yr").show(20)
print("")
print("---------------------------")
print("Start with some SQL")
twoAs_fromSQL = session.sql(
"SELECT name, "\
" year_of_mission AS m_yr, "\
" hours_mission AS m_hrs "\
" FROM sample.demo.astronauts "\
" WHERE name IN ('Nicollier, Claude', "\
" 'Ross, Jerry L.')")
w8 = W.partition_by("name")
twoAs_fromSQL.withColumn("tot_m_hrs",
F.sum("m_hrs").over(w2)) \
.sort("name", "m_yr").show(20)
print("")
print("---------------------------")
print("Do it all with SQL")
ALL_fromSQL = session.sql(
"SELECT name, "\
" year_of_mission AS m_yr, "\
" hours_mission AS m_hrs, "\
" SUM(hours_mission) "\
" OVER (PARTITION BY name) "\
" AS tot_m_hrs "\
" FROM sample.demo.astronauts "\
" WHERE name IN ('Nicollier, Claude', "\
" 'Ross, Jerry L.')"\
" ORDER BY name, m_yr ")
ALL_fromSQL.show(20)