Complex Event Processing Made Easy with Streaming SQL + UDF
- Gang Tao
- 4 days ago
- 8 min read
Updated: 3 minutes ago
How to Build CEP Applications with Timeplus
Complex Event Processing (CEP) is a technology that enables real-time analysis and detection of patterns, relationships, and trends across multiple streams of data events. It goes beyond simple event handling by correlating events from different sources, identifying meaningful patterns, and triggering actions based on complex event combinations.
CEP systems can detect temporal patterns such as:
sequences – "A followed by B within 10 minutes"
spatial relationships – "events occurring in the same geographic region"
statistical patterns – "more than 100 transactions per second from the same user"
They operate on continuous data streams with low latency, making them ideal for scenarios requiring immediate responses.
CEP is widely used in financial trading for algorithmic decision-making, in IoT systems for predictive maintenance, in cybersecurity for threat detection, and in business process monitoring for identifying bottlenecks or compliance violations.
In this blog, I'll walk you through a simple CEP example using an e-commerce fraud detection scenario. Let's say we want to detect potential credit card fraud by identifying suspicious purchasing patterns.
Sample Data Streams
The following diagram illustrates consecutive user activities which we will use to identify the fraud behaviors using CEP.

Stream 1: Purchase Events
{eventType: "purchase", userId: "user123", amount: 50, location: "New York", timestamp: "2025-06-27T10:00:00Z"}
{eventType: "purchase", userId: "user123", amount: 1200, location: "London", timestamp: "2025-06-27T10:05:00Z"}
{eventType: "purchase", userId: "user123", amount: 800, location: "Tokyo", timestamp: "2025-06-27T10:07:00Z"}
Stream 2: Login Events
{eventType: "login", userId: "user123", location: "New York", timestamp: "2025-06-27T09:55:00Z"}
{eventType: "login", userId: "user123", location: "Berlin", timestamp: "2025-06-27T10:01:00Z"}
CEP Rules Definition
We define a fraud detection rule here using some pseudo business language to explain what kind of pattern we want to use CEP to detect.
RULE: "Suspicious Geographic Activity"
IF (
User makes purchases from 2+ different continents
WITHIN 10 minutes
AND total purchase amount > $1000
)
THEN trigger fraud alert
Processing Steps
Step 1: Event Ingestion
CEP engine receives events from both streams in real-time
Events are tagged with timestamps and indexed by userId
Step 2: Pattern Matching
Engine tracks purchase locations per user within sliding time windows
For user123: detects purchases in New York (10:00), London (10:05), Tokyo (10:07)
Calculates geographic distances and time differences
Step 3: Rule Evaluation
Checks if pattern matches defined rule:
✓ 3 different continents (North America, Europe, Asia)
✓ Within 7 minutes (less than 10-minute window)
✓ Total amount: $2050 (exceeds $1000 threshold)
Step 4: Complex Event Generation
{
eventType: "FRAUD_ALERT",
userId: "user123",
reason: "Geographic anomaly detected",
triggeringEvents: [purchase_ny, purchase_london, purchase_tokyo],
riskScore: 0.95,
timestamp: "2025-06-27T10:07:00Z"
}
Step 5: Action Execution
Sends alert to fraud investigation team
Temporarily freezes the user's account
Logs incident for compliance reporting
There are a couple of open source or commercial options you can choose to build CEP applications, but today I am going to share with you how you can leverage Timeplus to implement CEP.
Streaming SQL Solution
For less complex CEPs, we actually can solve it purely using SQL. Based on the previous example, we can run the following streaming SQL to detect potential fraud:
-- Create separate streams for different event types
CREATE RANDOM STREAM login_events (
eventType string DEFAULT array_element(['login', 'logout'], (rand() % 2) + 1),
userId string DEFAULT array_element(['user123', 'user456'], (rand() % 2) + 1),
location string DEFAULT array_element(['New York', 'Berlin','Vancouver'], (rand() % 3) + 1),
timestamp datetime64(3) DEFAULT now64()
)
SETTINGS eps = 5;
CREATE RANDOM STREAM purchase_events (
eventType string DEFAULT array_element(['purchase', 'checkout'], (rand() % 2) + 1),
userId string DEFAULT array_element(['user123', 'user456'], (rand() % 2) + 1),
amount int DEFAULT rand() % 1001,
location string DEFAULT array_element(['New York', 'London','Tokyo'], (rand() % 3) + 1),
timestamp datetime64(3) DEFAULT now64()
)
SETTINGS eps = 5;
-- Create view for real-time union of the two activity streams
CREATE VIEW unified_user_events AS
SELECT
eventType,
userId,
0.0 as amount,
location,
timestamp,
eventType as source_stream
FROM login_events
UNION ALL
SELECT
eventType,
userId,
amount,
location,
timestamp,
eventType as source_stream
FROM purchase_events;
-- CEP Rule Detect suspicious activity using hopping window (10 min window, 5 sec step)
CREATE MATERIALIZED VIEW sql_based_cep_fraud_detection AS
SELECT
userId,
count(*) as total_events,
count_if(eventType = 'login') as login_count,
count_if(eventType = 'purchase') as purchase_count,
sum(amount) as total_purchase_amount,
group_array(location) as all_locations,
group_array(eventType) as event_sequence,
group_array(timestamp) as time_sequence,
window_start as t_start,
window_end as t_end
FROM hop(unified_user_events, timestamp, 5s, 10m)
GROUP BY userId, window_start, window_end
HAVING
-- Multiple geographic locations
length(array_distinct(all_locations)) >= 2
-- High-value transactions
AND total_purchase_amount > 1000
-- Must have at least one purchase
AND purchase_count >= 1
Using pure SQL for Complex Event Processing (CEP) has significant advantages: developers and analysts don’t need to learn a new pattern language. They can work with SQL, which they likely already know. Even non-technical stakeholders can understand and modify it, making it much easier to maintain.
But there are limitations too. SQL’s expressiveness can fall short when you need to describe complex sequence patterns, handle intricate state management, or work with recursive patterns and advanced algorithms. That’s where user-defined functions come in, filling the gaps when plain SQL isn’t enough.
User-Defined Function
Due to the limitation of SQL, many database systems provide extension capabilities by supporting user defined functions (UDF). Timeplus also provide several different types of user defined functions:
With those programming language based UDF, we can greatly extend the functionality of the SQL and are no longer limited when handling complex computation cases.
Here is a very simple example of using JavaScript UDF to detect sequence of patterns:
CREATE OR REPLACE AGGREGATE FUNCTION cep_simple_pattern(time datetime64(3), event string) RETURNS string LANGUAGE JAVASCRIPT AS $${
has_customized_emit: true,
initialize: function () {
this.events = [];
this.pattern = ['A', 'B', 'A'];
this.match_events = [];
},
process: function (Time, Event) {
console.log(Time, Event);
for (let i = 0; i < Event.length; i++) {
const event = {
time: Time[i],
event: Event[i]
}
this.events.push(event);
// a simple pattern detection
if (this.events.length > 3) {
// get last three events
const last_three_events = this.events.slice(-3);
// check if the pattern is present
if (last_three_events[0].event === this.pattern[0] &&
last_three_events[1].event === this.pattern[1] &&
last_three_events[2].event === this.pattern[2]) {
this.match = true;
this.match_events.push(JSON.stringify(last_three_events))
}
}
}
return this.match_events.length;
},
finalize: function () {
const result = this.match_events;
this.match_events = [];
return result;
},
}$$;
The function simple_pattern analyzes a sequence of timestamped events and detects occurrences of a specific pattern: A → B → A. The code:
Maintains a sliding window of the last 3 events
Looks for the exact sequence: A, then B, then A
When this pattern is found, it captures those 3 events as a match
How it works:
Initialize: Sets up empty arrays and defines the target pattern ['A', 'B', 'A']
Process: For each new event:
Adds it to the events history
Checks if the last 3 events match the A-B-A pattern
If matched, stores the matching events as JSON
Finalize: Returns all detected pattern matches and resets for next use
Note, the has_customized_emit will let the UDF control when to emit the result, when the has_customized_emit is set to true, it will only emit when the return of process is not 0.
User can test this CEP UDF using the following SQL:
CREATE RANDOM STREAM cep_test_stream
(
'time' datetime64(3) DEFAULT now64(),
'event' string DEFAULT array_element(['A', 'B', 'C'], (rand() % 3) + 1)
)
SETTINGS eps = 5;
SELECT cep_simple_pattern(time, event) FROM cep_test_stream;
[{"time":"2025-06-27T21:44:03.757Z","event":"A"},{"time":"2025-06-27T21:44:04.757Z","event":"B"},{"time":"2025-06-27T21:44:04.757Z","event":"A"}]
[{"time":"2025-06-27T21:44:05.757Z","event":"A"},{"time":"2025-06-27T21:44:06.757Z","event":"B"},{"time":"2025-06-27T21:44:06.757Z","event":"A"}]
[{"time":"2025-06-27T21:44:17.757Z","event":"A"},{"time":"2025-06-27T21:44:17.757Z","event":"B"},{"time":"2025-06-27T21:44:17.757Z","event":"A"}]
The random stream will generate event data randomly from A,B,C with 5 events per second. And when running the real time query, you can see if the event sequence follows the pattern A B A, a result of the match sequence will be emitted from the query.
This UDF is a very simple one but is a good starting point to build real time CEP applications. But what is the GAP compared to a production ready CEP system?
Supports multiple concurrent patterns instead a fixed simple pattern
Supports more pattern types, instead of sequence, there are threshold, absence, alternating or event customer defined pattern
Advanced state management, such as using Finite State Machine (FSM) or Time window to manage different state and state transitions.
Here, I have a more advanced example for you to reference: https://gist.github.com/gangtao/44fce0d019be441f94e19503c0923cf7
As demonstrated in this example, we have extended our capabilities to include more advanced pattern detection, leveraging a finite state machine (FSM) for efficient handling.
Sequence Patterns
Ordered event chains (A→B→A)
// States: WAITING_FOR_STEP_0, WAITING_FOR_STEP_1, ..., WAITING_FOR_STEP_N, COMPLETED
// Events: Matching event, Non-matching event, Timeout
State: { currentStep: 0, matchedEvents: [], lastEventTime: null }
Transitions:
- WAITING_FOR_STEP_0 + correct_event → WAITING_FOR_STEP_1
- WAITING_FOR_STEP_1 + correct_event → WAITING_FOR_STEP_2
- WAITING_FOR_STEP_N + correct_event → COMPLETED (emit pattern)
- ANY_STATE + wrong_event → RESET to WAITING_FOR_STEP_0
- ANY_STATE + timeout → RESET to WAITING_FOR_STEP_0
Threshold Patterns
Count-based detection (≥3 A's within 5 seconds)
// States: COUNTING, THRESHOLD_REACHED
// Events: Target event, Other event, Window expiry
State: { eventCount: 0, eventTimes: [], matchedEvents: [] }
Transitions:
- COUNTING + target_event → COUNTING (increment count)
- COUNTING + target_event (count >= threshold) → THRESHOLD_REACHED (emit pattern)
- COUNTING + window_expiry → COUNTING (remove old events)
Absence Patterns
Negative logic (A without C occurring)
// States: IDLE, WAITING_FOR_ABSENCE, ABSENCE_CONFIRMED
// Events: Trigger event, Expected event, Timeout
State: { triggerEvent: null, triggerTime: null, waitingForExpected: false }
Transitions:
- IDLE + trigger_event → WAITING_FOR_ABSENCE
- WAITING_FOR_ABSENCE + expected_event → IDLE (reset)
- WAITING_FOR_ABSENCE + timeout → ABSENCE_CONFIRMED (emit pattern)
Alternating Patterns
Repeating sequences (A-B-A-B...)
// States: EXPECTING_A, EXPECTING_B, PATTERN_BUILDING, COMPLETED
// Events: Expected event, Unexpected event, Timeout
State: {
sequence: [],
expectedNext: pattern.events[0], // Start with first event type
occurrences: 0
}
FSM Transitions:
- EXPECTING_A + event_A → EXPECTING_B (add to sequence)
- EXPECTING_B + event_B → EXPECTING_A (add to sequence, check completion)
- EXPECTING_X + wrong_event → RESET to EXPECTING_A
- PATTERN_BUILDING + sufficient_alternations → COMPLETED (emit pattern)
- ANY_STATE + timeout → RESET (clean old events)
Custom Patterns
User-defined complex conditions
// States: MONITORING, EVALUATING, MATCHED
// Events: Any event, Custom condition evaluation
State: {
lastProcessedIndex: 0 // Tracks position in event history
}
FSM Transitions:
- MONITORING + any_event → EVALUATING (run custom condition)
- EVALUATING + condition_true → MATCHED (emit pattern)
- EVALUATING + condition_false → MONITORING (continue monitoring)
- MATCHED → MONITORING (reset for next potential match)
Additionally, all patterns support a time window length check.
If we run the similar query with this UDF, we will get:
SELECT cep_advanced_pattern(time, event) FROM cep_test_stream;
{
"patternName": "threshold_A",
"patternDescription": "At least 3 A events within 5 seconds",
"matchType": "THRESHOLD_REACHED",
"events": [
{
"time": "2025-06-27T22:34:49.259Z",
"event": "A"
},
{
"time": "2025-06-27T22:34:49.259Z",
"event": "A"
},
{
"time": "2025-06-27T22:34:49.259Z",
"event": "A"
}
],
"timestamp": "2025-06-27T22:34:49.291Z",
"eventSequence": "A → A → A"
}
{
"patternName": "sequence_ABA",
"patternDescription": "Sequential pattern A->B->A",
"matchType": "SEQUENCE_COMPLETE",
"events": [
{
"time": "2025-06-27T22:34:50.251Z",
"event": "A"
},
{
"time": "2025-06-27T22:34:50.251Z",
"event": "B"
},
{
"time": "2025-06-27T22:34:50.251Z",
"event": "A"
}
],
"timestamp": "2025-06-27T22:34:50.252Z",
"eventSequence": "A → B → A"
}
{
"patternName": "complex_condition",
"patternDescription": "A followed by 2+ B events, then C",
"matchType": "CUSTOM: A followed by 2 B's and C",
"events": [
{
"time": "2025-06-27T22:39:50.251Z",
"event": "A"
},
{
"time": "2025-06-27T22:39:50.251Z",
"event": "B"
},
{
"time": "2025-06-27T22:39:51.251Z",
"event": "B"
},
{
"time": "2025-06-27T22:39:51.251Z",
"event": "C"
}
],
"timestamp": "2025-06-27T22:39:52.251Z",
"eventSequence": "A → B → B → C"
}
Summary
CEP is a critical technology in stream processing, which can be widely used in different use cases including:
Real-Time Decision Making: CEP enables immediate responses to complex business situations. For example, a fraud detection system can correlate a user's location data, transaction patterns, and account behavior in real-time to identify suspicious activity within milliseconds of a transaction attempt.
Pattern Recognition Across Multiple Streams: Unlike traditional stream processing that handles events individually, CEP can correlate events from different sources. A supply chain system might combine sensor data from trucks, weather alerts, and traffic information to predict delivery delays and automatically reroute shipments.
Temporal Reasoning: CEP excels at time-based analysis, detecting patterns that unfold over specific time windows. This is crucial for applications like monitoring system health, where you need to identify when error rates spike across multiple services within a certain timeframe.
Reducing Information Overload: Instead of alerting on every individual event, CEP generates higher-level insights by combining related events into meaningful patterns. This reduces noise and helps operators focus on situations that truly require attention.
Complex event processing doesn't have to be hard. You don't need to learn complicated new languages or build everything yourself. Timeplus makes CEP easy by using SQL—something most developers already know. And when SQL isn't enough, you can add custom functions (UDFs) to handle the tricky stuff.
Want to catch fraud in real-time? Monitor your IoT devices? Track user behavior as it happens? Timeplus lets you do all this with simple SQL queries, and UDFs give you the power to solve even the most complex problems.
Ready to see how easy streaming data can be? Check out timeplus.com and https://play.demo.timeplus.com/ to see for yourself.