top of page

Complex Event Processing Made Easy with Streaming SQL + UDF

  • Writer: Gang Tao
    Gang Tao
  • 4 days ago
  • 8 min read

Updated: 3 minutes ago

How to Build CEP Applications with Timeplus


Complex Event Processing (CEP) is a technology that enables real-time analysis and detection of patterns, relationships, and trends across multiple streams of data events. It goes beyond simple event handling by correlating events from different sources, identifying meaningful patterns, and triggering actions based on complex event combinations.


CEP systems can detect temporal patterns such as:

  • sequences – "A followed by B within 10 minutes"

  • spatial relationships – "events occurring in the same geographic region"

  • statistical patterns – "more than 100 transactions per second from the same user" 


They operate on continuous data streams with low latency, making them ideal for scenarios requiring immediate responses.


CEP is widely used in financial trading for algorithmic decision-making, in IoT systems for predictive maintenance, in cybersecurity for threat detection, and in business process monitoring for identifying bottlenecks or compliance violations.


In this blog, I'll walk you through a simple CEP example using an e-commerce fraud detection scenario. Let's say we want to detect potential credit card fraud by identifying suspicious purchasing patterns.



Sample Data Streams


The following diagram illustrates consecutive user activities which we will use to identify the fraud behaviors using CEP.



Stream 1: Purchase Events

{eventType: "purchase", userId: "user123", amount: 50, location: "New York", timestamp: "2025-06-27T10:00:00Z"}
{eventType: "purchase", userId: "user123", amount: 1200, location: "London", timestamp: "2025-06-27T10:05:00Z"}
{eventType: "purchase", userId: "user123", amount: 800, location: "Tokyo", timestamp: "2025-06-27T10:07:00Z"}

Stream 2: Login Events

{eventType: "login", userId: "user123", location: "New York", timestamp: "2025-06-27T09:55:00Z"}
{eventType: "login", userId: "user123", location: "Berlin", timestamp: "2025-06-27T10:01:00Z"}


CEP Rules Definition


We define a fraud detection rule here using some pseudo business language to explain what kind of pattern we want to use CEP to detect.

RULE: "Suspicious Geographic Activity"
IF (
  User makes purchases from 2+ different continents 
  WITHIN 10 minutes
  AND total purchase amount > $1000
) 
THEN trigger fraud alert


Processing Steps


Step 1: Event Ingestion
  • CEP engine receives events from both streams in real-time

  • Events are tagged with timestamps and indexed by userId


Step 2: Pattern Matching
  • Engine tracks purchase locations per user within sliding time windows

  • For user123: detects purchases in New York (10:00), London (10:05), Tokyo (10:07)

  • Calculates geographic distances and time differences


Step 3: Rule Evaluation
  • Checks if pattern matches defined rule:

    ✓ 3 different continents (North America, Europe, Asia)

    ✓ Within 7 minutes (less than 10-minute window)

    ✓ Total amount: $2050 (exceeds $1000 threshold)


Step 4: Complex Event Generation
{
  eventType: "FRAUD_ALERT",
  userId: "user123",
  reason: "Geographic anomaly detected",
  triggeringEvents: [purchase_ny, purchase_london, purchase_tokyo],
  riskScore: 0.95,
  timestamp: "2025-06-27T10:07:00Z"
}

Step 5: Action Execution
  • Sends alert to fraud investigation team

  • Temporarily freezes the user's account

  • Logs incident for compliance reporting


There are a couple of open source or commercial options you can choose to build CEP applications, but today I am going to share with you how you can leverage Timeplus to implement CEP.



Streaming SQL Solution


For less complex CEPs, we actually can solve it purely using SQL. Based on the previous example, we can run the following streaming SQL to detect potential fraud: 


-- Create separate streams for different event types
CREATE RANDOM STREAM login_events (
  eventType string DEFAULT array_element(['login', 'logout'], (rand() % 2) + 1),
  userId string DEFAULT array_element(['user123', 'user456'], (rand() % 2) + 1),
  location string DEFAULT array_element(['New York', 'Berlin','Vancouver'], (rand() % 3) + 1),
  timestamp datetime64(3) DEFAULT now64()
)
SETTINGS eps = 5;

CREATE RANDOM STREAM purchase_events (
  eventType string DEFAULT array_element(['purchase', 'checkout'], (rand() % 2) + 1),
  userId string DEFAULT array_element(['user123', 'user456'], (rand() % 2) + 1),
  amount int DEFAULT rand() % 1001,
  location string DEFAULT array_element(['New York', 'London','Tokyo'], (rand() % 3) + 1),
  timestamp datetime64(3) DEFAULT now64()
)
SETTINGS eps = 5;


-- Create view for real-time union of the two activity streams
CREATE VIEW unified_user_events AS
SELECT
   eventType,
   userId,
   0.0 as amount,
   location,
   timestamp,
   eventType as source_stream
FROM login_events
UNION ALL
SELECT
   eventType,
   userId,
   amount,
   location,
   timestamp,
   eventType as source_stream
FROM purchase_events;


-- CEP Rule Detect suspicious activity using hopping window (10 min window, 5 sec step)
CREATE MATERIALIZED VIEW sql_based_cep_fraud_detection AS
SELECT
   userId,
   count(*) as total_events,
   count_if(eventType = 'login') as login_count,
   count_if(eventType = 'purchase') as purchase_count,
   sum(amount) as total_purchase_amount,
   group_array(location) as all_locations,
   group_array(eventType) as event_sequence,
   group_array(timestamp) as time_sequence,
   window_start as t_start,
   window_end as t_end
FROM hop(unified_user_events, timestamp, 5s, 10m)
GROUP BY userId, window_start, window_end
HAVING
   -- Multiple geographic locations
   length(array_distinct(all_locations)) >= 2
   -- High-value transactions
   AND total_purchase_amount > 1000
   -- Must have at least one purchase
   AND purchase_count >= 1

Using pure SQL for Complex Event Processing (CEP) has significant advantages: developers and analysts don’t need to learn a new pattern language. They can work with SQL, which they likely already know. Even non-technical stakeholders can understand and modify it, making it much easier to maintain.


But there are limitations too. SQL’s expressiveness can fall short when you need to describe complex sequence patterns, handle intricate state management, or work with recursive patterns and advanced algorithms. That’s where user-defined functions come in, filling the gaps when plain SQL isn’t enough.

 


User-Defined Function


Due to the limitation of SQL, many database systems provide extension capabilities by supporting user defined functions (UDF). Timeplus also provide several different types of user defined functions:


With those programming language based UDF, we can greatly extend the functionality of the SQL and are no longer limited when handling complex computation cases.


Here is a very simple example of using JavaScript UDF to detect sequence of patterns:

CREATE OR REPLACE AGGREGATE FUNCTION cep_simple_pattern(time datetime64(3), event string) RETURNS string LANGUAGE JAVASCRIPT AS $${
 has_customized_emit: true,

 initialize: function () {
   this.events = [];
   this.pattern = ['A', 'B', 'A'];
   this.match_events = [];
 },

 process: function (Time, Event) {
   console.log(Time, Event);

   for (let i = 0; i < Event.length; i++) {
       const event = {
           time: Time[i],
           event: Event[i]
       }
       this.events.push(event);

       // a simple pattern detection
       if (this.events.length > 3) {
           // get last three events
           const last_three_events = this.events.slice(-3);
           // check if the pattern is present
           if (last_three_events[0].event === this.pattern[0] &&
               last_three_events[1].event === this.pattern[1] &&
               last_three_events[2].event === this.pattern[2]) {
               this.match = true;
               this.match_events.push(JSON.stringify(last_three_events))
           }
       }
   }

   return this.match_events.length;
 },

 finalize: function () {
   const result = this.match_events;
   this.match_events = [];
   return result;
 },
}$$;

The function simple_pattern analyzes a sequence of timestamped events and detects occurrences of a specific pattern: A → B → A. The code:

  • Maintains a sliding window of the last 3 events

  • Looks for the exact sequence: A, then B, then A

  • When this pattern is found, it captures those 3 events as a match


How it works:

  1. Initialize: Sets up empty arrays and defines the target pattern ['A', 'B', 'A']

  2. Process: For each new event:

    1. Adds it to the events history

    2. Checks if the last 3 events match the A-B-A pattern

    3. If matched, stores the matching events as JSON

  3. Finalize: Returns all detected pattern matches and resets for next use


Note, the has_customized_emit will let the UDF control when to emit the result, when the has_customized_emit is set to true, it will only emit when the return of process is not 0.


User can test this CEP UDF using the following SQL:

CREATE RANDOM STREAM cep_test_stream
(
 'time' datetime64(3) DEFAULT now64(),
 'event' string DEFAULT array_element(['A', 'B', 'C'], (rand() % 3) + 1)
)
SETTINGS eps = 5;

SELECT cep_simple_pattern(time, event) FROM cep_test_stream;

[{"time":"2025-06-27T21:44:03.757Z","event":"A"},{"time":"2025-06-27T21:44:04.757Z","event":"B"},{"time":"2025-06-27T21:44:04.757Z","event":"A"}]
[{"time":"2025-06-27T21:44:05.757Z","event":"A"},{"time":"2025-06-27T21:44:06.757Z","event":"B"},{"time":"2025-06-27T21:44:06.757Z","event":"A"}]
[{"time":"2025-06-27T21:44:17.757Z","event":"A"},{"time":"2025-06-27T21:44:17.757Z","event":"B"},{"time":"2025-06-27T21:44:17.757Z","event":"A"}]

The random stream will generate event data randomly from A,B,C with 5 events per second.  And when running the real time query, you can see if the event sequence follows the pattern A B A, a result of the match sequence will be emitted from the query.


This UDF is a very simple one but is a good starting point to build real time CEP applications. But what is the GAP compared to a production ready CEP system?


  • Supports multiple concurrent patterns instead a fixed simple pattern

  • Supports more pattern types, instead of sequence, there are threshold, absence, alternating or event customer defined pattern

  • Advanced state management, such as using Finite State Machine (FSM) or Time window to manage different state and state transitions.


Here, I have a more advanced example for you to reference: https://gist.github.com/gangtao/44fce0d019be441f94e19503c0923cf7 


As demonstrated in this example, we have extended our capabilities to include more advanced pattern detection, leveraging a finite state machine (FSM) for efficient handling.


Sequence Patterns

Ordered event chains (A→B→A)

// States: WAITING_FOR_STEP_0, WAITING_FOR_STEP_1, ..., WAITING_FOR_STEP_N, COMPLETED
// Events: Matching event, Non-matching event, Timeout

State: { currentStep: 0, matchedEvents: [], lastEventTime: null }

Transitions:
- WAITING_FOR_STEP_0 + correct_event → WAITING_FOR_STEP_1
- WAITING_FOR_STEP_1 + correct_event → WAITING_FOR_STEP_2  
- WAITING_FOR_STEP_N + correct_event → COMPLETED (emit pattern)
- ANY_STATE + wrong_event → RESET to WAITING_FOR_STEP_0
- ANY_STATE + timeout → RESET to WAITING_FOR_STEP_0

Threshold Patterns

Count-based detection (≥3 A's within 5 seconds)

// States: COUNTING, THRESHOLD_REACHED
// Events: Target event, Other event, Window expiry

State: { eventCount: 0, eventTimes: [], matchedEvents: [] }

Transitions:
- COUNTING + target_event → COUNTING (increment count)
- COUNTING + target_event (count >= threshold) → THRESHOLD_REACHED (emit pattern)
- COUNTING + window_expiry → COUNTING (remove old events)

Absence Patterns

Negative logic (A without C occurring)

// States: IDLE, WAITING_FOR_ABSENCE, ABSENCE_CONFIRMED
// Events: Trigger event, Expected event, Timeout

State: { triggerEvent: null, triggerTime: null, waitingForExpected: false }

Transitions:
- IDLE + trigger_event → WAITING_FOR_ABSENCE
- WAITING_FOR_ABSENCE + expected_event → IDLE (reset)
- WAITING_FOR_ABSENCE + timeout → ABSENCE_CONFIRMED (emit pattern)

Alternating Patterns

Repeating sequences (A-B-A-B...)

// States: EXPECTING_A, EXPECTING_B, PATTERN_BUILDING, COMPLETED
// Events: Expected event, Unexpected event, Timeout

State: { 
  sequence: [], 
  expectedNext: pattern.events[0], // Start with first event type
  occurrences: 0 
}

FSM Transitions:
- EXPECTING_A + event_A → EXPECTING_B (add to sequence)
- EXPECTING_B + event_B → EXPECTING_A (add to sequence, check completion)
- EXPECTING_X + wrong_event → RESET to EXPECTING_A
- PATTERN_BUILDING + sufficient_alternations → COMPLETED (emit pattern)
- ANY_STATE + timeout → RESET (clean old events)

Custom Patterns

User-defined complex conditions

// States: MONITORING, EVALUATING, MATCHED
// Events: Any event, Custom condition evaluation

State: { 
  lastProcessedIndex: 0  // Tracks position in event history
}

FSM Transitions:
- MONITORING + any_event → EVALUATING (run custom condition)
- EVALUATING + condition_true → MATCHED (emit pattern)
- EVALUATING + condition_false → MONITORING (continue monitoring)
- MATCHED → MONITORING (reset for next potential match)

Additionally, all patterns support a time window length check.


If we run the similar query with this UDF, we will get:

SELECT cep_advanced_pattern(time, event) FROM cep_test_stream;

{
	"patternName": "threshold_A",
	"patternDescription": "At least 3 A events within 5 seconds",
	"matchType": "THRESHOLD_REACHED",
	"events": [
		{
			"time": "2025-06-27T22:34:49.259Z",
			"event": "A"
		},
		{
			"time": "2025-06-27T22:34:49.259Z",
			"event": "A"
		},
		{
			"time": "2025-06-27T22:34:49.259Z",
			"event": "A"
		}
	],
	"timestamp": "2025-06-27T22:34:49.291Z",
	"eventSequence": "A → A → A"
}
{
	"patternName": "sequence_ABA",
	"patternDescription": "Sequential pattern A->B->A",
	"matchType": "SEQUENCE_COMPLETE",
	"events": [
		{
			"time": "2025-06-27T22:34:50.251Z",
			"event": "A"
		},
		{
			"time": "2025-06-27T22:34:50.251Z",
			"event": "B"
		},
		{
			"time": "2025-06-27T22:34:50.251Z",
			"event": "A"
		}
	],
	"timestamp": "2025-06-27T22:34:50.252Z",
	"eventSequence": "A → B → A"
}
{
	"patternName": "complex_condition",
	"patternDescription": "A followed by 2+ B events, then C",
	"matchType": "CUSTOM: A followed by 2 B's and C",
	"events": [
		{
			"time": "2025-06-27T22:39:50.251Z",
			"event": "A"
		},
		{
			"time": "2025-06-27T22:39:50.251Z",
			"event": "B"
		},
		{
			"time": "2025-06-27T22:39:51.251Z",
			"event": "B"
		},
		{
			"time": "2025-06-27T22:39:51.251Z",
			"event": "C"
		}
	],
	"timestamp": "2025-06-27T22:39:52.251Z",
	"eventSequence": "A → B → B → C"
}


Summary


CEP is a critical technology in stream processing, which can be widely used in different use cases including:

  • Real-Time Decision Making: CEP enables immediate responses to complex business situations. For example, a fraud detection system can correlate a user's location data, transaction patterns, and account behavior in real-time to identify suspicious activity within milliseconds of a transaction attempt.

  • Pattern Recognition Across Multiple Streams: Unlike traditional stream processing that handles events individually, CEP can correlate events from different sources. A supply chain system might combine sensor data from trucks, weather alerts, and traffic information to predict delivery delays and automatically reroute shipments.

  • Temporal Reasoning: CEP excels at time-based analysis, detecting patterns that unfold over specific time windows. This is crucial for applications like monitoring system health, where you need to identify when error rates spike across multiple services within a certain timeframe.

  • Reducing Information Overload: Instead of alerting on every individual event, CEP generates higher-level insights by combining related events into meaningful patterns. This reduces noise and helps operators focus on situations that truly require attention.


Complex event processing doesn't have to be hard. You don't need to learn complicated new languages or build everything yourself. Timeplus makes CEP easy by using SQL—something most developers already know. And when SQL isn't enough, you can add custom functions (UDFs) to handle the tricky stuff.


Want to catch fraud in real-time? Monitor your IoT devices? Track user behavior as it happens? Timeplus lets you do all this with simple SQL queries, and UDFs give you the power to solve even the most complex problems.


Ready to see how easy streaming data can be? Check out timeplus.com  and https://play.demo.timeplus.com/  to see for yourself.


bottom of page