> ## Documentation Index
> Fetch the complete documentation index at: https://upstash-vector.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka Streams

> This tutorial shows how to integrate Upstash Kafka with Kafka Streams

[Kafka Streams](https://kafka.apache.org/documentation/streams/) is a client
library, which streams data from one Kafka topic to another.

### Upstash Kafka Setup

Create a Kafka cluster using [Upstash Console](https://console.upstash.com) or
[Upstash CLI](https://github.com/upstash/cli) by following
[Getting Started](https://docs.upstash.com/kafka).

Create two topics by following the creating topic
[steps](https://docs.upstash.com/kafka#create-a-topic). Let’s name first topic
“input”, since we are going to stream this topic to other one, which we can name
it as “output”.

### Project Setup

<Note>
  If you already have a project and want to use Kafka Streams with Upstash Kafka
  in it, you can skip this section and continue with [Add Kafka Streams into the
  Project](#add-kafka-streams-into-the-project).
</Note>

Install Maven to your machine by following [Maven Installation Guide](https://maven.apache.org/guides/getting-started/maven-in-five-minutes.html).

Run `mvn –version` in a terminal or in a command prompt to make sure you have
Maven downloaded.

It should print out the version of the Maven you have:

```
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: D:\apache-maven-3.6.3\apache-maven\bin\..
Java version: 1.8.0_232, vendor: AdoptOpenJDK, runtime: C:\Program Files\AdoptOpenJDK\jdk-8.0.232.09-hotspot\jre
Default locale: en_US, platform encoding: Cp1250
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"
```

To create the Maven project;

Go into the folder that you want to create the project in your terminal or
command prompt by running `cd <folder path>`

Run the following command:

```
mvn archetype:generate -DgroupId=com.kafkastreamsinteg.app -DartifactId=kafkastreamsinteg-app -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
```

### Add Kafka Streams into the Project

Open the project folder by using an IDE which has maven plugin such as Intellij,
Visual Studio, Eclipse etc. Add following dependencies into the dependencies tag
in `pom.xml` file.

```xml
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>3.3.1</version>
</dependency>

<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-reload4j</artifactId>
  <version>2.0.3</version>
</dependency>
```

### Streaming From One Topic to Another Topic

Import the following packages first:

```java
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Pattern;
```

Define the names of the topics you are going to work on:

```java
String inputTopic = "input";
String outputTopic = "output";
```

Create the following properties for Kafka Streams and replace UPSTASH-KAFKA-\*
placeholders with your cluster information.

```java
final Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "UPSTASH-KAFKA-ENDPOINT:9092");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"myLastNewProject");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule username=\"UPSTASH-KAFKA-USERNAME\" password=\"UPSTASH-KAFKA-PASSWORD\";");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.topicPrefix(TopicConfig.RETENTION_MS_CONFIG), 604800000); // 7 days for internal repartition topic retention period
props.put(StreamsConfig.topicPrefix(TopicConfig.CLEANUP_POLICY_CONFIG), TopicConfig.CLEANUP_POLICY_DELETE); // delete cleanup policy for internal repartition topic
props.put(StreamsConfig.topicPrefix(TopicConfig.RETENTION_BYTES_CONFIG), 268435456); // 256 MB for internal repartition topic retention size
```

Start the builder for streaming and assign input topic as the source:

```java
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(inputTopic);
```

Apply the following steps to count the words in the sentence sent to input topic
and stream the results to the output topic:

```java
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("countMapping");
materialized.withLoggingDisabled();
source.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
        .groupBy((key, word) -> word, Grouped.as("groupMapping"))
        .count(materialized).toStream().mapValues(Object::toString)
        .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
```

Since “groupby” function causing repartition and creation of a new internal
topic to store the groups intermediately, be sure that there is enough partition
capacity on your Upstash Kafka. For detailed information about the max partition
capacity of Kafka cluster, check [plans](https://upstash.com/#section-pricing).

Just to be sure, you can check from topic section on
[console](https://console.upstash.com) if the internal repartition topic created
successfully when you run your application and send data to input topic. For
reference, naming convention for internal repartition topics:

```
<application id><group name><repartition>
```

Next, finalize and build the streams builder. Create a topology of your process.
It can be viewed by printing.

```java
final Topology topology = builder.build();
System.out.println(topology.describe());
```

Here is the example topology in this scenario:

```
Topologies:

Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
--> groupMapping
<-- KSTREAM-SOURCE-0000000000
Processor: groupMapping (stores: [])
--> groupMapping-repartition-filter
<-- KSTREAM-FLATMAPVALUES-0000000001
Processor: groupMapping-repartition-filter (stores: [])
--> groupMapping-repartition-sink
<-- groupMapping
Sink: groupMapping-repartition-sink (topic: groupMapping-repartition)
<-- groupMapping-repartition-filter

Sub-topology: 1
Source: groupMapping-repartition-source (topics: [groupMapping-repartition])
--> KSTREAM-AGGREGATE-0000000003
Processor: KSTREAM-AGGREGATE-0000000003 (stores: [countMapping])
--> KTABLE-TOSTREAM-0000000007
<-- groupMapping-repartition-source
Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
--> KSTREAM-MAPVALUES-0000000008
<-- KSTREAM-AGGREGATE-0000000003
Processor: KSTREAM-MAPVALUES-0000000008 (stores: [])
--> KSTREAM-SINK-0000000009
<-- KTABLE-TOSTREAM-0000000007
Sink: KSTREAM-SINK-0000000009 (topic: output)
<-- KSTREAM-MAPVALUES-0000000008
```

Finally, start the Kafka Streams that was built and run it.

```java
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
try {
	streams.start();
	System.out.println("streams started");
	latch.await();
} catch (final Throwable e) {
    System.exit(1);
}

Runtime.getRuntime().addShutdownHook(new Thread("streams-word-count") {
    @Override
	public void run() {
	    streams.close();
		latch.countDown();
	}
});
```
