I am a Sr. Software Developer at Oracle Cloud. The opinions expressed here are my own and not necessarily those of my employer.
Redis for Data Engineering and Data Science
Recently I spoke at RedisDay Seattle about using Redis for Data Engineering and Data Science. In this article I want to revisit these ideas.
- Python Pandas
- docker-compose.yml
- Dockerfile
- Worker containers
- Web container for Jupyter Notebook
- Links
Python Pandas
Python Pandas is a popular library for data science tasks such as importing data from various sources and analyzying it.
import pandas as pd
df1 = pd.read_csv('file.csv')
df2 = pd.read_json('http://.../something.json')
df3 = pd.read_sql_query('select * from …', connection)
df1.aggregate(...)
The challenge is that often our data acquisition is much more complex that simply reading it from one file or a single DB query. We often have to pull data from different sources. If one of our queries or API requests fails we do not want to repeat the entire process from the beginning.
In this article we will explore how to use Redis for two purposes to build a simple yet more scalable system:
- As a job queue to run multiple data acquisition tasks in parallel.
- As a DB to temporarily store our datasets.
We will be using Docker and Docker Compose to manage our environment.
docker-compose.yml
We will start our environment with docker-compose up --build -d --scale worker=2
command. This will bring up 1 Redis, 1 Web and 2 Worker containers.
version: '3.7'
services:
redis:
image: redis:5.0.7-alpine
ports:
- target: 6379
published: 6379
expose:
- 6379
web:
build:
context: .
dockerfile: Dockerfile
volumes:
- ./:/opt/redis_data
ports:
- target: 5000
published: 5000
- target: 8888
published: 8888
env_file:
- common.env
- secrets.env
environment:
CONTAINER_TYPE: web
worker:
build:
context: .
dockerfile: Dockerfile
volumes:
- ./:/opt/redis_data
env_file:
- common.env
- secrets.env
environment:
CONTAINER_TYPE: worker
Environment files
common.env
This file will contain common environment variables to be shared across containers.
REDIS_HOST=redis_data_redis_1
FLASK_ENV=development
FLASK_DEBUG=1
APP_ENV=dev
secrets.env
This file will NOT be commited to the repo but this is where we will store token necessary to access the Github APIs. More info is available here
GITHUB_TOKEN=...
Dockerfile
FROM python:3.6.8
END app_name=redis_data
ENV home_dir=/opt/${home_dir}/
RUN mkdir -p ${home_dir}
WORKDIR ${home_dir}
COPY Pipfile* ./
RUN pip install --upgrade pip && pip install pipenv
RUN pipenv install --system --dev
COPY ./ ./
EXPOSE 5000
EXPOSE 8888
ENTRYPOINT ["./entrypoint.sh"]
entrypoint.sh
This file contains the logic that depending on CONTAINER_TYPE
env variable starts either Flask web server and Jupyter Notebook OR background job worker using RQ
library.
#!/bin/bash
# https://docs.docker.com/config/containers/multi-service_container/
set -m
if [ $CONTAINER_TYPE = 'web' ]
then
flask run -h 0.0.0.0 -p 5000 &
jupyter notebook --ip=0.0.0.0 --no-browser --allow-root --NotebookApp.token='' --NotebookApp.password='' &
elif [ $CONTAINER_TYPE = 'worker' ]
then
rq worker -c rq_config &
fi
fg %1
To make sure RQ worker runs properly we need to create rq_config.py
file:
import os
REDIS_URL = f"redis://{os.environ.get('REDIS_HOST')}:6379/1"
pipfile
We will install various dependcies with pipenv
and Pipfile
:
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[packages]
jupyter = "*"
pandas = "*"
requests = "*"
redis = "*"
flask = "*"
rq-dashboard = "*"
flask-rq2 = "*"
...
[requires]
python_version = "3.6.8"
Worker containers
We will jump into Python shell and start running our background jobs with jobs.github_users.queue()
. First the code will query https://api.github.com/users?since=0
endpoint and then it will loop through the users and hit each user URL (like this https://api.github.com/users/mojombo).
We use ?since
parameter to paginate through the users
endpoint and queue subsequent requests. We use counter
to stop after 10 users
requests. Overall we make 310 HTTP requests so we do not want to start from the beginning in case one of them fails.
@RQ_CLIENT.job()
def github_users(since=0, counter=0):
''' get users data from github api '''
req = requests.get(f'https://api.github.com/users?since={since}', headers=GH_HEADERS)
df = pd.DataFrame(req.json())
for _, v in df.iterrows():
github_each_user.queue(v['login'])
since = v['id']
# queue next job request
if counter+1 < 10:
github_users.queue(since=since, counter=counter+1)
@RQ_CLIENT.job()
def github_each_user(login):
''' get data for specific user '''
req = requests.get(f'https://api.github.com/users/{login}', headers=GH_HEADERS)
ds = pd.Series(req.json()).to_dict()
keys = ['public_repos', 'public_gists', 'followers', 'following']
ds2 = {key: ds[key] for key in keys}
logging.info(ds2)
REDIS_CLIENT.hmset(login, ds2)
Each job will be sent via Redis queue thanks to .queue(...)
method and picked up by one of the worker containers. As the jobs complete they massage the data to extract ['public_repos', 'public_gists', 'followers', 'following']
and store them in Redis Hashes. Data in Redis will look this like:
127.0.0.1:6379> hgetall mojombo
1) "public_repos"
2) "61"
3) "public_gists"
4) "62"
...
Web container for Jupyter Notebook
This is where we transition from data engineering to data science. We can browse to http://localhost:8888
and user the Notebooks to do data analysis.
One limitation of Redis is that we cannot query by value so we will use Panda DataFrame to pull data out of Redis and store it in Python memory. Then we can do regular Pandas aggregations.
import os
import pandas as pd
import redis
RC = redis.Redis(host=os.environ.get('REDIS_HOST'), charset='utf-8', decode_responses=True)
df = pd.DataFrame()
for key in RC.keys():
value = RC.hgetall(key)
value['login'] = key
df = df.append(value, ignore_index=True)
for k in ['public_repos', 'public_gists', 'followers', 'following']:
df[k] = df[k].astype(int)
Overall this approach can be a good option for medium data scale. Job processing can stopped and restarted later. This solution does require a lot of memory so it is probably not a good choice when we need to store large amounts of data for extended periods of time. But it could be very useful as a place to temporarily store data as we are processing it and once the aggregations are done we can flush Redis.
Links
- Video of my presenation https://www.youtube.com/watch?v=Koh6piVaYh0
- Slides from my presentation http://bit.ly/36mQ8H2
- Code samples from my presentation https://github.com/dmitrypol/redis_data
- https://pandas.pydata.org/
- https://palletsprojects.com/p/flask/
- https://python-rq.org/