Skip to content

Commit

Permalink
[BEAM-5492] Python Dataflow integration tests should export the pipel…
Browse files Browse the repository at this point in the history
…ine console output to Jenkins Test Result section (apache#17530)
  • Loading branch information
andoni-guzman authored May 11, 2022
1 parent df67c81 commit a181053
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
9 changes: 7 additions & 2 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1640,7 +1640,10 @@ def wait_until_finish(self, duration=None):
if not self.is_in_terminal_state():
if not self.has_job:
raise IOError('Failed to get the Dataflow job id.')

consoleUrl = (
"Console URL: https://console.cloud.google.com/"
f"dataflow/jobs/<RegionId>/{self.job_id()}"
"?project=<ProjectId>")
thread = threading.Thread(
target=DataflowRunner.poll_for_job_completion,
args=(self._runner, self, duration))
Expand All @@ -1657,13 +1660,15 @@ def wait_until_finish(self, duration=None):
# is_in_terminal_state.
terminated = self.is_in_terminal_state()
assert duration or terminated, (
'Job did not reach to a terminal state after waiting indefinitely.')
'Job did not reach to a terminal state after waiting indefinitely. '
'{}'.format(consoleUrl))

# TODO(BEAM-14291): Also run this check if wait_until_finish was called
# after the pipeline completed.
if terminated and self.state != PipelineState.DONE:
# TODO(BEAM-1290): Consider converting this to an error log based on
# theresolution of the issue.
_LOGGER.error(consoleUrl)
raise DataflowRuntimeException(
'Dataflow pipeline failed. State: %s, Error:\n%s' %
(self.state, getattr(self._runner, 'last_error_msg', None)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def run_pipeline(self, pipeline, options):
# TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
# in some cases.
print('Worker logs: %s' % self.build_console_url(options))
_LOGGER.info('Console log: ')
_LOGGER.info(self.build_console_url(options))

try:
self.wait_until_in_state(PipelineState.RUNNING)
Expand Down Expand Up @@ -84,7 +86,11 @@ def build_console_url(self, options):

def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT):
"""Wait until Dataflow pipeline enters a certain state."""
consoleUrl = (
"Console URL: https://console.cloud.google.com/dataflow/"
f"<regionId>/{self.result.job_id()}?project=<projectId>")
if not self.result.has_job:
_LOGGER.error(consoleUrl)
raise IOError('Failed to get the Dataflow job id.')

start_time = time.time()
Expand All @@ -93,7 +99,7 @@ def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT):
if self.result.is_in_terminal_state() or job_state == expected_state:
return job_state
time.sleep(5)

_LOGGER.error(consoleUrl)
raise RuntimeError(
'Timeout after %d seconds while waiting for job %s '
'enters expected state %s. Current state is %s.' %
Expand Down

0 comments on commit a181053

Please sign in to comment.