Consumer APIs in Kafka are used for fetching and consuming messages from Kafka topics. Similar to Kafka clients, there are two mechanisms for consuming messages: manual offset seeking and the use of consumer groups.

Manual offset seeking allows consumers to specify the desired offset from which they want to consume messages, providing precise control over the consumption process.

Consumer groups, on the other hand, manage offsets automatically within a dedicated Kafka topic. They enable multiple consumers to work together in a coordinated manner, where each consumer within the group is assigned a subset of partitions from the Kafka topic. This automatic offset management simplifies the consumption process and facilitates efficient and parallel message processing across the consumer group.

We call the first one as Fetch API and the second one as Consume API. Consume API has some additional methods if you wish to commit offsets manually.

Both Fetch API and Consume API return array of messages as JSON. Message structure is as following:

Message {
    topic: String,
    partition: Int,
    offset: Long,
    timestamp: Long,
    key: String,
    value: String,
    headers: Array<Header>
}

Fetch API

[GET | POST] /fetch:

Fetches the message(s) starting with a given offset inside the partition. This API doesn’t use consumer groups. A FetchRequest should be sent via request body as JSON. Structure of the FetchRequest is:

FetchRequest{
    topic: String,
    partition: Int,
    offset: Long,
    topicPartitionOffsets: Set<TopicPartitionOffset>,
    timeout: Long
}

TopicPartitionOffset{
    topic: String,
    partition: Int,
    offset: Long
}

It’s possible to send a fetch request for only a single <topic, partition, offset> or a set of them using topicPartitionOffsets.

timeout field defines the time to wait at most for the fetch request in milliseconds. It’s optional and its default value 1000.

  • Fetch from a single <topic, partition, offset>:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/fetch -u myuser:mypass \
    -d '{"topic": "greetings", "partition": 3, "offset": 11, "timeout": 1000}'
  • Fetch from multiple <topic, partition, offset> triples:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/fetch -u myuser:mypass \
    -d '{
        "topicPartitionOffsets": [
            {"topic": "greetings", "partition": 1, "offset": 1},
            {"topic": "greetings", "partition": 2, "offset": 1},
            {"topic": "greetings", "partition": 3, "offset": 1},
            {"topic": "cities", "partition": 1, "offset": 10},
            {"topic": "cities", "partition": 2, "offset": 20}
        ],
        "timeout": 5000
    }'
  • You can even combine both:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/fetch -u myuser:mypass \
    -d '{
        "topic": "words", "partition": 0, "offset": 0,
        "topicPartitionOffsets": [
            {"topic": "cities", "partition": 1, "offset": 10},
            {"topic": "cities", "partition": 2, "offset": 20}
        ],
        "timeout": 5000
    }'

Consume API

Fetches the message(s) using Kafka consumer group mechanism and may commit the offsets automatically. Consume API has two variants:

  • GET /consume/$CONSUMER_GROUP/$INSTANCE_ID/$TOPIC?timeout=$TIMEOUT
  • [GET | POST] /consume/$CONSUMER_GROUP/$INSTANCE_ID

$CONSUMER_GROUP is the name of the consumer group which is used as Kafka consumer group id. $INSTANCE_ID is used identify Kafka consumer instances in the same consumer group. It’s used as Kafka consumer instance id. Each consumer instance is handled by a separate consumer client.

In the second variant, a ConsumeRequest should be sent via request body as JSON. Structure of the ConsumeRequest is:

ConsumeRequest{
    topic: String,
    topics: Set<String>,
    timeout: Long
}

It’s possible to send a consume request for only a single topic or multiple topics. timeout field defines the time to wait at most for the consume request in milliseconds. It’s optional and its default value is 1000.

Consumer group instances will be closed after some idle time. So consume requests should be sent periodically to keep them alive.

Request Headers

There are three request headers to configure Kafka consumer instance. These headers are only required for the very first request which creates and initializes the consumer but it’s fine to send them with every request and has no further effect.

  • Kafka-Enable-Auto-Commit: If true, the consumer’s offset will be periodically committed in the background. Valid values are <true, false>. Default is true.
  • Kafka-Auto-Commit-Interval: The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if auto commit is enabled. Default is 5000.
  • Kafka-Auto-Offset-Reset: What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. Default value is latest.
    • earliest: Automatically reset the offset to the earliest offset
    • latest: Automatically reset the offset to the latest offset
    • none: Throw exception to the consumer if no previous offset is found for the consumer’s group.

If all or some of these headers are missing in the consume request, default values will be used.

The first time a consumer is created, it needs to figure out the group coordinator by asking the Kafka brokers and joins the consumer group. This process takes some time to complete. That’s why when a consumer instance is created first time, it may return empty messages until consumer group coordination is completed.

  • Consume from a single topic:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/consume/mygroup/myconsumer/greetings -u myuser:mypass
  • Consume from a single topic with timeout:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/consume/mygroup/myconsumer/greetings?timeout=10000 \
    -u myuser:mypass
  • Consume from a single topic via request body:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/consume/mygroup/myconsumer -u myuser:mypass \
    -d '{"topic": "greetings", "timeout": 1000}'
  • Consume from multiple topics:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/consume/mygroup/myconsumer -u myuser:mypass \
    -d '{"topics": ["greetings", "cities", "words"], "timeout": 1000}'
  • Consume from topics without auto commit:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/consume/mygroup/myconsumer -u myuser:mypass \
    -H "Kafka-Enable-Auto-Commit: false" \
    -d '{"topics": ["greetings", "cities", "words"], "timeout": 1000}'
  • Consume from topics starting from the earliest message:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/consume/mygroup/myconsumer -u myuser:mypass \
    -H "Kafka-Auto-Offset-Reset: earliest" \
    -d '{"topics": ["greetings", "cities", "words"], "timeout": 1000}'
  • Consume from topics with custom auto commit interval:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/consume/mygroup/myconsumer -u myuser:mypass \
    -H "Kafka-Enable-Auto-Commit: true" \
    -H "Kafka-Auto-Commit-Interval: 3000" \
    -d '{"topics": ["greetings", "cities", "words"], "timeout": 1000}'

Maximum number of consumer group instances is limited to total partition count of all topics.

Commit Consumer API

[GET | POST] /commit/$CONSUMER_GROUP/$INSTANCE_ID

Commits the fetched message offsets. Commit API should be used alongside Consume API, especially when auto commit is disabled. Request body should be a single TopicPartitionOffset object or an array of TopicPartitionOffsets as JSON.

TopicPartitionOffset{topic: String, partition: Int, offset: Long}

When the body is empty (or an empty array), then the consumer will commit the last consumed messages.

  • Commit single topic partition offset:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/commit/mygroup/myconsumer -u myuser:mypass \
    -d '{"topic": "cities", "partition": 1, "offset": 10}'
  • Commit multiple topic partition offsets:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/commit/mygroup/myconsumer -u myuser:mypass \
    -d '[
        {"topic": "cities", "partition": 0, "offset": 13},
        {"topic": "cities", "partition": 1, "offset": 37},
        {"topic": "greetings", "partition": 0, "offset": 19}
    ]'
  • Commit all latest consumed message offsets:
curl https://tops-stingray-7863-eu1-rest-kafka.upstash.io/commit/mygroup/myconsumer -u myuser:mypass

Response:

When commit is completed, a success JSON result will be returned:

{ "result": "Success", "status": 200 }

Remove Consumer Instance

[POST|DELETE] /delete-consumer/$CONSUMER_GROUP/$INSTANCE_ID

Stops and removes a previously created consumer group instance.

Response:

When deletion is completed, a success JSON result will be returned:

{ "result": "Success", "status": 200 }