Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14383] Improve "FailedRows" errors returned by beam.io.WriteToBigQuery #17517

Merged
merged 5 commits into from
May 6, 2022

Conversation

Firlej
Copy link
Contributor

@Firlej Firlej commented Apr 30, 2022

WriteToBigQuery pipeline returns errors when trying to insert rows that do not match the BigQuery table schema. errors is a dictionary that cointains one FailedRows key. FailedRows is a list of tuples where each tuple has two elements: BigQuery table name and the row that didn't match the schema.

This can be verified by running the BigQueryIO deadletter pattern https://beam.apache.org/documentation/patterns/bigqueryio/

Using the template approach I can print the failed rows in a pipeline. When running the job, logger simultaneously prints out the reason why the rows were invalid.

The reason (for why the row is invalid) should also be included in the tuple in addition to the BigQuery table and the raw row. This way next pipeline could eg. insert the invalid rows into a different BigQuery table with a schema.

error_schema = (
    {
        'fields': [
            {'name': 'timestamp', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'},
            {'name': 'table', 'type': 'STRING', 'mode': 'REQUIRED'},
            {'name': 'reason', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'row_json', 'type': 'STRING', 'mode': 'REQUIRED'},
        ]
    }
)

The whole pipeline implementation could look something like this

with beam.Pipeline(options=pipeline_options) as p:

    errors = (
        p
        | "Read from Pub/Sub subscription" >> beam.io.gcp.pubsub.ReadFromPubSub(
            subscription=known_args.input_subscription,
            timestamp_attribute=None)
        | "UTF-8 bytes to string" >> beam.Map(lambda msg: msg.decode("utf-8"))
        | "Parse JSON messages" >> beam.Map(json.loads)
        | "WriteToBigQuery" >> beam.io.WriteToBigQuery(
            known_args.output_table,
            schema=schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            ignore_unknown_columns=False,
            insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
        )
    )
    
    result = (
        errors["FailedRows"]
        | 'ParseErrors' >> beam.Map(lambda err: {
                "timestamp": time.time_ns() / 1000000000,
                "table": err[0],
                "reason": None, # TODO to be replaced with `err[2]`
                "row_json": json.dumps(err[1]),
            })
        | "WriteErrorsToBigQuery" >> beam.io.WriteToBigQuery(
            known_args.output_table + "_error_records",
            schema=error_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            ignore_unknown_columns=False,
            insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
        )
    )

During my reasearch I found a couple of alternate solutions, but they are more complex than they need to be. Thats why I explored the beam source code and found the solution to be an easy and simple change.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@asf-ci
Copy link

asf-ci commented Apr 30, 2022

Can one of the admins verify this patch?

1 similar comment
@asf-ci
Copy link

asf-ci commented Apr 30, 2022

Can one of the admins verify this patch?

@Firlej
Copy link
Contributor Author

Firlej commented Apr 30, 2022

R: @jrmccluskey

@codecov
Copy link

codecov bot commented Apr 30, 2022

Codecov Report

Merging #17517 (c1443d7) into master (83a8855) will increase coverage by 0.07%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##           master   #17517      +/-   ##
==========================================
+ Coverage   73.83%   73.90%   +0.07%     
==========================================
  Files         690      691       +1     
  Lines       90830    91259     +429     
==========================================
+ Hits        67065    67446     +381     
- Misses      22556    22604      +48     
  Partials     1209     1209              
Flag Coverage Δ
python 83.68% <100.00%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/python/apache_beam/io/gcp/bigquery.py 63.87% <100.00%> (+0.10%) ⬆️
sdks/python/apache_beam/coders/row_coder.py 94.49% <0.00%> (-2.51%) ⬇️
sdks/python/apache_beam/runners/common.py 87.94% <0.00%> (-2.33%) ⬇️
sdks/python/apache_beam/io/gcp/gcsfilesystem.py 88.23% <0.00%> (-1.77%) ⬇️
sdks/python/apache_beam/dataframe/frame_base.py 89.54% <0.00%> (-0.83%) ⬇️
sdks/python/apache_beam/io/filesystem.py 88.76% <0.00%> (-0.65%) ⬇️
sdks/python/apache_beam/io/localfilesystem.py 90.97% <0.00%> (-0.50%) ⬇️
sdks/python/apache_beam/transforms/combiners.py 93.05% <0.00%> (-0.38%) ⬇️
sdks/python/apache_beam/io/gcp/gcsio.py 92.26% <0.00%> (-0.36%) ⬇️
...ks/python/apache_beam/runners/worker/sdk_worker.py 88.90% <0.00%> (-0.16%) ⬇️
... and 30 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 83a8855...c1443d7. Read the comment docs.

@jrmccluskey
Copy link
Contributor

R: @TheNeuralBit

@TheNeuralBit
Copy link
Member

@pabloem could you take a look at this Python BigQueryIO PR?

@Firlej would it be possible to add a test exercising this?

@TheNeuralBit
Copy link
Member

Run PythonLint PreCommit

@Firlej
Copy link
Contributor Author

Firlej commented May 4, 2022

@TheNeuralBit i've implemented a test based on the other tests in the sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py file.

@TheNeuralBit
Copy link
Member

Run Python 3.7 PostCommit

@pabloem
Copy link
Member

pabloem commented May 4, 2022

I haven't forgotten about this. It's in my queue : )

@TheNeuralBit
Copy link
Member

TheNeuralBit commented May 4, 2022

Thanks @Firlej! I triggers a CI check that should exercise this new test.

This looks good, I guess my only concern is this could be a breaking change for existing users, e.g. if they're unpacking the current result like destination, row = value. I think that's OK as long as we document this as a breaking change in CHANGES.md, but I think @chamikaramj or @pabloem should make that call.

@Firlej
Copy link
Contributor Author

Firlej commented May 5, 2022

@TheNeuralBit I updated CHANGES.md with a note describing the changes implemented here.

@pabloem
Copy link
Member

pabloem commented May 6, 2022

I'll agree with Brian, and say that we can accept this inl. It's onlybreaking one particular usage, and it adds to the functionality.

LGTM.

@pabloem pabloem merged commit 3587820 into apache:master May 6, 2022
@TheNeuralBit
Copy link
Member

It looks like we didn't get a green run on the Python PostComit before merging, the new test is failing at HEAD. I filed BEAM-14447 to track the failure. Could you take a look @Firlej?

If its not quick to diagnose and fix, we might just rollback this PR to preserve test signal. It's easy enough to roll it forward with a fix once we figure it out.

@pabloem
Copy link
Member

pabloem commented May 9, 2022 via email

@Firlej
Copy link
Contributor Author

Firlej commented May 9, 2022

I'm checking it RN

@Firlej
Copy link
Contributor Author

Firlej commented May 9, 2022

@pabloem I see you refactored and simplified the test. I guess it was needlessly complicated.

In hindsight I shouldn't have used a literal string. I don't understand why it threw a KeyError Exception though.

Additionally I see this line throws an error according to this testReport posted by @TheNeuralBit in BEAM-14447. The test exactly shows how this is a breaking change :D

@pabloem
Copy link
Member

pabloem commented May 9, 2022

ah makes sense. I'll fix that as well.

pabloem added a commit to pabloem/beam that referenced this pull request May 10, 2022
…iledRows" errors returned by beam.io.WriteToBigQuery"

This reverts commit 3587820.
pabloem added a commit that referenced this pull request May 10, 2022
…#17517 from [BEAM-14383] Improve "FailedRo…

[BEAM-14447] Revert "Merge pull request #17517 from [BEAM-14383] Improve "FailedRo…
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants