Image for post
Image for post

In my previous post, I explained how to stream data from Salesforce to PubSub in real-time. The next logical step would be to store the data somewhere, right?

One option could be, for example, to batch the data, and write them to files in GCS. That’s a good start and guarantee that we won’t lose anything but we won’t have an easy way to query the data. So In addition to GCS, we can insert the data to BigQuery.

So when we talk about PubSub and real-time pipelines Dataflow is our friend. We can build a simple pipeline to read…

Image for post
Image for post


The working implementation is on Third Attempt


A couple of months ago when we started building our data lake, one of the requirements was to try and get real-time data in. Most of our external and internal sources supported that, I had a bit more knowledge on Salesforce so I started researching how we can stream data from Salesforce in real-time.

Luckily I stumbled upon Salesforce Change Data Capture. At a first glance, it looks good, great, I thought, this is exactly what we need. So I started building a POC.

And thus, started most of my problems. Since our…

Image for post
Image for post

In our adventures trying to build a data lake, we are using dynamically generated spark cluster to ingest some data from MongoDB, our production database, to BigQuery. In order to do that, we use PySpark data frames and since mongo doesn’t have schemas, we try to infer the schema from the data.

collection_schema =“mongo”) \ 
.option(“database”, db) \
.option(“collection”, coll) \
.option(‘sampleSize’, 50000) \
.load() \
ingest_df =“mongo”) \
.option(“database”, db) \
.option(“collection”, coll) \ .load(schema=fix_spark_schema(collection_schema))

Our fix_spark_schema method just converts NullType columns to String.

In the users collection, we have the groups field, which is an…

Image for post
Image for post

Ok, so, we’ve written our Dataflow Template with Python, now what? We want to schedule it to run daily and we’re going to use Airflow for that.

The first thing we want, for security reasons, is to keep service accounts separate. In the previous post, we’ve created a service account in order to generate the template and run the jobs. Now we need a new service account in order to trigger new dataflow jobs.
The new service account needs to have Dataflow Developer and Cloud Dataflow Service Agent permissions. We’ll call this service account dataflow_python_runner.

Image for post
Image for post

In this article, we will try to transform a JSON file into a CSV file using dataflow and python

First, we’ll need a service account, give it the “Dataflow Worker” role and don’t forget to export it as a JSON at the end so we can use it later.

Alex Fragotsis

Data Engineer @ League Inc.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store