Schedule Dataflow Templates with Airflow

Alex Fragotsis
3 min readJul 4, 2020

--

Ok, so, we’ve written our Dataflow Template with Python, now what? We want to schedule it to run daily and we’re going to use Airflow for that.

The first thing we want, for security reasons, is to keep service accounts separate. In the previous post, we’ve created a service account in order to generate the template and run the jobs. Now we need a new service account in order to trigger new dataflow jobs.
The new service account needs to have Dataflow Developer and Cloud Dataflow Service Agent permissions. We’ll call this service account dataflow_python_runner.

Now that we have all service accounts in place, here’s how it works.

  • In the VM with airflow, we’ll set up the dataflow-python-runner as the default service account. This account should minimum access to our Google project. In this case, we want only to give the Dataflow Service Agent role, but because airflow runs some extra commands, we also need to grant the Dataflow Developer role or create a new custom role granting only the specific permissions we want.
  • In airflow with the serviceAccountEmail field, we tell dataflow, to run this job as dataflow-python-test. This service account should have permissions to fully run a dataflow job, as well as any other access permissions e.g. to buckets.

So dataflow-python-runner is the service account that triggers the job, and dataflow-python-test is the service account that runs the job.

If for example, you have data into a non-public bucket that this job needs to access, it’s the dataflow-python-test account that needs the read permissions and not the runner.

Now on the airflow code, we just use the DataflowTemplateOperator to trigger the template we already have.

job = DataflowTemplateOperator(
task_id='alex_test_job',
template="gs://FULL_PATH_TO_TEMPLATE",
job_name='alex_job_test_from_airflow',
dataflow_default_options={
"project": "MY_PROJECT_ID",
"stagingLocation": "gs://FULL_PATH_TO_STAGING_FOLDER",
"tempLocation": "gs://FULL_PATH_TO_TEMP_FOLDER",
"serviceAccountEmail": "dataflow-python-runner@{project_id}.iam.gserviceaccount.com"
)

And the airflow interface

Now that the task is ready to run, we can run a test command on airflow

airflow test dataflow_jobs alex_test_job 2020–07–04

And we can see in Google Console in the Dataflow Jobs page if we set up all the variables correctly. If you want to be sure about the permissions, try removing/adding some permissions from the service accounts and see what are the errors.

Also if you click on the job, on the bottom left you can see the pipeline options. All the fields we set on the airflow task in the dataflow_default_options field should be there. If you misspell a field it won’t show up here.

--

--