I am a Sr. Software Developer at Oracle Cloud. The opinions expressed here are my own and not necessarily those of my employer.
Processing time series data
Modern software systems can collect LOTS of time series data. It could be an analytics platform tracking user interactions or it could be IoT system receiving measurements from sensors. How do we process this data in timely and cost effective way? We will explore two different approaches below.
API - queue - workers - DB
We will build an API that will receive inbound messages and put them in a queue. Then workers running on different servers will grab these messages from the queue, process them and store data in MongoDB. Since are working with time series data we will create daily collections for these events. Separating API servers from workers servers will help us to properly scale them.
We will be using AWS ElasticBeanstalk with SQS. Our code will be Ruby on Rails API with Shoryuken library for integration with SQS. Here is a sample request that will be receiving /events?cid=123&aid=abc&...
Here is high level Terraform config file to build AWS infrastructure (Terraform and ElasticBeanstalk are not the primary focus of this article).
We will specifically use Rails API which is faster than full Rails app. Code in controllers should be as light as possible. We could add simple validations to ensure that the necessary parameters are passed in before we put item on the queue.
We will create a job to process events and push them into MongoDB. We will be using timestamp that is passed in with each job to ensure proper date attribution if there is a delay in data processing.
Here is shoryuken configuration with ActiveJob
Data in Mongo will look like this:
Once data is in Mongo DB we can write additional code to create summaries and eventually delete the detailed records.
There are several pros and cons with this approach. We MUST keep our API servers running at all times otherwise we will loose data. But we can stop the workers and messages will simply pile up in SQS. With SQS we pay per use so if we are running billions of messages this could become expensive. To start with we can build this as one application and later separate it into microservices.
ELB - S3 logs - Logstash - Elasticsearch
Alternative approach is to take server logs and extract parameters from them. We will setup frontend Nginx web servers to simply load the 1x1 pixel. AWS ELB will publish logs to S3 bucket every 5 minutes. From there logs will be picked up by Logstash and processed into Elasticsearch. Then we will build our reports, implement rollup indexes and snapshot data to a different S3 bucket (backup and archiving).
Sample line from ELB log file:
Logstash config
We will start with Logstash S3 input plugin:
Then we configure Elasticsearch output plugin which will create daily indexes. stdout
is commented out but can be used for debugging.
For filtering we will first grok and then remove unnecessary fields:
Ruby code
Now comes the hard part. We need to implement complex biz logic to validate and transform our data. For greater control we will use Logstash Ruby filter plugin.
Placing code in a config file is not a great solution and it will be difficult to test. Fortunately latest version of the Ruby filter plugin supports referencing separate Ruby script from .conf file. This helps us test our code with automates tests.
Ruby scripts are nice but Ruby objects are even better. Here is the next refactor. We can write additional classes to encapsulate common logic and inherit from them.
We also moved validation logic into separate method. Now we can leverage Ruby unit testing frameworks such as Rspec. We will need to create mock event object that responds to get
and set
methods. Alternatively we could still test this class via the tests provided by Logstash.
If we need to load external Ruby gems we cannot do it directly. One workaround is to install another Logstash plugin which uses that specific gem. For example, if we need to access Redis from our Ruby code we can install either Logstash Redis input or output plugins and then call Redis.new
in the class.
Next step is to build a full blown Logstash plugin (Ruby gem) which gives us the greatest amount of flexibility but that is beyond the scope of this post.
Links
- https://www.terraform.io/docs/providers/aws/r/elastic_beanstalk_environment.html
- http://guides.rubyonrails.org/api_app.html
- https://www.elastic.co/guide/en/logstash/current/plugins-filters-ruby.html
- https://www.elastic.co/blog/moving-ruby-code-out-of-logstash-pipeline
- https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/aws
- https://relishapp.com/rspec/rspec-mocks/v/3-7/docs