Photo by Tim Mossholder / Unsplash

Empower Real-Time Stream Processing with ksqlDB and ksql-cli

Apache Kafka Nov 4, 2022

In this guide, we'll delve into setting up ksqlDB—a powerful database purpose-built for stream processing applications—alongside its command-line interface, ksql-cli. With these tools, you can seamlessly explore, query, and transform streaming data to derive valuable insights in real-time.

Introduction

ksqlDB stands as a cutting-edge database designed specifically for stream processing applications. It offers a streamlined platform for processing and analyzing streaming data, enabling users to unlock insights and respond to events as they occur. Coupled with ksql-cli, a command-line interface for interacting with ksqlDB, you gain unparalleled flexibility and control over your stream processing workflows.

Minimum Software Requirements

Ensure you have the following software components installed to kickstart your ksqlDB journey:

Getting Started

Setup

Running the application via docker compose

Leverage Docker Compose to orchestrate the deployment of ksqlDB and ksql-cli:

 ksqldb-server:
    image: confluentinc/ksqldb-server:latest
    container_name: ksqldb-server
    hostname: ksqldb-server
    depends_on: [kafka]
    ports:
      - 8088:8088
    networks:
      - webproxy
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: kafka:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_CONNECT_URL: http://kafka-connect-01:8084
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:latest
    container_name: ksqldb-cli
    networks:
      - webproxy
    depends_on: [kafka, ksqldb-server]
    entrypoint: /bin/sh
    tty: true
    environment:
      KSQL_KSQL_CONNECT_URL: http://kafka-connect-01:8084

Pull all required docker images

$ docker-compose pull

Start up the environment

Initiate the Docker containers to start up the ksqlDB environment.

The first time that you do this, the Docker images will be pulled down from the remote server. This may take a while!

$ docker-compose up
Creating ksqldb-server      ... done
Creating ksqldb-cli         ... done

Accessing ksqlDb via ksqldb-cli

Interact with ksqlDB using ksql-cli:

$ docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

10-ksql-db-initial

Check topics, streams and tables

show topics;
show streams;
show tables;

Streams

  • Declare Streams
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM PERSON_STREAM (id bigint,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,name VARCHAR,username VARCHAR,address_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.person',VALUE_FORMAT='JSON');
CREATE STREAM ADDRESS_STREAM (id bigint,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,city VARCHAR,street VARCHAR,suite VARCHAR,zipcode VARCHAR,geo_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.address',VALUE_FORMAT='JSON');
  • Queries
DESCRIBE PERSON_STREAM;
select * from PERSON_STREAM;
SELECT * FROM PERSON_STREAM EMIT CHANGES LIMIT 1;
+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|ID                       |UUID                     |CREATED_DATE_TIME        |LAST_MODIFIED_DATE_TIME  |NAME                     |USERNAME                 |ADDRESS_ID               |
+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|1                        |ce8d2120-1f93-11ed-8647-0|2022-08-19T13:22:00.000  |2022-08-19T13:22:00.000  |d14                      |dbz14                    |1                        |
|                         |c9a3cfadc50              |                         |                         |                         |                         |                         |
Limit Reached
Query terminated
  • stream-stream join
CREATE STREAM PERSON_ADDRESS_ENRICHED_STREAM WITH (FORMAT='JSON', KAFKA_TOPIC='person_address_enriched', PARTITIONS=1, REPLICAS=1) AS 
SELECT
  P.ID P_ID,
  A.ID A_ID,
  P.NAME NAME,
  A.CITY CITY
FROM PERSON_STREAM P
LEFT OUTER JOIN ADDRESS_STREAM A WITHIN 1 HOURS GRACE PERIOD 30 MINUTES ON ((A.ID = P.ADDRESS_ID))
EMIT CHANGES;

Tables

  • Declare Tables
CREATE TABLE PERSON (id bigint PRIMARY KEY,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,name VARCHAR,username VARCHAR,address_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.person',VALUE_FORMAT='JSON');
CREATE TABLE ADDRESS (id bigint PRIMARY KEY,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,city VARCHAR,street VARCHAR,suite VARCHAR,zipcode VARCHAR,geo_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.address',VALUE_FORMAT='JSON');
  • Query Tables
SELECT * FROM PERSON EMIT CHANGES LIMIT 1;
SELECT * FROM ADDRESS EMIT CHANGES LIMIT 1;
  • Table Joins
SELECT 
	P.NAME,
	A.CITY
FROM PERSON P
LEFT JOIN ADDRESS A on A.id = P.address_id
EMIT CHANGES 
LIMIT 1;
SELECT 
  P.NAME, 
  A.CITY
FROM PERSON P
INNER JOIN ADDRESS A
ON A.id = P.address_id
EMIT CHANGES
LIMIT 1;
CREATE TABLE PERSON_ADDRESS_ENRICHED (P_ID bigint,A_ID bigint,NAME VARCHAR,CITY VARCHAR) WITH (KAFKA_TOPIC='person_address_enriched',VALUE_FORMAT='JSON');
  • Others
DROP TABLE IF EXISTS PERSON;

Tear down the stack

When done, tear down the ksqlDB stack:

$ docker compose-down
Stopping ksqldb-cli       ... done
Stopping ksqldb-server    ... done
Removing ksqldb-cli         ... done
Removing ksqldb-server      ... done

If you want to preserve the state of all containers, run docker-compose stop instead.

By following these steps, you can harness the power of ksqlDB and ksql-cli to perform real-time stream processing, enabling you to extract insights and derive value from streaming data with ease and efficiency.

Tags

Anantha Raju C

| Poetry | Music | Cinema | Books | Visual Art | Software Engineering |