[Danny Berger](https://dpb587.me/ "Home")

# Data Processing with Concourse

October 19, 2016

Recently I needed to focus on a project that regularly processed datasets with typical extract, transform, and load stages. Historically it was using [Amazon SQS](https://aws.amazon.com/sqs/) to queue up the tasks for each stage, some [supervisor](http://supervisord.org/)-managed processes to work off the queue, and [Amazon S3](https://aws.amazon.com/s3/) to store the results of each stage. The original implementation was struggling because it was inefficient, difficult to detect problems, and more difficult to replay a stage whenever unexpected, bad data showed up.

Avoiding large-scale solutions like [Hadoop](https://hadoop.apache.org/), I found [Pachyderm](https://www.pachyderm.io/) which seemed perfect. It is a newer, container-oriented solution which managed workloads via [Kubernetes](https://kubernetes.io/). I quickly had a cluster running in [Google Cloud Platform](https://cloud.google.com/), but once I was to the point of executing custom jobs I started running into issues around hanging jobs, non-starting containers, and pods with inexplicably high CPU. After spending some time debugging, I started thinking... I already have [a tool](https://concourse.ci/) which is really good at managing inputs, containers, outputs and chaining those things together.

# Architecture

Overall, I termed this approach "datapact". It represents a collection of configuration, data manipulations, and data results. The high level components are...

**Concourse** - at the core, [Concourse](https://concourse.ci/) is used to chain together the various stages of data operations. The results of each stage are tracked, and the existing Concourse concepts are followed...

- pipeline - a common group of datasets and data operations
- job - a specific operation for a dataset
- resource - data management for the results of job executions

**Job receipts** - receipts record the results of a job execution. This includes which prior datasets were the input, the new data generated by the job, and some additional metadata.

**Git** - the [`git`](https://git-scm.com/) repository is a good method for storing the execution receipts. It provides built-in concepts of commits and branches. More specifically...

- branches - each data transformation job has its own branch which stores its configuration, Docker environment, and job receipts
- commits - made whenever the data processing changes or updated job receipts are available

**S3** - a bucket is used to store blobs which are generated by the job executions.

# Example

As an example, let's say a network environment runs file integrity monitoring. Whenever there is a new base server image available we need to inspect it and eventually send results to our integrity server.

![Screenshot: datapact-pipeline](https://s3.dualstack.us-east-1.amazonaws.com/dpb587-website-us-east-1/asset/blog/2016-10-19-data-processing-with-concourse/datapact-pipeline.png)

## Receiving New Server Images

The very first responsibility is to download the server images into our datapact pipeline. Stages are branch-based, so we can start by creating an empty branch in our datapact repository.

```
$ git checkout --orphan aws-xen-centos-7
```

Each branch should document the container environment it needs to run in. The easiest way to do this is create a `docker` directory which will contain a `Dockerfile` and `execute` script.

```
$ mkdir docker
```

For our first job, `Dockerfile` should install `curl` and `jq`.

```
$ cat > docker/Dockerfile <<EOF
FROM alpine:latest
RUN apk install --no-cache ca-certificates curl jq
ADD execute /opt/datapact/execute
EOF
```

And then the `execute` script will utilize those packages.

```
$ touch docker/execute
$ chmod +x docker/execute
```

In our specific case, the script will download the tarball of the base server image (aka "stemcell") and dump it into a `result` directory (the default location for all our transformed blobs).

```
$ curl -o result/stemcell.tgz "$stemcell_tarball"
```

Since this is the very first execution in the pipeline, we can give a seed for generating the receipt ID. This helps ensure a deterministic receipt ID, rather than random or time-based.

```
$ jq -nc
  --arg name "$stemcell_name" \
  --arg version "$stemcell_version" \
  '
    {
      "branch": ":concourse",
      "result": "\(.name)/\(.version)"
    }
  ' \
  \> result/.datapact/input
```

At this point we have defined what needs to happen and the environment in which it should happen. Next we need to document the input for our task - normally this is how we create dependencies between data branches. In our case, we do not have a previous data job, so instead we can utilize a raw Concourse [resource](http://concourse.ci/configuring-resources.html) to trigger it.

```
$ cat config/job.json
{ "input": [
  { "alias": "stemcell",
    "resource": {
      "type": "bosh-io-stemcell",
      "source": {
        "name": "bosh-aws-xen-centos-7-go_agent" } } } ] }
```

Now that we're done with our first task, we can safely commit it.

```
$ git add config/job.json docker/Dockerfile docker/execute
$ git commit -m 'Initial commit'
```

## Analyzing the Server Images

Once I have a server image, the next step may be to analyze it. For this example, I may want to reuse a container that is managed elsewhere (e.g. [`bosh-stemcell-metadata-scripts`](https://github.com/dpb587/bosh-stemcell-metadata-scripts)). In this case, I need to create a `config/job.json` file with a `task` section in a new branch called `aws-xen-centos-7-metadata`.

```
{ "task": {
    "image": {
      "source": {
        "repository": "dpb587/bosh-stemcell-metadata" } },
    "privileged": true,
    "run": {
      "path": "/bosh-stemcell-metadata/bin/analyze" },
    "env": {
      "analyzer": "filelist manifest packages",
      "tarball": "aws-xen-centos-7/stemcell.tgz" } },
  "input": [
    { "branch": "aws-xen-centos-7" } ] }
```

Specifically, I have told it where to find the container, what script to execute, a couple environment settings, and that it needs to execute on all data which comes from the `aws-xen-centos-7` branch that we defined earlier.

## Building the Pipeline

To generate a datapact pipeline for Concourse, we rely on an additional configuration file. Typically this lives in its own branch.

```
$ git checkout --orphan master
```

The configuration first must include a list of branches which are responsible for data manipulations since not all repository branches may be relevant to the data tasks.

```
{ "branches_list": [
    { "branch": "aws-xen-centos-7" },
    { "branch": "aws-xen-centos-7-metadata"} ] }
```

The next section is where we define our base configuration used to `get` task inputs before they execute and `put` task results after they execute. This should contain the `git` repository where receipts are stored in addition to the S3 bucket information used for blob storage.

```
{ "datapact_job": {
  "source": {
    "uri": "git@github.com:dpb587/bosh-stemcell-metadata.git",
    "private_key": env.GIT_PRIVATE_KEY,
    "s3_bucket": "dpb587-bosh-stemcell-metadata-us-east-1",
    "s3_access_key_id": env.AWS_ACCESS_KEY_ID,
    "s3_secret_access_key": env.AWS_SECRETE_ACCESS_KEY } }
```

One other useful section defines the default Docker registry. By setting this, all datapact branches which have a `docker` directory will be watched and automatically rebuilt when the images need to change.

```
{ "docker_registry": {
  "source": {
    "repository": "dpb587/bosh-stemcell-metadata",
    "username": env.DOCKER_USERNAME,
    "password": env.DOCKER_PASSWORD } } }
```

The three sections are merged and saved as `datapact.json`. From that directory we can run `datapact-pipeline-generate` which dumps out a pipeline featuring:

- jobs for each branch configured to received inputs, execute manipulations, and archive results
- datapact resources for each branch capable of watching, downloading, and uploading results
- Docker image resources for any branches which have documented their container environment
- any custom resources, resource types, groups, or jobs

The pipeline can then be set with:

```
$ fly set-pipeline -c <( datapact-pipeline-generate )
```

## Additional Configuration

At this point, the datapact pipeline is pretty simple. It has a job for building the Docker image and a job for triggering off the new base server images via the `bosh-io-stemcell` built-in resource. Whenever new base server images are found, it runs some metadata scripts and archives the results. Further tasks might watch the `aws-xen-centos-7-metadata` branch and upload results to the integrity server.

Originally, one complaint with the existing system was the difficulty knowing when tasks failed. Since this is a regular Concourse pipeline, I could add a custom [Slack resource](https://github.com/cloudfoundry-community/slack-notification-resource) and wrap my jobs to execute `on_failure` handling with a [Slack](https://slack.com/) message and link to the build logs of the failure. I can also manually pull up the pipeline in my browser to quickly see whether the last runs are green or red.

# Receipt Files

The receipts contain data about how a particular execution happened and what the results were. Here are a few more details about the data it contains.

## Inputs

Every data execution is initiated by some sort of input. Occasionally this is time itself, but usually this is based on an external event. An `inputs` section might look like...

```
{ "inputs": [
  { "branch": "aws-xen-centos-7",
    "result": "460783c4f026dbfc580c9297ad5296f3f861c367",
    "metadata": [
      { "key": "version",
        "value": "3263.7" } ],
    "timestamp": "2016-10-12T21:12:52Z" } ] }
```

The `branch` simply references the Git branch of the datapact repository where the `result` (receipt ID) lives. The `timestamp` field indicates which result was used (in case later executions are performed), and `metadata` is arbitrary data which came from the input.

## Outputs

Most executions will generate some sort of artifact. These are recorded in the `outputs` section which might look like...

```
{ "output": [
  { "blob": "c570de39b72a0e66ed6e59fa2127461b9b16feec",
    "checksum": {
      "sha256": "0a3743ad21d8bb3193fbf07ac8c7856a4dda0e30cb6b8f4829d4f479d9e7e22c" },
    "filters": [ "gz" ],
    "path": "filelist.gz",
    "size": 1460203 } ] }
```

The `blob` references an object key in the S3 blobstore. The `checksum`s can be used to verify the integrity of the blobs when downloading. Since blobs will often be text files, `filters` may be used to further transform data when uploading and downloading blobs - in this case `gz` is used to significantly reduce the size. The `path` indicates what the file name is, and `size` the stored file size.

Typically your toolchain will rely on these results file and download objects with something like the following...

```
$ url=$( jq -r '"https://s3.amazonaws.com/\(env.S3_BUCKET)/\(.branch)/\(.output[0].blob\)"' )
$ path=$( jq -r '.output[0].path' )
$ wget -O "$path" "$url"
$ [[ "$( jq -r '.output[0].checksum.sha256' )" == "$( shasum -a 256 "$path" | cut -f1 -d' ' )" ]]
```

## Metadata

There are often a few other fields in the receipt...

```
{ "branch": "aws-xen-centos-7",
  "metadata": [
    { "key": "buildenv",
      "value": "0aed24f371e44660b614eabef08f7ce31a99df9c" } ],
  "timestamp": "2016-10-12T21:12:52Z" }
```

The `branch` field documents which branch the job execution occurred. This `branch` is used for the directory name in the S3 bucket. The `metadata` field may have additional information about the job execution environment (such as which container or code version was used). Finally, the `timestamp` is used as an indicator of when the job execution is finished. In the case a job is re-executed, the `timestamp` will be bumped even if the outputs are exactly the same.

# In Practice

If you're curious, one example of this is [dpb587/bosh-stemcell-metadata](https://dpb587.github.io/bosh-stemcell-metadata/) where a pipeline watches for new [BOSH stemcells](https://bosh.io/), analyzes some data, and updates a `gh-pages` branch with aggregated results. This allows me to easily pull down information for a specific version I'm interested in (manually and watched by another Concourse pipeline).

A couple other uses I've found for this are:

- pulling, transforming, and forwarding product datafeeds from vendors
- regularly downloading and generating reports for KPIs from third-party integrations
- running generic, cron-like jobs where I care about retaining logs and results over time

Currently, datapact is a private project, but I thought the idea of using Concourse for data transformation was particularly interesting, and the described architecture has helped me simplify the way I write automated data tasks.

## Reader Comments

Copyright © 2026 // [dpb587.me](https://dpb587.me/) is a [personal](https://dpb587.me/projects/website), [open source](https://github.com/dpb587/dpb587.me/blob/main/content/post/2016/data-processing-with-concourse-20161019.md) site.
