Redis Lists can be used as queues for jobs to move data from primary data store to Elasticsearch. What if we have time-series data that needs to stay in Redis AND be copied to Elasticsearch?

In previous post we built a Ruby on Rails website for a nationwide retail chain. We used Redis Lists as queues to ETL data to Elasticsearch. We had Redis SortedSets to record which zipcodes were searched and how often. And we had counters (using Redis Strings) of when searches occurred by day_of_week and hour_of_day. Our new requirement is to keep track of the exact time of each search and it’s parameters (zipcode and product/query). This will help our biz users to determine which products to stock in different stores.

Redis Streams

This search data is usually recorded in our application logs as part of the GET request. We could leverage Logstash to process it into Elasticsearch. But we will now use Streams to record data in Redis first. Streams are a new Redis data structure that should be released in 4.0.x timeframe.

class StoreLocator
  def initialize zipcode:, query:
    @zipcode = zipcode
    @query = query
    ...
  end
  def perform
    # search code here
    record_stats
  end
private
  def record_stats
    key = "search_log:#{Time.now.strftime("%Y-%m-%d")}"
    REDIS_CLIENT.xadd(key, '*', 'zip', @zip, 'query', @query)
  end
end

We are using the new xadd command which creates a new Redis stream and add an item to it with key/value pairs (zip and query). Keys are based on timestamps to create one per day, just like we rotate logs. Data in Redis looks like this:

# launch redis-cli
xrange search_log:2018-01-08 - +
1) 1) 1515465841276-0
   2) 1) "zip"
      2) "98168"
      3) "query"
      4) "Spilt Light"
2) 1) 1515465842278-0
   2) 1) "zip"
      2) "98114"
      3) "query"
      4) "Wake-up Volcano"
...

xrange is another new command that allows us to get the items from the stream. - and + give us all items from first to last. Stream IDs are autogenerated based on Unix epoch in milliseconds plus an a sequence number. Alternatively we could have specified ID in REDIS_CLIENT.xadd(key, numeric_id, ...).

ETL to Elasticsearch

Since the amount of data is quite large we will keep the last 7 days of searches in Redis and move the older records to Elasticsearch. Streams have flexible schema with different fields which fits well into Elasticsearch indexes.

xrange

We will create corresponding Elasticsearch indexes such as “search_log:YYYY-MM-DD” and loop through stream items in batches. We will also specify stream item ID as the Elasticsearch document ID.

ES_CLIENT = Elasticsearch::Client.new ...
# app/jobs/
class RedisElasticEtlJob < ApplicationJob
  def perform(num_days=7)
    key = "search_log:#{(Time.now - num_days.days).strftime("%Y-%m-%d")}"
    count = 100
    starting_id = '-'
    while true
      items = REDIS_CLIENT.xrange(key, starting_id, '+', 'count', count)
      items.each do |item|
        # => ["1515258610192-0", ["zip", "98134", "query", "Express Mug"]]
        hash = Hash[item.second.each_slice(2).to_a]
        # => {"zip": "98134", "query": "Express Mug"}
        hash['@timestamp'] = Time.strptime(item.first.to_i.to_s, '%Q')
        ES_CLIENT.index index: key, type: 'default', id: item.first, body: hash
      end
      break if items.count < count
      last_id = items.last.first.split('-')
      starting_id = [last_id.first, (last_id.second.to_i + 1).to_s].join('-')
    end
  end
end

Alternatively we can use REDIS_CLIENT.xrevrange('search_log:YYYY-MM-DD', '+', '-' to retrieve items in reverse order. We can remove data from Redis either by setting TTL on different stream keys or manually doing REDIS_CLIENT.del(''search_log:YYYY-MM-DD'').

xread

What if we want more real-time data pipeline from Redis to Elasticsearch? Instead of scheduled daily job we can build a dameon that will use the new xread command.

class RedisElasticStreamConsumer
  def perform
    while true
      key = "search_log:#{Time.now.strftime("%Y-%m-%d")}"
      data = REDIS_CLIENT.xread('BLOCK', 5000, 'STREAMS', key, '$')
      # => [["search_log:2018-01-07-21-35", [["1515389726944-0", ["zip", "98178", "query", "Red Select"]]]]]
      hash = Hash[data.first.second.first.second.each_slice(2).to_a]
      # => {"zip"=>"98178", "query"=>"Red Select"}
      id = data.first.second.first.first
      hash['@timestamp'] = Time.strptime(id.to_i.to_s, '%Q')
      ES_CLIENT.index index: key, type: 'default', id: id, body: hash
    end
  end
end

The challenge with this approach is that Redis will be able to record items in stream much faster than Elasticsearch can create documents (RAM vs disk). While the scheduled job approach above is less cutting edge it may fit better into pre-existing ETL processes.

Using the data

To access data we can encapsulate the logic for choosing Redis vs Elasticsearch as our data source in a separate class.

class SearchDataSelector
  def initialize date:
    @date = date
  end
  def perform
    elasticsearch if @date > Date.today - 7.days
    redis_streams
  end
private
  def redis_streams
    REDIS_CLIENT.xrange(@date, ...)
  end
  def elasticsearch
    ES_CLIENT.search(index: @date, ...)
  end
end

We would need to modify this code to work with date range that requires data from both Redis and Elasticsearch. Separately we can use Elasticsearch Kibana to build interesting visualizations on data in Elasticsearch. Logstash mentioned above has input/output plugins for accessing data in Redis. Currently they only support lists and channels but hopefully soon they will integrate with streams.

Other Streams commands

xlen

xlen search_log:2018-01-08 will the number of items in stream. We could loop through the date stamped keys to build a table in UI showing date and number of searches.

maxlen

We also can create capped streams with REDIS.xadd('last_1000_searches', 'maxlen', '~', 1000, '*', 'zip', zip, 'query', query). ~ improves performance by allowing Redis to keep approximately last 1000 items. This way we can enable our biz users to view the most recent searches.

  • https://brandur.org/redis-streams
  • https://hackernoon.com/introduction-to-redis-streams-133f1c375cd3
  • https://github.com/redis/redis-rcp/blob/master/RCP11.md
  • https://gist.github.com/antirez/68e67f3251d10f026861be2d0fe0d2f4