Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-352] catch and retry malformed json #4982

Merged
merged 14 commits into from
Apr 5, 2022
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20220331-143923.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Catch more cases to retry package retrieval for deps pointing to the hub. Also start to cache the package requests.
time: 2022-03-31T14:39:23.952705-05:00
custom:
Author: emmyoop
Issue: "4849"
PR: "4982"
113 changes: 79 additions & 34 deletions core/dbt/clients/registry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import functools
import requests
from dbt.events.functions import fire_event
from dbt.events.types import RegistryProgressMakingGETRequest, RegistryProgressGETResponse
from dbt.events.types import (
RegistryProgressMakingGETRequest,
RegistryProgressGETResponse,
RegistryIndexProgressMakingGETRequest,
RegistryIndexProgressGETResponse,
)
from dbt.utils import memoized, _connection_exception_retry as connection_exception_retry
from dbt import deprecations
import os
Expand All @@ -12,55 +17,52 @@
DEFAULT_REGISTRY_BASE_URL = "https://hub.getdbt.com/"


def _get_url(url, registry_base_url=None):
def _get_url(name, registry_base_url=None):
if registry_base_url is None:
registry_base_url = DEFAULT_REGISTRY_BASE_URL
url = "api/v1/{}.json".format(name)

return "{}{}".format(registry_base_url, url)


def _get_with_retries(path, registry_base_url=None):
get_fn = functools.partial(_get, path, registry_base_url)
def _get_with_retries(package_name, registry_base_url=None):
get_fn = functools.partial(_get_cached, package_name, registry_base_url)
return connection_exception_retry(get_fn, 5)


def _get(path, registry_base_url=None):
url = _get_url(path, registry_base_url)
def _get(package_name, registry_base_url=None):
url = _get_url(package_name, registry_base_url)
fire_event(RegistryProgressMakingGETRequest(url=url))
resp = requests.get(url, timeout=30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this line throw at all?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But we're catching all exceptions from requests in the retry logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add a comment to that effect then.

fire_event(RegistryProgressGETResponse(url=url, resp_code=resp.status_code))
resp.raise_for_status()

# It is unexpected for the content of the response to be None so if it is, raising this error
# will cause this function to retry (if called within _get_with_retries) and hopefully get
# a response. This seems to happen when there's an issue with the Hub.
# The response should always be a dictionary. Anything else is unexpected, raise error.
# Raising this error will cause this function to retry (if called within _get_with_retries)
# and hopefully get a valid response. This seems to happen when there's an issue with the Hub.
# Since we control what we expect the HUB to return, this is safe.
# See https://github.com/dbt-labs/dbt-core/issues/4577
if resp.json() is None:
raise requests.exceptions.ContentDecodingError(
"Request error: The response is None", response=resp
)
return resp.json()


def index(registry_base_url=None):
return _get_with_retries("api/v1/index.json", registry_base_url)


index_cached = memoized(index)


def packages(registry_base_url=None):
return _get_with_retries("api/v1/packages.json", registry_base_url)
# and https://github.com/dbt-labs/dbt-core/issues/4849
response = resp.json()

if not isinstance(response, dict): # This will also catch Nonetype
error_msg = (
f"Request error: The response type of {type(response)} is not valid: {resp.text}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's put what we expected in this error message

raise requests.exceptions.ContentDecodingError(error_msg, response=resp)

def package(name, registry_base_url=None):
response = _get_with_retries("api/v1/{}.json".format(name), registry_base_url)
expected_keys = ["name", "versions"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we seem to only use this as a set, maybe we could just create it as a set using {} instead of []

expected_version_keys = ["name", "packages", "downloads"]
if not set(expected_keys).issubset(response) and not set(expected_version_keys).issubset(
response["versions"]
):
error_msg = f"Request error: Expected the response to contain keys {expected_keys} but one or more are missing: {resp.text}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] we can use set diff to discover and explicitly report these keys. There's only 3 keys right now so if we don't want to bother I'm fine with that too.

raise requests.exceptions.ContentDecodingError(error_msg, response=resp)

# Either redirectnamespace or redirectname in the JSON response indicate a redirect
# redirectnamespace redirects based on package ownership
# redirectname redirects based on package name
# Both can be present at the same time, or neither. Fails gracefully to old name

if ("redirectnamespace" in response) or ("redirectname" in response):

if ("redirectnamespace" in response) and response["redirectnamespace"] is not None:
Expand All @@ -74,15 +76,58 @@ def package(name, registry_base_url=None):
use_name = response["name"]

new_nwo = use_namespace + "/" + use_name
deprecations.warn("package-redirect", old_name=name, new_name=new_nwo)
deprecations.warn("package-redirect", old_name=package_name, new_name=new_nwo)

return response


_get_cached = memoized(_get)


def package(package_name, registry_base_url=None):
# returns a dictionary of metadata for all versions of a package
response = _get_with_retries(package_name, registry_base_url)
return response["versions"]


def package_version(package_name, version, registry_base_url=None):
# returns the metadata of a specific version of a package
response = package(package_name, registry_base_url)
return response[version]


def get_available_versions(package_name):
# returns a list of all available versions of a package
response = package(package_name)
return list(response)


def _get_index(registry_base_url=None):

url = _get_url("index", registry_base_url)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like these functions and how they abstract over retries. Can we add type hints to them?

fire_event(RegistryIndexProgressMakingGETRequest(url=url))
resp = requests.get(url, timeout=30)
fire_event(RegistryIndexProgressGETResponse(url=url, resp_code=resp.status_code))
resp.raise_for_status()

# The response should be a list. Anything else is unexpected, raise an error.
# Raising this error will cause this function to retry and hopefully get a valid response.

response = resp.json()

if not isinstance(response, list): # This will also catch Nonetype
error_msg = (
f"Request error: The response type of {type(response)} is not valid: {resp.text}"
)
raise requests.exceptions.ContentDecodingError(error_msg, response=resp)

return response


def package_version(name, version, registry_base_url=None):
return _get_with_retries("api/v1/{}/{}.json".format(name, version), registry_base_url)
def index(registry_base_url=None):
# this returns a list of all packages on the Hub
get_index_fn = functools.partial(_get_index, registry_base_url)
return connection_exception_retry(get_index_fn, 5)


def get_available_versions(name):
response = package(name)
return list(response["versions"])
index_cached = memoized(index)
23 changes: 23 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,25 @@ def message(self) -> str:
return f" Checked out at {self.end_sha}."


@dataclass
class RegistryIndexProgressMakingGETRequest(DebugLevel):
url: str
code: str = "M022"

def message(self) -> str:
return f"Making package index registry request: GET {self.url}"


@dataclass
class RegistryIndexProgressGETResponse(DebugLevel):
url: str
resp_code: int
code: str = "M023"

def message(self) -> str:
return f"Response from registry index: GET {self.url} {self.resp_code}"


@dataclass
class RegistryProgressMakingGETRequest(DebugLevel):
url: str
Expand Down Expand Up @@ -2422,6 +2441,10 @@ def message(self) -> str:
GitNothingToDo(sha="")
GitProgressUpdatedCheckoutRange(start_sha="", end_sha="")
GitProgressCheckedOutAt(end_sha="")
RegistryIndexProgressMakingGETRequest(url="")
RegistryIndexProgressGETResponse(url="", resp_code=1234)
RegistryProgressMakingGETRequest(url="")
RegistryProgressGETResponse(url="", resp_code=1234)
SystemErrorRetrievingModTime(path="")
SystemCouldNotWrite(path="", reason="", exc=Exception(""))
SystemExecutingCmd(cmd=[""])
Expand Down
2 changes: 2 additions & 0 deletions test/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ def MockNode():
PrintDebugStackTrace(),
MainReportArgs(args={}),
RegistryProgressMakingGETRequest(url=''),
RegistryIndexProgressMakingGETRequest(url=""),
RegistryIndexProgressGETResponse(url="", resp_code=1),
DepsUTD(),
PartialParsingNotEnabled(),
SQlRunnerException(exc=Exception('')),
Expand Down
1 change: 0 additions & 1 deletion tests/functional/permission/test_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def test_create_schema_permissions(
project,
):
# now it should work!
# breakpoint()
project.run_sql("grant create on database {} to noaccess".format(project.database))
project.run_sql(
'grant usage, create on schema "{}" to noaccess'.format(project.test_schema)
Expand Down