> ## Documentation Index
> Fetch the complete documentation index at: https://upstash-vector.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Connect Using Kafka Clients

Connecting to Upstash Kafka using any Kafka client is very straightforward. If
you do not have a Kafka cluster and/or topic already, follow
[these steps](../overall/getstarted) to create one.

After creating a cluster and a topic, just go to cluster details page on the
[Upstash Console](https://console.upstash.com) and copy bootstrap endpoint,
username and password.

<Frame>
  <img src="https://mintlify.s3-us-west-1.amazonaws.com/upstash-vector/img/kafka/getting_started/cluster-detail.png" width="100%" />
</Frame>

Then replace following parameters in the code snippets of your favourite Kafka
client or language below.

* `{{ BOOTSTRAP_ENDPOINT }}`
* `{{ UPSTASH_KAFKA_USERNAME }}`
* `{{ UPSTASH_KAFKA_PASSWORD }}`
* `{{ TOPIC_NAME }}`

## Create a Topic

<CodeGroup>
  ```typescript TypeScript
  const { Kafka } = require("kafkajs");

  const kafka = new Kafka({ brokers: ["{{ BOOTSTRAP_ENDPOINT }}"], sasl: {
  mechanism: "scram-sha-512", username: "{{ UPSTASH_KAFKA_USERNAME }}", password:
  "{{ UPSTASH_KAFKA_PASSWORD }}", }, ssl: true, });

  const admin = kafka.admin();

  const createTopic = async () => { await admin.connect(); await
  admin.createTopics({ validateOnly: false, waitForLeaders: true, topics: [ {
  topic: "{{ TOPIC_NAME }}", numPartitions: partitions, replicationFactor:
  replicationFactor, }, ], }); await admin.disconnect(); }; createTopic();

  ```

  ```py Python
  from kafka import KafkaAdminClient
  from kafka.admin import NewTopic

  admin = KafkaAdminClient(
    bootstrap_servers=['{{ BOOTSTRAP_ENDPOINT }}'],
    sasl_mechanism='SCRAM-SHA-512',
    security_protocol='SASL_SSL',
    sasl_plain_username='{{ UPSTASH_KAFKA_USERNAME }}',
    sasl_plain_password='{{ UPSTASH_KAFKA_PASSWORD }}',
  )

  admin.create_topics([NewTopic(name='{{ TOPIC_NAME }}', num_partitions=partitions, replication_factor=replicationFactor)])
  admin.close()
  ```

  ```java Java
  class CreateTopic {
    public static void main(String[] args) throws Exception {
      var props = new Properties();
      props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
      props.put("sasl.mechanism", "SCRAM-SHA-512");
      props.put("security.protocol", "SASL_SSL");
      props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
                  "username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
                  "password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");

      try (var admin = Admin.create(props)) {
          admin.createTopics(
              Set.of(new NewTopic("{{ TOPIC_NAME }}", partitions, replicationFactor))
            ).all().get();
      }
    }
  }
  ```

  ```go Go
  import (
  	"context"
  	"crypto/tls"
  	"log"

  	"github.com/segmentio/kafka-go"
  	"github.com/segmentio/kafka-go/sasl/scram"
  )

  func main() {
    mechanism, err := scram.Mechanism(scram.SHA512,
        "{{ UPSTASH_KAFKA_USERNAME }}", "{{ UPSTASH_KAFKA_PASSWORD }}")
    if err != nil {
      log.Fatalln(err)
    }

    dialer := &kafka.Dialer{
      SASLMechanism: mechanism,
      TLS:           &tls.Config{},
    }

    conn, err := dialer.Dial("tcp", "{{ BOOTSTRAP_ENDPOINT }}")
    if err != nil {
        log.Fatalln(err)
    }
    defer conn.Close()

    controller, err := conn.Controller()
    if err != nil {
        log.Fatalln(err)
    }

    controllerConn, err := dialer.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
    if err != nil {
        log.Fatalln(err)
    }
    defer controllerConn.Close()

    err = controllerConn.CreateTopics(kafka.TopicConfig{
  		Topic:             "{{ TOPIC_NAME }}",
  		NumPartitions:     partitions,
  		ReplicationFactor: replicationFactor,
  	})
    if err != nil {
        log.Fatalln(err)
    }
  }
  ```
</CodeGroup>

## Produce a Message

<CodeGroup>
  ```typescript TypeScript
  const { Kafka } = require("kafkajs");

  const kafka = new Kafka({
    brokers: ["{{ BOOTSTRAP_ENDPOINT }}"],
    sasl: {
      mechanism: "scram-sha-512",
      username: "{{ UPSTASH_KAFKA_USERNAME }}",
      password: "{{ UPSTASH_KAFKA_PASSWORD }}",
    },
    ssl: true,
  });

  const producer = kafka.producer();

  const produce = async () => {
    await producer.connect();
    await producer.send({
      topic: "{{ TOPIC_NAME }}",
      messages: [{ value: "Hello Upstash!" }],
    });
    await producer.disconnect();
  };
  produce();
  ```

  ```py Python
  from kafka import KafkaProducer

  producer = KafkaProducer(
    bootstrap_servers=['{{ BOOTSTRAP_ENDPOINT }}'],
    sasl_mechanism='SCRAM-SHA-512',
    security_protocol='SASL_SSL',
    sasl_plain_username='{{ UPSTASH_KAFKA_USERNAME }}',
    sasl_plain_password='{{ UPSTASH_KAFKA_PASSWORD }}',
  )

  future = producer.send('{{ TOPIC_NAME }}', b'Hello Upstash!')
  record_metadata = future.get(timeout=10)
  print (record_metadata)
  producer.close()
  ```

  ```java Java
  class Produce {
    public static void main(String[] args) throws Exception {
      var props = new Properties();
      props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
      props.put("sasl.mechanism", "SCRAM-SHA-512");
      props.put("security.protocol", "SASL_SSL");
      props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
                  "username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
                  "password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

      try (var producer = new KafkaProducer<String, String>(props)) {
          producer.send(new ProducerRecord<CodeGroup>("{{ TOPIC_NAME }}", "Hello Upstash!"));
          producer.flush();
      }
    }
  }
  ```

  ```go Go
  import (
  	"context"
  	"crypto/tls"
  	"log"

  	"github.com/segmentio/kafka-go"
  	"github.com/segmentio/kafka-go/sasl/scram"
  )

  func main() {
    mechanism, err := scram.Mechanism(scram.SHA512,
        "{{ UPSTASH_KAFKA_USERNAME }}", "{{ UPSTASH_KAFKA_PASSWORD }}")
    if err != nil {
      log.Fatalln(err)
    }

    dialer := &kafka.Dialer{
      SASLMechanism: mechanism,
      TLS:           &tls.Config{},
    }

    w := kafka.NewWriter(kafka.WriterConfig{
      Brokers:  []string{"{{ BOOTSTRAP_ENDPOINT }}"},
      Topic:    "{{ TOPIC_NAME }}",
      Dialer:   dialer,
    })
    defer w.Close()

    err = w.WriteMessages(context.Background(),
      kafka.Message{
          Value: []byte("Hello Upstash!"),
      },
    )
    if err != nil {
      log.Fatalln("failed to write messages:", err)
    }
  }
  ```
</CodeGroup>

## Consume Messages

<CodeGroup>
  ```typescript TypeScript
  const { Kafka } = require("kafkajs");

  const kafka = new Kafka({
    brokers: ["{{ BOOTSTRAP_ENDPOINT }}"],
    sasl: {
      mechanism: "scram-sha-512",
      username: "{{ UPSTASH_KAFKA_USERNAME }}",
      password: "{{ UPSTASH_KAFKA_PASSWORD }}",
    },
    ssl: true,
  });

  const consumer = kafka.consumer({ groupId: "{{ GROUP_NAME }}" });
  const consume = async () => {
    await consumer.connect();
    await consumer.subscribe({
      topic: "{{ TOPIC_NAME }}",
      fromBeginning: true,
    });

    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        console.log({
          topic: topic,
          partition: partition,
          message: JSON.stringify(message),
        });
      },
    });
  };
  consume();
  ```

  ```py Python
  from kafka import KafkaConsumer

  consumer = KafkaConsumer(
    bootstrap_servers=['{{ BOOTSTRAP_ENDPOINT }}'],
    sasl_mechanism='SCRAM-SHA-512',
    security_protocol='SASL_SSL',
    sasl_plain_username='{{ UPSTASH_KAFKA_USERNAME }}',
    sasl_plain_password='{{ UPSTASH_KAFKA_PASSWORD }}',
    group_id='{{ GROUP_NAME }}',
    auto_offset_reset='earliest',
  )

  consumer.subscribe(['{{ TOPIC_NAME }}'])
  records = consumer.poll(timeout_ms=10000)
  print(records)
  consumer.close()
  ```

  ```java Java
  class Consume {
    public static void main(String[] args) throws Exception {
      var props = new Properties();
      props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
      props.put("sasl.mechanism", "SCRAM-SHA-512");
      props.put("security.protocol", "SASL_SSL");
      props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
                  "username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
                  "password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");

      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("auto.offset.reset", "earliest");
      props.put("group.id", "{{ GROUP_NAME }}");

      try(var consumer = new KafkaConsumer<String, String>(props)) {
          consumer.subscribe(Collections.singleton("{{ TOPIC_NAME }}"));
          var records = consumer.poll(Duration.ofSeconds(10));
          for (var record : records) {
              System.out.println(record);
          }
      }
    }
  }
  ```

  ```go Go
  import (
  	"context"
  	"crypto/tls"
  	"log"
  	"time"

  	"github.com/segmentio/kafka-go"
  	"github.com/segmentio/kafka-go/sasl/scram"
  )

  func main() {
    mechanism, err := scram.Mechanism(scram.SHA512,
        "{{ UPSTASH_KAFKA_USERNAME }}", "{{ UPSTASH_KAFKA_PASSWORD }}")
    if err != nil {
      log.Fatalln(err)
    }

    dialer := &kafka.Dialer{
      SASLMechanism: mechanism,
      TLS:           &tls.Config{},
    }

    r := kafka.NewReader(kafka.ReaderConfig{
      Brokers:  []string{"{{ BOOTSTRAP_ENDPOINT }}"},
      GroupID: "{{ GROUP_NAME }}",
      Topic:   "{{ TOPIC_NAME }}",
      Dialer:  dialer,
    })
    defer r.Close()

    ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
    defer cancel()

    m, err := r.ReadMessage(ctx)
    if err != nil {
      log.Fatalln(err)
    }
    log.Printf("%+v\n", m)
  }
  ```
</CodeGroup>
