How Timeplus Unifies Streaming and Historical Data Processing

Timeplus is a converged platform which unifies streaming and historical data processing. What does it mean from a technical perspective? In this blog, we’ll dive into some of the internal architecture and design of the Timeplus core engine and how it combines these two worlds with what kind of common challenges. We’ll also share the technical foundations of the Timeplus core engine: streaming processing theories, database theories from academia, and industry practices in recent years.

 

The Foundations


The Timeplus core engine is technically a modern analytic DSMS (Database Stream Management System). There are quite a few academic works and recent years’ industry practices which have profound impacts to the Timeplus core engine’s system data model and query processing model.


As far as we can tell, “streaming processing database” was explored back to 30 years ago in Tapestry : Continuous Queries over Append-only Databases [TAPESTRY-1992] paper to tackle the common scaled query processing (lots of concurrent query) and query latency challenges in traditional databases. The Tapestry paper explores SQL semantics over continuous incremental query processing. The fundamentals presented still apply to every modern streaming processing database via SQL interface. In the 2000s, around 10 years later, there are three important academic works: STREAM: The Stanford Data Stream Management System[STREAM-2003], Aurora: A new model and architecture for data stream management[AURORA-2003] and later the second generation of Aurora The Design of the Borealis Stream Processing Engine[BOREALIS-2005]. The data model of the Timeplus core engine is an extension of them, and we will cover it in detail in a later section. These three works almost also conclude the first generation of streaming processing if we would like to draw a milestone timeline and their system data model is relational and most of them are academic PoC systems.


The second generation of streaming processing roughly started a decade later with Google MillWheel [MILLWHEEL-2013], Spark Streaming [SPARK-STREAM-2013] Storm @Twitter [STORM-2014], Flink [FINK-2015], Trill [TRILL-2015] and more recent Kafka Streams [KAFKA-STREAM-2018]. Although most of the fundamental problems remain the same, there are lots of big improvements regarding out-of-order data handling, query state management, query processing guarantees and scalability. The second generation is highly impacted by the Map Reduce paper [MAP-REDUCE-2004] from our perspective. Most of them are data processing pipeline based and like the Map Reduce framework, they require users to implement code and get the code deployed to the system and run. Their data model and processing model drift away from regular databases as these systems usually don’t require a specific schema nor interact with SQL (at least initially). The Timeplus core engine adopts MillWheel’s / Flink’s watermark strategies to handle out of order events in the high level. Flink’s query state checkpoint algorithm [DIST-SNAPSHOT-1985] influences the state management functionality of the core engine as well.


Like Streaming Processing industry practitioner / expert Tyler Akidau presented in his keynote in DEBS: Open Problems in Streaming Processing: A Call To Action [TYLER-DEBS-2019], we are reaching another turning point to re-think / re-innovate (interesting enough, it is another decade later since Google MillWheel paper):

  • Can we make streaming processing more accessible and more economical to users?

  • Can we have better interoperability between streaming data and historical data processing?

  • Can we combine several decades of database technologies like query optimization with streaming processing in one system?

  • Can we just use SQL [ONE-SQL-RULE-ALL-2019] ?


The answer is definitely yes from our point of view and Timeplus is on this journey to echo Tyler’s call. It is the era of the 3rd generation of streaming processing.


The Architecture


As mentioned at the very beginning of this blog, Timeplus unifies streaming and historical data processing into one single binary. The high level architecture of its core engine is depicted below.



Internally, the core engine separates control plane and data plane as typical systems do. The data plane itself contains 4 components: ingestion framework, query processing pipeline, the stream store (NativeLog), the historical store and the control plane contains metadata storage and nodes coordination functionalities.


The basic input / output data flow of the system is:

  1. User creates a stream, and then ingest data to the stream by using Timeplus REST APIs or SQL `INSERT INTO` statements via Data Ingestion Framework.

  2. The ingest framework parses the data, does some lightweight processing like timestamp recognition, and converts the data to a block with Timeplus Data Format, and the block gets appended to the streaming store (internally we call it NativeLog).

  3. There is a background process which tails data from the streaming store and commits them into the historical data store.

  4. Users can issue SQL SELECT statements to do pure real-time streaming query processing or pure historical query processing or sometimes a combo of the two


The streaming data store and historical data store are optimized for different use cases: the former is optimized for real-time data ingestion / streaming processing, and the latter is optimized for large amounts of historical data (range) scan. So typically they are configured with different types of storage. For instance, the streaming store usually resides on NVMe SSD, but the historical data store can reside on HDD or Cloud Storage. Please also note the streaming store is pluggable as well: the core engine also supports Kafka / Redpanda as the streaming store (internally we call it KafkaLog).


Having two data stores internally enables easier and better interoperability of streaming and historical data processing like large amounts of data backfill, connecting historical insights with real-time analytics etc. Another important factor is it simplifies the users’ data stack: users don’t need to glue different systems together. Compared to a pure streaming query processing engine like Flink, we also think storage and query processing go hand-in-hand and can enable most efficient optimizations: we can control the internal data format to avoid unnecessary data converting among the streaming store, the historical store and the in-memory representation as much as possible. This is exactly why we introduce Timeplus Data Format: an internal data format representation which flows across different components in the core engine with little data conversion.


Although the internal architecture looks complex, one of the design principles we enforced strictly for the code engine is simplicity and efficiency: one single binary for all of the functionalities. Users can simply run the binary CLI and that’s it. This is one of the prerequisites to make streaming / historical data processing more accessible and economical and it is especially true in edge deployment. Even for Cloud deployment, we still benefit from this design. Compared to large scale deployment of Spark / Flink etc data processing solutions [SCALE-COST-2015], we strive to make the Timeplus core engine compact and efficient and we are so happy with this design choice so far. Please reference our `Need for Speed` part I and II blogs for the performance Timeplus can get if readers are curious about that.


Data Model and Storage


The Timeplus core engine follows a relational data model which requires a schema like a traditional relational database does. Every event in Timeplus as described below has some built-in columns like `Physical_Time, Index_Time, Sequence_Number` etc and the other user defined columns with different column types.

Event = {Physical_Time, Index_Time, Sequence_Number,  Delta_Flag,  k1 ,, kn , v1 ,, vn }

The events are usually persisted in block (like batch them together) in the Timeplus Data Format which is columnar with quite a few metadata. This data model enables exploring modern CPU’s SIMD processing capabilities like a most modern OLAP system does. Please note that although the data model is relational, it is still quite common for end users to just ingest raw string data (one column with string) and then do schema-on-read query processing to extract fields / columns on the fly.


Events are persistent in a `stream`. A stream in Timeplus has the above data model (associated with a schema) and can be created via SQL or REST API. For example : `CREATE STREAM my_stream(i int);` .


Every stream has two associated storages by default: the stream store part and the historical store part and data in both stores share the same Timeplus Data Format. Users can think of a stream in Timeplus as a regular table in a traditional database: the stream store of a stream is the WAL of a table and the historical store is the data part of a table and the Timeplus core engine makes the WAL part queryable in a streaming way. Internally we called the stream store NativeLog. There are background threads which tail the data from NativeLog and index them in batches to the historical store and also periodically sort, merge and index the data in the background. This process makes really fast historical analysis and fast backfill from the historical store in fused streaming query. The following diagram shows the two data stores of a stream and the above data flow process.



Please note it is possible for users to disable the historical store for a stream if users don’t have historical analytic requirements for a specific stream. For example

CREATE STREAM stream_only(i int) SETTINGS storage_type=’streaming’

It is even possible for users to disable both persistent stores of a stream. This kind of stream is essentially pure in-memory. For example

CREATE STREAM inmemory(i int) SETTINGS storage_type=’memory’ 

Pure in-memory stream only keeps some latest data in memory and it is volatile. In some approximate query processing scenarios and users don’t care about raw data persistence, it can be useful as when data gets ingested, the data gets computed and emits the computed results in real time.


The Timeplus core engine also supports different modes / semantics for its data store. By default, it has append-only semantics. But users can create a stream with key-value, changelog etc semantics to support different use cases. We will cover these in more detail in future blogs.


Query Processing


End users interact with Timeplus with streaming SQL. Since we position Timeplus as a real-time streaming-first analytic platform, we flip the SQL query behavior which is by default a SQL query is a long running streaming query and never ends and is always waiting for new data. If users like to do historical data query processing, they can use a `table` query which acts just like running an ad-hoc query in a traditional database.


For instance

SELECT * FROM my_stream

is an incremental streaming query and runs there forever;

SELEC * FROM table(my_stream)

is a run-once query and will end when it reads a snapshot of data when the query is issued.


Since Timeplus has stream and historical stores for every regular stream, it is efficient to support different combinations of joins like stream data join historical data, stream join stream, etc. One example is leveraging the historical data store for fast backfilling: a sample query is

SELECT count(*) FROM my_stream WHERE _tp_time >2022-10-01 01:00:00'

In this typical query, the core engine can scan the historical data store for fast backfill since the historical data store has an event time related index. Internally the core engine will process the historical data to a specific sequence and continue with the data from the streaming store exactly after this sequence. The following diagram shows this processing in general.


Like traditional databases, the Timeplus core engine applies AST, predicates / project push down etc. optimization during the query planning phase and leverages query DAG node reuse / fan out, just in time (JIT), vectorization for fast and efficient processing during the query execution phase.


There are lots of other query processing capabilities in Timeplus like federated query, materialized views, data revision processing with changelog storage, substream processing, query state checkpointing / recovery etc. We will cover these implementations in future blogs.


 

Common Open Challenges


Streaming processing resolves the challenges in concurrent query processing and query latency. However, besides the unification challenges between streaming and historical data processing, we need also resolve the following problems:


  1. Accuracy / Latency: unlike traditional database query processing which acts on a bound data set, it is always a tradeoff between accuracy and latency since fundamentally we never know if there is an arbitrary late event in an unbounded stream.

  2. Fault tolerant: streaming processing is usually incremental stateful and long running. This requires the target system to recover the stateful long running queries from faults.

  3. Reconfiguration: when a query is reconfigured with higher or lower parallelism for example, it usually requires rescaling the internal state.

  4. Data Revision Processing: Supporting data mutation semantics like delete, update in OLTP in streaming processing can be challenging since it is related to data lineage as well as potentially big state management.

  5. Schema evolution : when schema gets changed, how we gracefully tackle the changes.

  6. Streaming SQL: there is no standard yet.


There are other challenges like elasticity, deep database optimizations like reusing internal state / shared processing across different streaming processing DAG nodes.


Timeplus, as a modern streaming processing system in the third generation, is set to tackle these common challenges practically and serve real customer needs as one converged platform. Current feedback and results have validated our design. We will discuss more details in the future articles.


 

Summary


Timeplus is a unified platform which supports streaming processing and historical data processing. It is created on the foundations of the past 30 years of academic research and recent evolutions of industry practices. There are still quite a few challenges to be resolved. We are set to take these challenges, iterate and innovate together with the industry.


References


[DIST-SNAPSHOT-1985]

https://dl.acm.org/doi/10.1145/214451.214456


[TAPESTRY-1992] https://sigmodrecord.org/publications/sigmodRecord/9206/pdfs/141484.130333.pdf


[STREAM-2003]

http://ilpubs.stanford.edu:8090/641/1/2004-20.pdf


[AURORA-2003]

https://cs.brown.edu/~ugur/vldbj03.pdf


[BOREALIS-2005]

https://www.cidrdb.org/cidr2005/papers/P23.pdf


[MAP-REDUCE-2004]

https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf


[MILLWHEEL-2013]

https://research.google/pubs/pub41378/


[SPARK-STREAM-2013]

https://people.csail.mit.edu/matei/papers/2013/sosp_spark_streaming.pdf


[STORM-2014]

https://cs.brown.edu/courses/cs227/archives/2015/papers/ss-storm.pdf


[FINK-2015]

https://asterios.katsifodimos.com/assets/publications/flink-deb.pdf


[TRILL-2015]

https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/trill-vldb2015.pdf


[SCALE-COST-2015]

Scalability! But at what COST?

https://www.usenix.org/system/files/conference/hotos15/hotos15-paper-mcsherry.pdf


[KAFKA-STREAM-2018]

https://dl.acm.org/doi/10.1145/3242153.3242155


[TYLER-DEBS-2019]

https://docs.google.com/presentation/d/1YtTEnOax5MDA8DazDa1ad-sP4zzM58KQK4HNAcxoONA/edit#slide=id.p


[ONE-SQL-RULE-ALL-2019]

https://arxiv.org/pdf/1905.12133.pdf


191 views

Recent Posts

See All