Consumer APIs
Consumer APIs in Kafka are used for fetching and consuming messages from Kafka topics. Similar to Kafka clients, there are two mechanisms for consuming messages: manual offset seeking and the use of consumer groups.
Manual offset seeking allows consumers to specify the desired offset from which they want to consume messages, providing precise control over the consumption process.
Consumer groups, on the other hand, manage offsets automatically within a dedicated Kafka topic. They enable multiple consumers to work together in a coordinated manner, where each consumer within the group is assigned a subset of partitions from the Kafka topic. This automatic offset management simplifies the consumption process and facilitates efficient and parallel message processing across the consumer group.
We call the first one as Fetch API and the second one as Consume API. Consume API has some additional methods if you wish to commit offsets manually.
Both Fetch API and Consume API return array of messages as JSON. Message structure is as following:
Message {
topic: String,
partition: Int,
offset: Long,
timestamp: Long,
key: String,
value: String,
headers: Array<Header>
}
Fetch API
[GET | POST] /fetch
:
Fetches the message(s) starting with a given offset inside the partition. This
API doesn’t use consumer groups. A FetchRequest
should be sent via request
body as JSON. Structure of the FetchRequest
is:
FetchRequest{
topic: String,
partition: Int,
offset: Long,
topicPartitionOffsets: Set<TopicPartitionOffset>,
timeout: Long
}
TopicPartitionOffset{
topic: String,
partition: Int,
offset: Long
}
It’s possible to send a fetch request for only a single
<topic, partition, offset>
or a set of them using topicPartitionOffsets
.
timeout
field defines the time to wait at most for the fetch request in
milliseconds. It’s optional and its default value 1000.
- Fetch from a single
<topic, partition, offset>
:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/fetch -u myuser:mypass \
-d '{"topic": "greetings", "partition": 3, "offset": 11, "timeout": 1000}'
- Fetch from multiple
<topic, partition, offset>
triples:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/fetch -u myuser:mypass \
-d '{
"topicPartitionOffsets": [
{"topic": "greetings", "partition": 1, "offset": 1},
{"topic": "greetings", "partition": 2, "offset": 1},
{"topic": "greetings", "partition": 3, "offset": 1},
{"topic": "cities", "partition": 1, "offset": 10},
{"topic": "cities", "partition": 2, "offset": 20}
],
"timeout": 5000
}'
- You can even combine both:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/fetch -u myuser:mypass \
-d '{
"topic": "words", "partition": 0, "offset": 0,
"topicPartitionOffsets": [
{"topic": "cities", "partition": 1, "offset": 10},
{"topic": "cities", "partition": 2, "offset": 20}
],
"timeout": 5000
}'
Consume API
Fetches the message(s) using Kafka consumer group mechanism and may commit the offsets automatically. Consume API has two variants:
GET /consume/$CONSUMER_GROUP/$INSTANCE_ID/$TOPIC?timeout=$TIMEOUT
[GET | POST] /consume/$CONSUMER_GROUP/$INSTANCE_ID
$CONSUMER_GROUP
is the name of the consumer group which is used as
Kafka consumer group id.
$INSTANCE_ID
is used identify Kafka consumer instances in the same consumer
group. It’s used as
Kafka consumer instance id.
Each consumer instance is handled by a separate consumer client.
In the second variant, a ConsumeRequest
should be sent via request body as
JSON. Structure of the ConsumeRequest
is:
ConsumeRequest{
topic: String,
topics: Set<String>,
timeout: Long
}
It’s possible to send a consume request for only a single topic
or multiple
topics
. timeout
field defines the time to wait at most for the consume
request in milliseconds. It’s optional and its default value is 1000
.
Consumer group instances will be closed after some idle time. So consume requests should be sent periodically to keep them alive.
Request Headers
There are three request headers to configure Kafka consumer instance. These headers are only required for the very first request which creates and initializes the consumer but it’s fine to send them with every request and has no further effect.
Kafka-Enable-Auto-Commit
: If true, the consumer’s offset will be periodically committed in the background. Valid values are<true, false>
. Default istrue
.Kafka-Auto-Commit-Interval
: The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if auto commit is enabled. Default is5000
.Kafka-Auto-Offset-Reset
: What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. Default value islatest
.earliest
: Automatically reset the offset to the earliest offsetlatest
: Automatically reset the offset to the latest offsetnone
: Throw exception to the consumer if no previous offset is found for the consumer’s group.
If all or some of these headers are missing in the consume request, default values will be used.
The first time a consumer is created, it needs to figure out the group coordinator by asking the Kafka brokers and joins the consumer group. This process takes some time to complete. That’s why when a consumer instance is created first time, it may return empty messages until consumer group coordination is completed.
- Consume from a single topic:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/consume/mygroup/myconsumer/greetings -u myuser:mypass
- Consume from a single topic with timeout:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/consume/mygroup/myconsumer/greetings?timeout=10000 \
-u myuser:mypass
- Consume from a single topic via request body:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/consume/mygroup/myconsumer -u myuser:mypass \
-d '{"topic": "greetings", "timeout": 1000}'
- Consume from multiple topics:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/consume/mygroup/myconsumer -u myuser:mypass \
-d '{"topics": ["greetings", "cities", "words"], "timeout": 1000}'
- Consume from topics without auto commit:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/consume/mygroup/myconsumer -u myuser:mypass \
-H "Kafka-Enable-Auto-Commit: false" \
-d '{"topics": ["greetings", "cities", "words"], "timeout": 1000}'
- Consume from topics starting from the earliest message:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/consume/mygroup/myconsumer -u myuser:mypass \
-H "Kafka-Auto-Offset-Reset: earliest" \
-d '{"topics": ["greetings", "cities", "words"], "timeout": 1000}'
- Consume from topics with custom auto commit interval:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/consume/mygroup/myconsumer -u myuser:mypass \
-H "Kafka-Enable-Auto-Commit: true" \
-H "Kafka-Auto-Commit-Interval: 3000" \
-d '{"topics": ["greetings", "cities", "words"], "timeout": 1000}'
Maximum number of consumer group instances is limited to total partition count of all topics.
Commit Consumer API
[GET | POST] /commit/$CONSUMER_GROUP/$INSTANCE_ID
Commits the fetched message offsets. Commit API should be used alongside
Consume API, especially when auto commit is disabled. Request body should be
a single TopicPartitionOffset
object or an array of TopicPartitionOffset
s as
JSON.
TopicPartitionOffset{topic: String, partition: Int, offset: Long}
When the body is empty (or an empty array), then the consumer will commit the last consumed messages.
- Commit single topic partition offset:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/commit/mygroup/myconsumer -u myuser:mypass \
-d '{"topic": "cities", "partition": 1, "offset": 10}'
- Commit multiple topic partition offsets:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/commit/mygroup/myconsumer -u myuser:mypass \
-d '[
{"topic": "cities", "partition": 0, "offset": 13},
{"topic": "cities", "partition": 1, "offset": 37},
{"topic": "greetings", "partition": 0, "offset": 19}
]'
- Commit all latest consumed message offsets:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/commit/mygroup/myconsumer -u myuser:mypass
Response:
When commit is completed, a success JSON result will be returned:
{ "result": "Success", "status": 200 }
Remove Consumer Instance
[POST|DELETE] /delete-consumer/$CONSUMER_GROUP/$INSTANCE_ID
Stops and removes a previously created consumer group instance.
Response:
When deletion is completed, a success JSON result will be returned:
{ "result": "Success", "status": 200 }
Was this page helpful?