The past few weeks I developed and deployed a cloud function that is supposed to get called only by authorized users/service accounts and the truth is that the documentation I found wasn’t really helpful.

First I created a service account, gave it roles/cloudfunctions.invoker permission. If you’re dealing with an extracted service account the code is pretty simple

from google.oauth2 import service_account 
from google.auth.transport.requests import AuthorizedSession
keypath = '.../my_sa_key.json'base_url=''creds=service_account.IDTokenCredentials.from_service_account_file( keypath, target_audience=base_url)authed_session = AuthorizedSession(creds)# make authenticated request and print the response, status_code
resp = authed_session.get(base_url)

But in my case, the call to the cloud function is going to happen from within a google service (Dataflow) and I don’t have access to the service account file. So I tried to find how can I use target_audience with the default credentials. …

In my previous post, I explained how to stream data from Salesforce to PubSub in real-time. The next logical step would be to store the data somewhere, right?

One option could be, for example, to batch the data, and write them to files in GCS. That’s a good start and guarantee that we won’t lose anything but we won’t have an easy way to query the data. So In addition to GCS, we can insert the data to BigQuery.

So when we talk about PubSub and real-time pipelines Dataflow is our friend. We can build a simple pipeline to read…


The working implementation is on Third Attempt


A couple of months ago when we started building our data lake, one of the requirements was to try and get real-time data in. Most of our external and internal sources supported that, I had a bit more knowledge on Salesforce so I started researching how we can stream data from Salesforce in real-time.

Luckily I stumbled upon Salesforce Change Data Capture. At a first glance, it looks good, great, I thought, this is exactly what we need. So I started building a POC.

And thus, started most of my problems. Since our…

In our adventures trying to build a data lake, we are using dynamically generated spark cluster to ingest some data from MongoDB, our production database, to BigQuery. In order to do that, we use PySpark data frames and since mongo doesn’t have schemas, we try to infer the schema from the data.

collection_schema =“mongo”) \ 
.option(“database”, db) \
.option(“collection”, coll) \
.option(‘sampleSize’, 50000) \
.load() \
ingest_df =“mongo”) \
.option(“database”, db) \
.option(“collection”, coll) \ .load(schema=fix_spark_schema(collection_schema))

Our fix_spark_schema method just converts NullType columns to String.

In the users collection, we have the groups field, which is an…

Ok, so, we’ve written our Dataflow Template with Python, now what? We want to schedule it to run daily and we’re going to use Airflow for that.

The first thing we want, for security reasons, is to keep service accounts separate. In the previous post, we’ve created a service account in order to generate the template and run the jobs. Now we need a new service account in order to trigger new dataflow jobs.
The new service account needs to have Dataflow Developer and Cloud Dataflow Service Agent permissions. We’ll call this service account dataflow_python_runner.

In this article, we will try to transform a JSON file into a CSV file using dataflow and python

First, we’ll need a service account, give it the “Dataflow Worker” role and don’t forget to export it as a JSON at the end so we can use it later.

Alex Fragotsis

Data Engineer @ League Inc.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store