Data Processing with Concourse

October 19, 2016

Recently I needed to focus on a project that regularly processed datasets with typical extract, transform, and load stages. Historically it was using Amazon SQS to queue up the tasks for each stage, some supervisor-managed processes to work off the queue, and Amazon S3 to store the results of each stage. The original implementation was struggling because it was inefficient, difficult to detect problems, and more difficult to replay a stage whenever unexpected, bad data showed up.

Avoiding large-scale solutions like Hadoop, I found Pachyderm which seemed perfect. It is a newer, container-oriented solution which managed workloads via Kubernetes. I quickly had a cluster running in Google Cloud Platform, but once I was to the point of executing custom jobs I started running into issues around hanging jobs, non-starting containers, and pods with inexplicably high CPU. After spending some time debugging, I started thinking... I already have a tool which is really good at managing inputs, containers, outputs and chaining those things together.

Architecture

Overall, I termed this approach "datapact". It represents a collection of configuration, data manipulations, and data results. The high level components are...

Concourse – at the core, Concourse is used to chain together the various stages of data operations. The results of each stage are tracked, and the existing Concourse concepts are followed...

  • pipeline – a common group of datasets and data operations
  • job – a specific operation for a dataset
  • resource – data management for the results of job executions

Job receipts – receipts record the results of a job execution. This includes which prior datasets were the input, the new data generated by the job, and some additional metadata.

Git – the git repository is a good method for storing the execution receipts. It provides built-in concepts of commits and branches. More specifically...

  • branches – each data transformation job has its own branch which stores its configuration, Docker environment, and job receipts
  • commits – made whenever the data processing changes or updated job receipts are available

S3 – a bucket is used to store blobs which are generated by the job executions.

Example

As an example, let's say a network environment runs file integrity monitoring. Whenever there is a new base server image available we need to inspect it and eventually send results to our integrity server.

Screenshot: datapact-pipeline

Receiving New Server Images

The very first responsibility is to download the server images into our datapact pipeline. Stages are branch-based, so we can start by creating an empty branch in our datapact repository.

$ 
git checkout --orphan aws-xen-centos-7

Each branch should document the container environment it needs to run in. The easiest way to do this is create a docker directory which will contain a Dockerfile and execute script.

$ 
mkdir docker

For our first job, Dockerfile should install curl and jq.

$ 
cat > docker/Dockerfile <<EOF
FROM alpine:latest
RUN apk install --no-cache ca-certificates curl jq
ADD execute /opt/datapact/execute
EOF

And then the execute script will utilize those packages.

$ 
touch docker/execute
$ 
chmod +x docker/execute

In our specific case, the script will download the tarball of the base server image (aka "stemcell") and dump it into a result directory (the default location for all our transformed blobs).

$ 
curl -o result/stemcell.tgz "$stemcell_tarball"

Since this is the very first execution in the pipeline, we can give a seed for generating the receipt ID. This helps ensure a deterministic receipt ID, rather than random or time-based.

$ 
jq -nc
  --arg name "$stemcell_name" \
  --arg version "$stemcell_version" \
  '
    {
      "branch": ":concourse",
      "result": "\(.name)/\(.version)"
    }
  ' \
  > result/.datapact/input

At this point we have defined what needs to happen and the environment in which it should happen. Next we need to document the input for our task – normally this is how we create dependencies between data branches. In our case, we do not have a previous data job, so instead we can utilize a raw Concourse resource to trigger it.

$ 
cat config/job.json
{ "input": [
  { "alias": "stemcell",
    "resource": {
      "type": "bosh-io-stemcell",
      "source": {
        "name": "bosh-aws-xen-centos-7-go_agent" } } } ] }

Now that we're done with our first task, we can safely commit it.

$ 
git add config/job.json docker/Dockerfile docker/execute
$ 
git commit -m 'Initial commit'

Analyzing the Server Images

Once I have a server image, the next step may be to analyze it. For this example, I may want to reuse a container that is managed elsewhere (e.g. bosh-stemcell-metadata-scripts). In this case, I need to create a config/job.json file with a task section in a new branch called aws-xen-centos-7-metadata.

{ "task": {
    "image": {
      "source": {
        "repository": "dpb587/bosh-stemcell-metadata" } },
    "privileged": true,
    "run": {
      "path": "/bosh-stemcell-metadata/bin/analyze" },
    "env": {
      "analyzer": "filelist manifest packages",
      "tarball": "aws-xen-centos-7/stemcell.tgz" } },
  "input": [
    { "branch": "aws-xen-centos-7" } ] }

Specifically, I have told it where to find the container, what script to execute, a couple environment settings, and that it needs to execute on all data which comes from the aws-xen-centos-7 branch that we defined earlier.

Building the Pipeline

To generate a datapact pipeline for Concourse, we rely on an additional configuration file. Typically this lives in its own branch.

$ 
git checkout --orphan master

The configuration first must include a list of branches which are responsible for data manipulations since not all repository branches may be relevant to the data tasks.

{ "branches_list": [
    { "branch": "aws-xen-centos-7" },
    { "branch": "aws-xen-centos-7-metadata"} ] }

The next section is where we define our base configuration used to get task inputs before they execute and put task results after they execute. This should contain the git repository where receipts are stored in addition to the S3 bucket information used for blob storage.

{ "datapact_job": {
  "source": {
    "uri": "git@github.com:dpb587/bosh-stemcell-metadata.git",
    "private_key": env.GIT_PRIVATE_KEY,
    "s3_bucket": "dpb587-bosh-stemcell-metadata-us-east-1",
    "s3_access_key_id": env.AWS_ACCESS_KEY_ID,
    "s3_secret_access_key": env.AWS_SECRETE_ACCESS_KEY } }

One other useful section defines the default Docker registry. By setting this, all datapact branches which have a docker directory will be watched and automatically rebuilt when the images need to change.

{ "docker_registry": {
  "source": {
    "repository": "dpb587/bosh-stemcell-metadata",
    "username": env.DOCKER_USERNAME,
    "password": env.DOCKER_PASSWORD } } }

The three sections are merged and saved as datapact.json. From that directory we can run datapact-pipeline-generate which dumps out a pipeline featuring:

  • jobs for each branch configured to received inputs, execute manipulations, and archive results
  • datapact resources for each branch capable of watching, downloading, and uploading results
  • Docker image resources for any branches which have documented their container environment
  • any custom resources, resource types, groups, or jobs

The pipeline can then be set with:

$ 
fly set-pipeline -c <( datapact-pipeline-generate )

Additional Configuration

At this point, the datapact pipeline is pretty simple. It has a job for building the Docker image and a job for triggering off the new base server images via the bosh-io-stemcell built-in resource. Whenever new base server images are found, it runs some metadata scripts and archives the results. Further tasks might watch the aws-xen-centos-7-metadata branch and upload results to the integrity server.

Originally, one complaint with the existing system was the difficulty knowing when tasks failed. Since this is a regular Concourse pipeline, I could add a custom Slack resource and wrap my jobs to execute on_failure handling with a Slack message and link to the build logs of the failure. I can also manually pull up the pipeline in my browser to quickly see whether the last runs are green or red.

Receipt Files

The receipts contain data about how a particular execution happened and what the results were. Here are a few more details about the data it contains.

Inputs

Every data execution is initiated by some sort of input. Occasionally this is time itself, but usually this is based on an external event. An inputs section might look like...

{ "inputs": [
  { "branch": "aws-xen-centos-7",
    "result": "460783c4f026dbfc580c9297ad5296f3f861c367",
    "metadata": [
      { "key": "version",
        "value": "3263.7" } ],
    "timestamp": "2016-10-12T21:12:52Z" } ] }

The branch simply references the Git branch of the datapact repository where the result (receipt ID) lives. The timestamp field indicates which result was used (in case later executions are performed), and metadata is arbitrary data which came from the input.

Outputs

Most executions will generate some sort of artifact. These are recorded in the outputs section which might look like...

{ "output": [
  { "blob": "c570de39b72a0e66ed6e59fa2127461b9b16feec",
    "checksum": {
      "sha256": "0a3743ad21d8bb3193fbf07ac8c7856a4dda0e30cb6b8f4829d4f479d9e7e22c" },
    "filters": [ "gz" ],
    "path": "filelist.gz",
    "size": 1460203 } ] }

The blob references an object key in the S3 blobstore. The checksums can be used to verify the integrity of the blobs when downloading. Since blobs will often be text files, filters may be used to further transform data when uploading and downloading blobs – in this case gz is used to significantly reduce the size. The path indicates what the file name is, and size the stored file size.

Typically your toolchain will rely on these results file and download objects with something like the following...

$ 
url=$( jq -r '"https://s3.amazonaws.com/\(env.S3_BUCKET)/\(.branch)/\(.output[0].blob\)"' )
$ 
path=$( jq -r '.output[0].path' )
$ 
wget -O "$path" "$url"
$ 
[[ "$( jq -r '.output[0].checksum.sha256' )" == "$( shasum -a 256 "$path" | cut -f1 -d' ' )" ]]

Metadata

There are often a few other fields in the receipt...

{ "branch": "aws-xen-centos-7",
  "metadata": [
    { "key": "buildenv",
      "value": "0aed24f371e44660b614eabef08f7ce31a99df9c" } ],
  "timestamp": "2016-10-12T21:12:52Z" }

The branch field documents which branch the job execution occurred. This branch is used for the directory name in the S3 bucket. The metadata field may have additional information about the job execution environment (such as which container or code version was used). Finally, the timestamp is used as an indicator of when the job execution is finished. In the case a job is re-executed, the timestamp will be bumped even if the outputs are exactly the same.

In Practice

If you're curious, one example of this is dpb587/bosh-stemcell-metadata where a pipeline watches for new BOSH stemcells, analyzes some data, and updates a gh-pages branch with aggregated results. This allows me to easily pull down information for a specific version I'm interested in (manually and watched by another Concourse pipeline).

A couple other uses I've found for this are:

  • pulling, transforming, and forwarding product datafeeds from vendors
  • regularly downloading and generating reports for KPIs from third-party integrations
  • running generic, cron-like jobs where I care about retaining logs and results over time

Currently, datapact is a private project, but I thought the idea of using Concourse for data transformation was particularly interesting, and the described architecture has helped me simplify the way I write automated data tasks.