top of page
  • Writer's pictureJove Zhong

Top 10 Streaming SQL Patterns

Everyone knows SQL. But did you know that by extending SQL to streaming analytics patterns you can easily conduct powerful real-time analytics without that much additional work? We’ve seen how our customers are substantially reducing development team manpower by switching real-time microservices to streaming SQL. In this blog, we’ll show you the top 10 streaming SQL patterns that can help you rapidly build up your real-time capabilities.


 

Why Streaming SQL

At Timeplus, we love simplicity. So when building our streaming analytics engine, we decided to leverage the well-known and popular SQL. The traditional SQL is a bounded query. You send the SQL to the database. It scans data with the conditions you specify and returns the result as a fixed number of rows and columns. When you process streaming data, you need the latest results without constantly re-submitting the SQL. Streaming SQL is built upon SQL, adding two major key capabilities:

  1. Streaming SQL is long running. It continuously shows the latest results to the client, unless the user cancels the query.

  2. In addition to processing real-time data, streaming SQL supports many streaming analytics semantics: such as detecting late events, supporting many time windows.

To demonstrate the power of streaming SQL, I built a demo with a Slack API and Timeplus Streaming SQL. The demo is structured to continuously get chat activities via Slack Real-Time Message (RTM) API and send them to data streams in Timeplus. I defined a few streaming SQLs as the pattern detection rules. Those streaming SQL can trigger AWS Lambda (via Timeplus built-in WebHook sink) to delete/update/post messages real-time.


The system overview:


A short demo video:


As demonstrated in the video, with the streaming SQL and our Timeplus engine, the chat messages can be processed in real-time, and actions are taken immediately. The end-to-end latency is just a few milliseconds.


We’ll use this example to help showcase the top 10 patterns for streaming SQL.


 

1. Streaming Tail & Filter

Before defining any meaningful streaming processing rule, you first need to understand what the real-time data looks like. You can simply run `SELECT * FROM stream` to list all live data from the stream, without applying any filter or aggregation.


Timeplus provides a single-click experience to run a streaming tail for any data stream.

When the data arrives, it will show up in the results page in the same second. We also built a real-time table header to help you understand the common value and distribution for each column.


After verifying the streaming data, you can apply filters to take actions for certain types of events.


In this real-time Slack demo, for example, you can add a real-time rule that deletes the message if it contains certain keywords, such as confidential or password.


You can just run:


SELECT * FROM slack_events WHERE text=’password


In order to take real-time action for the matching result, we add two more columns, one for the type of action to take, one for a message to post in the channel as a reminder.


SELECT *, 'delete' AS tp_action, 'Do not send messages with potential confidential information' AS tp_message FROM slack_events WHERE text=’password


The Lambda function will get the entire row from webhook payload and do corresponding actions based on the tp_action column, and post a message in the channel.


However WHERE text=’password only works when the user just types literally the ‘password’ in the message and nothing else. You can enhance the filter logic as follows:

  • Check whether a certain keyword is part of the message: WHERE text LIKE ’%password%’

  • Check whether a set of keywords exist: WHERE multi_search_any(text,[‘password’,’token’,’secret’])

  • Check whether the message matches a certain pattern, for example, whether the user mentions a AWS KMS key WHERE match(text,’arn:aws:kms:us-east-1:\d{12}:key/.{36}’)

  • Check the message is too long WHERE length(text)>100

2. Tumble Window Aggregation

Tumble window or fixed window aggregation is a common time window to process streaming data. One typical use is to do downsampling. For example, sensors from IoT devices send data couple times a minute without a fixed interval. To store data more efficiently and make the analysis easier, the data can be grouped into each minute and only keep the min, max, avg, sum, count information. The following streaming SQL demonstrates the downsampling for IoT data:


SELECT window_start, window_end, device_id, count(1), sum(v), avg(v),min(v),max(v),p90(v) FROM tumble(iot_data,1m) GROUP BY window_start, window_end, device_id

The tumble() is a special function and takes at least 2 parameters, one as the data stream name, the other as the window size. In this case 1m means “one minute”. The following diagram illustrates how the events from different devices and with different event times will be grouped by the tumble window.

In the Slack example, we can send out an hourly message to show how many messages were sent in the past hour, and can even sort by the most active users.


SELECT window_start, user, count(1) FROM tumble(slack_events,1h) GROUP BY window_start, user ORDER BY user DESC


3. Late Event Detection

Before we talk about other common time windows, we also need to talk about an interesting topic in the streaming processing: events can be “fashionably late”.

Source: https://www.amazon.com/exec/obidos/ASIN/074349959X

There could be many reasons why the streaming data comes late. Such as network latency, or the clock on the device is later than the wall clock. If we are building a tumble window for each second, we don’t want to keep waiting for the late events and delay the aggregation. That’s why in the modern streaming processing engine, a “watermark” is introduced.


The full explanation of watermark is beyond the scope of this blog, but I’ll be sure to address that topic (I will write one in the future.) The following diagram illustrates how watermarks help to detect late events.

At Timeplus, we are doing our best to hide such complexity from the end user. If you are okay for the events being 2 seconds late, you can define the rule in the streaming SQL as following:


SELECT .. FROM tumble(..) GROUP BY .. EMIT AFTER WATERMARK AND DELAY 2s


4. Hopping Window Aggregation

Let’s continue exploring other common time windows.


Hopping window, a.k.a. sliding window, is another window with fixed window size, but unlike a tumble window, it can overlap to each other.


One picture is worth a thousand words:

In the Slack example, you can check whether someone sends over 20 messages in a minute in a channel. If that’s the case, it could be a signal that the user is disturbing the flow of the discussion. You probably don’t want to use a tumble window, otherwise the window start and end have to match the beginning of the minute. You may want to send out a reminder to the user as soon as possible. One solution is to use hop(.., 1s,1m)


SELECT channel,window_end,user_id, count(1) AS msg_cnt, 'nop' AS tp_action, concat('<@',user_id,'>, do not send messages too frequently') AS tp_message FROM hop(slack_events,1s, 1m) GROUP BY channel, window_end, user_id HAVING msg_cnt>20


5. Stateful Processing

The previous SQL can notify the users if they send too many messages in a short time period, however the side effect is multiple notifications will be sent out. For example, if the user sends too many messages from 12:01:00 to 12:02:00. The first notification will go out at 12:02:01. However, even if the user stops sending a message immediately, most likely at 12:02:02 another notification will be sent out, since the count remains high in that hop window.


To solve this problem, we can use the lag function to access the previous streaming results. For example


SELECT channel,window_end,user_id, count(1) AS msg_cnt, 'nop' AS tp_action, concat('<@',user_id,'>, do not send messages too frequently') AS tp_message, FROM hop(slack_events,1s, 1m) GROUP BY channel, window_end, user_id HAVING msg_cnt>20 AND lag(user_id)!=user_id


The lag(user_id) function will get the user_id from the last result that just emits, then we can check whether we are about to notify the same user. If so, skip this notification.

Lag function can be very useful to do “minute over minute”,”hour over hour” or other comparisons. For example, you can compare how the total revenue in this hour compares with the last hour:


SELECT window_start, sum(amount) as revenue, lag(revenu) as last_hour, revenue- last_hour as gap FROM tumble(revenue,1h) GROUP BY window_start


Or even compare the revenue for this hour with the same hour yesterday:


SELECT window_start, sum(amount) as revenue, revenue - lag(revenu,24) as gap FROM tumble(revenue,1h) GROUP BY window_start


Timeplus supports stateful streaming processing. You can use streaming SQL to access information across multiple events. Some examples of stateful operations are:

  • The lag, latest, and earliest functions allow you to access the events in the data stream with certain patterns.

  • The tumble/hop/session windows hold the pending aggregations per second/minute/etc.

  • The global aggregation holds the state since you start the query until you cancel it

  • When training a machine learning model over a stream of data points, the state holds the current version of the model parameters.

Easily access the historical data via the table function and seek_to time travel capabilities.


6. Session Window Aggregation

There are many other time windows. The last common one I’d like to highlight is the session window. This is not the same as the common session concept in HTTP web server. The session window can track whether the same entity is actively sending events, and close the session if the time gap between the two events exceeds the specified time.

Unlike tumble or hop window, the window size is not fixed, and different group key may have different windows (for example, device1’s three windows are very different than device2’s windows)


In the Slack example, if two messages from the same user (in the same channel) are in the same minute, we consider the user is discussing a topic. If there are over 60 seconds between the messages from the same user, we consider the discussion is over. We may want to send an reminder to ask them to meet in person or via huddle/meeting if there are too many back-and-forth discussions in the slack (sorry, this may not be a good example)


SELECT 'nop' AS tp_action, concat('<@',user_id,'>, start a meeting?') AS tp_message, FROM session(slack_events,user_id,1m) GROUP BY window_start, window_end HAVING date_diff(‘minute’,window_start,window_end)>2


session(slack_events, user_id, 1m) means we group the slack events by user_id. Messages from the same user_id will be put in the same session window if the gap between 2 messages are less than 1 minute(we also call this as idle-time). If the user keeps sending messages frequently, the session window will expand. By default the maximum session window is 5 times of the idle-time. In this example, the maximum session window is 5 minutes. We can compare the window_start and window_end to know how long the session window is and remind the user/administrator.


7. Time Travel

Streaming SQL is mainly about processing upcoming new events. However, sometimes you may want to look back at the events that just happened. For example, when you come to work at 9 am, you may want to check the server spike at midnight. Or after fixing data issues or program defects, you may want to re-analyze the past moment or specific time range. But this traditional re-indexing and re-computing approach requires a lot of effort and long wait times to get new analytic results.


Luckily Timeplus provides an easy way to go back to the past with some simple variations:

  • You can go back to the earliest event via SETTINGS seek_to=’earliest’

  • You can also go back to 2 hours ago via SETTINGS seek_to=’-2h’

  • Or you can go back to a specific timestamp via SETTINGS seek_to=’2022–04–12 06:00:00.000'

Since Timeplus unifies storage and computing for both streaming and historical data, you can use such queries to go back to the past, start the aggregation process, and keep processing the new events.


8. User-Defined Functions

It’s a hot topic whether SQL is the right tool for streaming analytics. A common concern is there are limited numbers of SQL functions and expressions. It’s hard or impossible to write custom logic via SQL. At Timeplus, we are listening to these concerns. That’s why we invested a lot of time on our User-Defined Functions (UDF) framework and sample functions.


We are working with a few early adopters to test the UDF streaming functions, so that customers can build their own analytics logic with familiar programming languages and libraries, and apply them to streaming data. We also eat our own dog-food. Our machine-learning-based predictive analytics are actually implemented with such a UDF framework.


9. Streaming Data Transformation

The last 2 common patterns for streaming SQL are Data Transformation and Join.

In our above example, we keep querying the slack_events stream. Actually this is not a stream. The raw data from Slack RTM API is in JSON format and we save the JSON directly in the timeplus_slack_events stream, with a single column for raw data.


Then we define our streaming transformation logic as a streaming view:


SELECT _tp_time, _tp_time AS ingest_time, json_extract_string(message, 'ts') AS ts, to_datetime64(ts, 6) AS event_time, date_diff('second', event_time, ingest_time) AS second, json_extract_string(message, 'type') AS type, json_value(message, '$.blocks[*].type') AS block_type, json_value(message, '$.files[*].preview') AS file_preview, json_extract_string(message, 'channel') AS channel, json_extract_string(message, 'user') AS user_id, json_extract_string(message, 'text') AS text, json_extract_string(message, 'client_msg_id') AS id FROM default.timeplus_slack_events WHERE length(id) > 0


Via this view, we extract value from the JSON document and apply certain filters. In the upcoming Timeplus beta releases, we will introduce an even simpler way to process JSON, so that you don’t need to use those json_extract_ functions.