hello world with flink (from scratch)

If you have heard about Apache Flink, but have not gotten your hands dirty yet then this is the blog post for you. We will stand up a local development environment, write some simple code (much more to Flink than we will get into during this post), and then test it out.

And… yes, like my functional programming and big data (what a pair) and big data api’s look a lot alike (code comparisons with flink, kafka, spark, trident and pig) we’ll use the darn word count example. Also, it won’t be as robust as the one from project’s source code.

Setup Environment

<shamelessProductPlug>
  If you already have a Flink runtime environment, 
  such as Cloudera Data Platform (CDP),
  then you are ALREADY there!!
</shamelessProductPlug>

If not, or if you just want to set something small up on your workstation, let’s keep it simple (KISS) and just go over to Flink’s download page and select an appropriate version. Most often the latest version is the best and for me at time of this article, I went with 1.12.2 — specifically I downloaded Apache Flink 1.12.2 for Scala 2.11. Then we just need to unwind it and start it up.

MBP15:blog lmartin$ pwd
/Users/lmartin/blog
MBP15:blog lmartin$ ls
flink-1.12.2-bin-scala_2.11.tgz
MBP15:blog lmartin$ tar -xvf flink-1.12.2-bin-scala_2.11.tgz 
    ... lines removed ...
MBP15:blog lmartin$ tree -L 2
.
├── flink-1.12.2
│   ├── LICENSE
│   ├── NOTICE
│   ├── README.txt
│   ├── bin
│   ├── conf
│   ├── examples
│   ├── lib
│   ├── licenses
│   ├── log
│   ├── opt
│   └── plugins
└── flink-1.12.2-bin-scala_2.11.tgz

9 directories, 4 files
MBP15:blog lmartin$ cd flink-1.12.2/bin
MBP15:bin lmartin$ ./start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host MBP15.local.
Starting taskexecutor daemon on host MBP15.local.
MBP15:bin lmartin$ 

Now we can point our browser to http://localhost:8081 to see the Flink UI.

That wasn’t too hard, now was it?

Code It!

In your favorite IDE, create a Maven Java project. Here’s how I identified my project, but call yours whatever you would like.

    <groupId>com.github.lestermartin</groupId>
    <artifactId>flink-exploration</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

And, yes… for those front-row kids, that suggests that all of this code is in github (it is!). The posting will assuming you are building this following my notes, but obviously could just clone or download my blog-wc-batch branch instead.

You will also need some dependencies, so add these to your pom.xml and be mindful of the Flink version.

    <properties>
        <flink.version>1.12.2</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

Next up, create a wordcount Java package and within that a new Java class named WordCountBatch that has a main() method in it.

NOTE: This is a batch program — I will publish a follow-up blog post of modifying this same app to be a streaming solution. Fortunately, only a few lines of code will change and I thought it would be easier for this first attempt at coding a Flink job if we stuck to that KISS principle we used earlier.

Let’s just blast in some imports that will help us along the way.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;

Before we do anything else, let’s just make sure we can build our empty class with a mvn clean and install. We can verify the jar file is present, too.

MBP15:target lmartin$ pwd
/Users/lmartin/blog/flink-exploration/target
MBP15:target lmartin$ ls *.jar
flink-exploration-0.0.1-SNAPSHOT.jar
MBP15:target lmartin$ 

At the beginning of our main() method, add in a few lines of boilerplate code to deal with whatever execution environment we find ourselves in during job submission and to be able to access any command-line arguments that were passed in (ex: input file and output location).

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ParameterTool params = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(params);

This line will allow us to read in a text file from a command-line argument.

DataSet<String> lines = env.readTextFile(params.get("input"));

To help us, add the following inner class that can break a space-delimited string into single words and then morph each word into a KVP of (word,1) that we will use in a transformation of the lines dataset in just a minute.

//user defined function
public static final class Tokenizer 
    implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");

        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<>(token, 1));
            }
        }
    }
}

With the heavy lifting code now available as a user defined function, the rest of the word count processing is easily taken care of from the Flink API itself.

DataSet<Tuple2<String, Integer>> wordcounts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        lines.flatMap(new Tokenizer())
                // group by the tuple field "0" and sum up tuple field "1"
                .groupBy(0).sum(1);

Lastly, we can write the wordcounts dataset out to disk to review the calculations.

wordcounts.writeAsCsv(params.get("output"), "\n", " ");
// the next line triggers the lazy execution to begin
env.execute("WordCount Batch Example");

Package up the jar file again with the mvn install command.

Run It!

Create a simple text file to test with. In my github project referenced earlier, this is in src/test/resources/BitOfGreenEggsAndHam.txt.

DO WOULD YOU LIKE GREEN EGGS AND HAM
I DO NOT LIKE THEM SAM I AM
I DO NOT LIKE GREEN EGGS AND HAM

Submit the job from the CLI. Be mindful of your classname, jar file location, and the input & output file names to fit your specific setup.

MBP15:bin lmartin$ pwd
/Users/lmartin/blog/flink-1.12.2/bin
MBP15:bin lmartin$ ./flink run --class wordcount.WordCountBatch \
> ~/blog/flink-exploration/target/flink-exploration-0.0.1-SNAPSHOT.jar \
> --input file:///Users/lmartin/blog/flink-exploration/src/test/resources/BitOfGreenEggsAndHam.txt \
> --output file:///Users/lmartin/blog/data/wc-tinyBit-OUT
Job has been submitted with JobID c2079da12c2c0e89a56a1b325f75e5dc
Program execution finished
Job with JobID c2079da12c2c0e89a56a1b325f75e5dc has finished.
Job Runtime: 241 ms

MBP15:bin lmartin$ 

This batch job took less than a second to complete since it was so small. You can find the job in the Flink UI by looking down at the completed jobs.

As expected, the output file is even smaller.

MBP15:data lmartin$ pwd
/Users/lmartin/blog/data
MBP15:data lmartin$ ls
wc-tinyBit-OUT
MBP15:data lmartin$ cat wc-tinyBit-OUT 
am 1
and 2
do 3
eggs 2
green 2
ham 2
i 3
like 3
not 2
sam 1
them 1
would 1
you 1
MBP15:data lmartin$

Remember, you can always pull down the blog-wc-batch branch of flink-exploration github project if that would be easier. There is even a complete text version of Green Eggs and Ham to word count on. If you run it, comment below on the number of times green, eggs, and ham were present. To help make sure it all ran correctly, there were 82 instances of not in the text.

Published by lestermartin

Developer advocate, trainer, blogger, and data engineer focused on data lake & streaming frameworks including Trino, Hive, Spark, Flink, Kafka and NiFi.

Leave a Reply

Discover more from Lester Martin (l11n)

Subscribe now to keep reading and get access to the full archive.

Continue reading