viewing astronauts thru windows (more pystarburst examples)

After this and that, you might think I’d be done posting PyStarburst DataFrame API examples, but I’m still excited to share a few more. I ended my last PyStarburst post with some examples of Window functions. To help conceptually understand them better, I posted window functions explained. This post will focus on some additional windowing examples with Python and Starburst via the DataFrame API.

Also like in my last post, I’ll share SQL first followed a Python approach. For a dataset, I’ll make it super easy. Starburst Galaxy already has a nice little sample catalog whose demo schema offers up some out-of-this-world tables.

For this post, I’ll explore the astronauts table exclusively.

Note: Take a look at the full code listing at the end of this post to get the boiler plate code you’ll need at the top of your .py file or in your web-based notebook.

10/27/2023 Update: This code is now in a Jupyter notebook available at notebooks/astronaut_windows.ipynb within https://github.com/starburstdata/pystarburst-examples to make it easier to execute.

  1. Check out the table
  2. Window function examples
    1. Single window for all rows
    2. Window for each distinct value
    3. Multiple aggregations on the same window
    4. Using different columns to create multiple windows
    5. Order the window’s contents
    6. Create rolling windows
    7. Sophisticated rolling windows
  3. Or… just run some SQL
    1. Start with SQL…
    2. Do it all with SQL…
  4. The code

Check out the table

Let’s just get some rows to look at.

-- SQL soln

SELECT * 
  FROM sample.demo.astronauts 
 LIMIT 10;
# Python soln

a = session.table("sample.demo.astronauts")

a.show(10);

Both the SQL and Python outputs show this to be a pretty wide table. So wide, I don’t want to show it. We could ask them both to share some more about the columns that make them up.

-- SQL soln

DESC sample.demo.astronauts;
# Python soln

for field in a.schema.fields:
    print(field.name +" , "+str(field.datatype))

Note: PySpark has a nice printSchema() function that produces a very nice output. At this early stage in PyStarburst it doesn’t seem it has been implemented, but I’m sure it will surface before long.

And again, BOTH of these produce a BUNCH of text, so I’m just showing the web UI’s output as it is more quickly consumable for most of us.

As I know the table pretty well, let’s just run this highly projected & filtered query to simplify the data we are looking at.

-- SQL soln

SELECT name, nationality, 
       year_of_mission AS m_yr, 
       hours_mission AS m_hrs
 WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')  
 ORDER BY name, m_yr;
# Python soln

li = ["Nicollier, Claude", "Ross, Jerry L."]

twoAs = a.select("name", "nationality", \
				 "year_of_mission", "hours_mission") \
	.rename("year_of_mission", "m_yr") \
	.rename("hours_mission", "m_hrs") \
	.filter(a.name.isin(li)) \
	.sort("name", "m_yr")

twoAs.show(20)

What I wanted to show is that this table is NOT a list of astronauts. It is a table of astronauts’ trips to space. These two fellas are represented multiple times; Jerry went to space 7 times and Claude went 4 times.

Window function examples

Single window for all rows

Let’s see how each mission compares with an OVERALL average across ALL missions. Create a single window that encompasses all input rows and calculates a single average for all output rows.

SELECT name,
       year_of_mission AS m_yr, 
       hours_mission AS m_hrs,
       AVG(hours_mission)        
            OVER()         -- the WINDOW is ALL rows
       AS avg_all_m_hrs    --   NOT typical ^^^^^^^^
  FROM sample.demo.astronauts
 WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.') 
 ORDER BY name, m_yr;

Introduce the over() function without any parameters to create a single window to be used for all input rows.

# trim out the nation column
aDF = twoAs.drop("nationality")

# use an empty parameter over function call
aDF.withColumn("avg_all_m_hrs", F.avg("m_hrs").over()) \
	.sort("name", "m_yr").show(20)

Window for each distinct value

Keep all the rows, but calculate an aggregate for each window created for all input rows with the same astronaut.

SELECT name,
       year_of_mission AS m_yr, 
       hours_mission AS m_hrs,
       SUM(hours_mission) 
          -- kinda like a GROUP BY, but you get all rows
          OVER (PARTITION BY name)  
       AS tot_m_hrs  
  FROM sample.demo.astronauts
 WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')  
ORDER BY name, m_yr;

Create a WindowSpec object based on the Window.partition_by() factory method.

from pystarburst.window import Window as W

# define the window specification
w2 = W.partition_by("name")

aDF.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
	.sort("name", "m_yr").show(20)

Multiple aggregations on the same window

You can calculate multiple aggregations on the same window specification.

SELECT name,
       year_of_mission AS m_yr, 
       hours_mission AS m_hrs,
       AVG(hours_mission)        
          OVER (PARTITION BY name)  
       AS avg_m_hrs,  
       -- we can have more than 1
       SUM(hours_mission) 
          OVER (PARTITION BY name)  
       AS tot_m_hrs
  FROM sample.demo.astronauts
 WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.') 
 ORDER BY name, m_yr;
# chain another withColumn method
aDF.withColumn("avg_m_hrs", F.avg("m_hrs").over(w2)) \
	.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
	.sort("name", "m_yr").show(20)

Of course, we can also manipulate the values we get back from the window’s aggregate functions, too.

-- determine the percentage of each mission against 
--   that astronaut's total
SELECT name,
       year_of_mission AS m_yr, 
       hours_mission AS m_hrs,
       ROUND(hours_mission /
            SUM(hours_mission) OVER (PARTITION BY name)
            * 100.0,  -- example: change 0.12 to 120 
            2)        -- round off to decimal places
       AS percent_of_tot,
       SUM(hours_mission) 
          OVER (PARTITION BY name)  
       AS tot_m_hrs
  FROM sample.demo.astronauts
 WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.') 
 ORDER BY name, m_yr;
aDF.withColumn("percent_of_tot", 
		round(aDF.m_hrs / F.sum("m_hrs").over(w2) * 100, 
			2)) \
	.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
	.sort("name", "m_yr").show(20)

Using different columns to create multiple windows

For each row you can create additional windows. In the example below, we are adding another one based on all input records that have the same year_of_mission column as the current row.

-- use a second window definition
SELECT name,
       year_of_mission AS m_yr, 
       hours_mission AS m_hrs,
       SUM(hours_mission) 
          OVER (PARTITION BY name)  
       AS tot_m_hrs,
       COUNT() 
          OVER (PARTITION BY year_of_mission)
       AS tot_m_yr_for_all
  FROM sample.demo.astronauts
 WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.') 
 ORDER BY name, m_yr;
# define another window specification
w3 = W.partition_by("m_yr")

aDF.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
	.withColumn("tot_m_yr_for_all", 
		F.count("m_yr").over(w3)) \
	.sort("name", "m_yr").show(20)

Only 1993 had more than one space flight and we see it for both astronauts in our dataset.

Order the window’s contents

By adding an ORDER BY clause within the window definition, we can perform some additional calculations based on position of the current input row in the window. This example shows how you can create a mission number.

SELECT name,
       year_of_mission AS m_yr, 
       hours_mission AS m_hrs,
       RANK() OVER (PARTITION BY name
                    ORDER BY year_of_mission)
       AS m_nbr
  FROM sample.demo.astronauts
 WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.') 
 ORDER BY name, m_yr;
from pystarburst.functions import row_number

w4 = W.partition_by("name").order_by("m_yr")

aDF.withColumn("m_nbr", row_number().over(w4)) \
	.sort("name", "m_yr").show(20)

In addition to the prior ranking function, you can look forward or backward 1+ record to get values to populate new columns as shown below.

SELECT name,
       year_of_mission AS m_yr, 
       hours_mission AS m_hrs,
       LAG(hours_mission, 1)
          OVER (PARTITION BY name 
                ORDER BY year_of_mission)
       AS prev_m_hrs,  
       LEAD(hours_mission, 1)
          OVER (PARTITION BY name 
                ORDER BY year_of_mission)
       AS next_m_hrs  
  FROM sample.demo.astronauts
 WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.') 
 ORDER BY name, m_yr;
from pystarburst.functions import lag, lead

# lag and lead functions
aDF.withColumn("prev_m_hrs", 
		lag(aDF.m_hrs, 1).over(w4)) \
	.withColumn("next_m_hrs", 
		lead(aDF.m_hrs, 1).over(w4)) \
	.sort("name", "m_yr").show(20)

Create rolling windows

You can bind the window using boundaries such as UNBOUNDED PRECEDING, n PRECEDING, CURRENT ROW, n FOLLOWING, and UNBOUNDED FOLLOWING. The example below has a window that includes the current input row and all previous records based on the sort order.

YES, this is a good time to look at window functions explained again.

--calculate running_total_hours
SELECT name,
       year_of_mission AS m_yr, 
       hours_mission AS m_hrs,
       SUM(hours_mission) 
          OVER (PARTITION BY name
                ORDER BY year_of_mission
                     ROWS BETWEEN
                     UNBOUNDED PRECEDING
                     AND CURRENT ROW)
       AS running_tot_m_hrs
  FROM sample.demo.astronauts
 WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')  
 ORDER BY name, m_yr;
w5 = W.partition_by("name").order_by("m_yr") \
	.rows_between(W.UNBOUNDED_PRECEDING, W.CURRENT_ROW)

aDF.withColumn("running_tot_m_hrs", 
		F.sum("m_hrs").over(w5)) \
	.sort("name", "m_yr").show(20)

Here is another example.

-- rolling overage over current record and last two
SELECT name,
       year_of_mission AS m_yr, 
       hours_mission AS m_hrs,
       ROUND(
       AVG(hours_mission) 
          OVER (PARTITION BY name
                ORDER BY year_of_mission
                     ROWS BETWEEN
                     2 PRECEDING
                     AND CURRENT ROW),
       2)
       AS avg_this_and_last2
  FROM sample.demo.astronauts
 WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')  
 ORDER BY name, m_yr;
w6 = W.partition_by("name").order_by("m_yr") \
	.rows_between(-2, W.CURRENT_ROW)
	
# rollling avg over curr rec and last two
aDF.withColumn("avg_this_and_last2", 
		round(F.avg("m_hrs").over(w6), 2)) \
	.sort("name", "m_yr").show(20)

To call out an example from above, look at Jerry’s 1998 mission. The avg_this_and_last2 value of 217 is the average of 283, 129, and 239.

Another example is Claude’s 1993 entry. The 225.61 average is only based on 259.97 and 191.25 as there is only one row preceding it.

Sophisticated rolling windows

In the prior section, the n value when using ROW is a specific number of rows PRECEDING and/or FOLLOWING. When the data type that the sorting is done on is a number or date datatype and the RANGE keyword replaces ROW, the number of rows before/after are relative to the actual value of the sorting column.

AGAIN, see window functions explained for some visuals.

SELECT name,
       year_of_mission AS m_yr, 
       hours_mission AS m_hrs,
       ROUND(
       AVG(hours_mission) 
          OVER (PARTITION BY name
                ORDER BY year_of_mission
                     -- using RANGE not ROWS
                     RANGE BETWEEN  
                     2 PRECEDING
                     AND CURRENT ROW),
       2) AS avg_last2_YEARS,  
       ROUND(
       AVG(hours_mission) 
          OVER (PARTITION BY name
                ORDER BY year_of_mission
                     ROWS BETWEEN
                     2 PRECEDING
                     AND CURRENT ROW),
       2) AS avg_last2_ROWS
  FROM sample.demo.astronauts
 WHERE name IN ('Nicollier, Claude', 'Ross, Jerry L.')  
ORDER BY name, m_yr;
w7 = W.partition_by("name").order_by("m_yr") \
	.range_between(-2, W.CURRENT_ROW)
	
aDF.withColumn("avg_last2_YEARS", 
		round(F.avg("m_hrs").over(w7), 2)) \
	.withColumn("avg_last2_ROWS", 
		round(F.avg("m_hrs").over(w6), 2)) \
	.sort("name", "m_yr").show(20)

To call out an example from above, look at Claude’s 1999 mission. The avg_last2_rows value of 330.94 is the average of 191.18, 541.67, and 259.97. Conversely, his avg_last2_years is only the current value of 191.18 as there were no other missions looking backwards 2 years.

Or… just run some SQL

While the programmer in me likes the method chaining code you see in this post, you could still use the Session object’s sql() function to just write some SQL.

Start with SQL…

You could start off with some SQL to get an initial DataFrame and then perform some more method chaining for additional transformations.

twoAs_fromSQL = session.sql(
    "SELECT name, "\
    "       year_of_mission AS m_yr, "\
    "       hours_mission AS m_hrs "\
    "  FROM sample.demo.astronauts "\
    " WHERE name IN ('Nicollier, Claude', "\
    "                'Ross, Jerry L.')")

w8 = W.partition_by("name")

twoAs_fromSQL.withColumn("tot_m_hrs", 
		F.sum("m_hrs").over(w2)) \
	.sort("name", "m_yr").show(20)

Do it all with SQL…

Or just put the whole SQL statement inside the sql() call.

ALL_fromSQL = session.sql(
    "SELECT name, "\
    "       year_of_mission AS m_yr, "\
    "       hours_mission AS m_hrs, "\
    "       SUM(hours_mission) "\
    "          OVER (PARTITION BY name) "\
    "       AS tot_m_hrs "\
    "  FROM sample.demo.astronauts "\
    " WHERE name IN ('Nicollier, Claude', "\
    "                'Ross, Jerry L.')"\
    " ORDER BY name, m_yr ")

ALL_fromSQL.show(20)

You definitely have some optionality with the DataFrame API.

The code

Here is the code all in one file; astronauts.py.

import trino
from pystarburst import Session
from pystarburst import functions as F
from pystarburst.functions import col, lag, lead, row_number, round
from pystarburst.window import Window as W

db_parameters = {
    "host": "lXXXXXXXXXXXXr.trino.galaxy.starburst.io",
    "port": 443,
    "http_scheme": "https",
    "auth": trino.auth.BasicAuthentication("lXXXXXX/XXXXXXn", "<password>")
}
session = Session.builder.configs(db_parameters).create()


print("")
print("---------------------------")
print("Take a peek at a couple of astronauts missions")

a = session.table("sample.demo.astronauts")
a.show(10);


print("")
print("---------------------------")
print("What does the schema look like?")

#Get all column names and their types
for field in a.schema.fields:
    print(field.name +" , "+str(field.datatype))


print("")
print("---------------------------")
print("Apply some projection & filtering")

# identify the two astronauts we want to focus on
li = ["Nicollier, Claude", "Ross, Jerry L."]
twoAs = a.select("name", "nationality", \
				 "year_of_mission", "hours_mission") \
	.rename("year_of_mission", "m_yr") \
	.rename("hours_mission", "m_hrs") \
	.filter(a.name.isin(li)) \
	.sort("name", "m_yr")
twoAs.show(20)


print("")
print("---------------------------")
print("See how each mission compares with an ")
print("  OVERALL average across ALL missions")

# trim out the nation column
aDF = twoAs.drop("nationality")

aDF.withColumn("avg_all_m_hrs", F.avg("m_hrs").over()) \
	.sort("name", "m_yr").show(20)


print("")
print("---------------------------")
print("Have a window per person of all their rows")

# define the window specification
w2 = W.partition_by("name")

aDF.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
	.sort("name", "m_yr").show(20)


print("")
print("---------------------------")
print("Multiple aggs for the same window")

# chain another withColumn method
aDF.withColumn("avg_m_hrs", F.avg("m_hrs").over(w2)) \
	.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
	.sort("name", "m_yr").show(20)


print("")
print("---------------------------")
print("Another ex: multiple aggs for the same window")

# manipulate the window's agg value
aDF.withColumn("percent_of_tot", 
		round(aDF.m_hrs / F.sum("m_hrs").over(w2) * 100, 
			2)) \
	.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
	.sort("name", "m_yr").show(20)


print("")
print("---------------------------")
print("Different windows by different partition_by's")

# define another window specification
w3 = W.partition_by("m_yr")

aDF.withColumn("tot_m_hrs", F.sum("m_hrs").over(w2)) \
	.withColumn("tot_m_yr_for_all", 
		F.count("m_yr").over(w3)) \
	.sort("name", "m_yr").show(20)


print("")
print("---------------------------")
print("Ordering window contents")

# define another window specification
w4 = W.partition_by("name").order_by("m_yr")

# row_number
aDF.withColumn("m_nbr", row_number().over(w4)) \
	.sort("name", "m_yr").show(20)

# lag and lead
aDF.withColumn("prev_m_hrs", 
		lag(aDF.m_hrs, 1).over(w4)) \
	.withColumn("next_m_hrs", 
		lead(aDF.m_hrs, 1).over(w4)) \
	.sort("name", "m_yr").show(20)


print("")
print("---------------------------")
print("Rolling windows")

w5 = W.partition_by("name").order_by("m_yr") \
	.rows_between(W.UNBOUNDED_PRECEDING, W.CURRENT_ROW)

aDF.withColumn("running_tot_m_hrs", 
		F.sum("m_hrs").over(w5)) \
	.sort("name", "m_yr").show(20)

w6 = W.partition_by("name").order_by("m_yr") \
	.rows_between(-2, W.CURRENT_ROW)
	
# rollling avg over curr rec and last two
aDF.withColumn("avg_this_and_last2", 
		round(F.avg("m_hrs").over(w6), 2)) \
	.sort("name", "m_yr").show(20)
		
	
print("")
print("---------------------------")
print("Sophisticated window ranges")

w7 = W.partition_by("name").order_by("m_yr") \
	.range_between(-2, W.CURRENT_ROW)
	
aDF.withColumn("avg_last2_YEARS", 
		round(F.avg("m_hrs").over(w7), 2)) \
	.withColumn("avg_last2_ROWS", 
		round(F.avg("m_hrs").over(w6), 2)) \
	.sort("name", "m_yr").show(20)


print("")
print("---------------------------")
print("Start with some SQL")

twoAs_fromSQL = session.sql(
    "SELECT name, "\
    "       year_of_mission AS m_yr, "\
    "       hours_mission AS m_hrs "\
    "  FROM sample.demo.astronauts "\
    " WHERE name IN ('Nicollier, Claude', "\
    "                'Ross, Jerry L.')")

w8 = W.partition_by("name")

twoAs_fromSQL.withColumn("tot_m_hrs", 
		F.sum("m_hrs").over(w2)) \
	.sort("name", "m_yr").show(20)


print("")
print("---------------------------")
print("Do it all with SQL")

ALL_fromSQL = session.sql(
    "SELECT name, "\
    "       year_of_mission AS m_yr, "\
    "       hours_mission AS m_hrs, "\
    "       SUM(hours_mission) "\
    "          OVER (PARTITION BY name) "\
    "       AS tot_m_hrs "\
    "  FROM sample.demo.astronauts "\
    " WHERE name IN ('Nicollier, Claude', "\
    "                'Ross, Jerry L.')"\
    " ORDER BY name, m_yr ")

ALL_fromSQL.show(20)

Published by lestermartin

Software development & data engineering trainer/evangelist/consultant currently focused on data lake frameworks including Hive, Spark, Kafka, Flink, NiFi, and Trino/Starburst.

Leave a comment