I am a Sr. Software Developer at Oracle Cloud. The opinions expressed here are my own and not necessarily those of my employer.
Elasticsearch and Redis streams
Redis Lists can be used as queues for jobs to move data from primary data store to Elasticsearch. What if we have time-series data that needs to stay in Redis AND be copied to Elasticsearch?
In previous post we built a Ruby on Rails website for a nationwide retail chain. We used Redis Lists as queues to ETL data to Elasticsearch. We had Redis SortedSets to record which zipcodes were searched and how often. And we had counters (using Redis Strings) of when searches occurred by day_of_week
and hour_of_day
. Our new requirement is to keep track of the exact time of each search and it’s parameters (zipcode and product/query). This will help our biz users to determine which products to stock in different stores.
Redis Streams
This search data is usually recorded in our application logs as part of the GET request. We could leverage Logstash to process it into Elasticsearch. But we will now use Streams to record data in Redis first. Streams are a new Redis data structure that should be released in 4.0.x timeframe.
We are using the new xadd
command which creates a new Redis stream and add an item to it with key/value pairs (zip and query). Keys are based on timestamps to create one per day, just like we rotate logs. Data in Redis looks like this:
xrange
is another new command that allows us to get the items from the stream. -
and +
give us all items from first to last. Stream IDs are autogenerated based on Unix epoch in milliseconds plus an a sequence number. Alternatively we could have specified ID in REDIS_CLIENT.xadd(key, numeric_id, ...)
.
ETL to Elasticsearch
Since the amount of data is quite large we will keep the last 7 days of searches in Redis and move the older records to Elasticsearch. Streams have flexible schema with different fields which fits well into Elasticsearch indexes.
xrange
We will create corresponding Elasticsearch indexes such as “search_log:YYYY-MM-DD” and loop through stream items in batches. We will also specify stream item ID as the Elasticsearch document ID.
Alternatively we can use REDIS_CLIENT.xrevrange('search_log:YYYY-MM-DD', '+', '-'
to retrieve items in reverse order. We can remove data from Redis either by setting TTL on different stream keys or manually doing REDIS_CLIENT.del(''search_log:YYYY-MM-DD'')
.
xread
What if we want more real-time data pipeline from Redis to Elasticsearch? Instead of scheduled daily job we can build a dameon that will use the new xread
command.
The challenge with this approach is that Redis will be able to record items in stream much faster than Elasticsearch can create documents (RAM vs disk). While the scheduled job approach above is less cutting edge it may fit better into pre-existing ETL processes.
Using the data
To access data we can encapsulate the logic for choosing Redis vs Elasticsearch as our data source in a separate class.
We would need to modify this code to work with date range that requires data from both Redis and Elasticsearch. Separately we can use Elasticsearch Kibana to build interesting visualizations on data in Elasticsearch. Logstash mentioned above has input/output plugins for accessing data in Redis. Currently they only support lists and channels but hopefully soon they will integrate with streams.
Other Streams commands
xlen
xlen search_log:2018-01-08
will the number of items in stream. We could loop through the date stamped keys to build a table in UI showing date and number of searches.
maxlen
We also can create capped streams with REDIS.xadd('last_1000_searches', 'maxlen', '~', 1000, '*', 'zip', zip, 'query', query)
. ~
improves performance by allowing Redis to keep approximately last 1000 items. This way we can enable our biz users to view the most recent searches.
Links
- https://brandur.org/redis-streams
- https://hackernoon.com/introduction-to-redis-streams-133f1c375cd3
- https://github.com/redis/redis-rcp/blob/master/RCP11.md
- https://gist.github.com/antirez/68e67f3251d10f026861be2d0fe0d2f4