Empower Real-Time Stream Processing with ksqlDB and ksql-cli
In this guide, we'll delve into setting up ksqlDB—a powerful database purpose-built for stream processing applications—alongside its command-line interface, ksql-cli. With these tools, you can seamlessly explore, query, and transform streaming data to derive valuable insights in real-time.
Introduction
ksqlDB stands as a cutting-edge database designed specifically for stream processing applications. It offers a streamlined platform for processing and analyzing streaming data, enabling users to unlock insights and respond to events as they occur. Coupled with ksql-cli, a command-line interface for interacting with ksqlDB, you gain unparalleled flexibility and control over your stream processing workflows.
Minimum Software Requirements
Ensure you have the following software components installed to kickstart your ksqlDB journey:
- Docker (for ksqldb-server, ksqldb-cli)
- Docker Compose
Getting Started
Setup
Running the application via docker compose
Leverage Docker Compose to orchestrate the deployment of ksqlDB and ksql-cli:
ksqldb-server:
image: confluentinc/ksqldb-server:latest
container_name: ksqldb-server
hostname: ksqldb-server
depends_on: [kafka]
ports:
- 8088:8088
networks:
- webproxy
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_CONNECT_URL: http://kafka-connect-01:8084
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
ksqldb-cli:
image: confluentinc/ksqldb-cli:latest
container_name: ksqldb-cli
networks:
- webproxy
depends_on: [kafka, ksqldb-server]
entrypoint: /bin/sh
tty: true
environment:
KSQL_KSQL_CONNECT_URL: http://kafka-connect-01:8084
Pull all required docker images
$ docker-compose pull
Start up the environment
Initiate the Docker containers to start up the ksqlDB environment.
The first time that you do this, the Docker images will be pulled down from the remote server. This may take a while!
$ docker-compose up
Creating ksqldb-server ... done
Creating ksqldb-cli ... done
Accessing ksqlDb via ksqldb-cli
Interact with ksqlDB using ksql-cli:
$ docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Check topics, streams and tables
show topics;
show streams;
show tables;
Streams
- Declare Streams
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM PERSON_STREAM (id bigint,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,name VARCHAR,username VARCHAR,address_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.person',VALUE_FORMAT='JSON');
CREATE STREAM ADDRESS_STREAM (id bigint,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,city VARCHAR,street VARCHAR,suite VARCHAR,zipcode VARCHAR,geo_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.address',VALUE_FORMAT='JSON');
- Queries
DESCRIBE PERSON_STREAM;
select * from PERSON_STREAM;
SELECT * FROM PERSON_STREAM EMIT CHANGES LIMIT 1;
+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|ID |UUID |CREATED_DATE_TIME |LAST_MODIFIED_DATE_TIME |NAME |USERNAME |ADDRESS_ID |
+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|1 |ce8d2120-1f93-11ed-8647-0|2022-08-19T13:22:00.000 |2022-08-19T13:22:00.000 |d14 |dbz14 |1 |
| |c9a3cfadc50 | | | | | |
Limit Reached
Query terminated
- stream-stream join
CREATE STREAM PERSON_ADDRESS_ENRICHED_STREAM WITH (FORMAT='JSON', KAFKA_TOPIC='person_address_enriched', PARTITIONS=1, REPLICAS=1) AS
SELECT
P.ID P_ID,
A.ID A_ID,
P.NAME NAME,
A.CITY CITY
FROM PERSON_STREAM P
LEFT OUTER JOIN ADDRESS_STREAM A WITHIN 1 HOURS GRACE PERIOD 30 MINUTES ON ((A.ID = P.ADDRESS_ID))
EMIT CHANGES;
Tables
- Declare Tables
CREATE TABLE PERSON (id bigint PRIMARY KEY,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,name VARCHAR,username VARCHAR,address_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.person',VALUE_FORMAT='JSON');
CREATE TABLE ADDRESS (id bigint PRIMARY KEY,uuid VARCHAR,created_date_time TIMESTAMP,last_modified_date_time TIMESTAMP,city VARCHAR,street VARCHAR,suite VARCHAR,zipcode VARCHAR,geo_id bigint) WITH (KAFKA_TOPIC='mysql.streaming_etl_db.address',VALUE_FORMAT='JSON');
- Query Tables
SELECT * FROM PERSON EMIT CHANGES LIMIT 1;
SELECT * FROM ADDRESS EMIT CHANGES LIMIT 1;
- Table Joins
SELECT
P.NAME,
A.CITY
FROM PERSON P
LEFT JOIN ADDRESS A on A.id = P.address_id
EMIT CHANGES
LIMIT 1;
SELECT
P.NAME,
A.CITY
FROM PERSON P
INNER JOIN ADDRESS A
ON A.id = P.address_id
EMIT CHANGES
LIMIT 1;
CREATE TABLE PERSON_ADDRESS_ENRICHED (P_ID bigint,A_ID bigint,NAME VARCHAR,CITY VARCHAR) WITH (KAFKA_TOPIC='person_address_enriched',VALUE_FORMAT='JSON');
- Others
DROP TABLE IF EXISTS PERSON;
Tear down the stack
When done, tear down the ksqlDB stack:
$ docker compose-down
Stopping ksqldb-cli ... done
Stopping ksqldb-server ... done
Removing ksqldb-cli ... done
Removing ksqldb-server ... done
If you want to preserve the state of all containers, run docker-compose stop
instead.
By following these steps, you can harness the power of ksqlDB and ksql-cli to perform real-time stream processing, enabling you to extract insights and derive value from streaming data with ease and efficiency.