Consume Messages Using REST API
If you do not have a Kafka cluster and/or topic already, follow these steps to create one.
In the cluster details section of the
Upstash Console, scroll down the REST API
section and and copy UPSTASH_KAFKA_REST_URL
, UPSTASH_KAFKA_REST_USERNAME
and
UPSTASH_KAFKA_REST_PASSWORD
using the copy icons next to them.
We will use a Node.js
sample code to show how to consume messages using the
REST API. Our sample will use a topic named cities
and consume previously
produced city names from this topic using Kafka consumer groups and automatic
offset committing.
Replace following parameters in the code snippets below with your actual values.
const address = "https://tops-stingray-7863-eu1-rest-kafka.upstash.io";
const user = "G9wcy1zdGluZ3JheS03ODYzJMUX";
const pass = "eUmYCkAlxEhihIc7Hooi2IA2pz2fw==";
const auth = Buffer.from(`${user}:${pass}`).toString("base64");
const topic = "cities";
Following code will consume city names using mygroup
consumer group id and
myconsumer
consumer id from the topic starting from the latest offset and
print the consumed messages and their offsets to the console:
async function consumeTopic(groupId, consumerId, topic) {
const response = await fetch(
`${address}/consume/${groupId}/${consumerId}/${topic}`,
{
headers: { Authorization: `Basic ${auth}` },
}
);
const messages = await response.json();
messages.forEach((m) => {
console.log(`Message: ${m.value}, Offset: ${m.offset}`);
});
}
consumeTopic("mygroup", "myconsumer", topic);
By default consume API starts consuming from the latest offset. It’s also
possible to start from the earliest offset by passing
Kafka-Auto-Offset-Reset: earliest
request header:
async function consumeTopic(groupId, consumerId, topic, offsetReset) {
const response = await fetch(
`${address}/consume/${groupId}/${consumerId}/${topic}`,
{
headers: {
Authorization: `Basic ${auth}`,
"Kafka-Auto-Offset-Reset": offsetReset,
},
}
);
const messages = await response.json();
messages.forEach((m) => {
console.log(`Message: ${m.value}, Offset: ${m.offset}`);
});
}
consumeTopic("mygroup", "myconsumer", topic, "earliest");
We can also go deeper and turn off auto-commit behaviour of the consumer to
manually commit the offsets later. To turn off auto commit, we should send
Kafka-Enable-Auto-Commit: false
header. This allows us to commit the offsets
only when all messages processed successfully.
async function consumeTopicWithoutCommit(
groupId,
consumerId,
topic,
offsetReset
) {
const response = await fetch(
`${address}/consume/${groupId}/${consumerId}/${topic}`,
{
headers: {
Authorization: `Basic ${auth}`,
"Kafka-Auto-Offset-Reset": offsetReset,
"Kafka-Enable-Auto-Commit": "false",
},
}
);
const messages = await response.json();
messages.forEach((m) => {
console.log(`Message: ${m.value}, Offset: ${m.offset}`);
});
}
async function commitOffsetsFor(groupId, consumerId) {
const response = await fetch(`${address}/commit/${groupId}/${consumerId}`, {
headers: { Authorization: `Basic ${auth}` },
});
const resp = await response.json();
console.log(
`Result: ${resp.result}, Error: ${resp.error}, Status: ${resp.status}`
);
}
consumeTopicWithoutCommit("mygroup", "myconsumer", topic, "earliest");
commitOffsetsFor("mygroup", "myconsumer");
For more info about using the REST API see Kafka REST Consume API section.
Was this page helpful?