Data Processing with Concourse
Recently I needed to focus on a project that regularly processed datasets with typical extract, transform, and load stages. Historically it was using Amazon SQS to queue up the tasks for each stage, some supervisor-managed processes to work off the queue, and Amazon S3 to store the results of each stage. The original implementation was struggling because it was inefficient, difficult to detect problems, and more difficult to replay a stage whenever unexpected, bad data showed up.
Avoiding large-scale solutions like Hadoop, I found Pachyderm which seemed perfect. It is a newer, container-oriented solution which managed workloads via Kubernetes. I quickly had a cluster running in Google Cloud Platform, but once I was to the point of executing custom jobs I started running into issues around hanging jobs, non-starting containers, and pods with inexplicably high CPU. After spending some time debugging, I started thinking... I already have a tool which is really good at managing inputs, containers, outputs and chaining those things together.
Architecture
Overall, I termed this approach "datapact". It represents a collection of configuration, data manipulations, and data results. The high level components are...
Concourse – at the core, Concourse is used to chain together the various stages of data operations. The results of each stage are tracked, and the existing Concourse concepts are followed...
- pipeline – a common group of datasets and data operations
- job – a specific operation for a dataset
- resource – data management for the results of job executions
Job receipts – receipts record the results of a job execution. This includes which prior datasets were the input, the new data generated by the job, and some additional metadata.
Git – the git
repository is a good method for storing the execution receipts. It provides built-in concepts of commits and branches. More specifically...
- branches – each data transformation job has its own branch which stores its configuration, Docker environment, and job receipts
- commits – made whenever the data processing changes or updated job receipts are available
S3 – a bucket is used to store blobs which are generated by the job executions.
Example
As an example, let's say a network environment runs file integrity monitoring. Whenever there is a new base server image available we need to inspect it and eventually send results to our integrity server.
Receiving New Server Images
The very first responsibility is to download the server images into our datapact pipeline. Stages are branch-based, so we can start by creating an empty branch in our datapact repository.
$
git checkout --orphan aws-xen-centos-7
Each branch should document the container environment it needs to run in. The easiest way to do this is create a docker
directory which will contain a Dockerfile
and execute
script.
$
mkdir docker
For our first job, Dockerfile
should install curl
and jq
.
$
cat > docker/Dockerfile <<EOF
FROM alpine:latest
RUN apk install --no-cache ca-certificates curl jq
ADD execute /opt/datapact/execute
EOF
And then the execute
script will utilize those packages.
$
touch docker/execute
$
chmod +x docker/execute
In our specific case, the script will download the tarball of the base server image (aka "stemcell") and dump it into a result
directory (the default location for all our transformed blobs).
$
curl -o result/stemcell.tgz "$stemcell_tarball"
Since this is the very first execution in the pipeline, we can give a seed for generating the receipt ID. This helps ensure a deterministic receipt ID, rather than random or time-based.
$
jq -nc
--arg name "$stemcell_name" \
--arg version "$stemcell_version" \
'
{
"branch": ":concourse",
"result": "\(.name)/\(.version)"
}
' \
> result/.datapact/input
At this point we have defined what needs to happen and the environment in which it should happen. Next we need to document the input for our task – normally this is how we create dependencies between data branches. In our case, we do not have a previous data job, so instead we can utilize a raw Concourse resource to trigger it.
$
cat config/job.json
{ "input": [
{ "alias": "stemcell",
"resource": {
"type": "bosh-io-stemcell",
"source": {
"name": "bosh-aws-xen-centos-7-go_agent" } } } ] }
Now that we're done with our first task, we can safely commit it.
$
git add config/job.json docker/Dockerfile docker/execute
$
git commit -m 'Initial commit'
Analyzing the Server Images
Once I have a server image, the next step may be to analyze it. For this example, I may want to reuse a container that is managed elsewhere (e.g. bosh-stemcell-metadata-scripts
). In this case, I need to create a config/job.json
file with a task
section in a new branch called aws-xen-centos-7-metadata
.
{ "task": {
"image": {
"source": {
"repository": "dpb587/bosh-stemcell-metadata" } },
"privileged": true,
"run": {
"path": "/bosh-stemcell-metadata/bin/analyze" },
"env": {
"analyzer": "filelist manifest packages",
"tarball": "aws-xen-centos-7/stemcell.tgz" } },
"input": [
{ "branch": "aws-xen-centos-7" } ] }
Specifically, I have told it where to find the container, what script to execute, a couple environment settings, and that it needs to execute on all data which comes from the aws-xen-centos-7
branch that we defined earlier.
Building the Pipeline
To generate a datapact pipeline for Concourse, we rely on an additional configuration file. Typically this lives in its own branch.
$
git checkout --orphan master
The configuration first must include a list of branches which are responsible for data manipulations since not all repository branches may be relevant to the data tasks.
{ "branches_list": [
{ "branch": "aws-xen-centos-7" },
{ "branch": "aws-xen-centos-7-metadata"} ] }
The next section is where we define our base configuration used to get
task inputs before they execute and put
task results after they execute. This should contain the git
repository where receipts are stored in addition to the S3 bucket information used for blob storage.
{ "datapact_job": {
"source": {
"uri": "git@github.com:dpb587/bosh-stemcell-metadata.git",
"private_key": env.GIT_PRIVATE_KEY,
"s3_bucket": "dpb587-bosh-stemcell-metadata-us-east-1",
"s3_access_key_id": env.AWS_ACCESS_KEY_ID,
"s3_secret_access_key": env.AWS_SECRETE_ACCESS_KEY } }
One other useful section defines the default Docker registry. By setting this, all datapact branches which have a docker
directory will be watched and automatically rebuilt when the images need to change.
{ "docker_registry": {
"source": {
"repository": "dpb587/bosh-stemcell-metadata",
"username": env.DOCKER_USERNAME,
"password": env.DOCKER_PASSWORD } } }
The three sections are merged and saved as datapact.json
. From that directory we can run datapact-pipeline-generate
which dumps out a pipeline featuring:
- jobs for each branch configured to received inputs, execute manipulations, and archive results
- datapact resources for each branch capable of watching, downloading, and uploading results
- Docker image resources for any branches which have documented their container environment
- any custom resources, resource types, groups, or jobs
The pipeline can then be set with:
$
fly set-pipeline -c <( datapact-pipeline-generate )
Additional Configuration
At this point, the datapact pipeline is pretty simple. It has a job for building the Docker image and a job for triggering off the new base server images via the bosh-io-stemcell
built-in resource. Whenever new base server images are found, it runs some metadata scripts and archives the results. Further tasks might watch the aws-xen-centos-7-metadata
branch and upload results to the integrity server.
Originally, one complaint with the existing system was the difficulty knowing when tasks failed. Since this is a regular Concourse pipeline, I could add a custom Slack resource and wrap my jobs to execute on_failure
handling with a Slack message and link to the build logs of the failure. I can also manually pull up the pipeline in my browser to quickly see whether the last runs are green or red.
Receipt Files
The receipts contain data about how a particular execution happened and what the results were. Here are a few more details about the data it contains.
Inputs
Every data execution is initiated by some sort of input. Occasionally this is time itself, but usually this is based on an external event. An inputs
section might look like...
{ "inputs": [
{ "branch": "aws-xen-centos-7",
"result": "460783c4f026dbfc580c9297ad5296f3f861c367",
"metadata": [
{ "key": "version",
"value": "3263.7" } ],
"timestamp": "2016-10-12T21:12:52Z" } ] }
The branch
simply references the Git branch of the datapact repository where the result
(receipt ID) lives. The timestamp
field indicates which result was used (in case later executions are performed), and metadata
is arbitrary data which came from the input.
Outputs
Most executions will generate some sort of artifact. These are recorded in the outputs
section which might look like...
{ "output": [
{ "blob": "c570de39b72a0e66ed6e59fa2127461b9b16feec",
"checksum": {
"sha256": "0a3743ad21d8bb3193fbf07ac8c7856a4dda0e30cb6b8f4829d4f479d9e7e22c" },
"filters": [ "gz" ],
"path": "filelist.gz",
"size": 1460203 } ] }
The blob
references an object key in the S3 blobstore. The checksum
s can be used to verify the integrity of the blobs when downloading. Since blobs will often be text files, filters
may be used to further transform data when uploading and downloading blobs – in this case gz
is used to significantly reduce the size. The path
indicates what the file name is, and size
the stored file size.
Typically your toolchain will rely on these results file and download objects with something like the following...
$
url=$( jq -r '"https://s3.amazonaws.com/\(env.S3_BUCKET)/\(.branch)/\(.output[0].blob\)"' )
$
path=$( jq -r '.output[0].path' )
$
wget -O "$path" "$url"
$
[[ "$( jq -r '.output[0].checksum.sha256' )" == "$( shasum -a 256 "$path" | cut -f1 -d' ' )" ]]
Metadata
There are often a few other fields in the receipt...
{ "branch": "aws-xen-centos-7",
"metadata": [
{ "key": "buildenv",
"value": "0aed24f371e44660b614eabef08f7ce31a99df9c" } ],
"timestamp": "2016-10-12T21:12:52Z" }
The branch
field documents which branch the job execution occurred. This branch
is used for the directory name in the S3 bucket. The metadata
field may have additional information about the job execution environment (such as which container or code version was used). Finally, the timestamp
is used as an indicator of when the job execution is finished. In the case a job is re-executed, the timestamp
will be bumped even if the outputs are exactly the same.
In Practice
If you're curious, one example of this is dpb587/bosh-stemcell-metadata where a pipeline watches for new BOSH stemcells, analyzes some data, and updates a gh-pages
branch with aggregated results. This allows me to easily pull down information for a specific version I'm interested in (manually and watched by another Concourse pipeline).
A couple other uses I've found for this are:
- pulling, transforming, and forwarding product datafeeds from vendors
- regularly downloading and generating reports for KPIs from third-party integrations
- running generic, cron-like jobs where I care about retaining logs and results over time
Currently, datapact is a private project, but I thought the idea of using Concourse for data transformation was particularly interesting, and the described architecture has helped me simplify the way I write automated data tasks.