Airflow + Dataflow — scalable, secure and reliable data integration
Our workplace wants to integrate with a new CRM platform. One crucial task is getting our analysis into the CRM platform, for segmentation of users. However our business has tens of millions of users. What works fine on a single machine for a quick demo, won’t work at scale. We need to run this once a week, quickly, and keep costs down. As with everything, it needs to be reliable and secure by default.
We use a lot of google managed services, among them dataflow (managed apache beam) and composer (managed apache airflow). Generally, we’re pretty happy with them, however this took slightly longer than expected because of sparse documentation and examples, so I’m sharing my experience.
In order to execute this work quickly, it’s clear we need to parallelise this work. To summarise dataflow: Apache Beam is a framework for developing distributed data processing, and google offers a managed service called dataflow. Often people seem to regard this as a complex solution, but it’s effectively like cloud functions for distributed data processing — just provide your code, and it will run and scale the service for you. You need to write your pipeline in the context of the apache beam framework. The SDK is available in Java, Python and Go, with varying levels of support for more advanced features.
Previously running on one machine, there was a state involved, but it was easy to handle as there was only one machine. We’d fetch 5000 records, and once they were ingested into the CRM platform, fetch the next 5000. But at scale, we want to distribute that, so we need to be able to fetch lots of records, and batch them together somehow. I did some extra preprocessing to assign every record to a batch, of approximately no larger than 5000. My data, ready to export, looks something like this:
It lives in a table on it’s own, dedicated to this particular batch export. I used bigquery date sharded tables for this. Although they’re not recommended for querying purposes, I find they can be handy for data management with imports/exports.
Having this file locally is normally handy for running your pipeline on some dummy data, as actually starting the dataflow service on GCP takes about 3 minutes to spin up VMs and has a cost associated with it.
Separate the idea of the running pipeline from your code
Often when writing a dataflow, I’ve wanted to make it just slightly dynamic, meaning I can reuse the same pipeline repeatedly with different parameters. This allows other users to use your templates, from the Dataflow UI.
However, the guides for this are not immediately straightforward: https://cloud.google.com/dataflow/docs/guides/templates/creating-templates
I struggled with the differences between StaticValueProvider and RuntimeValueProvider (I believe the StaticValueProvider can be used to modify the shape of your DAG, and is available sooner in the execution). But once that’s understood, you can’t necessarily use these values wherever you want, as support is limited based on your language of choice. Our team does not have any java developers, so we used python, where only these IOs allow templated value providers:
This is a little inadequate for my needs. However, we can mentally separate the concept of the python file that defines the beam pipeline, and the actual running instance of that pipeline. The python file, is just a python file, and can do anything a python file can do, so long as at the end of that file, we define a pipeline.
To that end, take a look at the following pipeline, with a few points to note:
There are a few noteworthy points, regarding data format, imports, pipeline options and metrics.
The data format when reading a nested structure from bigquery does not produce a python dict, but instead flattens. I added a step (“Convert flat name to nested struct”) to undo this. BigQuery also forces a schema, whereas my CRM platform will accept a union type, so I added a step (“Extract correct value type”) to merge those into one property. In order for the provided beam transform, GroupByKey to work, the elements need to have a certain structure: (key, value), so I added a step (“Format for batching”) to prepare for this. After they’re in batches, the last step is using requests to format the request to send the batch to the CRM platform and process the response.
The imports at the top of the file are pretty sparse, just need the beam import (I added the arg parsing imports lower to keep it all together). However, looking in each of the steps, they have their own imports. This is important, it shows that the steps are quite separate from the python file that creates the pipeline. These imports need to be available to the dataflow workers — if you’ve got a special requirement, you’ll need to list a requirements file for dataflow to install on each worker before starting. You can use global imports if you use the argument save_main_session=True in the pipeline options, but this can cause difficulties with state.
Pipeline options specifies a class to read any extra options in from the command line, that are specific to your pipeline. In this case so far, just the input table.
Metrics is a useful way to get insight into your running pipeline. Our distribution metric for batch size recorded just over 87M records processed, and in the UI, looks like so:
Running the following would deploy the job to dataflow.
This has a few interesting flags. The streaming engine has been enabled. This means that google’s service does the intense work of shuffling data between nodes and storing state. Although this is a paid service, it means we can use smaller machines, with a lot less disk space, so the cost should balance out. Read more here: https://cloud.google.com/blog/products/data-analytics/introducing-cloud-dataflows-new-streaming-engine
FlexRS Goal has been set to Cost Optimised. Flexible resource scheduling allows for partial use of pre-emptible VMs, to reduce the cost of dataflow batch job worker VMs by 40%, but causes a delayed start by up to 6 hours. The delayed start however, causes an interesting problem later.
Finally a few points worth noting that I haven’t used this time, if you require connection over a private VPN to an on-premise network, this is possible too. You can specify the network and subnetwork to attach the VMs to, and specify the use of private IP addresses for workers (which could also be useful for NAT purposes). Lastly, the region specified here, does not have to be the region the VMs get deployed into, it is specifying where the cloud dataflow service will execute. You could put your VMs in a zone that is not in one of the supported regions (although for latency, they should ideally be close).
There does not seem to be a straightforward mechanism to inject secrets into dataflow. You shouldn’t hardcode them, but anything you pass as an input to dataflow will be available in the UI to see in plaintext, also not preferable.
Secrets should be kept separately to the codebase — a change in the secret should not mean a change is required in the codebase (see 12 factor app on config: https://12factor.net/config). So instead I’ve passed arguments to the python file that reference where to find the secret, this running python file is then able to get the value of that secret, and use it in the construction and submission of the job. Understanding that the python file and the job are separate, we can use google cloud KMS (key management service) to encrypt a secret manually and store it in GCS, and have the python file read from GCS and decrypt it using the key, or we could use hashicorp’s Vault, to fetch the secret. Which you use depends on your infrastructure setup, I’m personally using Vault, and can recommend it. You could use other secret management solutions too, but these are 2 common ones (Note that these require additional libraries installed on the airflow worker running the python file, not the dataflow workers running the pipeline).
Using KMS and GCS to store and retrieve secrets
This short python file demonstrates how to encrypt a variable using a google cloud KMS key, then store it as a file in a bucket, and do the reverse proces (this assumes you’ve already created the KMS key and have the appropriate permissions).
We can apply that to our pipeline preparation steps like so (Note: your airflow worker needs decrypt access to the key):
Using Hashicorp Vault to retrieve secrets
A few assumptions for the upcoming snippet:
- you’ve already got vault running
- you’re only going to run this on kubernetes
- you’ve added your kubernetes cluster as an auth mechanism
- you’ve got access to a role that can read the secret
- you’ve created the secret already, in a kv.v2 secret engine
You can then apply that to the pipeline preparation steps like so:
Automating execution with Composer (Managed Airflow)
Working with composer is sometimes delightful, and sometimes… less fun. There are some curious quirks around how it’s deployed and what happens when you try to make changes, but generally when it works well, it’s great. There are plenty of plugins offered, to perform common tasks working with GCS, BigQuery, Dataflow and much more.
Composer allows you to install extra packages. I’ve got the following:
I’ve failed to modify the environment for a change many times, due to permission restraints, modification of composer’s GKE node pools, and API enablement. In short, changing the environment causes a job to be launched in the cluster, which must go on a node labelled with default-pool (I modified our cluster to add a new pool to increase capacity, which later caused the upgrade to fail). The upgrade kicks off a google cloud build job, which also needs GCS, and basically there are about 50 IAM roles between “let me just click this” and “it’s working, at last!”
We strive to make our tasks idempotent, so normally if something needs re-running, or crashes, we don’t need to do any additional work, just hit re-run.
But when I deployed the dataflow using the packaged DataflowPythonOperator (https://airflow.apache.org/docs/stable/_api/airflow/contrib/operators/dataflow_operator/index.html#airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator) I hit a snag.
The operator suffixes the job name with a random idea. Good idea, incase there are several pipelines with the same job name trying to run in airflow. But then the task waits for completion. However the flexible resource scheduling option means I could be waiting for hours for this task to complete. If the task is somehow terminated, it should be retried (we try to make everything retry, to make our systems more automated). But this would give 2 job names. Our cost saving efforts would go out the window, and reliablity would be poor.
So I needed to split the operator apart into 2 things, an operator to start the task, and a sensor. You can view the modified plugin (and a few others) here: https://github.com/Mark-McCracken/airflow_plugins
Building the DAG
A few points to note:
- We don’t use catchup, as we don’t have any value in backdating old attributes.
- The job completion task uses xcoms to pull the job_name. Although XComs are normally advised against, (“if two operators need to share information, like a filename or small amount of data, you should consider combining them into a single operator. If it absolutely can’t be avoided, [xcoms exist]” ), in this case, it absolutely can’t be avoided.
- We use a lot of jinja templates so we can easily copy DAGs across airflow environments. We set the variables in the airflow environment. Note that these are NOT secrets, just configuration. Configuration should be easy to get, secrets should not be left lying around, and should be guarded.
The equivalent options using KMS for secrets would look like so:
Uploading this to your DAGs folder (hopefully via a continuous delivery pipeline) should have the DAG show up and run in your environment.
How much does this all cost? Take a look at the resource consumption from the dataflow UI:
And the pricing structure:
vCPU = $0.0354*8.019 = $0.284
Memory: $0.0025032 * 30.073 = $0.075
Standard Persistent Disk: $0.000054 * 100.242 = $0.005
Data processed: $0.011 * 12.33 = $0.136 (although I was billed only 50%, the first 5TB is half price to encourage usage).
Totalling $0.50. Not bad for 87M records. Don’t think anyone from finance will complain!
The documentation for beam and airflow can sometimes be touch and go, or not obvious until you have some hands on experience. And secrets management within beam was pretty difficult to find, but understanding the separation of Beam pipeline definition with a python file, and the running of the pipeline, was key to getting this working well.
I was quite surprised at the internals of the Dataflow Operator, it doesn’t call an API, but uses a command line subprocess. I found it helped my understanding of airflow a lot to dig into the internal code for these operators, and made me more confident to create plugins for any other common needs.
The above examples allow for scalable processing of hundreds of millions of records, in just 15 minutes, using proper security, with data processing insight, for under a dollar in cloud costs.
If you have any feedback or improvements, I’d love to hear!