How to
Connect with upstash-kafka
upstash-kafka is an HTTP/REST based Kafka client built on top of Upstash Kafka REST API.
It is the only connectionless (HTTP based) Kafka client and designed to work with:
- Serverless functions (AWS Lambda …)
- Cloudflare Workers (see the example)
- Fastly Compute@Edge
- Next.js Edge, Remix, Nuxt …
- Client side web/mobile applications
- WebAssembly
- and other environments where HTTP is preferred over TCP.
Quick Start
Install
npm install @upstash/kafka
Authenticate
Copy URL, username and password from Upstash Console
import { Kafka } from "@upstash/kafka";
const kafka = new Kafka({
url: "<UPSTASH_KAFKA_REST_URL>",
username: "<UPSTASH_KAFKA_REST_USERNAME>",
password: "<UPSTASH_KAFKA_REST_PASSWORD>",
});
Produce
const p = kafka.producer();
const message = { hello: "world" }; // Objects will get serialized using `JSON.stringify`
const response = await p.produce("TOPIC", message);
const response2 = await p.produce("TOPIC", message, {
partition: 1,
timestamp: 4567,
key: "KEY",
headers: [{ key: "TRACE-ID", value: "32h67jk" }],
});
Produce Many
const p = kafka.producer();
const res = await p.produceMany([
{
topic: "TOPIC",
value: "MESSAGE",
// ...options
},
{
topic: "TOPIC-2",
value: "MESSAGE-2",
// ...options
},
]);
Consume
When a new consumer instance is created, it may return empty messages until consumer group coordination is completed.
const c = kafka.consumer();
const messages = await c.consume({
consumerGroupId: "group_1",
instanceId: "instance_1",
topics: ["test.topic"],
autoOffsetReset: "earliest",
});
Commit
While consume
commits automatically, you can commit manually as below:
const consumerGroupId = "mygroup";
const instanceId = "myinstance";
const topic = "my.topic";
const c = kafka.consumer();
const messages = await c.consume({
consumerGroupId,
instanceId,
topics: [topic],
autoCommit: false,
});
for (const message of messages) {
// message handling logic
await c.commit({
consumerGroupId,
instanceId,
offset: {
topic: message.topic,
partition: message.partition,
offset: message.offset,
},
});
}
Fetch
const c = kafka.consumer();
const messages = await c.fetch({
topic: "greeting",
partition: 3,
offset: 42,
timeout: 1000,
});
Examples
See here for more examples.
Was this page helpful?