Skip to content

Commit

Permalink
Merge branch 'main' into ct-1944-use_mashumaro_jsonschema
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank committed Aug 3, 2023
2 parents 00b7b54 + 991618d commit 21210bd
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 64 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Breaking Changes-20230725-171359.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Breaking Changes
body: Removed the FirstRunResultError and AfterFirstRunResultError event types, using
the existing RunResultError in their place.
time: 2023-07-25T17:13:59.441682-04:00
custom:
Author: peterallenwebb
Issue: "7963"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230802-141556.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix retry not working with log-file-max-bytes
time: 2023-08-02T14:15:56.306027-07:00
custom:
Author: ChenyuLInx
Issue: "8297"
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ jobs:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

integration-report:
name: integration test suite
name: Integration Test Suite
runs-on: ubuntu-latest
needs: integration
steps:
Expand Down
5 changes: 2 additions & 3 deletions core/dbt/cli/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def args_to_context(args: List[str]) -> Context:
if len(args) == 1 and "," in args[0]:
args = args[0].split(",")
sub_command_name, sub_command, args = cli.resolve_command(cli_ctx, args)

# Handle source and docs group.
if isinstance(sub_command, Group):
sub_command_name, sub_command, args = sub_command.resolve_command(cli_ctx, args)
Expand Down Expand Up @@ -319,7 +318,6 @@ def command_params(command: CliCommand, args_dict: Dict[str, Any]) -> CommandPar

for k, v in args_dict.items():
k = k.lower()

# if a "which" value exists in the args dict, it should match the command provided
if k == WHICH_KEY:
if v != command.value:
Expand All @@ -344,7 +342,8 @@ def add_fn(x):

if k == "macro" and command == CliCommand.RUN_OPERATION:
add_fn(v)
elif v in (None, False):
# None is a Singleton, False is a Flyweight, only one instance of each.
elif v is None or v is False:
add_fn(f"--no-{spinal_cased}")
elif v is True:
add_fn(f"--{spinal_cased}")
Expand Down
26 changes: 24 additions & 2 deletions core/dbt/events/eventmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from logging.handlers import RotatingFileHandler
import threading
import traceback
from typing import Any, Callable, List, Optional, TextIO
from typing import Any, Callable, List, Optional, TextIO, Protocol
from uuid import uuid4
from dbt.events.format import timestamp_to_datetime_string

Expand Down Expand Up @@ -206,7 +206,7 @@ def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None:
for callback in self.callbacks:
callback(msg)

def add_logger(self, config: LoggerConfig):
def add_logger(self, config: LoggerConfig) -> None:
logger = (
_JsonLogger(self, config)
if config.line_format == LineFormat.Json
Expand All @@ -218,3 +218,25 @@ def add_logger(self, config: LoggerConfig):
def flush(self):
for logger in self.loggers:
logger.flush()


class IEventManager(Protocol):
callbacks: List[Callable[[EventMsg], None]]
invocation_id: str

def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None:
...

def add_logger(self, config: LoggerConfig) -> None:
...


class TestEventManager(IEventManager):
def __init__(self):
self.event_history = []

def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None:
self.event_history.append((e, level))

def add_logger(self, config: LoggerConfig) -> None:
raise NotImplementedError()
9 changes: 7 additions & 2 deletions core/dbt/events/functions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dbt.constants import METADATA_ENV_PREFIX
from dbt.events.base_types import BaseEvent, EventLevel, EventMsg
from dbt.events.eventmgr import EventManager, LoggerConfig, LineFormat, NoFilter
from dbt.events.eventmgr import EventManager, LoggerConfig, LineFormat, NoFilter, IEventManager
from dbt.events.helpers import env_secrets, scrub_secrets
from dbt.events.types import Formatting, Note
from dbt.flags import get_flags, ENABLE_LEGACY_LOGGER
Expand Down Expand Up @@ -182,7 +182,7 @@ def cleanup_event_logger():
# Since dbt-rpc does not do its own log setup, and since some events can
# currently fire before logs can be configured by setup_event_logger(), we
# create a default configuration with default settings and no file output.
EVENT_MANAGER: EventManager = EventManager()
EVENT_MANAGER: IEventManager = EventManager()
EVENT_MANAGER.add_logger(
_get_logbook_log_config(False, True, False, False) # type: ignore
if ENABLE_LEGACY_LOGGER
Expand Down Expand Up @@ -295,3 +295,8 @@ def set_invocation_id() -> None:
# This is primarily for setting the invocation_id for separate
# commands in the dbt servers. It shouldn't be necessary for the CLI.
EVENT_MANAGER.invocation_id = str(uuid.uuid4())


def ctx_set_event_manager(event_manager: IEventManager):
global EVENT_MANAGER
EVENT_MANAGER = event_manager
20 changes: 1 addition & 19 deletions core/dbt/events/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2245,25 +2245,7 @@ message CheckNodeTestFailureMsg {
CheckNodeTestFailure data = 2;
}

// Z028
message FirstRunResultError {
string msg = 1;
}

message FirstRunResultErrorMsg {
EventInfo info = 1;
FirstRunResultError data = 2;
}

// Z029
message AfterFirstRunResultError {
string msg = 1;
}

message AfterFirstRunResultErrorMsg {
EventInfo info = 1;
AfterFirstRunResultError data = 2;
}
// Skipped Z028, Z029

// Z030
message EndOfRunSummary {
Expand Down
20 changes: 1 addition & 19 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2171,25 +2171,7 @@ def message(self) -> str:
return f" See test failures:\n {border}\n {msg}\n {border}"


# FirstRunResultError and AfterFirstRunResultError are just splitting the message from the result
# object into multiple log lines
# TODO: is this reallly needed? See printer.py


class FirstRunResultError(ErrorLevel):
def code(self):
return "Z028"

def message(self) -> str:
return yellow(self.msg)


class AfterFirstRunResultError(ErrorLevel):
def code(self):
return "Z029"

def message(self) -> str:
return self.msg
# Skipped Z028, Z029


class EndOfRunSummary(InfoLevel):
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ def __init__(self, thread_id, known: List):
self.thread_id = thread_id
self.known = known
super().__init__(
msg="connection never acquired for thread {self.thread_id}, have {self.known}"
msg=f"connection never acquired for thread {self.thread_id}, have {self.known}"
)


Expand Down
12 changes: 1 addition & 11 deletions core/dbt/task/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
RunResultErrorNoMessage,
SQLCompiledPath,
CheckNodeTestFailure,
FirstRunResultError,
AfterFirstRunResultError,
EndOfRunSummary,
)

Expand Down Expand Up @@ -118,15 +116,7 @@ def print_run_result_error(result, newline: bool = True, is_warning: bool = Fals
fire_event(CheckNodeTestFailure(relation_name=result.node.relation_name))

elif result.message is not None:
first = True
for line in result.message.split("\n"):
# TODO: why do we format like this? Is there a reason this needs to
# be split instead of sending it as a single log line?
if first:
fire_event(FirstRunResultError(msg=line))
first = False
else:
fire_event(AfterFirstRunResultError(msg=line))
fire_event(RunResultError(msg=result.message))


def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None:
Expand Down
3 changes: 1 addition & 2 deletions tests/functional/compile/test_compile.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ def test_default(self, project):
assert get_lines("first_model") == ["select 1 as fun"]
assert any("_test_compile as schema" in line for line in get_lines("second_model"))

@pytest.mark.skip("Investigate flaky test #7179")
def test_no_introspect(self, project):
with pytest.raises(DbtRuntimeError):
with pytest.raises(DbtRuntimeError, match="connection never acquired for thread"):
run_dbt(["compile", "--no-introspect"])


Expand Down
5 changes: 5 additions & 0 deletions tests/unit/test_cli_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,8 @@ def test_from_dict__which_fails(self):
args_dict = {"which": "some bad command"}
with pytest.raises(DbtInternalError, match=r"does not match value of which"):
self._create_flags_from_dict(Command.RUN, args_dict)

def test_from_dict_0_value(self):
args_dict = {"log_file_max_bytes": 0}
flags = Flags.from_dict(Command.RUN, args_dict)
assert flags.LOG_FILE_MAX_BYTES == 0
41 changes: 37 additions & 4 deletions tests/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re
from typing import TypeVar

from dbt.contracts.results import TimingInfo
from dbt.contracts.results import TimingInfo, RunResult, RunStatus
from dbt.events import AdapterLogger, types
from dbt.events.base_types import (
BaseEvent,
Expand All @@ -14,11 +14,15 @@
WarnLevel,
msg_from_base_event,
)
from dbt.events.functions import msg_to_dict, msg_to_json
from dbt.events.eventmgr import TestEventManager, EventManager
from dbt.events.functions import msg_to_dict, msg_to_json, ctx_set_event_manager
from dbt.events.helpers import get_json_string_utcnow
from dbt.events.types import RunResultError
from dbt.flags import set_from_args
from argparse import Namespace

from dbt.task.printer import print_run_result_error

set_from_args(Namespace(WARN_ERROR=False), None)


Expand Down Expand Up @@ -388,8 +392,6 @@ def test_event_codes(self):
types.RunResultErrorNoMessage(status=""),
types.SQLCompiledPath(path=""),
types.CheckNodeTestFailure(relation_name=""),
types.FirstRunResultError(msg=""),
types.AfterFirstRunResultError(msg=""),
types.EndOfRunSummary(num_errors=0, num_warnings=0, keyboard_interrupt=False),
types.LogSkipBecauseError(schema="", relation="", index=0, total=0),
types.EnsureGitInstalled(),
Expand Down Expand Up @@ -485,3 +487,34 @@ def test_bad_serialization():
str(excinfo.value)
== "[Note]: Unable to parse dict {'param_event_doesnt_have': 'This should break'}"
)


def test_single_run_error():

try:
# Add a recording event manager to the context, so we can test events.
event_mgr = TestEventManager()
ctx_set_event_manager(event_mgr)

error_result = RunResult(
status=RunStatus.Error,
timing=[],
thread_id="",
execution_time=0.0,
node=None,
adapter_response=dict(),
message="oh no!",
failures=[],
)

print_run_result_error(error_result)
events = [e for e in event_mgr.event_history if isinstance(e[0], RunResultError)]

assert len(events) == 1
assert events[0][0].msg == "oh no!"

finally:
# Set an empty event manager unconditionally on exit. This is an early
# attempt at unit testing events, and we need to think about how it
# could be done in a thread safe way in the long run.
ctx_set_event_manager(EventManager())

0 comments on commit 21210bd

Please sign in to comment.