Real-time data analytics has grown exponentially, becoming the new normal. Increasingly, more data is now stored or transported via data streaming platforms such as Apache Kafka, Apache Pulsar, or Amazon Kinesis. In a traditional data stack, streaming data gets imported into a database or data warehouse for users to further analyze or process. However, it is actually possible to run SQL directly on streaming data. In the recent Kafka meetup in Vancouver, my colleague Jove and I presented the talk Running Streaming SQL on Kafka. Why, How, and What's Next?, exploring different options for running SQL Kafka and doing the analysis directly.
In this blog, I will share how to use different tools to query data in a Kafka topic with SQL, perform some basic analysis, and then send the analysis results to another Kafka topic. The slides deck and query notebook are available at the end of this blog.
A Kafka cluster with three nodes has been set up. Using data simulation tools, we have created a topic and simulated a car sharing scenario where thousands of cars are reporting their status into this Kafka topic.
Here is a sample event in json format:
{
"cid":"c00517",
"gas_percent":74.38581147266994,
"in_use":true,
"latitude":83.76161946072543,
"locked":false,
"longitude":35.090340891241965,
"speed_kmh":53,
"time":"2022-12-14T21:22:48.516342615Z",
"total_km":81.26434736984858
}
Flink
Apache Flink is the most popular streaming processing tool. In most cases, the user will need to write some Java/Scala code for stream processing, though it also has a SQL mode where the user can run streaming SQL for data processing directly, without writing code.
When using Flink SQL, the user can create a Table which connects to different data sources. This connection is bidirectional, meaning the user can use a SELECT query statement to read data or use INSERT statement to write data. While the Flink community has many connectors that support connecting different data with Flink Table, Kafka is the most popular, as most streaming data comes from Kafka.
First, Flink SQL supports different query result modes. In the demo, we use `tableau` mode, which is similar to other SQL clients.
-- set result mode
SET 'sql-client.execution.result-mode' = 'tableau'
Next, a Table is created with the connector to the Kafka topic.
-- create table
CREATE TABLE car_live_data (
cid STRING,
locked BOOLEAN,
latitude DOUBLE,
gas_percent DOUBLE,
total_km DOUBLE,
in_use BOOLEAN,
longitude DOUBLE,
speed_kmh DOUBLE,
`time` VARCHAR,
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
WATERMARK FOR `record_time` AS `record_time` - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'car_live_data',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'kafka:9092',
'value.format' = 'json'
)
Note that we are using record_time fields to calculate watermark–the record_time is an internal field appended by the Kafka broker to the event, which has the timestamp_ltz(3) type, since the watermark will be used to decide the end of the window when running a window-based query. Using customer time fields to calculate watermark is supported by Flink, but in my case, the event time field’s format is `2022-12-14T21:22:48.516342615Z`, which cannot be easily transformed into the timestamp type, (refer to https://stackoverflow.com/questions/70385766/how-to-convert-a-tz-datetime-to-timestamp3-in-flink-sql) Thus, in this example, I am using record_time instead.
Here is the result if you call TO_TIMESTAMP using FlinkSQL to convert the time string to timestamp:
Flink SQL> select TO_TIMESTAMP('2022-12-14T21:22:48.516342615Z');
+----+-------------------------+
| op | EXPR$0 |
+----+-------------------------+
| +I | <NULL> |
+----+-------------------------+
Received a total of 1 row
You can run a SQL query to query the data from the created table. As a Flink query is an unbounded streaming query, the new query result will be pushed to the user.
-- simple stream query
select cid, `time`, speed_kmh from car_live_data;
-- 5 seconds max speed
SELECT window_start, window_end, MAX(speed_kmh) as max_speed
FROM TABLE(
TUMBLE(TABLE car_live_data, DESCRIPTOR(`record_time`), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end
The above query is a tumble window-based aggregation, which calculates the max car speed for each 5 second window.
-- create a temp table for data sink
CREATE TEMPORARY TABLE car_live_data_sink (
window_start TIMESTAMP_LTZ(3),
window_end TIMESTAMP_LTZ(3),
max_speed DOUBLE,
cid STRING
) WITH (
'connector' = 'kafka',
'topic' = 'flinksql_car_live_data_sink',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
As mentioned, the Flink-Kafka connector also supports writing data back to Kafka. In the above query, a table is created that is connected to a Kafka topic, and we can insert the query result from the previous tumble window aggregation into that topic. This query will be running in the background and is continuously writing query results into the connected Kafka topic. This is a typical use case when the user wants a downsample from an existing data stream, so the downstream doesn’t have to handle large amounts of raw data.
Here are the pros and cons of using FlinkSQL to query Kafka data streams:
Pros:
Easy to connect to Kafka data using Kafka connector with bidirectional read/write.
Query result is pushing to the user in real-time.
Provides streaming SQL that support time window-based aggregation and watermark-based late event handling.
Supports high availability and scalable cluster.
Cons:
Flink clusters are hard to manage and maintain. the user has to make sure the Java, Scala, Flink, and 3rd party libraries versions are completely matched. If any of those versions are mismatched, the cluster will be broken.
It will take some time to submit the Flink job, so when a user runs the query, it usually takes some time to return the first result, even if the query is a very simple one.
By default, all the states are in memory, which means, when a user restarts their Flink cluster, those tables will be gone and the user needs to create those resources.
Trino
Trino is an open-source distributed SQL query engine designed to query large data sets distributed over one or more heterogeneous data sources. (A side note: the story between Trino and Presto is an interesting one, and you can check out the relationship between these two projects if you’re curious.)
With Trino, you can run SQL query to your database, data lakes, and of course, Apache Kafka. The concept mode is very similar to Flink where a connector is configured for a table which can be used to read/write Kafka data. While one big difference is that Flink is running in streaming mode, and Trino is running in a batch mode. This means Trino will load the Kafka data into memory, processing the data and returning the query result in a batch.
To configure the Kafka connector, the user has to manually update the configuration, adding the following config to `/ect/catalog/kafka.properties` which will create a catalog called Kafka:
connector.name=kafka
kafka.default-schema=default
kafka.nodes=kafka:9092
kafka.table-names=car_live_data
kafka.hide-internal-columns=true
To support the json to table mapping, the user also needs to add the following config to `etc/trino/kafka` .
{
"tableName": "car_live_data",
"schemaName": "default",
"topicName": "car_live_data",
"key": {
"dataFormat": "raw",
"fields": [
{
"name": "kafka_key",
"dataFormat": "LONG",
"type": "BIGINT",
"hidden": "false"
}
]
},
"message": {
"dataFormat": "json",
"fields": [
{
"name": "cid",
"mapping": "cid",
"type": "VARCHAR"
},
{
"name": "in_use",
"mapping": "in_use",
"type": "BOOLEAN"
},
{
"name": "longitude",
"mapping": "longitude",
"type": "DOUBLE"
},
{
"name": "latitude",
"mapping": "latitude",
"type": "DOUBLE"
},
{
"name": "speed_kmh",
"mapping": "speed_kmh",
"type": "BIGINT"
},
{
"name": "gas_percent",
"mapping": "gas_percent",
"type": "DOUBLE"
},
{
"name": "total_km",
"mapping": "total_km",
"type": "DOUBLE"
},
{
"name": "locked",
"mapping": "locked",
"type": "BOOLEAN"
},
{
"name": "time",
"mapping": "time",
"type": "VARCHAR"
}
]
}
}
Run `trino --catalog=kafka --schema=default` will bring you Trino’s interactive SQL shell.
Then we can run a simple query:
-- simple query
select cid, time, speed_kmh from car_live_data;
As you can see, Trino’s query will return data in a batch mode, which means when there is no data pushed to Kafka, it will not impact the query result.
Since Trino is not running in streaming mode, there is no window aggregation function provided, but we can run aggregation on all available data.
-- a simple agg query , no window agg
SELECT count(*) as count, max(speed_kmh) as max_speed, cid
FROM car_live_data
GROUP BY cid
ORDER BY cid
It takes some time for Trino to run the batch aggregation since it will need to load all the data from the very beginning from the topic.
Then we can call insert statements to write query results to a pre-configured table which connects to a Kafka topic.
Here is the sink table configuration, at `etc/trino/kafka/default.trino_car_live_data_sink.json`
{
"tableName": "trino_car_live_data_sink",
"schemaName": "default",
"topicName": "trino_car_live_data_sink",
"key": {
"dataFormat": "raw",
"fields": [
{
"name": "kafka_key",
"dataFormat": "LONG",
"type": "BIGINT",
"hidden": "true"
}
]
},
"message": {
"dataFormat": "json",
"fields": [
{
"name": "count",
"mapping": "count",
"type": "BIGINT"
},
{
"name": "cid",
"mapping": "cid",
"type": "VARCHAR"
}
]
}
}
-- insert is supported only for avro encoding
INSERT INTO trino_car_live_data_sink (count, cid)
SELECT count(*) as count, cid from car_live_data group by cid
Trino is a great SQL engine in case you want to run a federated query on different data sources. But when the use case is to query Kafka data, Trino is not a good choice, especially compared to Flink. Trino’s Kafka connection has to be manually configured, the user cannot dynamically create a new table connected to a Kafka topic. Trino does not support streaming processing, so it will scan all the data on kafka, as Kafka is an append only log, there is no effective way to skip data which means the data loading could be a performance concern.
ksqlDB
ksqlDB is a distributed SQL engine for Apache Kafka. It allows users to query, process, and store data in Kafka using a SQL-like syntax, making it easier to work with streaming data. ksqlDB is built on top of Kafka streams, so it is scalable and fault-tolerant.
As ksqlDB is tightly integrated with Kafka, it’s designed to query data on Kafka. ksqlDB has two major concepts: table and stream. To learn more, check out this paper: Streams and Tables: Two Sides of the Same Coin
First, let’s create a stream from the Kafka topic:
-- create stream
CREATE STREAM car_live_data
(cid STRING, locked BOOLEAN, latitude DOUBLE,
gas_percent DOUBLE, total_km DOUBLE, time STRING,
in_use BOOLEAN, longitude DOUBLE, speed_kmh DOUBLE)
WITH (kafka_topic='car_live_data', value_format='json', partitions=1)
Then, we run a simple query:
select cid, time, speed_kmh from car_live_data
Then, run a simple window-based aggregation query:
-- a simple window agg query
SELECT windowstart as ws, cid, avg(gas_percent) as avg_gas, max(speed_kmh) as max_speed from car_live_data
WINDOW TUMBLING (SIZE 60 SECONDS)
GROUP BY cid
EMIT CHANGES
Finally, we create a sink table and insert the aggregation result into that table. This query will be running in the background, which is also called a materialized view.
-- select into a newly created table
CREATE TABLE ksql_car_live_data_sink
WITH (kafka_topic='ksql_car_live_data_sink', value_format='json', partitions=1) AS
SELECT cid, max(speed_kmh) as speed from car_live_data group by cid
emit changes
ksqlDB is integrated with the Confluent platform and is tightly integrated with Kafka, so it works smoothly while querying Kafka data. Unlike the Flink job, the interactive query has no latency waiting for job submission. ksqlDB’s stream query supports the time window and pushes data to the client.
Since ksqlDB is pre-configured with a Kafka cluster, it cannot directly query data on other Kafka clusters. With ksqlDB, user has to understand some implementation details of kafka, for example, when creating streams, user need to specific partition, when joining data, the operation is limited by co-partition limitations, all of these would be hard for those who does not understand how Kafka works.
In case you have already deployed a Confluent platform or are using a Confluent Cloud, ksqlDB can be a good tool to support analysis data on Kafka.
Materialize
Materialize is a distributed database that provides real-time, scalable data processing and fast query performance. It is designed to make it easy to work with streaming data, and provides a SQL-like query language for running queries on streaming data in real-time. Materialize is built on top of a distributed, in-memory data store, which allows it to process data quickly and support high-throughput workloads.
The core concept is the materialized view, which can be considered a background long running stream processing query. Source and sink are components which read data from and write data to external systems.
With a materialized view, the first thing to do is create a Kafka source:
-- create source
CREATE SOURCE car_live_data
FROM KAFKA BROKER 'kafka:9092' TOPIC 'car_live_data'
WITH (start_offset=0)
FORMAT BYTES
Then, the user can create a materialized view, which extracts all the fields from the json data.
-- create mv , json to table data conversion
CREATE MATERIALIZED VIEW car_live_data_view AS
SELECT
(data->>'cid') AS cid,
(data->>'gas_percent')::float AS gas_percent,
(data->>'in_use')::boolean AS in_use,
CAST((data->>'time') as timestamp) AS time,
(data->>'latitude')::float AS latitude,
(data->>'longitude')::float AS longitude,
(data->>'locked')::boolean AS locked,
(data->>'speed_kmh')::int AS speed_kmh,
(data->>'total_km')::float AS total_km
FROM (
SELECT CAST(data AS JSONB) AS data
FROM (
SELECT CONVERT_FROM(data, 'utf8') AS data
FROM car_live_data
)
)
The aggregation query can be another materialized view. All the materialized view is updated at background automatically.
-- tumble window in mz
CREATE MATERIALIZED VIEW car_live_data_agg_view AS
SELECT cid, max(speed_kmh) as max_speed
FROM car_live_data_view
WHERE mz_logical_timestamp() < extract(epoch from time) * 10000000000 + 100000000000
GROUP BY cid
Materialize does not provide window aggregation functions like tumble or hop, the user can use the mz_logical_timestamp function to calculate the current data belonging to which window. This works but is not so convenient and the user has to learn how to use it.
You can run following query to subscribe the query result in real-time:
-- TAIL result
COPY (tail car_live_data_agg_view) TO STDOUT
At last, we can create a sink using the materialized view:
-- sink into kafka
CREATE SINK car_live_data_agg_view_sink
FROM car_live_data_agg_view
INTO KAFKA BROKER 'kafka:9092' TOPIC 'materialize_car_live_data_agg_view_sink'
FORMAT JSON
Note, in this sample, v0.26.5 is used, which is not exactly same as materialize cloud, if you are materialize cloud user, the query statement might be different.
Materialize provides a fully compatible interface of postgresql, which means users can use existing ecosystem resources that support postgresql. Materialize is developed with Rust which is a high performance compared with Java. Materialize does not support a time window by default, using mz_logical_timestamp manually is kind of hard to use. Materialize put everything in memory, in case restart or crash, user has to created all resources (for cloud, it could be different case).
One other stream database which is very similar to materialize called RisingWave provide similar functions and concept.
Timeplus
Timeplus is a cloud-based streaming analytics platform, with a streaming database under the hood to provide SQL-based data processing and analytics. Why SQL? Because SQL is a declarative programming language and it's one of the best practices to create Data-as-a-product. The UI also provides interactive SQL capabilities, dashboards, and visualizations for a better understanding of streaming analytic results. As a result, we achieved Data democratization, lowering the technical barrier around data at every scale.
The stream is the core concept of Timeplus, which represents the constantly changing streaming data. Source and sink are used to read/write data from/to external systems. Materialized view is used to run a long running query in the background and store those query results in an internal stream.
First, the user can set up a Kafka source in the Timeplus console UI.
Then run a simple query in Timeplus:
SELECT cid, window_start, max(speed_kmh) AS max_speed
FROM tumble(car_live_data, 3s)
GROUP by cid, window_start
The user can explore the different analysis results and then if the query works as expected, he can create a Kafka sink to write the query result to a specific Kafka topic.
Timeplus is a powerful tool that can be used to query Kafka data streams. It provides very low latency with high throughput (refer to this blog). Timeplus has full streaming semantic SQL support including window aggregation, late event handling and push results to the client to make sure the query result is consumed in real-time.
Summary
For an overview of major differences, here are all the products mentioned, along with key characteristics.
In this blog, we only covered a very basic flow to run some simple down sampling query from a stream data on Kafka, and then export the result to another Kafka topic. There are lots of other functions we did not cover, such as how to join two streams, how to handle late events, how to analyze large numbers of sub-streams using Partition By, and how to extend using User Defined Functions/Aggregate Functions. We will have more blogs to talk about these functions in the future.
As more and more data is moved to event streams, querying data on Kafka and getting real-time analytic results are becoming more and more valuable. All these tools we talked about today can be used to query data on Kafka, I hope it will help you find the right one to use:
If there is no need to return real-time results and the data volume is small, Trino can be considered.
If you have a technical team that has the capability to manage Flink clusters, and you don't need to run a lot of Ad-hoc queries, you can consider using Flink.
If you are using Confluent Platform or Confluent Cloud, and have no other external Kafka data to query, ksqlDB is a good choice as part of the Confluent platform.
If you want to embed some streaming database into your own application, Rust based programming might be a good choice.
If you need one unified real-time analytics platform to query from multiple streaming or historical data sources, or you want to have end-to-end insights like visualization or alerting functions beyond querying the data only, Timeplus is the one you should consider.
All DDL and SQL statements on this blog and talk are available in this public Hyperquery notebook. To get a copy of the slides deck, please visit https://web.timeplus.com/20221213kafka