Elasticsearch and Redis are powerful technologies with different strengths. They are very flexible and can be used for a variety of purposes. We will explore different ways to integrate them.

ELK is Elasticsearch, Logstash and Kibana. Elasticsearch stores data in indexes and supports powerful searching capabilities. Logstash is an ETL pipeline to move data to and from different data sources (including Redis). Kibana helps us build rich dashboards and do adhoc searches. These tools are used not just by developers but by data analysts and devops engineers who often have different skillset.

Redis has speed and powerful data structures. It can almost function as an extension of application memory but shared across processes / servers. The downside is that records can ONLY be looked up by key. Our applications can easily store all kinds of interesting data in Redis. But if this data needs to be extracted and aggregated in different ways that requires writing code. There is no easy way to do adhoc analysis (like writing SQL queries).

Search for products

We are building a website for a nationwide chain of stores. The first requirement is enabling users to search for various products (in our case coffee brands which were generated using randomly). We will use Ruby on Rails with searchkick library to simplify Elasticsearch integration. We set callbacks: :async option. If we configure Sidekiq it will use Redis to queue a background job to update the documents in products index when record in primary DB is modified.

# app/models/
class Product < ApplicationRecord
  searchkick callbacks: :async, index_name: 'products'
  # specify searchable fields
  def search_data
    {
      name:         name,
      description:  description,
      price:        price,
    }
  end  
end

We are also caching JSON output of the ProductSearch.new.perform method call in Redis using query param to generate cache_key.

# app/controllers/
class API::ProductSearchController
  def show
    ProductSearch.new.perform params[:query]
  end
end
# app/services/
class ProductSearch
  def perform query
    cache_key = [self.class.name, __method__, query]
    Rails.cache.fetch(cache_key, expires_in: 1.hour) do
      Product.search(query, fields: [name, description], ...).to_json
    end
  end
end

The downside is that any changes to products will take up to an hour to appear in cached search results. We can build a callback so when a record is updated in primary DB it not only updates the index but also flushes cache. To keep things simple we will delete all Redis keys matching ProductSearch:perform:* pattern but this needs to be improved for scaling. To be honest this caching technique might be more trouble than it’s worth.

class Product < ApplicationRecord
  after_save :flush_search_cache
private
  def flush_search_cache
    cache_keys = REDIS_CLIENT.keys 'ProductSearch:perform:*'
    cache_keys.each do |key|
      Rails.cache.delete key
      # or force regeneration with Rails.cache.fetch(key, force: true)
    end
  end
end

Search by zipcode

Another important feature is enabling users to find stores by zipcode. Both Redis and Elasticsearch support geolocation searches. We need to map zipcodes to lon/lat coordinates. Here is a free data source

Redis geo

CSV.foreach("data/zip_lon_lat.csv", headers: true) do |row|
  REDIS_CLIENT.geoadd 'zip_lon_lat', row['LON'].to_f, row['LAT'].to_f, row['ZIP'].to_s
end
# data in Redis
{"db":0,"key":"zip_lon_lat","ttl":-1,"type":"zset","value":[
  ["96799",1.147461274922875e+15],
  ["96737",1.184332805872164e+15],
  ...
  ["96950",4.103298315066677e+15]],"size":858113}

One option is to use Redis to find zipcodes w/in 5 mile radius and then query primary DB for stores in those zipcodes.

class StoreLocator
  def initialize zipcode
    @zipcode = zipcode
    @distance = 5
  end
  def perform
    zipcodes = REDIS_CLIENT.georadiusbymember('zip_lon_lat', @zipcode, @distance, 'mi')
    Store.where(zipcode: zipcodes)
  end
end
# georadiusbymember returns for 90210
["90073", "90024", "90095", "90067", "90212", "90077", "90210", "90211",
"90048", "90036", "90069", "90046", "91403", "91423", "91604", "91607", "91602",
"91608", "90025", "90034", "90064", "90035", "90049"]

Elasticsearch geo

Alternatively we can use Elasticsearch geo search. We need to create an index and specify lon/lat for each zipcode. Since we already have lon/lat stored in Redis we can use it for quick lookup (vs parsing CSV file).

class Store < ApplicationRecord
  searchkick callbacks: :async, index_name: 'stores'
  def search_data
    {
      zipcode:  zipcode,
      lon:      lon_lat.try(:first),
      lat:      lon_lat.try(:second),
    }
  end
  def lon_lat
    REDIS_CLIENT.geopos('zip_lon_lat', @zipcode).first
  end
end

We run Store.reindex, verify that data shows up in Elasticsearch and modify StoreLocator.

class StoreLocator
  def perform
    lon_lat = REDIS_CLIENT.geopos('zip_lon_lat', @zipcode).try(:first)
    lat = lon_lat.try(:second)
    lon = lon_lat.try(:first)
    Store.search("*",
      where: { location: {near: {lat: lat, lon: lon}, within: "#{@distance}mi"} }
    )
  end
end

Each document looks like this in Elasticsearch:

{
  "_index": "stores",
  ...
  "_source": {
    "zipcode": "98116",
    "location": {
      "lat": 47.57424607502233,
      "lon": -122.40022391080856
    }
  }
}

Now we can take advantage of rich Elasticsearch querying capabilities including geo queries, get the IDs of matching stores and display data from the primary DB.

Search by product AND geo

Now our users want to know which stores in specific area sell particular products. And they are not sure how to exactly spell the product name. We also want to make our indexes more powerful first class objects, not just something related to model.

Fist we create a model mapping which products are available in which stores. Then we will integrate with chewy library which is a little different than searchkick we used before.

# app/models/
class ProductStore < ApplicationRecord
  belongs_to :product
  belongs_to :store
  update_index('product_store#product_store') { self }
end
# app/chewy/
class ProductStoreIndex < Chewy::Index
  define_type ProductStore.includes(:product, :store) do
    field :product_name,  type: 'text', value: ->{ product.name }
    field :store_zipcode, type: 'text', value: ->{ store.zipcode }
    field :store_location, type: 'geo_point' do
      field :lon, value: ->{ store.lon_lat.try(:first) }
      field :lat, value: ->{ store.lon_lat.try(:second) }
    end    
  end
end

update_index method ensures that Elasticsearch documents get updated when we update DB records. Chewy supports async updates to indexes via background jobs. Data in Elasticsearch looks like this:

{
  "_index": "product_store",
  ...
  "_source": {
    "product_name": "American Cowboy",
    "store_zipcode": "98174",
    "store_location": {
      "lat": "47.6045689442515112",
      "lon": "-122.33535736799240112"
    }
  }
}

We modify our search code

class StoreLocator
  def initialize zipcode:, query:
    @zipcode = zipcode
    @query = query
    @distance = 5
    lon_lat = REDIS_CLIENT.geopos('zip_lon_lat', @zipcode).first
    @lat = lon_lat.try(:second)
    @lon = lon_lat.try(:first)    
  end
  def perform
    ProductStoreIndex
      .query(fuzzy: {product_name: @query})
      .filter(geo_distance: {
        distance: "#{@distance}mi",
        store_location: {lat: @lat, lon: @lon}
        })
      .order(_geo_distance: {store_location: {lat: @lat, lon: @lon} })
  end
end

Now we can do StoreLocator.new(zipcode: 98174, query: 'kowboy').perform to find stores near 98174 zipcode that sell American Cowboy coffee.

Autocomplete

This problem can also be solved with both Elasticsearch and Redis.

Redis

To keep data in-sync between primary DB and Redis autocomplete keys we will implement a separate class and leverage it from model callbacks. We will begin our keys with the first 2 letters of each term and store up to last latter. We will also be using Sorted Set scores to give higher weight to more common terms.

# app/models/
class Product < ApplicationRecord
  after_save      { AutocompleteRedis.new.add    name }
  before_destroy  { AutocompleteRedis.new.remove name }
end
# app/services/
class Autocomplete
  def initialize
    @namespace = 'autocomplete'
  end
  def search prefix:, num: 10
    REDIS_CLIENT.zrevrange "#{@namespace}:#{prefix.try(:downcase)}", 0, num - 1
  end
  def add_all klass, method
    klass.titleize.constantize.all.each do |object|
      add object.send(method)
    end
  end
  def add term
    add_remove term, 'add'
  end
  def remove_all klass, method
    klass.titleize.constantize.all.each do |object|
      remove object.send(method)
    end
  end
  def remove term
    add_remove term, 'remove'
  end
private
  def add_remove term, type
    term.downcase!
    first_letter = term[0]
    1.upto(term.length - 2) do |i|
      prefix = first_letter + term[1, i]
      if type == 'add'
        REDIS_CLIENT.incrby("#{@namespace}:#{prefix}", 1, term)
      elsif type == 'remove'
        REDIS_CLIENT.zrem("#{@namespace}:#{prefix}", term)
      end
    end
  end
end

We can add / remove all keys by running AutocompleteRedis.new.add_all('product', 'name') (or remove_all). Data in Redis will be stored in multiple sorted sets.

{"db":0,"key":"autocomplete:am","ttl":-1,"type":"zset","value":
  [["american cowboy",1.0],["american select",1.0]],...}
{"db":0,"key":"autocomplete:ame","ttl":-1,"type":"zset","value":
  [["american cowboy",1.0],["american select",1.0]],"...}
...
{"db":0,"key":"autocomplete:bl","ttl":-1,"type":"zset","value":
  [["blacktop light",1.0],["bluebery treat",1.0]],"...}
{"db":0,"key":"autocomplete:blu ","ttl":-1,"type":"zset","value":
  [["bluebery treat",1.0]],"...}

We can call AutocompleteRedis.new.search prefix: 'am' and get back JSON ["american select", "american cowboy"].

Elasticsearch

We will build a special index in Elasticsearch using Chewy library. Read here about filter and analyzer configuration.

# app/models/
class Product < ApplicationRecord
  update_index('autocomplete#product') { self }
end
# app/chewy/
class AutocompleteIndex < Chewy::Index
  settings analysis: {
    filter: {
      autocomplete_filter: {
          type:     "edge_ngram",
          min_gram: 1,
          max_gram: 20
      }
    },
    analyzer: {
        autocomplete: {
            type:      "custom",
            tokenizer: "standard",
            filter: [
                "lowercase",
                "autocomplete_filter"
            ]
        }
    }
  }
  define_type Product do
    field :name, type: 'text', analyzer: 'autocomplete'
  end
end

Now AutocompleteIndex.query(match: {name: 'am'}) returns American Cowboy, American Select AND Old America products. Elasticsearch is able to use the second word in the product name to match against.

ETL

Until now we were moving data between primary DB and Redis or Elasticsearch. Now we will ETL data between Redis and Elasticsearch.

Redis to Elasticsearch

The next requirement is to record which zipcodes are searched most often and when searches are performed (by hour_of_day and day_of_week). To capture data in Redis we will use leaderboard library to track searches and minuteman to count when those searches occur.

# config/initializers/redis.rb
Minuteman.configure do |config|
  config.patterns = {
    # custom config
    hour_of_day: -> (time) { 'hour_of_day:' + time.strftime("%H") },
    day_of_week: -> (time) { 'day_of_week:' + time.strftime("%a") },
  }
end
# app/service/
class StoreLocator
  def perform
    Leaderboard.new('ldbr:zipcode_searched').change_score_for(zipcode, 1)
    Minuteman.add("search_by_zip", Time.now)
    ...
  end
end

This will be very fast and data in Redis will be stored like this:

{"db":0,"key":"ldbr:zipcode_searched","ttl":-1,"type":"zset",
  "value":[["98113",11.0],...,["98184",55.0]]...}
#
{"db":0,"key":"Minuteman::Counter::search_by_zip:day_of_week:Sun","ttl":-1,
  "type":"string","value":"24"...}
{"db":0,"key":"Minuteman::Counter::search_by_zip:hour_of_day:06","ttl":-1,
  "type":"string","value":"11"...}

But our internal business users do not want to look at raw data. Our choice is writing custom dashboard or pulling data into Elasticsearch and leveraging Kibana. Once it’s in Elasticsearch we can also combine it with other data sources. We will use elasticsearch-ruby library directly since this data does not related to our application models.

ES_CLIENT = Elasticsearch::Client.new
# app/jobs/
class RedisElasticEtlJob < ApplicationJob
  def perform
    zipcode_searched
    hour_of_day
    day_of_week
  end
private
  def zipcode_searched
    Leaderboard.new('ldbr:zipcode_searched').all_members.each do |zipcode|
      ES_CLIENT.index index: 'zipcode_searched', id: zipcode[:member],
        body: { count: zipcode[:score] }
    end
  end
  def hour_of_day
    time = Time.now
    count = Minuteman.count("search_by_zip").hour_of_day(time).count
    ES_CLIENT.index index: 'hour_of_day', id: time.hour,
      body: { count: count}
  end
  def day_of_week
    time = Time.now
    count = Minuteman.count("search_by_zip").day_of_week(time).count
    ES_CLIENT.index index: 'day_of_week', id: time.strftime('%a'),
      body: { count: count }
  end
end

We are specifying our aggregation metrics (zipcode, hour_of_day, day_of_week) as the ID of Elasticsearch document.

Elasticsearch to Redis

In our Elasticsearch cluster we have captured data from logs that contain IP and UserAgent. Combination of IP and UserAgent can be used to fairly uniquely identify users. Our next business requirement is to implement functionality where our website displays slightly different UI to users that we believe have visited our site before.

Now we will leverage Logstash with various plugins as our ETL pipeline. We will be using elasticsearch input plugin, redis output plugin and ruby filter plugin to transform the data into format expected by ActiveJob background job framework and pushing it straight into a Redis List data structure.

input {
  elasticsearch {
    hosts => "localhost"
    index => "logs"
  }
}
filter {
  ruby {
    code => "
      event.set('jid', SecureRandom.hex(12))
      event.set('created_at', Time.now.to_f)
      event.set('enqueued_at', Time.now.to_f)
      event.set('class', 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper')
      event.set('wrapped', 'UniqVisJob')
      event.set('queue', 'low')
      args = [{
        'job_class' => 'UniqVisJob',
        'job_id' => SecureRandom.uuid,
        'provider_job_id' => 'null',
        'queue_name' => 'low',
        'priority' => 'null',
        'arguments' => [ event.get('client_ip'), event.get('user_agent') ],
        'executions' => 0,
        'locale' => 'en'
      }]
      event.set('args', args)
      "
    add_field => {
      "retry" => true
    }
    remove_field => [ "@version", "@timestamp", 'client_ip', 'user_agent' ]
  }
}
output {
  redis {
    data_type => "list"
    key => "queue:low"
    db => 0
  }
}

Now we create a very simple job ran via Sidekiq that will hash IP & UA and also set Redis keys to expire in a week.

class UniqVisJob < ApplicationJob
  queue_as :low
  def perform ip, ua
    key = "uniq_vis:" + Digest::MurmurHash1.hexdigest("#{ip}:#{ua}")
    REDIS_CLIENT.setex key, 3600*24*7, 1
  end
end

Data in Redis will look like this

{"db":0,"key":"uniq_vis:cbfb868b","ttl":..,"type":"string","value":"1","size":1}
{"db":0,"key":"uniq_vis:b68ef58c","ttl":..,"type":"string","value":"1","size":1}

Read here on manually creating messages for Sidekiq and using Ruby in Logstash

In future posts I will cover other technologies such as RediSearch module and Elasticsearch Kibana dashboard.

  • https://dev.maxmind.com/geoip/geoip2/geolite2/
  • https://github.com/yhirose/maxminddb
  • https://code.tutsplus.com/tutorials/geospatial-search-in-rails-using-elasticsearch–cms-22921
  • https://github.com/etehtsea/oxblood - supports GEO* commands
  • https://cristian.regolo.cc/2015/07/07/introducing-the-geo-api-in-redis.html
  • https://www.elastic.co/blog/found-fuzzy-search
  • https://github.com/sethherr/soulheart - library for autocomplete
  • https://stackoverflow.com/questions/29572654/how-to-view-redis-data-inside-rails-application-using-soulmate