
Spark SQL offers an API functions approach to building a query as well as a mechanism to simply run good old fashion SQL statements. Spark SQL also lets us produce our own user-defined scalar functions (UDFs) for when we need to bring our own special sauce to our queries.
The following is a quick example of declaring a Scala function then elevating it to be usable in both the API and SQL approaches of Spark SQL.
Setting the Stage
For our use case, we will imagine a package delivery company needs to surface their shipment costs calculation function into Spark SQL. To get started, let’s create some data we can work with.
//define a class
case class Shipment(shipId:Int, custId:Int,
srcZip:String, dstZip:String, weight:Float)
//create a local collection of Shipment objects
val shipmentsLocal = Seq(
Shipment(99009, 101, "75067", "30004", 3.1f),
Shipment(99010, 104, "12345", "54321", 7.4f))
//parallelize it to a DF
val shipmentDF = shipmentsLocal.toDF
shipmentDF.show()
+------+------+------+------+------+
|shipId|custId|srcZip|dstZip|weight|
+------+------+------+------+------+
| 99009| 101| 75067| 30004| 3.1|
| 99010| 104| 12345| 54321| 7.4|
+------+------+------+------+------+
For simplicity’s sake, we can now define a bogus method to calculate our costs.
//create a simple method
def calcShipCost( sourcePostalCode:String,
destinationPostalCode:String,
weightInPounds:Float) :
Float = {
return weightInPounds * 5.1f
}
//quick test
print( calcShipCost("bla", "bla", 3.0f) )
15.299999
Using the UDF via the API Approach
import org.apache.spark.sql.functions.udf
//wrap the method so it can be used w/DF API
val calcShipCostUDF = udf(calcShipCost _)
//use the UDF
shipmentDF.select( col("shipId"),
calcShipCostUDF(
col("srcZip"), col("dstZip"), col("weight"))
.as("shipCost")).show()
+------+---------+
|shipId| shipCost|
+------+---------+
| 99009|15.809999|
| 99010| 37.74|
+------+---------+
Leveraging the UDF in a SQL Statement
//give the function a moniker to be used in SQL
spark.udf.register("CALC_SHIP_COST", calcShipCost _)
//give the DF a moniker to be used in SQL, too
shipmentDF.createOrReplaceTempView("SHIPMENTS")
//use the monikers
spark.sql("SELECT shipId, " +
" CALC_SHIP_COST( " +
" srcZip, dstZip, weight) " +
" AS shipCost " +
" FROM SHIPMENTS").show()
+------+---------+
|shipId| shipCost|
+------+---------+
| 99009|15.809999|
| 99010| 37.74|
+------+---------+