building a spark sql udf with scala (using multiple arguments)

Spark SQL offers an API functions approach to building a query as well as a mechanism to simply run good old fashion SQL statements. Spark SQL also lets us produce our own user-defined scalar functions (UDFs) for when we need to bring our own special sauce to our queries.

The following is a quick example of declaring a Scala function then elevating it to be usable in both the API and SQL approaches of Spark SQL.

Setting the Stage

For our use case, we will imagine a package delivery company needs to surface their shipment costs calculation function into Spark SQL. To get started, let’s create some data we can work with.

//define a class
case class Shipment(shipId:Int, custId:Int, 
        srcZip:String, dstZip:String, weight:Float)

//create a local collection of Shipment objects
val shipmentsLocal = Seq(
        Shipment(99009, 101, "75067", "30004", 3.1f),
        Shipment(99010, 104, "12345", "54321", 7.4f))
        
//parallelize it to a DF        
val shipmentDF = shipmentsLocal.toDF
shipmentDF.show()

+------+------+------+------+------+
|shipId|custId|srcZip|dstZip|weight|
+------+------+------+------+------+
| 99009|   101| 75067| 30004|   3.1|
| 99010|   104| 12345| 54321|   7.4|
+------+------+------+------+------+

For simplicity’s sake, we can now define a bogus method to calculate our costs.

//create a simple method
def calcShipCost( sourcePostalCode:String, 
                  destinationPostalCode:String,
                  weightInPounds:Float) : 
            Float = {
        return weightInPounds * 5.1f
}

//quick test 
print( calcShipCost("bla", "bla", 3.0f) )

15.299999

Using the UDF via the API Approach

import org.apache.spark.sql.functions.udf
//wrap the method so it can be used w/DF API
val calcShipCostUDF = udf(calcShipCost _)

//use the UDF
shipmentDF.select( col("shipId"), 
    calcShipCostUDF( 
        col("srcZip"), col("dstZip"), col("weight"))
    .as("shipCost")).show()

+------+---------+
|shipId| shipCost|
+------+---------+
| 99009|15.809999|
| 99010|    37.74|
+------+---------+

Leveraging the UDF in a SQL Statement

//give the function a moniker to be used in SQL
spark.udf.register("CALC_SHIP_COST", calcShipCost _)

//give the DF a moniker to be used in SQL, too
shipmentDF.createOrReplaceTempView("SHIPMENTS")

//use the monikers
spark.sql("SELECT shipId, " +
          "       CALC_SHIP_COST( " + 
          "         srcZip, dstZip, weight) " +
          "       AS shipCost " +
          "  FROM SHIPMENTS").show()

+------+---------+
|shipId| shipCost|
+------+---------+
| 99009|15.809999|
| 99010|    37.74|
+------+---------+

Published by lestermartin

Developer advocate, trainer, blogger, and data engineer focused on data lake & streaming frameworks including Trino, Hive, Spark, Flink, Kafka and NiFi.

Leave a Reply

Discover more from Lester Martin (l11n)

Subscribe now to keep reading and get access to the full archive.

Continue reading