Photo by Tim Mossholder / Unsplash

ksqlDB and ksql-cli

Apache Kafka Nov 4, 2022

Introduction

This example provides a base setup for using ksqlDB (The database purpose-built for stream processing applications) and ksql-cli.

Minimum Software Requirements

Getting Started

Setup

Running the application via docker compose

 ksqldb-server:
    image: confluentinc/ksqldb-server:latest
    container_name: ksqldb-server
    hostname: ksqldb-server
    depends_on: [kafka]
    ports:
      - 8088:8088
    networks:
      - webproxy
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: kafka:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_CONNECT_URL: http://kafka-connect-01:8084
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:latest
    container_name: ksqldb-cli
    networks:
      - webproxy
    depends_on: [kafka, ksqldb-server]
    entrypoint: /bin/sh
    tty: true
    environment:
      KSQL_KSQL_CONNECT_URL: http://kafka-connect-01:8084

Pull all required docker images

$ docker-compose pull

Start up the environment

The first time that you do this, the Docker images will be pulled down from the remote server. This may take a while!

$ docker-compose up
Creating ksqldb-server      ... done
Creating ksqldb-cli         ... done

Accessing ksqlDb via ksqldb-cli

$ docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

10-ksql-db-initial

Check topics, streams and tables

show topics;
show streams;
show tables;

Streams

  • Declare Streams
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM PERSON_STREAM (id bigint,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,name VARCHAR,username VARCHAR,address_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.person',VALUE_FORMAT='JSON');
CREATE STREAM ADDRESS_STREAM (id bigint,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,city VARCHAR,street VARCHAR,suite VARCHAR,zipcode VARCHAR,geo_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.address',VALUE_FORMAT='JSON');
  • Queries
DESCRIBE PERSON_STREAM;
select * from PERSON_STREAM;
SELECT * FROM PERSON_STREAM EMIT CHANGES LIMIT 1;
+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|ID                       |UUID                     |CREATED_DATE_TIME        |LAST_MODIFIED_DATE_TIME  |NAME                     |USERNAME                 |ADDRESS_ID               |
+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|1                        |ce8d2120-1f93-11ed-8647-0|2022-08-19T13:22:00.000  |2022-08-19T13:22:00.000  |d14                      |dbz14                    |1                        |
|                         |c9a3cfadc50              |                         |                         |                         |                         |                         |
Limit Reached
Query terminated
  • stream-stream join
CREATE STREAM PERSON_ADDRESS_ENRICHED_STREAM WITH (FORMAT='JSON', KAFKA_TOPIC='person_address_enriched', PARTITIONS=1, REPLICAS=1) AS 
SELECT
  P.ID P_ID,
  A.ID A_ID,
  P.NAME NAME,
  A.CITY CITY
FROM PERSON_STREAM P
LEFT OUTER JOIN ADDRESS_STREAM A WITHIN 1 HOURS GRACE PERIOD 30 MINUTES ON ((A.ID = P.ADDRESS_ID))
EMIT CHANGES;

Tables

  • Declare Tables
CREATE TABLE PERSON (id bigint PRIMARY KEY,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,name VARCHAR,username VARCHAR,address_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.person',VALUE_FORMAT='JSON');
CREATE TABLE ADDRESS (id bigint PRIMARY KEY,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,city VARCHAR,street VARCHAR,suite VARCHAR,zipcode VARCHAR,geo_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.address',VALUE_FORMAT='JSON');
SELECT * FROM PERSON EMIT CHANGES LIMIT 1;
SELECT * FROM ADDRESS EMIT CHANGES LIMIT 1;
  • Table Joins
SELECT 
	P.NAME,
	A.CITY
FROM PERSON P
LEFT JOIN ADDRESS A on A.id = P.address_id
EMIT CHANGES 
LIMIT 1;
SELECT 
  P.NAME, 
  A.CITY
FROM PERSON P
INNER JOIN ADDRESS A
ON A.id = P.address_id
EMIT CHANGES
LIMIT 1;
CREATE TABLE PERSON_ADDRESS_ENRICHED (P_ID bigint,A_ID bigint,NAME VARCHAR,CITY VARCHAR) WITH (KAFKA_TOPIC='person_address_enriched',VALUE_FORMAT='JSON');
  • Others
DROP TABLE IF EXISTS PERSON;

Tear down the stack

$ docker compose-down
Stopping ksqldb-cli       ... done
Stopping ksqldb-server    ... done
Removing ksqldb-cli         ... done
Removing ksqldb-server      ... done

If you want to preserve the state of all containers, run docker-compose stop instead.

Tags

Anantha Raju C

| Poetry | Music | Cinema | Books | Visual Art | Software Engineering |