
“Functional Programming” and “Big Data” (let’s just call them FP\BD) are both pretty comprehensive topics on their own, but let’s see if we can make some elementary sense of them. I started my Hadoop journey with Java MapReduce, Pig and Hive (see my Open Georgia Analysis series for more on those technologies) and I have to admit that when I first encountered Spark I was a bit confused about immutable datasets going through transformations and creating new (still immutable) datasets. Probably the fact that it was also my introduction to RDD programming didn’t help either.
In my defense, programming with the Resilient Distributed Dataset (RDD) API probably does look a bit odd when you have come from IP and OOP paradigms (check out Compare Functional Programming, Imperative Programming and Object Oriented Programming for more info).
val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
A seasoned Spark programmer can understand this code in a few seconds. It is the canonical “hello world” example called word counting. It is also a good working example to try to explain what this whole FP\BD intersection is all about. This post presents this at a high-level and backs it up with the actual working example from above for those from the “Show-Me” state.
Functional Programming Implications
First, we should review a few of the key principles of functional programming that really matter in the big data context. These concepts allow programmers to focus on describing WHAT needs to get done, not on exactly HOW to do it.
- Immutable data: Datasets cannot be changed at all; DS1A can be transformed into a (new) DS1B, but an individual element within DS1A cannot be independently modified
- No state or side effects: No interaction with, or modification of, any values or properties outside of a function
- Behavioral consistency: If you pass the same value into a function multiple times, you will always get the same result
- Functions as arguments: function results (including anonymous functions) can be passed as input/arguments to other functions
- Lazy evaluation: Programs wait as long as they can before doing actual work; usually when they encounter a line of code that outputs results
Use Case
Our simple use case is determine from an input text file a discrete list of words along with the number of instances of those words. We can use the following text file for our exploration.
$ hdfs dfs -cat input.txt
we can go
we can go away in a boat
away in a big blue boat
This short file can be processed quickly with nothing more than some pen & paper to help find answers such as there are 2 instances of both of the following words; we and boat. However, if this was a much bigger file (or bunches of big files) it would be next to impossible to do hand-crank this in any timely fashion.
On that note, when we use a FP\BD framework such as Spark these logical operations in the following steps occur in parallel. Again, the intention of this post is to provide a logical view so you can initially imagine the processing occurs in a single-threaded manner. Obviously, that will never scale but trying to explain parallel processing fundamentals, such as the MapReduce paradigm executing on many servers at once, are more than we need to tackle right now.
Step 1: Read the File
Initially, we just need to read the text file into a dataset. Our text file above becomes a dataset with 3 elements; each representing a string of text. This operation is a creator of a dataset and not a transformation activity, but the dataset is still immutable.
| we can go |
| we can go away in a boat |
| away in a big blue boat |
“Show-Me”…
scala> val textFile = sc.textFile("input.txt")
scala> textFile.count()
res0: Long = 3
scala> textFile.collect()
res1: Array[String] = Array(we can go, we can go away in a boat, away in a big blue boat)
The code above reads the input.txt file into a local dataset named textFile which is essentially an array of 3 strings.
Step 2: Split into Words
Our first true transformation operation is to loop through the 3-element dataset above and tokenize the long strings (which are space-delimited) into individual words. FP\BD frameworks often call this mapping or projecting. If we used one of these basic methods (such as Spark’s map), then we would end up halfway there with another 3-element dataset as these mappers take an element from the original dataset and create an element into the new dataset. This would look something like the following.
| Array(we, can, go) |
| Array(we, can, go, away, in, a, boat) |
| Array(away, in, a, big, blue, boat) |
For our purpose, this list of arrays that each have a different number of the same kind of data (i.e. words) will not be all that valuable for our use case. Fortunately, these frameworks have many transformation operations such as flat-mapping which is essentially the same processing except that it pivots the results so that each array member that makes up an element in the overall dataset becomes an independent element like shown below.
| we |
| can |
| go |
| we |
| can |
| go |
| away |
| in |
| a |
| boat |
| away |
| in |
| a |
| big |
| blue |
| boat |
“Show-Me”…
scala> val words = textFile.flatMap(line => line.split(" "))
scala> words.count()
res2: Long = 16
scala> words.collect()
res3: Array[String] = Array(we, can, go, we, can, go, away, in, a, boat, away, in, a, big, blue, boat)
The code above executes a flatMap transformation on all 3 elements from textFile which creates a new immutable dataset called words whose 16 elements are all the individual words.
Step 3: Prepare for Aggregation
Now we need to modify our dataset so that we can aggregate on it. A lot of the let’s-not-talk-about-it-now MapReduce paradigm way down at the lowest levels depends on key-value pairs (KVP). Why? Because the frameworks use that to gather up all the like-minded data into one place for some final rollup activity. For our case, we want to transform the dataset of 16 words into a dataset of 16 KVPs where the key is the word itself and the value is just a hard-coded number 1. Yep… that’ll make more sense in the next step…
| (we, 1) |
| (can, 1) |
| (go, 1) |
| (we, 1) |
| (can, 1) |
| (go, 1) |
| (away, 1) |
| (in, 1) |
| (a, 1) |
| (boat, 1) |
| (away, 1) |
| (in, 1) |
| (a, 1) |
| (big, 1) |
| (blue, 1) |
| (boat, 1) |
“Show-Me”…
scala> val KVPs = words.map(word => (word,1))
scala> KVPs.count()
res4: Long = 16
scala> KVPs.take(2)
res5: Array[(String, Int)] = Array((we,1), (can,1))
The code above executes a map transformation on the 16 element words dataset and just creates a simple KVP for each word in the new KVPs dataset. It still has 16 elements, but just showing the first 2 for readability.
Step 4: Group and Count
As hinted around above, we are now transitioning into the “reduce” side of the house and this allows us to work on all the values for a given key at one time — and all of the processing for the distinct keys can be parallelized. We usually have at least two ways to do something like this (hint: for those Missouri citizens, search for “MapReduce reduce vs combine”). Regardless of how you get there, we want to get to the following new dataset.
| (boat, 2) |
| (away, 2) |
| (can, 2) |
| (big, 1) |
| (we, 2) |
| (blue, 1) |
| (go, 2) |
| (a, 2) |
| (in, 2) |
“Show-Me”…
scala> val wordCounts = KVPs.reduceByKey(_ + _)
scala> wordCounts.count()
res6: Long = 9
scala> wordCounts.collect()
res7: Array[(String, Int)] = Array((boat,2), (away,2), (can,2), (big,1), (we,2), (blue,1), (go,2), (a,2), (in,2))
Yep… let’s just say that reduceByKey is an optimized reducer concept called a combiner… and we just don’t want to get into Scala’s shorthand for anonymous function’s arguments… 😉
Step 5: Output the Results
We did it! All we have left to do is output the results somewhere from the final dataset in blue above. We should see something like the following in the persisted results.
$ hdfs dfs -cat output/*
(boat,2)
(away,2)
(can,2)
(big,1)
(we,2)
(blue,1)
(go,2)
(a,2)
(in,2)
“Show-Me”…
scala> wordCounts.saveAsTextFile("output")
Happy Functional Programming with Big Data and let me know if any of this made any sense at all in the comments section below.