const { Kafka } = require("kafkajs");
const kafka = new Kafka({
brokers: ["{{ BOOTSTRAP_ENDPOINT }}"],
sasl: {
mechanism: "scram-sha-512",
username: "{{ UPSTASH_KAFKA_USERNAME }}",
password: "{{ UPSTASH_KAFKA_PASSWORD }}",
},
ssl: true,
});
const consumer = kafka.consumer({ groupId: "{{ GROUP_NAME }}" });
const consume = async () => {
await consumer.connect();
await consumer.subscribe({
topic: "{{ TOPIC_NAME }}",
fromBeginning: true,
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic: topic,
partition: partition,
message: JSON.stringify(message),
});
},
});
};
consume();