Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14546] Fix errant pass for empty collections in Count #17813

Merged
merged 6 commits into from
Jun 3, 2022

Conversation

jrmccluskey
Copy link
Contributor

@jrmccluskey jrmccluskey commented Jun 2, 2022

Adds a NonEmpty() passert utility to check that a given PCollection has at least one element, then uses it to ensure that a collection passed to Count() with a non-zero expected number of elements has at least one element to avoid an erroneous passing test.

Discovered and fixed an instance of erroneous passing with the synthetic steps tests, and applies the same fix to passert.Sum.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@github-actions github-actions bot added the go label Jun 2, 2022
@codecov
Copy link

codecov bot commented Jun 2, 2022

Codecov Report

Merging #17813 (b3955fc) into master (999bcea) will increase coverage by 0.02%.
The diff coverage is 86.20%.

@@            Coverage Diff             @@
##           master   #17813      +/-   ##
==========================================
+ Coverage   74.09%   74.11%   +0.02%     
==========================================
  Files         697      697              
  Lines       91980    92036      +56     
==========================================
+ Hits        68148    68212      +64     
+ Misses      22583    22574       -9     
- Partials     1249     1250       +1     
Flag Coverage Δ
go 50.95% <86.20%> (+0.13%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/go/pkg/beam/io/synthetic/step.go 81.81% <71.42%> (ø)
sdks/go/pkg/beam/testing/passert/passert.shims.go 62.02% <82.35%> (+4.79%) ⬆️
sdks/go/pkg/beam/testing/passert/count.go 79.16% <100.00%> (+2.97%) ⬆️
sdks/go/pkg/beam/testing/passert/hash.go 28.00% <100.00%> (+28.00%) ⬆️
sdks/go/pkg/beam/testing/passert/passert.go 81.11% <100.00%> (+2.36%) ⬆️
sdks/go/pkg/beam/testing/passert/sum.go 100.00% <100.00%> (ø)
sdks/go/pkg/beam/core/runtime/exec/fn.go 69.55% <0.00%> (ø)
sdks/go/pkg/beam/core/runtime/exec/sdf.go 70.84% <0.00%> (+0.23%) ⬆️
sdks/go/pkg/beam/core/runtime/exec/pardo.go 59.43% <0.00%> (+0.32%) ⬆️
... and 1 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 999bcea...b3955fc. Read the comment docs.

@asf-ci
Copy link

asf-ci commented Jun 2, 2022

Can one of the admins verify this patch?

1 similar comment
@asf-ci
Copy link

asf-ci commented Jun 2, 2022

Can one of the admins verify this patch?

@github-actions
Copy link
Contributor

github-actions bot commented Jun 2, 2022

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @damccorm for label go.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

if err := ptest.Run(p); err != nil {
t.Errorf("Pipeline failed: %v", err)
}
})
}
}

func TestCount_Bad(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify and combine these tests into 1 by adding a expectErr variable to the tests struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, although I don't see too much value in bundling them into one suite. The setup isn't particularly long or complicated to test the function, so deduplicating it doesn't add much value IMO. Totally cool doing it if you feel strongly though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue its worth doing since more/duplicated code => more opportunities for bugs to slip in when updates are needed and more for a future developer (maybe us) to understand (code is a liability). I'm not going to block on it though, its not very important

@@ -30,6 +30,10 @@ func Count(s beam.Scope, col beam.PCollection, name string, count int) {
if typex.IsKV(col.Type()) {
col = beam.DropKey(s, col)
}

if count > 0 {
NonEmpty(s, col)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add the same thing to Hash and Sum? I think an empty pcollection would silently pass for both of those as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sum did need it, confirmed via unit test. Will add some short validation for Hash

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a small "fail on empty" check for Hash.

@damccorm
Copy link
Contributor

damccorm commented Jun 2, 2022

The portable failure looks legit as well:

11:01:07 2022/06/02 14:58:41 2022-06-02 14:58:41. (26): Traceback (most recent call last):
11:01:07   File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_GoPortable_Commit/src/sdks/python/apache_beam/runners/portability/local_job_service.py", line 275, in _run_job
11:01:07     self.result = self._invoke_runner()
11:01:07   File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_GoPortable_Commit/src/sdks/python/apache_beam/runners/portability/local_job_service.py", line 297, in _invoke_runner
11:01:07     return fn_runner.FnApiRunner(
11:01:07   File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_GoPortable_Commit/src/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 208, in run_via_runner_api
11:01:07     return self.run_stages(stage_context, stages)
11:01:07   File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_GoPortable_Commit/src/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 408, in run_stages
11:01:07     bundle_results = self._execute_bundle(
11:01:07   File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_GoPortable_Commit/src/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 736, in _execute_bundle
11:01:07     self._run_bundle(
11:01:07   File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_GoPortable_Commit/src/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 965, in _run_bundle
11:01:07     result, splits = bundle_manager.process_bundle(
11:01:07   File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_GoPortable_Commit/src/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1314, in process_bundle
11:01:07     raise RuntimeError(result.error)
11:01:07 RuntimeError: process bundle failed for instruction bundle_15 using plan 2 : while executing Process for Plan[2]:
11:01:07 2: ParDo[passert.nonEmptyFn] Out:[]
11:01:07 1: DataSource[S[passert.Count(out)/passert.NonEmpty/Impulse@localhost:38365], i0] Coder:W;c0_windowed<bytes;c0>!GWC Out:2
11:01:07 	caused by:
11:01:07 DoFn[UID:2, PID:passert.Count(out)/passert.NonEmpty/passert.nonEmptyFn, Name: github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert.nonEmptyFn] failed:
11:01:07 PCollection is empty, want non-empty collection
11:01:07 2022/06/02 14:58:41 2022-06-02 14:58:41. (27): Error running pipeline.
11:01:07 Traceback (most recent call last):
11:01:07   File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_GoPortable_Commit/src/sdks/python/apache_beam/runners/portability/local_job_service.py", line 275, in _run_job
11:01:07     self.result = self._invoke_runner()
11:01:07   File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_GoPortable_Commit/src/sdks/python/apache_beam/runners/portability/local_job_service.py", line 297, in _invoke_runner
11:01:07     return fn_runner.FnApiRunner(
11:01:07   File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_GoPortable_Commit/src/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 208, in run_via_runner_api
11:01:07     return self.run_stages(stage_context, stages)
11:01:07   File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_GoPortable_Commit/src/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 408, in run_stages
11:01:07     bundle_results = self._execute_bundle(
11:01:07   File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_GoPortable_Commit/src/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 736, in _execute_bundle
11:01:07     self._run_bundle(
11:01:07   File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_GoPortable_Commit/src/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 965, in _run_bundle
11:01:07     result, splits = bundle_manager.process_bundle(
11:01:07   File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_GoPortable_Commit/src/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1314, in process_bundle
11:01:07     raise RuntimeError(result.error)
11:01:07 RuntimeError: process bundle failed for instruction bundle_15 using plan 2 : while executing Process for Plan[2]:
11:01:07 2: ParDo[passert.nonEmptyFn] Out:[]
11:01:07 1: DataSource[S[passert.Count(out)/passert.NonEmpty/Impulse@localhost:38365], i0] Coder:W;c0_windowed<bytes;c0>!GWC Out:2
11:01:07 	caused by:
11:01:07 DoFn[UID:2, PID:passert.Count(out)/passert.NonEmpty/passert.nonEmptyFn, Name: github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert.nonEmptyFn] failed:
11:01:07 PCollection is empty, want non-empty collection
11:01:07 2022/06/02 14:58:41 Job state: FAILED
11:01:07     ptest.go:108: Failed to execute job: job go-testsimplepipeline-e0c742f9-4ac7-4f8f-9eff-73c54d6d8617 failed
11:01:07 --- FAIL: TestSimplePipeline (12.70s)

Not sure if the test is broken (and erroneously passing) or if something here is broken

@jrmccluskey
Copy link
Contributor Author

It's strange, from what I can tell the problem lies in the synthetic code since Count() gets used heavily in a number of places and we have both unit testing and the other integration tests passing. Let me see if that's reproducible on other runners

@jrmccluskey
Copy link
Contributor Author

Run Go Flink ValidatesRunner

@jrmccluskey
Copy link
Contributor Author

Run Go PostCommit

@github-actions github-actions bot added io and removed io labels Jun 2, 2022
@jrmccluskey
Copy link
Contributor Author

Found the problem, the synthetic StepCfg struct was not exported so the number of elements to emit in the synthetic tests was getting set to 0, producing empty PCollections

@jrmccluskey
Copy link
Contributor Author

Run Go Flink ValidatesRunner

if err := ptest.Run(p); err != nil {
t.Errorf("Pipeline failed: %v", err)
}
})
}
}

func TestCount_Bad(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue its worth doing since more/duplicated code => more opportunities for bugs to slip in when updates are needed and more for a future developer (maybe us) to understand (code is a liability). I'm not going to block on it though, its not very important

@github-actions github-actions bot added io and removed io labels Jun 2, 2022
@github-actions
Copy link
Contributor

github-actions bot commented Jun 2, 2022

R: @youngoli for final approval

@github-actions github-actions bot added io and removed io labels Jun 2, 2022
@jrmccluskey
Copy link
Contributor Author

R: @lostluck since they have context

@github-actions
Copy link
Contributor

github-actions bot commented Jun 2, 2022

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM Thanks!

@jrmccluskey
Copy link
Contributor Author

Run GoPortable PreCommit

@lostluck lostluck merged commit c072c36 into apache:master Jun 3, 2022
@jrmccluskey jrmccluskey deleted the theCount branch June 15, 2022 18:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants