Apache Flink
This tutorial shows how to integrate Upstash Kafka with Apache Flink
Apache Flink is a distributed processing engine which can process streaming data.
Upstash Kafka Setup
Create a Kafka cluster using Upstash Console or Upstash CLI by following Getting Started.
Create two topics by following the creating topic steps. Let’s name first topic “input”, since we are going to stream this topic to other one, which we can name it as “output”.
Project Setup
If you already have a project and want to implement Upstash Kafka and Apache Flink integration into it, you can skip this section and continue with Add Apache Flink and Kafka into the Project.
Install Maven to your machine by following Maven Installation Guide.
Run mvn –version
in a terminal or in a command prompt to make sure you have
Maven downloaded.
It should print out the version of the Maven you have:
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: D:\apache-maven-3.6.3\apache-maven\bin\..
Java version: 1.8.0_232, vendor: AdoptOpenJDK, runtime: C:\Program Files\AdoptOpenJDK\jdk-8.0.232.09-hotspot\jre
Default locale: en_US, platform encoding: Cp1250
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"
To create the Maven project;
Go into the folder that you want to create the project in your terminal or
command prompt by running cd <folder path>
Run the following command:
mvn archetype:generate -DgroupId=com.kafkaflinkinteg.app -DartifactId=kafkaflinkinteg-app -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
Add Apache Flink and Kafka into the Project
Open the project folder by using an IDE which has maven plugin such as Intellij,
Visual Studio, Eclipse etc. Add following Apache Flink dependencies into the
dependencies tag in pom.xml
file.
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.16.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.16.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.16.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.16.0</version>
</dependency>
Streaming From One Topic to Another Topic
You need to create 2 more classes (LineSplitter, CustomSerializationSchema) for word count example.
LineSplitter
This class will be custom implementation of FlatMapFunction from Apache Flink
client library. It takes a sentence, splits into words and returns a
two-dimensional Tuple in format: (<word>, 1)
.
Create LineSplitter class as following.
package com.kafkaflinkinteg.app;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Implements the string tokenizer that splits sentences into words as a user-defined * FlatMapFunction. The function takes a line (String) and splits it into * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
*/
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
CustomSerializationSchema
This class will be a custom implementation of KafkaRecordSerializationSchema from Apache Flink Kafka connector library. It will provide a schema for serializing and converting data from two-dimensional Tuple, which will be the output of word counting process, to Kafka record format.
Create CustomSerializationSchema class as following:
package com.kafkaflinkinteg.app;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerRecord;
public class CustomSerializationSchema<T> implements KafkaRecordSerializationSchema<Tuple2<String, Integer>> {
private String topic;
private ObjectMapper mapper;
public CustomSerializationSchema(String topic) {
this.topic = topic;
}
@Override
public void open(SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext) throws Exception {
KafkaRecordSerializationSchema.super.open(context, sinkContext);
}
@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, Integer> stringIntegerTuple2, KafkaSinkContext kafkaSinkContext, Long aLong) {
byte[] k = null;
byte[] v = null;
if (mapper == null) {
mapper = new ObjectMapper();
}
try {
k = mapper.writeValueAsBytes(stringIntegerTuple2.f0);
v = mapper.writeValueAsBytes(stringIntegerTuple2.f1);
} catch ( JsonProcessingException e) {
// error
}
return new ProducerRecord<>(topic, k,v);
}
}
Integration
Import the following packages first:
package com.kafkaflinkinteg.app;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Properties;
Define the names of the topics you are going to work on:
String inputTopic = "input";
String outputTopic = "output";
Create the following properties for Apache Flink Kafka connector and replace
UPSTASH-KAFKA-*
placeholders with your cluster information.
Properties props = new Properties();
props.put("transaction.timeout.ms", "90000"); // e.g., 2 hours
props.put("bootstrap.servers", "UPSTASH-KAFKA-ENDPOINT:9092");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"UPSTASH-KAFKA-USERNAME\" password=\"UPSTASH-KAFKA-PASSWORD\";");
Get the stream execution environment to create and execute the pipeline in it.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Create the Kafka consumer.
KafkaSource<String> source = KafkaSource.<String>builder()
.setStartingOffsets(OffsetsInitializer.earliest())
.setProperties(props)
.setTopics(inputTopic)
.setGroupId("my-group")
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
Implement the stream processing part, which will take the input sentence from source and count words.
DataStream<Tuple2<String, Integer>> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
.flatMap(new LineSplitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
You can see the output by printing the data stream.
stream.print();
If you produce message to the input topic from your console, you will see the output like this:
2> (This,1)
1> (an,1)
3> (is,1)
2> (sentence,1)
4> (example,1)
Next, create a Kafka producer to sink the data stream to output Kafka topic.
KafkaSink sink = KafkaSink.<String>builder()
.setKafkaProducerConfig(props)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("integ")
.setRecordSerializer(new CustomSerializationSchema(outputTopic))
.build();
stream.sinkTo(sink);
Finally, execute the Stream execution environment that was retrieved and run it.
env.execute();
Was this page helpful?