Streaming Data to BigQuery with Dataflow and Updating the Schema in Real-Time

Alex Fragotsis
Inside League
Published in
3 min readDec 26, 2021

--

Robert Delaunay, “Relief-disques,” 1936.

In our previous story, we saw how to stream data to Big Query and also add new columns when needed. This solution though is not really real-time, I think we can do better.

Another approach I’ve seen discussed online, but haven’t found any code samples, is this. We enable streaming inserts to Big Query using Dataflow, if the new data contain new fields, the insert is going to fail, then we get all the failed rows, detect the schema, update the schema in BQ and then re-insert.

A really simple pipeline that streams data to Big Query looks like this:

def run(argv):
with beam.Pipeline(options=pipeline_options) as pipeline:
realtime_data = (
pipeline
| "Read PubSub Messages" >>
beam.io.ReadFromPubSub(
subscription=options.input_subscription,
with_attributes=True)
| f"Write to {options.bq_table}" >>
beam.io.WriteToBigQuery(
table=f"{options.bq_dataset}.{options.bq_table}",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)

Now, if the PubSub message contains some new fields that are missing from Big Query, the beam.io.WriteToBigQuery , according to the documentation, is going to emit the failed rows to

beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS

So all we have to do is just read them. Group them in a small window (I use 1 minute), just to catch any other messages that happen to come at the same time, and re-insert them to Big Query

(
realtime_data[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| f"Window" >>
GroupWindowsIntoBatches(window_size=options.bq_window_size)
| f"Failed Rows for {table}" >>
beam.ParDo(ModifyBadRows(options.bq_dataset, options.bq_table))
)

BUT!

before we start testing there are a few gotchas.
1. we need to change beam.io.WriteToBigQuery retry policy to Never.

beam.io.WriteToBigQuery(
table=f"{options.bq_dataset}.{options.bq_table}",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
insert_retry_strategy=beam.io.gcp.bigquery_tools.RetryStrategy.RETRY_NEVER
)

2. The default GroupWindowsIntoBatches we find in Google’s documentation doesn’t work. Messages coming from BigQueryWriteFn.FAILED_ROWS are not timestamped, so we need to timestamp them

import timeclass GroupWindowsIntoBatches(beam.PTransform):def __init__(self, window_size):
# Convert minutes into seconds.
self.window_size = int(window_size * 60)
def expand(self, pcoll):
return (pcoll
| 'Add Timestamps' >>
beam.Map(lambda x: beam.window.TimestampedValue(x, time.time()))
| "Window into Fixed Intervals" >>
beam.WindowInto(window.FixedWindows(self.window_size))
| "Groupby" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
)

Finally, to detect new schema we use the BigQuery Schema Generator, as the last time

class ModifyBadRows(beam.DoFn):

def __init__(self, bq_dataset, bq_table):
self.bq_dataset = bq_dataset
self.bq_table = bq_table

def start_bundle(self):
self.client = bigquery.Client()

def process(self, batch):
logging.info(f"Got {len(batch)} bad rows")
table_id = f"{self.bq_dataset}.{self.bq_table}"

generator = SchemaGenerator(input_format='dict',
quoted_values_are_strings=True)

# Get original schema to assist the deduce_schema function.
# If the table doesn't exist
# proceed with empty original_schema_map
try:
table_file_name =f"original_schema_{self.bq_table}.json"
table = self.client.get_table(table_id)
self.client.schema_to_json(table.schema, table_file_name)
original_schema_map =
read_existing_schema_from_file(table_file_name)
except Exception:
logging.info(f"{table_id} table not exists. Proceed without getting schema")
original_schema_map = {}

# generate the new schema
schema_map, error_logs = generator.deduce_schema(
input_data=batch,
schema_map=original_schema_map)
schema = generator.flatten_schema(schema_map)

job_config = bigquery.LoadJobConfig(
source_format=
bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
schema_update_options=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
],
write_disposition=
bigquery.WriteDisposition.WRITE_APPEND,
schema=schema
)

try:
load_job = self.client.load_table_from_json(
batch,
table_id,
job_config=job_config,
) # Make an API request.

load_job.result() # Waits for the job to complete.
if load_job.errors:
logging.info(f"error_result = {load_job.error_result}")
logging.info(f"errors = {load_job.errors}")
else:
logging.info(f'Loaded {len(batch)} rows.')

except Exception as error:
logging.info(f'Error: {error} with loading dataframe')

And that’s it! Now our pipeline will stream the data to Big Query in real-time, and if we get a message that contains a field that we don’t have a column in Big Query:

  • that insertion will fail,
  • we’ll gather all failed rows and group them in a 1-minute window
  • our pipeline will automatically detect the new schema
  • update Big Query and
  • re-insert the failed rows

and all that without stopping the pipeline at all, messages that will arrive after the failed one will get inserted to Big Query.

You can find the full code here:
https://github.com/aFrag/PubsubToBigQuery/blob/main/StreamingPubsubToBQ.py

--

--