
UPDATE (2024-05-08): Check out joining spark dataframes with identical column names (an easier way), too.
Earlier today I was asked what happens when joining two Spark DataFrames that both have a column (not being used for the join) with the same name. Shame on me as I should have had a quick answer, not just a conjecture. What was my anticipated outcome? I figured that Spark SQL would have either done same swizzling of the duplicate names, or that it would throw a runtime exception complaining about it. Since I have never seen Spark SQL rename any columns before on its own, my money was on the RTE.
Since I didn’t know for sure, I stated that and said “let’s go see!” I decided to try out a couple of other scenarios, as well. Here’s what I found.
The Classic; Join on DataFrames with Identical Column Names
Let’s stand up some simple DFs for customers and orders that we can eventually join together.
case class Customer(custId:Int, name:String)
val customersData = Seq(
Customer(101, "Lester"),
Customer(102, "Gretchen"),
Customer(103, "Zoe"),
Customer(104, "Connor"))
val customerDF = customersData.toDF
customerDF.show()
+------+--------+
|custId| name|
+------+--------+
| 101| Lester|
| 102|Gretchen|
| 103| Zoe|
| 104| Connor|
+------+--------+
case class Order(orderId:Int, custId:Int, totalPrice:Float)
val ordersData = Seq(
Order(8888, 101, 33.33f),
Order(8889, 101, 66.66f),
Order(8890, 102, 101.01f),
Order(8891, 101, 99.99f))
val orderDF = ordersData.toDF
orderDF.show()
+-------+------+----------+
|orderId|custId|totalPrice|
+-------+------+----------+
| 8888| 101| 33.33|
| 8889| 101| 66.66|
| 8890| 102| 101.01|
| 8891| 101| 99.99|
+-------+------+----------+
Let’s join them.
val joinedDF = orderDF.join(customerDF,
orderDF("custId") === customerDF("custId"))
joinedDF.show()
+-------+------+----------+------+--------+
|orderId|custId|totalPrice|custId| name|
+-------+------+----------+------+--------+
| 8891| 101| 99.99| 101| Lester|
| 8889| 101| 66.66| 101| Lester|
| 8888| 101| 33.33| 101| Lester|
| 8890| 102| 101.01| 102|Gretchen|
+-------+------+----------+------+--------+
As you can see in the output above and the schema below, we end up having two columns named custId which hold the exact same values.
joinedDF.printSchema
root
|-- orderId: integer (nullable = false)
|-- custId: integer (nullable = false)
|-- totalPrice: float (nullable = false)
|-- custId: integer (nullable = false)
|-- name: string (nullable = true)
This causes us trouble when trying to directly use this duplication of fields with the same names.
joinedDF.where("custId > 100").show
AnalysisException: Reference 'custId' is ambiguous, could be: custId, custId.; line 1 pos 0
A well known fix is documented here and is shown below of only including a single column named custId.
val joinedDF2 = orderDF.join(customerDF,
Seq("custId"))
joinedDF2.show()
+------+-------+----------+--------+
|custId|orderId|totalPrice| name|
+------+-------+----------+--------+
| 101| 8891| 99.99| Lester|
| 101| 8889| 66.66| Lester|
| 101| 8888| 33.33| Lester|
| 102| 8890| 101.01|Gretchen|
+------+-------+----------+--------+
Perfect as with that silly “ambiguous” exception early, we could not have prevented both columns from being eliminated if we tried to get rid of one of them. Additionally, both would be renamed if we tried that instead.
joinedDF.drop("custID").show()
+-------+----------+--------+
|orderId|totalPrice| name|
+-------+----------+--------+
| 8891| 99.99| Lester|
| 8889| 66.66| Lester|
| 8888| 33.33| Lester|
| 8890| 101.01|Gretchen|
+-------+----------+--------+
joinedDF.withColumnRenamed("custID", "custID2").show()
+-------+-------+----------+-------+--------+
|orderId|custID2|totalPrice|custID2| name|
+-------+-------+----------+-------+--------+
| 8891| 101| 99.99| 101| Lester|
| 8889| 101| 66.66| 101| Lester|
| 8888| 101| 33.33| 101| Lester|
| 8890| 102| 101.01| 102|Gretchen|
+-------+-------+----------+-------+--------+
Good stuff.
What If the Duplicate Column Was Not Being Joined On?
This scenario is actually the one I was being asked about in the first place. Let’s just add a notes column to both of the DFs and jam them with some bogus data.
val customerWithNotesDF = customerDF.withColumn(
"notes", lit("bogus cust note"))
customerWithNotesDF.show(1)
+------+------+---------------+
|custId| name| notes|
+------+------+---------------+
| 101|Lester|bogus cust note|
+------+------+---------------+
val orderWithNotesDF = orderDF.withColumn(
"notes", lit("bogus order note"))
orderWithNotesDF.show(1)
+-------+------+----------+----------------+
|orderId|custId|totalPrice| notes|
+-------+------+----------+----------------+
| 8888| 101| 33.33|bogus order note|
+-------+------+----------+----------------+
Now that we know we can multiple DF columns with the same name, we could imagine that notes will be there twice now and only the bogus data values themselves give us a hint as to which notes field came from which DF.
val joinedDF3 = orderWithNotesDF.join(
customerWithNotesDF, Seq("custId"))
joinedDF3.drop("totalPrice", "name").show(2)
+------+-------+----------------+---------------+
|custId|orderId| notes| notes|
+------+-------+----------------+---------------+
| 101| 8888|bogus order note|bogus cust note|
| 101| 8889|bogus order note|bogus cust note|
+------+-------+----------------+---------------+
Like before, this is going to be “ambiguous” again…
joinedDF3.select("custId", "totalPrice", "notes").show()
AnalysisException: Reference 'notes' is ambiguous, could be: notes, notes.;
How do we fix it? Well, we just do a little name swizzling of our own. We could have done this before the join, but probably just easier to do on the fly.
val joinedDF4 = orderWithNotesDF.withColumnRenamed(
"notes", "orderNotes").join(
customerWithNotesDF.withColumnRenamed(
"notes", "custNotes"),
Seq("custId"))
joinedDF4.drop("totalPrice", "name").show(2)
+------+-------+----------------+---------------+
|custId|orderId| orderNotes| custNotes|
+------+-------+----------------+---------------+
| 101| 8888|bogus order note|bogus cust note|
| 101| 8889|bogus order note|bogus cust note|
+------+-------+----------------+---------------+
Easy peasey
A Twist on the Classic; Join on DataFrames with DIFFERENT Column Names
For this scenario, let’s assume there is some naming standard (sounds like they didn’t read my fruITion and recrEAtion (a double-header book review) post) declared that the primary key (yes, we don’t really have PKs here, but you know what I mean) of ever table that uses a surrogate value just be called id. We can just swizzle the original customer and order DFs to conform to the standard.
val customerWithPkOfIdDF =
customerDF.withColumnRenamed("custId", "id")
customerWithPkOfIdDF.show(2)
+---+--------+
| id| name|
+---+--------+
|101| Lester|
|102|Gretchen|
+---+--------+
val orderWithPkOfIdDF =
orderDF.withColumnRenamed("orderId","id")
orderWithPkOfIdDF.show(2)
+----+------+----------+
| id|custId|totalPrice|
+----+------+----------+
|8888| 101| 33.33|
|8889| 101| 66.66|
+----+------+----------+
Yep, you can guess what this is going to look like I’m betting when we join them!!
val joinedDF5 = orderWithPkOfIdDF.join(
customerWithPkOfIdDF,
orderWithPkOfIdDF("custId") ===
customerWithPkOfIdDF("id"))
joinedDF5.show()
+----+------+----------+---+--------+
| id|custId|totalPrice| id| name|
+----+------+----------+---+--------+
|8891| 101| 99.99|101| Lester|
|8889| 101| 66.66|101| Lester|
|8888| 101| 33.33|101| Lester|
|8890| 102| 101.01|102|Gretchen|
+----+------+----------+---+--------+
That first id column is from the orders and the second id column is from customers (and is the same thing as the custId column). What a mess and absolutely something like the following is going to get you another “ambiguous” exception!!
joinedDF5.where("id > 100").show()
AnalysisException: Reference 'id' is ambiguous, could be: id, id.; line 1 pos 0
The fix? Well, we just merge the other two scenarios’ solutions into one! We just rename the customers id field to custID on the fly inside the join operation and then we can use the abbreviated condition of Seq("custId") to ensure we only have one column for the join key.
val joinedDF6 = orderWithPkOfIdDF.join(
customerWithPkOfIdDF.withColumnRenamed(
"id", "custId"), Seq("custId"))
joinedDF6.show(3)
+------+----+----------+------+
|custId| id|totalPrice| name|
+------+----+----------+------+
| 101|8891| 99.99|Lester|
| 101|8889| 66.66|Lester|
| 101|8888| 33.33|Lester|
+------+----+----------+------+
Wrap-Up
In some ways, all of that might have been harder than it should have been. What could likely have been much easier? Well, just submitting a SQL statement via the SparkSession.sql function, but where’s the fun in that!!
NOTE: I used Zeppelin for testing this all out and I exported the notebook should you want to leverage it.
Well, feel free to check out joining spark dataframes with identical column names (an easier way) for, as it says, an “easier way” to do this. ;)