Loading complex JSON files in RealTime to BigQuery from PubSub using Dataflow and updating the schema
In my previous post, I explained how to stream data from Salesforce to PubSub in real-time. The next logical step would be to store the data somewhere, right?
One option could be, for example, to batch the data, and write them to files in GCS. That’s a good start and guarantee that we won’t lose anything but we won’t have an easy way to query the data. So In addition to GCS, we can insert the data to BigQuery.
So when we talk about PubSub and real-time pipelines Dataflow is our friend. We can build a simple pipeline to read data from PubSub and write them to Big Query and to GCS simultaneously. Something like that
So let’s dive into the code.
The pipeline
def run(argv):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, streaming=True)
options = pipeline_options.view_as(JobOptions)
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=options.input_subscription)
| "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
| f"Window into: {options.window_size}m" >> GroupWindowsIntoBatches(options.window_size)
| "Export Raw Data to GCS" >> beam.ParDo(WriteToGCS(output_path=options.output_path, output_file_prefix=options.output_file_prefix))
| "Write Raw Data to Big Query" >> beam.ParDo(WriteDataframeToBQ(project_id=options.project_id, bq_dataset=options.bigquery_dataset, bq_table=options.bigquery_table))
)
and the step that writes to Big Query
class WriteDataframeToBQ(beam.DoFn):
def __init__(self, bq_dataset, bq_table, project_id):
self.bq_dataset = bq_dataset
self.bq_table = bq_table
self.project_id = project_id
def start_bundle(self):
self.client = bigquery.Client()
def process(self, df):
# table where we're going to store the data
table_id = f"{self.bq_dataset}.{self.bq_table}"
job_config = bigquery.LoadJobConfig(
autodetect=True,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
schema_update_options=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION
],
write_disposition=bigquery.WriteDisposition.WRITE_APPEND
)
try:
load_job = self.client.load_table_from_dataframe(
df,
table_id,
job_config=job_config,
) # Make an API request.
load_job.result() # Waits for the job to complete.
if load_job.errors:
logging.info(f"result={load_job.error_result}")
logging.info(f"errors = {load_job.errors}")
else:
logging.info(f'Loaded {len(df)} rows.')
except Exception as error:
logging.info(f'Error: {error} with loading dataframe')
if load_job.errors:
logging.info(f"result={load_job.error_result}")
logging.info(f"errors={load_job.errors}")
Now, the data that we get from salesforce are in a pretty straight forward schema. We don’t have any nested fields, everything is on one level and the data we have are pretty simple.
{
"id": "2003c000005c4dFEQD",
"isdeleted": false,
"casenumber": "01286593",
"contactid": "0012c000004IURN4T4",
"accountid": null,
"assetid": null,
"businesshoursid": "05m40000000UuckwABD",
"parentid": null,
"suppliedname": Alex,
"suppliedemail": alextest@test.ca,
"suppliedphone": "1234567890",
"suppliedcompany": "my Super Awesome Company",
"type": null,
"recordtypeid": "8594t0000006ITcA5T",
"status": "New",
"reason": null,
"origin": "Offline Chat Request",
"subject": "Offline Chat Request from: Alex Test",
"priority": "Medium",
"description": "testing offline flow",
"isclosed": false,
"closeddate": null,
"isescalated": false,
"ownerid": "4512c000000ORHTDA4",
"isclosedoncreate": false,
"createddate": "2020-10-23T21:13:51.000+0000",
"createdbyid": "4512c000000ORHTDA4",
"lastmodifieddate": "2020-10-23T21:13:54.000+0000",
"lastmodifiedbyid": "4512c000000ORHTDA4",
"systemmodstamp": "2020-10-23T21:13:54.000+0000",
"lastvieweddate": null,
"lastreferenceddate": null,
}
But if we try to use the same code for some more complicated data. Data that contain nested arrays, with objects and repeated fields, unfortunately, this isn’t going to work
Generate BQ Schema
So as a “proper engineer”, I don’t want to sit down and write the whole schema by myself. I would prefer to find an automated way, which would also handle the case of new (nested) field additions without worrying that they’re going to break my pipeline.
So here’s where BigQuery Schema Generator comes into play. This package works really well and given a JSON object it can generate the BigQuery schema which we can use in thejob_config
to fix the above error. Here’s how we can do it
from bigquery_schema_generator.generate_schema import SchemaGenerator
from google.cloud import bigquerygenerator = SchemaGenerator(input_format='dict',
quoted_values_are_strings=True,
keep_nulls=True)
# convert dataframe to dict
df_dict = df.to_dict('records')# generate the new schema
schema_map,errors = generator.deduce_schema(input_data=df_dict)
bq_schema = generator.flatten_schema(schema_map)
Now we can use bq_schema into thejob_config
like this
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
schema_update_options=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION
],
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
schema = bq_schema
)
Make it Better
This should work fine and we would be able to add new fields to our BigQuery table without any issues, but why don’t we go one step further?
We can help deduce_schema
method to determine the schema by passing the original schema from the table.
So before every insert, we can get the schema from the table and use it as an input to help generate the bq schema. In that way, the schema generation process will start from the original schema, not from 0.
This will add some complexity to our code but I think eventually the generated schema would be more accurate, especially if we have missing and null fields in the messages we get from PubSub.
So first of all, we need to get the schema from the Big Query table
from google.cloud import bigqueryclient = bigquery.Client()generator = SchemaGenerator(input_format='dict',
quoted_values_are_strings=True,
keep_nulls=True)try:
table_file_name = f"original_schema_{table_id}.json"
table = client.get_table(table_id)
client.schema_to_json(table.schema, table_file_name)
original_schema_map =
read_existing_schema_from_file(table_file_name)
except Exception:
logging.info(f"{table_id} table not exists. Proceed without getting schema")
original_schema_map = {}
and now the only thing we need to change is to add the original_schema_map
in the deduce schema at the code above
schema_map,errors = generator.deduce_schema(input_data=df_dict,
schema_map=original_schema_map)
so here’s the final version of our WriteDataframeToBQ
class WriteDataframeToBQ(beam.DoFn):def __init__(self, bq_dataset, bq_table, project_id):
self.bq_dataset = bq_dataset
self.bq_table = bq_table
self.project_id = project_iddef start_bundle(self):
self.client = bigquery.Client()def process(self, df):
# table where we're going to store the data
table_id = f"{self.bq_dataset}.{self.bq_table}" # function to help with the json -> bq schema transformations
generator = SchemaGenerator(input_format='dict', quoted_values_are_strings=True, keep_nulls=True) # Get original schema to assist the deduce_schema function.
# If the table doesn't exist
# proceed with empty original_schema_map
try:
table_file_name = f"original_schema_{table_id}.json"
table = self.client.get_table(table_id)
self.client.schema_to_json(table.schema, table_file_name)
original_schema_map =
read_existing_schema_from_file(table_file_name)
except Exception:
logging.info(f"{table_id} table not exists. Proceed without getting schema")
original_schema_map = {}
# convert dataframe to dict
json_text = df.to_dict('records') # generate the new schema, we need to write it to a file
# because schema_from_json only accepts json file as input
schema_map, error_logs = generator.deduce_schema(
input_data=json_text,
schema_map=original_schema_map)
bq_schema = generator.flatten_schema(schema_map) job_config = bigquery.LoadJobConfig(
source_format= bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
schema_update_options=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION
],
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
schema=bq_schema
) try:
load_job = self.client.load_table_from_json(
json_text,
table_id,
job_config=job_config,
) # Make an API request. load_job.result() # Waits for the job to complete.
if load_job.errors:
logging.info(f"result={load_job.error_result}")
logging.info(f"errors={load_job.errors}")
else:
logging.info(f'Loaded {len(df)} rows.') except Exception as error:
logging.info(f'Error: {error} with loading dataframe') if load_job and load_job.errors:
logging.info(f"result={load_job.error_result}")
logging.info(f"errors={load_job.errors}")
and that’s it! Our Dataflow job can update the BigQuery schema as we get new fields!
You can find the full code here:
https://github.com/aFrag/PubsubToBigQuery
Final thoughts
keep_nulls: I’m using keep_nulls = True
in the SchemaGenerator because some of my PubSub messages contain empty arrays. Trying to have an empty array in the BQ schema would end up in an error. Keep nulls will append a dummy field to the array and when the actual data comes, it will remove the dummy field and add the correct ones.
Dataframes: Instead of converting the data frame to a dict json_text = df.to_dict(‘records’)
we can use the JSON directly. I’m not doing that because on the previous step I’m adding all the messages in a data frame in order to drop duplicates, keep the latest record, do some clean up etc.
Schema File: In the final implementation we export the schema to f”original_schema_{table_id}.json”
file, I haven’t tested how this is going to work if at some point we have multiple dataflow workers trying simultaneously to update the schema and they have different versions of it.
Query BigQuery for the schema: In the above implementation, we are grouping the data in 10min windows and then inserting them into BigQuery. That’s why making an extra call to get the schema won’t add a substantial delay to our process. If we need a more real-time approach, then a proposed solution could be to try and write the incoming messages directly to BigQuery without getting the schema and if it fails, move all the failed messages to another step where we’re going to update the schema and then insert the failed data.
2021–05–18
I’ve updated the code because I was using the SchemaGenerator library wrong. I wasn’t aware of the read_existing_schema_from_file
function and my code wasn’t generating the correct schema
References:
https://googleapis.dev/python/bigquery/latest/usage/pandas.html
https://pypi.org/project/bigquery-schema-generator/