How to
Schema registry can be used in various scenarios. In this page, the configurations for different use-cases are listed.
You can find the related parameters that you need the use in the configurations from Upstash Console.
Scroll down to the REST API
section to find the values you need:
UPSTASH_KAFKA_REST_URL
UPSTASH_KAFKA_REST_USERNAME
UPSTASH_KAFKA_REST_PASSWORD
Producer/Consumer
Producer
If you need to configure your producers to use the schema registry, add the following properties to producer properties in addition to the broker configurations. Note that the selected deserializer needs to be schema-registry aware.
Properties props = new Properties();
// ... other configurations like broker.url and broker authentication are skipped
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer, "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", UPSTASH_KAFKA_REST_URL + "/schema-registry");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", UPSTASH_KAFKA_REST_USERNAME + ":" + UPSTASH_KAFKA_REST_PASSWORD);
try (var producer = new KafkaProducer<String, org.apache.avro.GenericRecord>(props)) {
// ...
}
Consumer
If you need to configure your consumers to use the schema registry, add the following properties to consumer properties in addition to the broker configurations. Note that the selected deserializer needs to be schema-registry aware.
Properties props = new Properties();
// ... other configurations like broker.url and broker authentication are skipped
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "$UPSTASH_KAFKA_REST_URL/schema-registry");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", UPSTASH_KAFKA_REST_USERNAME + ":" + UPSTASH_KAFKA_REST_PASSWORD);
try(var consumer = new KafkaConsumer<String, org.apache.avro.GenericRecord>(props)) {
// ...
}
Connectors
Some connectors forces you to use a STRUCT as key/value which means that you need a schema and a schema aware convertor to use with the connector. For this case, you can add the following configurations to your connector.
{
"name": "myConnector",
"properties": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
// other configurations are skipped.
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.upstash.schema.registry.enable": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.upstash.schema.registry.enable": "true"
}
}
The config above is the shorther version of the following.
{
"name": "myConnector",
"properties": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
// other configurations are skipped.
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.basic.auth.user.info": "UPSTASH_KAFKA_REST_USERNAME:UPSTASH_KAFKA_REST_PASSWORD",
"key.converter.schema.registry.url": "UPSTASH_KAFKA_REST_URL/schema-registry",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "UPSTASH_KAFKA_REST_USERNAME:UPSTASH_KAFKA_REST_PASSWORD",
"value.converter.schema.registry.url": "UPSTASH_KAFKA_REST_URL/schema-registry"
}
}
Thirdy party UI tools
You can use third-party UI tools to use the schema registry with.
We have schema registry configuration examples in our Monitoring
section.
SchemaRegistryClient
SchemaRegistryClient can be used to access the schema registry programmatically. In this case, you can configure it as following:
Map<String, String> configs = new HashMap<>();
configs.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
configs.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, UPSTASH_KAFKA_REST_USERNAME + ":" + UPSTASH_KAFKA_REST_PASSWORD);
var client = new CachedSchemaRegistryClient(UPSTASH_KAFKA_REST_URL + "/schema-registry", 100, configs);
Was this page helpful?