Airflow + Dataflow — scalable, secure and reliable data integration

Our workplace wants to integrate with a new CRM platform. One crucial task is getting our analysis into the CRM platform, for segmentation of users. However our business has tens of millions of users. What works fine on a single machine for a quick demo, won’t work at scale. We need to run this once a week, quickly, and keep costs down. As with everything, it needs to be reliable and secure by default.

We use a lot of google managed services, among them dataflow (managed apache beam) and composer (managed apache airflow). Generally, we’re pretty happy with them, however this took slightly longer than expected because of sparse documentation and examples, so I’m sharing my experience.

Using Dataflow

Previously running on one machine, there was a state involved, but it was easy to handle as there was only one machine. We’d fetch 5000 records, and once they were ingested into the CRM platform, fetch the next 5000. But at scale, we want to distribute that, so we need to be able to fetch lots of records, and batch them together somehow. I did some extra preprocessing to assign every record to a batch, of approximately no larger than 5000. My data, ready to export, looks something like this:

Sample data from Bigquery to export

It lives in a table on it’s own, dedicated to this particular batch export. I used bigquery date sharded tables for this. Although they’re not recommended for querying purposes, I find they can be handy for data management with imports/exports.

Having this file locally is normally handy for running your pipeline on some dummy data, as actually starting the dataflow service on GCP takes about 3 minutes to spin up VMs and has a cost associated with it.

Separate the idea of the running pipeline from your code

However, the guides for this are not immediately straightforward: https://cloud.google.com/dataflow/docs/guides/templates/creating-templates

I struggled with the differences between StaticValueProvider and RuntimeValueProvider (I believe the StaticValueProvider can be used to modify the shape of your DAG, and is available sooner in the execution). But once that’s understood, you can’t necessarily use these values wherever you want, as support is limited based on your language of choice. Our team does not have any java developers, so we used python, where only these IOs allow templated value providers: textio, avroio, tfrecordio

This is a little inadequate for my needs. However, we can mentally separate the concept of the python file that defines the beam pipeline, and the actual running instance of that pipeline. The python file, is just a python file, and can do anything a python file can do, so long as at the end of that file, we define a pipeline.

To that end, take a look at the following pipeline, with a few points to note:

Full pipeline definition file

There are a few noteworthy points, regarding data format, imports, pipeline options and metrics.

The data format when reading a nested structure from bigquery does not produce a python dict, but instead flattens. I added a step (“Convert flat name to nested struct”) to undo this. BigQuery also forces a schema, whereas my CRM platform will accept a union type, so I added a step (“Extract correct value type”) to merge those into one property. In order for the provided beam transform, GroupByKey to work, the elements need to have a certain structure: (key, value), so I added a step (“Format for batching”) to prepare for this. After they’re in batches, the last step is using requests to format the request to send the batch to the CRM platform and process the response.

The imports at the top of the file are pretty sparse, just need the beam import (I added the arg parsing imports lower to keep it all together). However, looking in each of the steps, they have their own imports. This is important, it shows that the steps are quite separate from the python file that creates the pipeline. These imports need to be available to the dataflow workers — if you’ve got a special requirement, you’ll need to list a requirements file for dataflow to install on each worker before starting. You can use global imports if you use the argument save_main_session=True in the pipeline options, but this can cause difficulties with state.

Pipeline options specifies a class to read any extra options in from the command line, that are specific to your pipeline. In this case so far, just the input table.

Metrics is a useful way to get insight into your running pipeline. Our distribution metric for batch size recorded just over 87M records processed, and in the UI, looks like so:

Counters showing distribution of batch size

Deploying

Deployment shell script

This has a few interesting flags. The streaming engine has been enabled. This means that google’s service does the intense work of shuffling data between nodes and storing state. Although this is a paid service, it means we can use smaller machines, with a lot less disk space, so the cost should balance out. Read more here: https://cloud.google.com/blog/products/data-analytics/introducing-cloud-dataflows-new-streaming-engine

FlexRS Goal has been set to Cost Optimised. Flexible resource scheduling allows for partial use of pre-emptible VMs, to reduce the cost of dataflow batch job worker VMs by 40%, but causes a delayed start by up to 6 hours. The delayed start however, causes an interesting problem later.

Finally a few points worth noting that I haven’t used this time, if you require connection over a private VPN to an on-premise network, this is possible too. You can specify the network and subnetwork to attach the VMs to, and specify the use of private IP addresses for workers (which could also be useful for NAT purposes). Lastly, the region specified here, does not have to be the region the VMs get deployed into, it is specifying where the cloud dataflow service will execute. You could put your VMs in a zone that is not in one of the supported regions (although for latency, they should ideally be close).

Using secrets

Secrets should be kept separately to the codebase — a change in the secret should not mean a change is required in the codebase (see 12 factor app on config: https://12factor.net/config). So instead I’ve passed arguments to the python file that reference where to find the secret, this running python file is then able to get the value of that secret, and use it in the construction and submission of the job. Understanding that the python file and the job are separate, we can use google cloud KMS (key management service) to encrypt a secret manually and store it in GCS, and have the python file read from GCS and decrypt it using the key, or we could use hashicorp’s Vault, to fetch the secret. Which you use depends on your infrastructure setup, I’m personally using Vault, and can recommend it. You could use other secret management solutions too, but these are 2 common ones (Note that these require additional libraries installed on the airflow worker running the python file, not the dataflow workers running the pipeline).

Using KMS and GCS to store and retrieve secrets

Demo usage of google cloud KMS and GCS solution

We can apply that to our pipeline preparation steps like so (Note: your airflow worker needs decrypt access to the key):

Pipeline preparation using KMS andGCS

Using Hashicorp Vault to retrieve secrets

  • you’ve already got vault running
  • you’re only going to run this on kubernetes
  • you’ve added your kubernetes cluster as an auth mechanism
  • you’ve got access to a role that can read the secret
  • you’ve created the secret already, in a kv.v2 secret engine

You can then apply that to the pipeline preparation steps like so:

Pipeline preparation using Hashicorp Vault and hvac python library

Automating execution with Composer (Managed Airflow)

Composer allows you to install extra packages. I’ve got the following:

google-cloud
google-cloud-bigquery
google-cloud-error-reporting
google-cloud-core==1.3.0
apache-beam==2.19.0
apache_beam[gcp]
google-cloud-storage>=1.27.0
google-resumable-media==0.5.0
google-cloud-kms>=1.3.0
hvac

I’ve failed to modify the environment for a change many times, due to permission restraints, modification of composer’s GKE node pools, and API enablement. In short, changing the environment causes a job to be launched in the cluster, which must go on a node labelled with default-pool (I modified our cluster to add a new pool to increase capacity, which later caused the upgrade to fail). The upgrade kicks off a google cloud build job, which also needs GCS, and basically there are about 50 IAM roles between “let me just click this” and “it’s working, at last!”

Idempotence

But when I deployed the dataflow using the packaged DataflowPythonOperator (https://airflow.apache.org/docs/stable/_api/airflow/contrib/operators/dataflow_operator/index.html#airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator) I hit a snag.

The operator suffixes the job name with a random idea. Good idea, incase there are several pipelines with the same job name trying to run in airflow. But then the task waits for completion. However the flexible resource scheduling option means I could be waiting for hours for this task to complete. If the task is somehow terminated, it should be retried (we try to make everything retry, to make our systems more automated). But this would give 2 job names. Our cost saving efforts would go out the window, and reliablity would be poor.

So I needed to split the operator apart into 2 things, an operator to start the task, and a sensor. You can view the modified plugin (and a few others) here: https://github.com/Mark-McCracken/airflow_plugins

Building the DAG

DAG to run job

A few points to note:

  • We don’t use catchup, as we don’t have any value in backdating old attributes.
  • The job completion task uses xcoms to pull the job_name. Although XComs are normally advised against, (“if two operators need to share information, like a filename or small amount of data, you should consider combining them into a single operator. If it absolutely can’t be avoided, [xcoms exist]” ), in this case, it absolutely can’t be avoided.
  • We use a lot of jinja templates so we can easily copy DAGs across airflow environments. We set the variables in the airflow environment. Note that these are NOT secrets, just configuration. Configuration should be easy to get, secrets should not be left lying around, and should be guarded.

The equivalent options using KMS for secrets would look like so:

Start dataflow options using google KMS

Uploading this to your DAGs folder (hopefully via a continuous delivery pipeline) should have the DAG show up and run in your environment.

Cost

Resource metrics for dataflow job

And the pricing structure:

Price structure for dataflow

vCPU = $0.0354*8.019 = $0.284

Memory: $0.0025032 * 30.073 = $0.075

Standard Persistent Disk: $0.000054 * 100.242 = $0.005

Data processed: $0.011 * 12.33 = $0.136 (although I was billed only 50%, the first 5TB is half price to encourage usage).

Totalling $0.50. Not bad for 87M records. Don’t think anyone from finance will complain!

Conclusion

I was quite surprised at the internals of the Dataflow Operator, it doesn’t call an API, but uses a command line subprocess. I found it helped my understanding of airflow a lot to dig into the internal code for these operators, and made me more confident to create plugins for any other common needs.

The above examples allow for scalable processing of hundreds of millions of records, in just 15 minutes, using proper security, with data processing insight, for under a dollar in cloud costs.

If you have any feedback or improvements, I’d love to hear!