Loading complex JSON files in RealTime to BigQuery from PubSub using Dataflow and updating the schema

Alex Fragotsis
Inside League
Published in
6 min readJan 17, 2021

--

In my previous post, I explained how to stream data from Salesforce to PubSub in real-time. The next logical step would be to store the data somewhere, right?

One option could be, for example, to batch the data, and write them to files in GCS. That’s a good start and guarantee that we won’t lose anything but we won’t have an easy way to query the data. So In addition to GCS, we can insert the data to BigQuery.

So when we talk about PubSub and real-time pipelines Dataflow is our friend. We can build a simple pipeline to read data from PubSub and write them to Big Query and to GCS simultaneously. Something like that

So let’s dive into the code.

The pipeline

def run(argv):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, streaming=True)
options = pipeline_options.view_as(JobOptions)

with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=options.input_subscription)
| "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
| f"Window into: {options.window_size}m" >> GroupWindowsIntoBatches(options.window_size)
| "Export Raw Data to GCS" >> beam.ParDo(WriteToGCS(output_path=options.output_path, output_file_prefix=options.output_file_prefix))
| "Write Raw Data to Big Query" >> beam.ParDo(WriteDataframeToBQ(project_id=options.project_id, bq_dataset=options.bigquery_dataset, bq_table=options.bigquery_table))
)

and the step that writes to Big Query

class WriteDataframeToBQ(beam.DoFn):

def __init__(self, bq_dataset, bq_table, project_id):
self.bq_dataset = bq_dataset
self.bq_table = bq_table
self.project_id = project_id

def start_bundle(self):
self.client = bigquery.Client()

def process(self, df):
# table where we're going to store the data
table_id = f"{self.bq_dataset}.{self.bq_table}"

job_config = bigquery.LoadJobConfig(
autodetect=True,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
schema_update_options=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION
],
write_disposition=bigquery.WriteDisposition.WRITE_APPEND
)

try:
load_job = self.client.load_table_from_dataframe(
df,
table_id,
job_config=job_config,
) # Make an API request.

load_job.result() # Waits for the job to complete.
if load_job.errors:
logging.info(f"result={load_job.error_result}")
logging.info(f"errors = {load_job.errors}")
else:
logging.info(f'Loaded {len(df)} rows.')

except Exception as error:
logging.info(f'Error: {error} with loading dataframe')

if load_job.errors:
logging.info(f"result={load_job.error_result}")
logging.info(f"errors={load_job.errors}")

Now, the data that we get from salesforce are in a pretty straight forward schema. We don’t have any nested fields, everything is on one level and the data we have are pretty simple.

{
"id": "2003c000005c4dFEQD",
"isdeleted": false,
"casenumber": "01286593",
"contactid": "0012c000004IURN4T4",
"accountid": null,
"assetid": null,
"businesshoursid": "05m40000000UuckwABD",
"parentid": null,
"suppliedname": Alex,
"suppliedemail": alextest@test.ca,
"suppliedphone": "1234567890",
"suppliedcompany": "my Super Awesome Company",
"type": null,
"recordtypeid": "8594t0000006ITcA5T",
"status": "New",
"reason": null,
"origin": "Offline Chat Request",
"subject": "Offline Chat Request from: Alex Test",
"priority": "Medium",
"description": "testing offline flow",
"isclosed": false,
"closeddate": null,
"isescalated": false,
"ownerid": "4512c000000ORHTDA4",
"isclosedoncreate": false,
"createddate": "2020-10-23T21:13:51.000+0000",
"createdbyid": "4512c000000ORHTDA4",
"lastmodifieddate": "2020-10-23T21:13:54.000+0000",
"lastmodifiedbyid": "4512c000000ORHTDA4",
"systemmodstamp": "2020-10-23T21:13:54.000+0000",
"lastvieweddate": null,
"lastreferenceddate": null,
}

But if we try to use the same code for some more complicated data. Data that contain nested arrays, with objects and repeated fields, unfortunately, this isn’t going to work

The error I was getting when I tried to upload a more complicated data frame.

Generate BQ Schema

So as a “proper engineer”, I don’t want to sit down and write the whole schema by myself. I would prefer to find an automated way, which would also handle the case of new (nested) field additions without worrying that they’re going to break my pipeline.

So here’s where BigQuery Schema Generator comes into play. This package works really well and given a JSON object it can generate the BigQuery schema which we can use in thejob_config to fix the above error. Here’s how we can do it

from bigquery_schema_generator.generate_schema import SchemaGenerator
from google.cloud import bigquery
generator = SchemaGenerator(input_format='dict',
quoted_values_are_strings=True,
keep_nulls=True)

# convert dataframe to dict
df_dict = df.to_dict('records')
# generate the new schema
schema_map,errors = generator.deduce_schema(input_data=df_dict)
bq_schema = generator.flatten_schema(schema_map)

Now we can use bq_schema into thejob_config like this

job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
schema_update_options=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION
],
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
schema = bq_schema
)

Make it Better

This should work fine and we would be able to add new fields to our BigQuery table without any issues, but why don’t we go one step further?
We can help deduce_schema method to determine the schema by passing the original schema from the table.
So before every insert, we can get the schema from the table and use it as an input to help generate the bq schema. In that way, the schema generation process will start from the original schema, not from 0.

This will add some complexity to our code but I think eventually the generated schema would be more accurate, especially if we have missing and null fields in the messages we get from PubSub.

So first of all, we need to get the schema from the Big Query table

from google.cloud import bigqueryclient = bigquery.Client()generator = SchemaGenerator(input_format='dict', 
quoted_values_are_strings=True,
keep_nulls=True)
try:
table_file_name = f"original_schema_{table_id}.json"
table = client.get_table(table_id)
client.schema_to_json(table.schema, table_file_name)
original_schema_map =
read_existing_schema_from_file(table_file_name)
except Exception:
logging.info(f"{table_id} table not exists. Proceed without getting schema")
original_schema_map = {}

and now the only thing we need to change is to add the original_schema_map in the deduce schema at the code above

schema_map,errors = generator.deduce_schema(input_data=df_dict,
schema_map=original_schema_map)

so here’s the final version of our WriteDataframeToBQ

class WriteDataframeToBQ(beam.DoFn):def __init__(self, bq_dataset, bq_table, project_id):
self.bq_dataset = bq_dataset
self.bq_table = bq_table
self.project_id = project_id
def start_bundle(self):
self.client = bigquery.Client()
def process(self, df):
# table where we're going to store the data
table_id = f"{self.bq_dataset}.{self.bq_table}"
# function to help with the json -> bq schema transformations
generator = SchemaGenerator(input_format='dict', quoted_values_are_strings=True, keep_nulls=True)
# Get original schema to assist the deduce_schema function.
# If the table doesn't exist
# proceed with empty original_schema_map
try:
table_file_name = f"original_schema_{table_id}.json"
table = self.client.get_table(table_id)
self.client.schema_to_json(table.schema, table_file_name)
original_schema_map =
read_existing_schema_from_file(table_file_name)
except Exception:
logging.info(f"{table_id} table not exists. Proceed without getting schema")
original_schema_map = {}

# convert dataframe to dict
json_text = df.to_dict('records')
# generate the new schema, we need to write it to a file
# because schema_from_json only accepts json file as input
schema_map, error_logs = generator.deduce_schema(
input_data=json_text,
schema_map=original_schema_map)
bq_schema = generator.flatten_schema(schema_map)
job_config = bigquery.LoadJobConfig(
source_format= bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
schema_update_options=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION
],
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
schema=bq_schema
)
try:
load_job = self.client.load_table_from_json(
json_text,
table_id,
job_config=job_config,
) # Make an API request.
load_job.result() # Waits for the job to complete.
if load_job.errors:
logging.info(f"result={load_job.error_result}")
logging.info(f"errors={load_job.errors}")
else:
logging.info(f'Loaded {len(df)} rows.')
except Exception as error:
logging.info(f'Error: {error} with loading dataframe')
if load_job and load_job.errors:
logging.info(f"result={load_job.error_result}")
logging.info(f"errors={load_job.errors}")

and that’s it! Our Dataflow job can update the BigQuery schema as we get new fields!

You can find the full code here:
https://github.com/aFrag/PubsubToBigQuery

Final thoughts

keep_nulls: I’m using keep_nulls = True in the SchemaGenerator because some of my PubSub messages contain empty arrays. Trying to have an empty array in the BQ schema would end up in an error. Keep nulls will append a dummy field to the array and when the actual data comes, it will remove the dummy field and add the correct ones.
Dataframes: Instead of converting the data frame to a dict json_text = df.to_dict(‘records’) we can use the JSON directly. I’m not doing that because on the previous step I’m adding all the messages in a data frame in order to drop duplicates, keep the latest record, do some clean up etc.
Schema File: In the final implementation we export the schema to f”original_schema_{table_id}.json” file, I haven’t tested how this is going to work if at some point we have multiple dataflow workers trying simultaneously to update the schema and they have different versions of it.
Query BigQuery for the schema: In the above implementation, we are grouping the data in 10min windows and then inserting them into BigQuery. That’s why making an extra call to get the schema won’t add a substantial delay to our process. If we need a more real-time approach, then a proposed solution could be to try and write the incoming messages directly to BigQuery without getting the schema and if it fails, move all the failed messages to another step where we’re going to update the schema and then insert the failed data.

2021–05–18
I’ve updated the code because I was using the SchemaGenerator library wrong. I wasn’t aware of the read_existing_schema_from_file function and my code wasn’t generating the correct schema

References:

https://googleapis.dev/python/bigquery/latest/usage/pandas.html
https://pypi.org/project/bigquery-schema-generator/

--

--