> ## Documentation Index
> Fetch the complete documentation index at: https://upstash-vector.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Apache Spark

> This tutorial shows how to integrate Upstash Kafka with Apache Spark

[Apache Spark](https://spark.apache.org/) is a multi-language engine for
executing data engineering, data science, and machine learning on single-node
machines or clusters.

### Upstash Kafka Setup

Create a Kafka cluster using [Upstash Console](https://console.upstash.com/) or
[Upstash CLI](https://github.com/upstash/cli) by following
[Getting Started](https://docs.upstash.com/kafka).

Create a topic by following the creating topic
[steps](https://docs.upstash.com/kafka#create-a-topic). Let’s name the topic
“sentence”.

### Project Setup

<Info>
  If you already have a project and want to implement Upstash Kafka and Apache
  Spark integration into it, you can skip this section and continue with [Add
  Spark and Kafka into the Project](#add-spark-and-kafka-into-the-project).
</Info>

Install Maven to your machine by following [Maven Installation Guide](https://maven.apache.org/guides/getting-started/maven-in-five-minutes.html).

Run `mvn –version` in a terminal or in a command prompt to make sure you have
Maven downloaded.

It should print out the version of the Maven you have:

```
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: D:\apache-maven-3.6.3\apache-maven\bin\..
Java version: 1.8.0_232, vendor: AdoptOpenJDK, runtime: C:\Program Files\AdoptOpenJDK\jdk-8.0.232.09-hotspot\jre
Default locale: en_US, platform encoding: Cp1250
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"
```

To create the Maven project;

Go into the folder that you want to create the project in your terminal or
command prompt by running `cd <folder path>`

Run the following command:

```
mvn archetype:generate -DgroupId=com.kafkasparkinteg.app -DartifactId=kafkasparkinteg-app -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
```

### Add Spark and Kafka into the Project

Open the project folder by using an IDE which has maven plugin such as Intellij,
Visual Studio, Eclipse etc. Add following Spark dependencies into the
dependencies tag in `pom.xml` file.

```xml
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.3.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.3.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.3.1</version>
</dependency>
```

### Using Apache Spark as Producer

Import the following packages first:

```java
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import java.util.*;
```

To send messages to Kafka from Spark, use the following code after replacing the
`UPSTASH-KAFKA-*` placeholders with your cluster information:

```java
SparkSession spark = SparkSession.builder()
  .appName("quickstart")
  .config("spark.master", "local")
  .getOrCreate();

StructType structType = new StructType();
structType = structType.add("key", DataTypes.StringType, false);
structType = structType.add("value", DataTypes.StringType, false);

List<Row> rows = new ArrayList<Row>();
rows.add(RowFactory.create("test key", "This is an example sentence"));

Dataset<Row> sentenceDF = spark.createDataFrame(rows, structType);

sentenceDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "UPSTASH-KAFKA-ENDPOINT:9092")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"UPSTASH-KAFKA-USERNAME\" password=\"UPSTASH-KAFKA-PASSWORD\";")
.option("topic", "sentence")
.save();
```

Before running the project, open the messages of the topic from
[console](https://console.upstash.com).

You can observe new message coming to the topic on Upstash console when you run
your project.

### Using Apache Spark as Consumer

If the following packages are not imported, import them first:

```java
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.util.*;
```

To receive the messages from Kafka topic by Apache Spark and to process, use the
following code after replacing the UPSTASH-KAFKA-\* placeholders with your
cluster information:

```java
SparkSession spark = SparkSession.builder()
  .appName("quickstart")
  .config("spark.master", "local")
  .getOrCreate();

Dataset<Row> lines = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "UPSTASH-KAFKA-ENDPOINT:9092")
  .option("kafka.sasl.mechanism", "SCRAM-SHA-256")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"UPSTASH-KAFKA-USERNAME\" password=\"UPSTASH-KAFKA-PASSWORD\";")
  .option("startingOffsets", "earliest")
  .option("subscribe", "sentence")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// PROCESS RECEIVED MESSAGE - Word counting part
Dataset<String> words = lines.select("value")
  .as(Encoders.STRING())
  .flatMap( new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String x) { return Arrays.asList(x.split(" ")).iterator(); }
  }, Encoders.STRING()); Dataset<Row> wordCounts = words.groupBy("value").count(); wordCounts.show();
```

You can verify that you can see the sentence, which you sent, on your console
with number of word occurrences:

```
+--------+-----+
|   value|count|
+--------+-----+
| example|    1|
|      is|    1|
|sentence|    1|
|      an|    1|
|    This|    1|
+--------+-----+
```
