A hands-on guide to building real-time fraud detection using Timeplus
Machine learning is going real-time
The transition of machine learning into real-time represents a natural progression fueled by two key factors. Firstly, customers seek immediate decision-making capabilities, particularly in applications such as anomaly detection and personalized recommendations. Real-time prediction facilitates swift responses to evolving conditions, enhancing overall user experiences. Secondly, advancements in data infrastructure now support the processing of streaming data, leveraging technologies like Apache Kafka, Redpanda, Apache Pulsar, and AWS Kinesis. These technologies provide a robust foundation for constructing stream data pipelines essential for the implementation of real-time machine learning.
There are several challenges to building these real-time machine learning systems:
Support low latency, fresh feature generation Keeping a low latency feature means as soon as new data arrives, those related features should be calculated in real-time with second or sub-second delay. This usually requires a highly mature streaming feature pipeline.
Keep consistent features between training and serving In most ML systems, the batching data pipeline used for training differs from the streaming data pipeline used for inference, referring to the typical Lambda architecture. These pipelines are even created using different programming languages; for instance, Python is employed for training, while Java or Scala is used for inference. Another common issue is ensuring the point-in-time correctness of training data, preventing the use of future data for training, which could lead to data leakage.
Backfill historical data When dealing with missing data or when experimenting with new features, users often resort to historical data backfill to train a new model. This necessitates the entire system's ability to replay this data to reconstruct all the relevant features.
Manage the complexity of online and offline data infrastructures To support real-time ML, users usually need to add streaming systems used for real-time feature generation to their existing batch-based data processing for offline feature generation and model training, such as Apache Flink. This makes the whole system complex, and Flink itself can be complex to manage. As a result, running real-time machine learning is feasible mainly for vendors with robust engineering teams, but not every player is Meta or Uber.
Why streaming data platforms can be a good tool for building real-time machine learning
To solve these challenges, a typical solution is to use a real-time feature platform. A feature platform helps users manage the complexity of real-time feature pipeline and storage, providing both batching and streaming processing abstractions to support real-time machine learning. Leading vendors in this space include Tecton, Hopworks, Claypot, Fennel, and Chalk.
Another option is to leverage a streaming data platform to build your own real-time feature store:
It is a simple system that unifies streaming processing and historical data processing, so the training and inference are sharing the same data processing pipeline
It usually provides SQL-based interfaces that are easy for both data scientists and engineers to use. So you don't need to worry about the Java/Python battle, usually you can call SQL in both languages
Leveraging streaming processing, streaming data platforms provide low-latency data processing pipelines which process that data at arrival incrementally
Backfill historical data with streaming data platform is easy, just run some SQL
“Point-in-time correctness” can be implemented by using asof joining
Step-by-step guide: Implementing real-time feature pipelines for fraud detection using Timeplus Proton
So, how do you build your own real-time feature pipelines? I created this step-by-step guide on how I used Timeplus Proton (and other publicly available tools) to implement a real-time feature pipeline for fraud detection. Proton is our open source, unified streaming and historical data analytics database in a single binary.
Here is a high level overview of the key components:
An online payment data generator which is based on a fraud detection dataset on Kaggle
A streaming feature pipeline/store backed by Timeplus’s streaming data platform
Two live streams, one for payments and one for labels
Real-time and historical views for features
A materialized view of all features used for training and inference data
Model training, PyCaret, a low-code ML tool for classification model training and inference
Model serving, fastapi, a python API framework
Online Payment Data
First, let’s take a look at the data.
The payment data is based on a fraud detection dataset from Kaggle. There are two streams, one for online payment and one for the label data that marks a transaction as fraud or not.
The streams are created using the following SQL:
CREATE STREAM IF NOT EXISTS online_payments
(
`id` string,
`type` enum8('PAYMENT' = 0, 'TRANSFER' = 1, 'CASH_OUT' = 2, 'CASH_IN' = 3, 'DEBIT' = 4),
`amount` float64,
`account_from` string,
`old_balance_from` float64,
`new_balance_from` float64,
`account_to` string,
`old_balance_to` float64,
`new_balance_to` float64
)
CREATE STREAM IF NOT EXISTS online_payments_label
(
`id` string,
`is_fraud` bool,
`type` string
)
This will generate the payment and label data and ingest into Proton.
There are three different types of fraud being simulated:
Type 1: The payment starts with a small transfer amount and then transfers the remainder of the money. The fraudster wants to verify that transactions can go through successfully with the first payment.
Type 2: There will be several payments to send all money from one account to several different accounts in a very short period of time.
Type 3: All money from one account is used as payment to a merchant.
The generator will randomly generate one of those three different types of fraud transactions. When it is a fraud transaction, a label data is saved to the label stream as well. For regular transactions, there is no label data being generated.
Build Features
"Features" refer to the individual, measurable properties or characteristics of data that are used as input for a machine learning model.
In this example, we will build three different types of features as the input of our fraud detection model.
Real-time Features
Real-time features are generated in real-time when the data arrives. We use the following SQL to create a view for real-time features.
CREATE VIEW IF NOT EXISTS v_fraud_reatime_features AS
WITH cte AS
(
SELECT
_tp_time,
id,
type,
account_from,
amount,
lag(amount) AS previous_amount,
lag(_tp_time) AS previous_transaction_time
FROM
default.online_payments
WHERE
_tp_time > earliest_timestamp()
PARTITION BY
account_from
)
SELECT
_tp_time,
id,
type,
account_from,
amount,
previous_amount,
previous_transaction_time,
if(previous_transaction_time > earliest_timestamp(), date_diff('second', previous_transaction_time, _tp_time), 0) AS time_to_last_transaction
FROM
cte
The transaction type and transaction amount are two simple features which do not need any processing. These two features are directly created from the raw payment data.
The other two real-time features created by this SQL are:
amount of the current account's previous transaction
time elapsed from the current account's previous transaction
To build these two features, a `PARTITION BY` clause is used which will turn the stream into substreams by the account_from, then the lag function will return the previous value for specific fields in the stream from that account. This is a useful feature when you want to get the related value from a stream, and can be useful to identify type 1 and type 2 fraud.
A good feature platform should be able to handle stateful processing. In the above SQL, it is actually stateful processing as the query needs to remember if those previous records are for the current account_from? Thus, the feature is not only generated with the current records, but also related to the historical states.
Near-Real Time Features
Not all features are real-time features, near-real time features are features built on a time window within 1 minute or 5 minutes. These kinds of features can be used to identify the user behavior across a period of time.
CREATE VIEW IF NOT EXISTS v_fraud_1m_features AS
SELECT
window_start,
account_from,
count(*) AS count,
max(amount) AS max_amount,
min(amount) AS min_amount,
avg(amount) AS avg_amount
FROM
tumble(default.online_payments, 60s)
WHERE
_tp_time > earliest_timestamp()
GROUP BY
window_start, account_from
CREATE VIEW IF NOT EXISTS v_fraud_5m_features AS
SELECT
window_start,
account_from,
count_distinct(account_to) AS target_counts
FROM
tumble(default.online_payments, 5m)
WHERE
_tp_time > earliest_timestamp()
GROUP BY
window_start, account_from
With the above SQL, we build some 1 minute and 5 minute features using a tumble window.
1m transaction count per account
1m transaction max amount per account
1m transaction min amount per account
1m transaction average amount per account
5m distinct transaction target account number per account
We can see these features could be very helpful to identify type 2 fraud.
Historical Features
Historical features are those features that span all time from the data. In this sample, we create a 1 day tumble window aggregation and then calculate the max/min/average on all aggregated results using a global aggregation.
CREATE VIEW IF NOT EXISTS v_fraud_1d_features AS
WITH agg1d AS
(
SELECT
window_start, account_from, count(*) AS count, max(amount) AS max_amount
FROM
tumble(default.online_payments, 1d)
WHERE
_tp_time > earliest_timestamp()
GROUP BY
window_start, account_from
)
SELECT
now64() as ts, account_from, avg(count) AS avg_count, avg(max_amount) AS avg_max_amount
FROM
agg1d
GROUP BY
account_from
With the above SQL, we created following features:
daily average transaction amount per account
daily average max transaction amount per account
These kind of features could be very useful to identity type 3 fraud as the user usually won't spend all the money if we check their spending history.
Combining all features using ASOF join
When training a machine learning model or doing a prediction, features need to be assigned to each event. While the features are generated in different time intervals, ASOF join from all features will select the latest value from each feature view as shown in the following diagram.
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_fraud_all_features AS
SELECT
_tp_time AS time,
v_fraud_reatime_features.id AS id,
v_fraud_reatime_features.type AS type,
v_fraud_reatime_features.account_from AS account,
v_fraud_reatime_features.amount AS amount,
v_fraud_reatime_features.previous_amount AS previous_amount,
v_fraud_reatime_features.time_to_last_transaction AS time_to_last_transaction,
v_fraud_1m_features.count AS transaction_count_1m,
v_fraud_1m_features.max_amount AS max_transaction_amount_1m,
v_fraud_1m_features.avg_amount AS avg_transaction_amount_1m,
v_fraud_5m_features.target_counts AS distinct_transaction_target_count_5m,
v_fraud_1d_features.avg_count AS avg_transaction_count_1d,
v_fraud_1d_features.avg_max_amount AS avg_max_transaction_count_1d
FROM
v_fraud_reatime_features
ASOF LEFT JOIN v_fraud_1m_features ON (v_fraud_reatime_features.account_from = v_fraud_1m_features.account_from) AND (v_fraud_reatime_features._tp_time >= v_fraud_1m_features.window_start)
ASOF LEFT JOIN v_fraud_5m_features ON (v_fraud_reatime_features.account_from = v_fraud_5m_features.account_from) AND (v_fraud_reatime_features._tp_time >= v_fraud_5m_features.window_start)
ASOF LEFT JOIN v_fraud_1d_features ON (v_fraud_reatime_features.account_from = v_fraud_1d_features.account_from) AND (v_fraud_reatime_features._tp_time >= v_fraud_1d_features.ts)
SETTINGS
keep_versions = 100
We are running the above query as a materialized view, which is a long-running background process. All features will be calculated in real-time, incrementally, as soon as a new event comes into the online payment stream. This is the foundation to keep a low-latency feature pipeline used for next step’s training and inference.
Train the Model
Labeling and prepare training dataset
In the previous step, a materialized view of all features is created, as we are going to train a supervised classification model used to identify the fraud, we need a dataset with label information to show which transaction is fraud. We can simply join the feature mv with the label stream.
SELECT
*
FROM
table(mv_fraud_all_features) AS f
LEFT JOIN table(online_payments_label) AS l ON f.id = l.id
The label stream is an append-only log which contains the records that have been marked as fraud, while sometimes, we may make a mistake by marking some non-fraud transaction as fraud, a correction is needed. If we insert a new corrected label record to the label stream, as it is an append-only log, now the stream contains two records, one old label which is wrong and one new label which is the latest truth. When I updated the labels, a new model needed to be trained using this new label. So the user wants to use the latest version of labels, this can be handled by using a changelog function.
SELECT
*
FROM
table(online_payments_label)
WHERE
id = '37f0e2d5-9786-4833-a0e0-77c13eea4691'
Query id: e4139011-89d0-48ad-881b-bac408feda6d
┌─id───────────────────────────────────┬─is_fraud─┬─type──┬────────────────_tp_time─┐
│ b849c6e3-bc7a-4b20-a8da-35c589565879 │ true │ type1 │ 2023-11-30 18:31:06.523 │
└──────────────────────────────────────┴──────────┴───────┴─────────────────────────┘
┌─id───────────────────────────────────┬─is_fraud─┬─type─┬────────────────_tp_time─┐
│ b849c6e3-bc7a-4b20-a8da-35c589565879 │ false │ │ 2023-12-12 20:01:12.908 │
└──────────────────────────────────────┴──────────┴──────┴─────────────────────────┘
Assuming we have a transaction b849c6e3-bc7a-4b20-a8da-35c589565879, previously marked as fraud, and then we updated it as not.
We use following query to get the latest version of all labels:
CREATE VIEW v_latest_labels AS
WITH ordered_label AS
(
SELECT
*
FROM
table(online_payments_label)
ORDER BY
_tp_time ASC
)
SELECT
id, latest(is_fraud) AS is_fraud
FROM
ordered_label
GROUP BY
id
If we run this view, only the latest label version will be returned.
SELECT
*
FROM
v_latest_labels
WHERE
id = 'b849c6e3-bc7a-4b20-a8da-35c589565879'
Query id: 6e6425c9-2bd3-4894-bf54-95d7316b136b
┌─id───────────────────────────────────┬─is_fraud─┐
│ b849c6e3-bc7a-4b20-a8da-35c589565879 │ false │
└──────────────────────────────────────┴──────────┘
Now we have our query for training dataset with latest label
SELECT
*
FROM
table(mv_fraud_all_features) AS f
LEFT JOIN v_latest_labels AS l ON f.id = l.id
Training the Classification Model
Now, we have the training dataset ready, for fraud detection, we can build a classification model. PyCaret is a low-code ML tool, which can help us choose the best model to use, in the following sample code:
Query the training dataset from Timeplus
Convert the query result into a pandas dataframe
Initialize PyCaret environment and find/train the best classification mode
Save the model to a local file
import json
import pandas as pd
from pycaret.classification import *
from timeplus import Stream, Query, Environment
sql = '''
SELECT
*
FROM
table(mv_fraud_all_features) as f
LEFT JOIN v_latest_labels as l ON f.id = l.id
LIMIT 100000
'''
query = (
Query(env=env).sql(query=sql)
.batching_policy(10000, 1000)
.create()
)
query_header = query.header()
query_result = []
for event in query.result():
if event.event == "message":
query_result += json.loads(event.data)
columns = [ f['name'] for f in query_header]
df = pd.DataFrame(query_result, columns=columns)
df_train = df[['type', 'amount', 'previous_amount',
'time_to_last_transaction', 'transaction_count_1m', 'max_transaction_amount_1m',
'avg_transaction_amount_1m','distinct_transaction_target_count_5m','avg_transaction_count_1d',
'avg_max_transaction_count_1d','is_fraud']]
s = setup(data = df_train, target = 'is_fraud', session_id = 123)
best_model = compare_models()
save_model(best_model, 'saved_model')
With the above code, we trained a classification model using a historical query where we return a bounded set of data. You can change the time range of using other conditions to select a specific set of data used for training.
Real-time Inference
Leveraging streaming query, we can run the inference as soon as the event arrives in real time. The following code shows how to load a train model and run real-time inference:
import json
import pandas as pd
from timeplus import