RisingWave
This tutorial shows how to integrate Upstash Kafka with RisingWave
RisingWave is a distributed SQL streaming database that enables simple, efficient, and reliable processing of streaming data.
Upstash Kafka Setup
Create a Kafka cluster using Upstash Console or Upstash CLI by following Getting Started.
Create two topics by following the creating topic steps. Let’s name the first topic risingwave_input
, since we are going to stream from this topic to RisingWave. The name of the second topic can be risingwave_output
. This one is going to receive the stream from RisingWave.
RisingWave Setup
RisingWave provides RisingWave Cloud, a fully managed and scalable stream processing platform.
To use the RisingWave Cloud, create an account first.
After creating the account, navigate to the Clusters
in the right bar. Click on Create Cluster
and select your plan and cluster configuration.
Creation of the cluster takes a few minutes.
Once the cluster is created, open it and navigate to the Query
page.
You need to create a user to log in to the cluster on Cloud first. The user will be a superuser by default.
Now, you have the required RisingWave setup to connect to the Upstash Kafka.
Create Source
Source means streaming from an external database or pipeline to the RisingWave.
By creating a new source on RisingWave Cloud, the message you add to the Upstash Kafka topic is going to be streamed to the RisingWave database.
Create a source from RisingWave Cloud console by running the following command:
CREATE SOURCE <source-name> (
name VARCHAR,
city VARCHAR,
)
WITH(
connector = 'kafka',
topic = 'risingwave_input',
properties.bootstrap.server = '<UPSTASH-KAFKA-ENDPOINT>',
scan.startup.mode = 'latest',
properties.sasl.mechanism = 'SCRAM-SHA-512',
properties.security.protocol = 'SASL_SSL',
properties.sasl.username = '<UPSTASH-KAFKA-USERNAME>',
properties.sasl.password = '<UPSTASH-KAFKA-PASSWORD>'
) FORMAT PLAIN ENCODE JSON;
You should replace the UPSTASH-KAFKA-*
placeholders with the credentials from Upstash Kafka console.
This query will create a source on RisingWave. The source can be seen on the left in the console.
You can also see it in the Sources tab.
To test, go to your Upstash console and open the risingwave_input
topic in your Kafka cluster.
Produce a message in this topic in a JSON format. The message should include the fields we defined in the source creation query.
{
"name": "Noah",
"city": "London"
}
After producing the message, go back to the RisingWave console and run the following query to see the streamed data.
SELECT * FROM <source-name>;
Create Sink
Sink means streaming from RisingWave database to the external data stores or pipelines.
By creating a sink, the data you inserted into the RisingWave table or the data streamed through the source will be streamed to the Upstash Kafka topic.
For testing purposes, let’s create a new table by running the following query. This table will be streamed to the Upstash Kafka sink topic.
CREATE TABLE <table-name> (name VARCHAR, city VARCHAR);
Create a sink from RisingWave Cloud console by running the following command:
CREATE SINK <sink-name> FROM <table-name>
WITH (
connector = 'kafka',
properties.bootstrap.server = '<UPSTASH-KAFKA-ENDPOINT>',
properties.sasl.mechanism = 'SCRAM-SHA-512',
properties.security.protocol = 'SASL_SSL',
properties.sasl.username = '<UPSTASH-KAFKA-USERNAME>',
properties.sasl.password = '<UPSTASH-KAFKA-PASSWORD>',
topic = 'risingwave_output',
properties.message.max.bytes = 2000
)
FORMAT PLAIN ENCODE JSON (
force_append_only = 'true'
);
You should replace the UPSTASH-KAFKA-*
placeholders with the credentials from Upstash Kafka console.
To test this sink, go to your Upstash console, open the risingwave_output
topic in your Kafka cluster. Open the messages tab to see incoming messages.
Now insert a new row to the table to be streamed:
INSERT INTO <table-name> VALUES ('Noah', 'Manchester');
You can see this row streamed to the Upstash Kafka output topic on your Upstash console.