Real-Time Log Stream Analysis Using an Open-Source Streaming Database
top of page

Real-Time Log Stream Analysis Using an Open-Source Streaming Database

Updated: Jan 29


server error log

Above are logs from my server logs showing an error has happened. Given that this application serves as a crucial service for my cloud customers, it is critical to swiftly identify the root cause of the issue and rectify it at the moment the error occurs. I am looking for a tool that can help monitor these errors, perform a quick analysis, and take immediate action.


As one of the pillars of observability, log analysis is one of the most important jobs for IT operation and security domains, where there are a huge number of logs being collected, stored and processed. Many well-known tools exist that can help you to handle complex log analysis including: ELK (elastic, logstash, kibana), Splunk, Sumologic, and Datadog.


log analysis vendors

A typical log analysis workflow includes:

  1. Data collection, collecting log data from different sources

  2. Data store, save/index collected log on the server storage

  3. Data search/query, processing the log data using a search/query language

  4. Data presentation, visualizing query results as charts and dashboards

  5. Alert, based on a data query, sending notifications of events


log analysis data flow

Most of these tools are building off document search architecture, where the core of the product is a search engine leveraging inverted index or bloom filter to help users quickly search specific information in huge amounts of data, or so-called index. When users have accumulated a large volume of logs and want to find something in the logs, these tools are the best choice. However, in case you want to do some log stream-based monitoring, like sending an alert as soon as a specific error happens according to the log, search architecture usually is not the best choice, since the search is built on top of an inverted index which needs time to build. So, the data is not immediately available to be searched until the related data structure, the index, has been fully built. This is where streaming processing can help. Today, I am going to talk about how to monitor your log streams in real-time using open source toolchains and the key component is a streaming database, Proton.


Proton: A Streaming Database


Proton is a unified streaming and historical data processing engine in a single binary.


proton high level architecture
proton high level architecture

The above diagram is the high level architecture of proton, with these core components:

  • Streaming storage, which is similar to Apache Kafka, an append-only log that can handle real-time streaming data with super low latency and high scalability

  • Historical storage, built on top of ClickHouse, providing high performance historical data query leveraging column data

  • Unified query processing, which can run SQL-based, incremental, stateful streaming processing just like Flink, but unifying the streaming and batch (historical) modes


With streaming processing, if you ingest the log stream into Proton as a stream, you can run an unbounded, push query, as soon as there is something new from the log, the query will process the information incrementally, emit the result, and push to the user.


The log piece I am showing at the beginning of this blog is actually from a Proton server error log. In the following examples, I am going to use Proton combined with different open source tools to monitor its own error log, so let the game begin!


Filebeat + Kafka + Proton


Filebeat is a golang-based, lightweight, open-source log shipper that belongs to the Elastic Stack. Filebeat can be used for forwarding and centralizing log data.


filebeat stack
FileBeat Stack

Filebeat works like tailing the log file and sending the newly generated log content to a downstream component such as ElasticSearch, Kafka, Redis etc.


In this case, we configure the Filebeat sending log data to a Kafka topic called `logs` and then create an external stream in Proton.


You can run this stack using Docker compose.


Collect log data using Filebeat and send to Kafka


The first step is send log stream data into Kafka using Filebeat, here is the Filebeat configuration:


filebeat.inputs:
 - type:log
   enabled:true
   paths:
     -/var/log/proton-server/proton-server.err.log
   multiline:
     type:pattern
     pattern:'^(\d{4}\.\d{2}\.\d{2} \d{2}:\d{2}:\d{2}\.\d+)'
     negate:true
     match:after

output.kafka:
 hosts:["kafka:9092"]
 topic:logs

With the above configuration, filebeat reads logs from `/var/log/proton-server/proton-server.err.log`, split the multiple line logs into events using regex pattern `^(\d{4}\.\d{2}\.\d{2} \d{2}:\d{2}:\d{2}\.\d+)` which matches the starting timestamp (such as `2023.11.20 18:02:55.472620` ) from every log line. And send the log events to Kafka topic called `logs`.


Create an external stream in proton on Kafka Topic


Now, the logs data are sending to Kafka topic `logs` in real-time, if you consume that topic from Kafka, here are some data samples:


{
"topic": "logs",
"value": "{\"@timestamp\":\"2023-11-23T00:41:56.655Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"8.11.1\"},\"message\":\"2023.11.23 00:41:41.234843 [ 1 ] {} \u003cWarning\u003e Access(local directory): Recovering lists in directory /var/lib/proton/access/\",\"input\":{\"type\":\"log\"},\"ecs\":{\"version\":\"8.0.0\"},\"host\":{\"name\":\"83c6f2d59442\"},\"agent\":{\"name\":\"83c6f2d59442\",\"type\":\"filebeat\",\"version\":\"8.11.1\",\"ephemeral_id\":\"d88e8a06-63aa-4e62-87fe-3fd79be0b585\",\"id\":\"5dff8233-5b5d-49c7-ae02-8c325a1f6ac5\"},\"log\":{\"file\":{\"path\":\"/var/log/proton-server/proton-server.err.log\"},\"offset\":1268}}",
"timestamp": 1700700116655,
"partition": 0,
"offset": 9
}
{
"topic": "logs",
"value": "{\"@timestamp\":\"2023-11-23T00:41:56.655Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"8.11.1\"},\"ecs\":{\"version\":\"8.0.0\"},\"host\":{\"name\":\"83c6f2d59442\"},\"agent\":{\"ephemeral_id\":\"d88e8a06-63aa-4e62-87fe-3fd79be0b585\",\"id\":\"5dff8233-5b5d-49c7-ae02-8c325a1f6ac5\",\"name\":\"83c6f2d59442\",\"type\":\"filebeat\",\"version\":\"8.11.1\"},\"log\":{\"file\":{\"path\":\"/var/log/proton-server/proton-server.err.log\"},\"offset\":1393},\"message\":\"2023.11.23 00:41:41.356154 [ 1 ] {} \u003cWarning\u003e TelemetryCollector: Please note that telemetry is enabled. This is used to collect the version and runtime environment information to Timeplus, Inc. You can disable it by setting telemetry_enabled to false in config.yaml\",\"input\":{\"type\":\"log\"}}",
"timestamp": 1700700116655,
"partition": 0,
"offset": 10
}

Let’s turn that sample data from string to JSON to have a better view:

{
  "@timestamp": "2023-11-23T00:41:56.655Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.11.1"
  },
  "ecs": {
    "version": "8.0.0"
  },
  "host": {
    "name": "83c6f2d59442"
  },
  "agent": {
    "ephemeral_id": "d88e8a06-63aa-4e62-87fe-3fd79be0b585",
    "id": "5dff8233-5b5d-49c7-ae02-8c325a1f6ac5",
    "name": "83c6f2d59442",
    "type": "filebeat",
    "version": "8.11.1"
  },
  "log": {
    "file": {
      "path": "/var/log/proton-server/proton-server.err.log"
    },
    "offset": 1393
  },
  "message": "2023.11.23 00:41:41.356154 [ 1 ] {} <Warning> TelemetryCollector: Please note that 
  telemetry is enabled. This is used to collect the version and runtime environment information to 
  Timeplus, Inc. You can disable it by setting telemetry_enabled to false in config.yaml",
  "input": {
    "type": "log"
  }
}


Other than the original server log message, Filebeat adds some extra metadata to the event including: timestamp, host, agent, version etc.


To analysis that log stream, we can create a Proton external stream,

CREATE STREAM IF NOT EXISTS logs
(`raw` string
)
ENGINE = ExternalStream
SETTINGS type = 'kafka', brokers = 'kafka:9092', topic = 'logs'

With the above stream created, users can analyze that log data in real-time using Proton’s streaming SQL.


Using Proton streaming SQL to monitor the log stream


The Proton stream is similar to a database table. The difference is that if you run a query on top of a stream, the query is unbounded. Once there is no data arriving, the query will push the query result to the user immediately.


I am running two proton client in above demo, one is running a streaming query `select * from logs`, when I type in some invalid sql in the other proton client `select abc`, which will generate some error logs, immediately, the first query will catch that error and push the error result to the user.


With Proton streaming SQL, you can do more real-time monitoring and analysis on that stream using SQL. As the log message is unstructured data, you can use grok or regular expressions to extract information from the message and then use SQL to transform or aggregate the data based on your use case.


Pros and Cons


The Filebeat, Kafka, Proton stack is a very typical data pipeline stack where the Kafka is used to decouple the data producer and data consumer which brings flexibility with the cost of three different components, it has pros and cons.


Pros

  • Real-time log streaming

  • Flexible and scalable

  • Kafka provide decoupling of log producer (filebeat) and log consumer (proton), in case there are new consumer, or back pressure, kafka can help to handle

Cons

  • Kind of complex, three different components


Vector + Proton


Vector is an open source, high performance data pipeline developed with Rust, acquired by Datadog.


vector architecture
Vector Architecture

In the previous solution, Kafka was used as the log stream transporter. It has the benefit to isolate the log producer and the consumer. While in some cases, users don't really care about such isolation, we can use vector’s http output to directly ingest log data into proton, which simplified the overall deployment.


vector stack
Vector Stack

You can run this stack using Docker Compose


Collect log data using Vector and send to Proton Stream


In this case, we collect the data from the log and then send these data to Proton using Proton’s HTTP ingest API.


First, we need to create a stream in Proton.

CREATE STREAM IF NOT EXISTS logs(raw string)

Then, we configure the vector to read the logs data, transform it into proton ingest payload format and then call HTTP to ingest to. Here is the vector configuration.


sources:
 logs:
   type:"file"
   multiline:
start_pattern:^(\d{4}\.\d{2}\.\d{2}\d{2}:\d{2}:\d{2}\.\d+)
mode:halt_before
condition_pattern:^(\d{4}\.\d{2}\.\d{2}\d{2}:\d{2}:\d{2}\.\d+)
timeout_ms:1000
   include:
     -"/var/log/proton-server/proton-server.err.log"

transforms:
 payload:
   type:"lua"
   inputs:
     -"logs"
   version:"2"
   hooks:
     process:|-
function(event,emit)
event.log.message={
columns={"raw"},
data={{event.log.message}}
}
emit(event)
end

sinks:
 proton:
   type:http
   inputs:
     -"payload"
   compression:none
   method:post
   uri:http://proton:3218/proton/v1/ingest/streams/logs
   batch:
     max_events:1
   encoding:
     codec:"text"

In this case, we call vector’s LUA script to transform the raw logs event into a proton ingest payload refer to following curl of ingest sample:


curl  -X POST \
  'http://localhost:3218/proton/v1/ingest/streams/logs' \
  --header 'Content-Type: application/json' \
  --data-raw '{"columns":["raw"],"data":[["log message"]]}'

Similar to the previous demo, the log stream will catch any new event in real-time as the event happens.


While there is one limitation with this configuration, our transformation turn each log event into a proton http ingest call, this works where there is not a lot of log event, in case there are huge amount of events, sending on http ingest per event will cause performance issue, it is better to send a batch of events. It is possible to leverage dedupe transformation provided by Vector to support this. But debugging such a data pipeline is not easy and I don't have time working on this part yet.


Pros and Cons


With the Vector and Proton stack, we no longer need Kafka. But now we need to handle data transformation and batching in the vector data pipeline configuration.


Pros

  • Real-time log streaming

  • High performance log collector


Cons

  • Vector data pipeline is hard to debug, not easy to support batching ingest


Proton Only


Kafka has been removed from the previous solution, but in some cases, you want to quickly run your log stream analysis and don't like too many moving parts, is there an even simpler solution? Yes, you can actually just run proton to do it, using the external log stream feature provided by proton.


proton only stack
Proton Only Stack

This is the simplest stack I can think of to monitor your logs streams in real-time. Proton is acting as the log data collector, storage, and query processor, all in one box.

Just run `docker run -d --pull always --name proton ghcr.io/timeplus-io/proton:latest` and done! You have all you need to monitor your log streams.


In this case, we just need create an external log stream using proton with related configurations refer to following SQL:


CREATE EXTERNAL STREAM proton_log(
  raw string
)
SETTINGS
type='log',
   log_files='proton-server.log',
   log_dir='/var/log/proton-server',
   timestamp_regex='^(\d{4}\.\d{2}\.\d{2} \d{2}:\d{2}:\d{2}\.\d+)',
   row_delimiter='(\d{4}\.\d{2}\.\d{2} \d{2}:\d{2}:\d{2}\.\d+) \[ \d+ \] \{'

Now, happy streaming with your logs!


Pros and Cons


We have a super compact deployment for log streaming now. It is fast, simple, and powerful, and the only limitation is since you are running Proton in the same box where your log exists, the performance and scalability are limited by the resource you have on that box.


Pros

  • Real-time log streaming

  • Low-latency and fast response

  • Powerful event streaming analysis capabilities by streaming SQL

  • Low cost with simplified stacks

Cons

  • Limit scalability due to local resource limitations

With Proton only solution, proton has to be deployed locally to where your log file located, it means proton will share the comupting resources on the same machine your server locate. While as Proton is designed to be high performance with low footprint, it is good enough for most of the log analysis scenanrio. In case scalability is a concern, you can consider previsous two options.


Another thing to mention is that external stream does not persist any data locally, in case historical log analysis is required, you can consider create a materialized view to store these log data locally



Summary


Logs data is crucial for several reasons, spanning across various domains such as IT, security, and business operations. While many mature tools exist to assist users with log analysis, those tools are not designed to handle log streams in real-time with low latency. That's where streaming databases like Proton come in, able to process log stream data as soon as events unfold.


In today’s blog, I demonstrated three different architecture options to monitor log streams using Proton, and you can choose which works best for you. Proton is also used as the core engine of our real-time streaming data platform Timeplus. Give it a try and let us know what you think!


385 views
bottom of page