> ## Documentation Index
> Fetch the complete documentation index at: https://upstash-vector.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Cloudflare Workers

As a tutorial for this integration, we'll implement a real time analytics system. We'll stream the traffic (click) events from our web application to Upstash Kafka. Here's the implementation for this simple query:

```sql
SELECT city, count() FROM kafka_topic_page_views where  timestamp > now() - INTERVAL 15 MINUTE group by city
```

Namely, we will query the number of page views from different cities in last 15 minutes. We keep the query and scenario intentionally simple to make the series easy to understand. But you can easily extend the model for your more complex realtime analytics scenarios.

We'll use Clouflare Workers to intercept incoming requests to the website, and run a serverless function.

### Kafka Setup

Create an Upstash Kafka cluster and a topic as explained
[here](https://docs.upstash.com/kafka).

### Project Setup

We will use **C3 (create-cloudflare-cli)** command-line tool to create our application. You can open a new terminal window and run C3 using the prompt below.

<CodeGroup>
  ```shell npm
  npm create clourflare@latest
  ```

  ```shell yarn
  yarn create cloudflare@latest
  ```
</CodeGroup>

This will install the `create-cloudflare` package, and lead you through setup. C3 will also install Wrangler in projects by default, which helps us testing and deploying the application.

```text
➜  npm create cloudflare@latest
Need to install the following packages:
  create-cloudflare@2.1.0
Ok to proceed? (y) y

using create-cloudflare version 2.1.0

╭ Create an application with Cloudflare Step 1 of 3
│
├ In which directory do you want to create your application?
│ dir ./cloudflare_starter
│
├ What type of application do you want to create?
│ type "Hello World" Worker
│
├ Do you want to use TypeScript?
│ yes typescript
│
├ Copying files from "hello-world" template
│
├ Do you want to use TypeScript?
│ yes typescript
│
├ Retrieving current workerd compatibility date
│ compatibility date 2023-08-07
│
├ Do you want to use git for version control?
│ yes git
│
╰ Application created
```

We will also install the **Upstash Kafka SDK** to connect to Kafka.

```bash
npm install @upstash/kafka
```

### The Code

You can update the `src/index.ts` file with the code below:

<CodeGroup>
  ```ts src/index.ts
  import { Kafka } from "@upstash/kafka";

  export interface Env {
    UPSTASH_KAFKA_REST_URL: string;
    UPSTASH_KAFKA_REST_USERNAME: string;
    UPSTASH_KAFKA_REST_PASSWORD: string;
  }

  export default {
    async fetch(
      request: Request,
      env: Env,
      ctx: ExecutionContext
    ): Promise<Response> {
      if (new URL(request.url).pathname == "/favicon.ico") {
        return new Response(null, { status: 200 });
      }

      let message = {
        country: request.cf?.country,
        city: request.cf?.city,
        region: request.cf?.region,
        url: request.url,
        ip: request.headers.get("x-real-ip"),
        mobile: request.headers.get("sec-ch-ua-mobile"),
        platform: request.headers.get("sec-ch-ua-platform"),
        useragent: request.headers.get("user-agent"),
      };

      const kafka = new Kafka({
        url: env.UPSTASH_KAFKA_REST_URL,
        username: env.UPSTASH_KAFKA_REST_USERNAME,
        password: env.UPSTASH_KAFKA_REST_PASSWORD,
      });

      const p = kafka.producer();
      // Please update the topic according to your configuration
      const topic = "mytopic";

      ctx.waitUntil(p.produce(topic, JSON.stringify(message)));
      // if you use CF Workers to intercept your existing site, uncomment below
      // return await fetch(request);
      return new Response("My website");
    },
  };
  ```

  ```js src/index.js
  import { Kafka } from "@upstash/kafka";

  export default {
    async fetch(request, env, ctx) {
      if (new URL(request.url).pathname == "/favicon.ico") {
        return new Response(null, { status: 200 });
      }

      let message = {
        country: request.cf?.country,
        city: request.cf?.city,
        region: request.cf?.region,
        url: request.url,
        ip: request.headers.get("x-real-ip"),
        mobile: request.headers.get("sec-ch-ua-mobile"),
        platform: request.headers.get("sec-ch-ua-platform"),
        useragent: request.headers.get("user-agent"),
      };

      const kafka = new Kafka({
        url: env.UPSTASH_KAFKA_REST_URL,
        username: env.UPSTASH_KAFKA_REST_USERNAME,
        password: env.UPSTASH_KAFKA_REST_PASSWORD,
      });

      const p = kafka.producer();
      // Please update the topic according to your configuration
      const topic = "mytopic";

      ctx.waitUntil(p.produce(topic, JSON.stringify(message)));
      // if you use CF Workers to intercept your existing site, uncomment below
      // return await fetch(request);
      return new Response("My website");
    },
  };
  ```
</CodeGroup>

Above, we simply parse the request object and send the useful information to Upstash Kafka. You may add/remove information depending on your own requirements.

### Configure Credentials

There are two methods for setting up the credentials for Upstash Kafka client. The recommended way is to use Cloudflare Upstash Integration. Alternatively, you can add the credentials manually.

#### Using the Cloudflare Integration

Access to the [Cloudflare Dashboard](https://dash.cloudflare.com) and login with the same account that you've used while setting up the Worker application. Then, navigate to **Workers & Pages > Overview** section on the sidebar. Here, you'll find your application listed.

<Frame>
  <img src="https://mintlify.s3-us-west-1.amazonaws.com/upstash-vector/img/cloudflare-integration/overview.png" />
</Frame>

Clicking on the application will direct you to the application details page, where you can perform the integration process. Switch to the **Settings** tab in the application details, and proceed to **Integrations** section. You will see various Worker integrations listed. To proceed, click the **Add Integration** button associated with the Upstash Kafka.

<Frame>
  <img src="https://mintlify.s3-us-west-1.amazonaws.com/upstash-vector/img/cloudflare-integration/kafka-add-integration.png" />
</Frame>

On the Integration page, connect to your Upstash account. Then, select the related cluster from the dropdown menu. Finalize the process by pressing **Add Integration** button.

<Frame>
  <img src="https://mintlify.s3-us-west-1.amazonaws.com/upstash-vector/img/cloudflare-integration/kafka-credentials.png" />
</Frame>

#### Setting up Manually

Navigate to [Upstash Console](https://console.upstash.com) and copy/paste your `UPSTASH_KAFKA_REST_URL`, `UPSTASH_KAFKA_REST_USERNAME` and `UPSTASH_KAFKA_REST_PASSWORD` credentials to your `wrangler.toml` as below.

```yaml
[vars]
UPSTASH_KAFKA_REST_URL="REPLACE_HERE"
UPSTASH_KAFKA_REST_USERNAME="REPLACE_HERE"
UPSTASH_KAFKA_REST_PASSWORD="REPLACE_HERE"
```

### Test and Deploy

You can test the function locally with `npx wrangler dev`

Deploy your function to Cloudflare with `npx wrangler deploy`

Once the deployment is done, the endpoint of the function will be provided to you.

You can check if logs are collected in Kafka by copying the `curl` expression from the console:

```shell
curl https://<UPSTASH_KAFKA_REST_URL>/consume/GROUP_NAME/GROUP_INSTANCE_NAME/TOPIC \
  -H "Kafka-Auto-Offset-Reset: earliest" -u \
  REPLACE_HERE
```
