mv’ing batch flink to streaming (easy breezy)

This is the follow-up post mentioned in hello world with flink (from scratch) that shows how to take that simple batch application and modify it to be a real-time streaming job.

Create a Source

Streaming apps need a continuous source of data to process. This could be all kinds of things including using Apache Kafka, but for our example we are continuing to use the KISS principle. We will just connect to a socket and expect strings of words to be sent to us. To stand up a test harness (manual) generator, just run the following command.

nc -l 9999

This will just put the cursor on the following line and you can type data and hit the <ENTER> key to submit that string out over port 9999.

NOTE: You might simply <CTRL-Z> this until you are ready to submit your job.

Modify the Code

If you just completed the code in hello world with flink, then you are ready to go. If not, you can clone the blog-wc-batch branch of my flink-exploration github project.

MBP15:blog2 lmartin$ pwd
/Users/lmartin/blog2
MBP15:blog2 lmartin$ git clone --branch blog-wc-batch --single-branch https://github.com/lestermartin/flink-exploration.git
MBP15:blog2 lmartin$ tree
.
└── flink-exploration
    ├── README.md
    ├── pom.xml
    └── src
        ├── main
        │   └── java
        │       └── wordcount
        │           └── WordCountBatch.java
        └── test
            └── resources
                ├── BitOfGreenEggsAndHam.txt
                └── GreenEggsAndHam.txt

7 directories, 5 files
MBP15:blog2 lmartin$ 

NOTE: You could also just pull down the blog-wc-streaming branch if you don’t want to make the changes yourself. 😉

Step 1: Duplicate the Batch Class

Make a copy of WordCountBatch.java and call it WordCountStreaming.java.

Step 2: Update the Imports

You could do this while modifying the new streaming class, but I’m just trying to help you out here!! Add the following two lines.

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

Eventually, you can safely remove the DataSet and ExecutionEnvironment statements.

Step 3: Adjust the Boilerplating

Update the first line of the boilerplate code to use the new “environment” class just imported so it looks like the following.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Step 4: Stream Instead of Read

Change this line that was reading a file.

// read the input file
DataSet<String> lines =
  env.readTextFile(params.get("input"));

To now listen to the socket for continuous feeds of data.

// listen to a socket
DataStream<String> lines = 
  env.socketTextStream("localhost", 9999);

Notice that we not only called a different method on the env object, we are now creating a DataStream instead of a DataSet.

Step 5: Adapt to DataStream

The wordcounts streaming collection should now be of type DataStream and the groupBy() method after the flatMap() needs to be changed to a keyBy(). This code block should look similar to the following.

        //calculate word counts
        DataStream<Tuple2<String, Integer>> wordcounts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                lines.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .keyBy(0).sum(1);
Step 6: Simplify the Output

In a “real” application, we’ll likely need a sink that persists some/all of the streaming data somewhere, but let’s replace the whole // output the dataset section with just these two lines.

        wordcounts.print();
        env.execute("WordCount Streaming Example");

This will simply send our results to “standard out” which we can view in the Flink UI once we start our continuous job running.

Here is the diff output from my IntelliJ setup.

Here is a CLI diff output of the same changes.

MBP15:wordcount lmartin$ pwd
/Users/lmartin/blog/flink-exploration/src/main/java/wordcount
MBP15:wordcount lmartin$ ls
WordCountBatch.java	WordCountStreaming.java
MBP15:wordcount lmartin$ diff WordCountBatch.java WordCountStreaming.java 
4,5d3
< import org.apache.flink.api.java.DataSet;
< import org.apache.flink.api.java.ExecutionEnvironment;
7a6,7
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
10c10
< public class WordCountBatch
---
> public class WordCountStreaming
16c16
<         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
---
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
20,21c20,21
<         // read the input file
<         DataSet<String> lines = env.readTextFile(params.get("input"));
---
>         // listen to a socket
>         DataStream<String> lines = env.socketTextStream("localhost", 9999);
24c24
<         DataSet<Tuple2<String, Integer>> wordcounts =
---
>         DataStream<Tuple2<String, Integer>> wordcounts =
28c28
<                         .groupBy(0).sum(1);
---
>                         .keyBy(0).sum(1);
30,36c30,31
<         // output the dataset
<         if (params.has("output"))
<         {
<             wordcounts.writeAsCsv(params.get("output"), "\n", " ");
<             // the next line triggers the lazy execution to begin
<             env.execute("WordCount Batch Example");
<         }
---
>         wordcounts.print();
>         env.execute("WordCount Streaming Example");
MBP15:wordcount lmartin$ 

Build and Submit

Use maven to package up a jar file again. Our command-line submission doesn’t need any parameters to be sent into the class so it could be much more succinct.

MBP15:bin lmartin$ ./flink run \
> --class wordcount.WordCountStreaming \
> ~/blog/flink-exploration/target/flink-exploration-0.0.1-SNAPSHOT.jar 
Job has been submitted with JobID 851364dd2d8197f3d717e9349437ff7e

We should be able to see the running job in the http://localhost:8081 UI.

As an alternative, we could also set up the jar to be used for job submission from the UI. There is a “Submit New Job” link in the left nav and the rendered page has a “+ Add New” button in the upper-right of the page. Once you select your jar file from the upload dialogue box you can click on the name of the jar file in the UI to see some additional submit options.

For our simple job, we only need to update the “Entry Class” box with wordcount.WordCountStreaming and push the “Submit” button. This should send us straight to the “Overview” tab of this newly launched job.

Test It Out

One way or another, we should have the job running now. If it failed, go back and make sure the nc command is still working. If it is then you should be able to add some lines of text such as the following.

MBP15:~ lmartin$ nc -l 9999
now is the time for all good programmers to learn flink
flink is now all the time and programmers love to learn

While we can dive through the running job’s UI pages to find the “standard out” data, in this simple setup it will be much easier to just click on “Task Managers” on the left nav.

Click on the link just below the “Path, ID” heading and on the newly rendered page and then select the “Stdout” tab to see what is being counted by the job.

Looks about right as we have 2 instances of both programmers and learn showing up.

Obviously, we are barely scratching the surface of Flink. Fortunately, moving a batch application to streaming will keep the bulk of the core code the same with the most updates focused on adapting the source for sure, and possibly the sink.

Tear It Down!

Click on the “Cancel Job” link in the upper-right corner of the job detail page.

Perform a <CTRL-Z> on the nc command.

MBP15:~ lmartin$ nc -l 9999
now is the time for all good programmers to learn flink
flink is now all the time and programmers love to learn
^Z
[5]+  Stopped                 nc -l 9999
MBP15:~ lmartin$ 

You could always shut down Flink on your laptop if you’re done.

MBP15:bin lmartin$ pwd
/Users/lmartin/blog/flink-1.12.2/bin
lmartin-MBP15-8694:bin lmartin$ ./stop-cluster.sh 
Stopping taskexecutor daemon (pid: 76383) on host lmartin-MBP15-8694.local.
Stopping standalonesession daemon (pid: 76140) on host lmartin-MBP15-8694.local.
MBP15:bin lmartin$

Published by lestermartin

Developer advocate, trainer, blogger, and data engineer focused on data lake & streaming frameworks including Trino, Hive, Spark, Flink, Kafka and NiFi.

Leave a Reply

Discover more from Lester Martin (l11n)

Subscribe now to keep reading and get access to the full archive.

Continue reading