-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PR-2 Plugin configuration from airflow.cfg #32
base: main
Are you sure you want to change the base?
Changes from all commits
610b67a
ac8a1d9
489fe0f
b9a0cd6
dd60f57
48260cd
017fcf3
9093841
1a4e363
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -123,3 +123,5 @@ dmypy.json | |
|
||
# Pyre type checker | ||
.pyre/ | ||
|
||
docker/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,12 +18,69 @@ | |
import threading | ||
|
||
from airflow.plugins_manager import AirflowPlugin | ||
from airflow.configuration import AirflowConfigParser | ||
from newrelic_telemetry_sdk import Harvester as _Harvester | ||
from newrelic_telemetry_sdk import MetricBatch, MetricClient | ||
|
||
ENV_SERVICE_NAME = "NEW_RELIC_SERVICE_NAME" | ||
ENV_INSERT_KEY = "NEW_RELIC_INSERT_KEY" | ||
ENV_HOST = "NEW_RELIC_HOST" | ||
|
||
PROP_HOST = "host" | ||
PROP_SERVICE_NAME = "service_name" | ||
PROP_INSERT_KEY = "insert_key" | ||
PROP_HARVESTER_INTERVAL = "harvester_interval" | ||
|
||
_logger = logging.getLogger(__name__) | ||
|
||
|
||
def get_config(): | ||
config_location = os.environ.get("AIRFLOW_HOME", "/opt/airflow") + "/airflow.cfg" | ||
|
||
nr_config = {} | ||
nr_dimensions = {} | ||
try: | ||
with open(config_location, mode="r") as file: | ||
airflow_config = file.read() | ||
|
||
airflow_config = AirflowConfigParser( | ||
default_config=airflow_config.encode("UTF-8").decode() | ||
) | ||
section = airflow_config.getsection("newrelic") | ||
if section is not None: | ||
nr_config = section | ||
|
||
section = airflow_config.getsection("newrelic.dimensions") | ||
if section is not None: | ||
nr_dimensions = section | ||
|
||
except Exception: | ||
_logger.warning( | ||
"Could not find airflow config at %s, using default from environment", | ||
config_location, | ||
) | ||
|
||
# Set default configs | ||
if PROP_INSERT_KEY not in nr_config and ENV_INSERT_KEY in os.environ: | ||
nr_config[PROP_INSERT_KEY] = os.environ.get(ENV_INSERT_KEY) | ||
|
||
if PROP_SERVICE_NAME not in nr_config: | ||
nr_config[PROP_SERVICE_NAME] = os.environ.get(ENV_SERVICE_NAME, "Airflow") | ||
|
||
if PROP_HOST not in nr_config: | ||
nr_config[PROP_HOST] = os.environ.get(ENV_HOST, None) | ||
|
||
if PROP_HARVESTER_INTERVAL not in nr_config: | ||
nr_config[PROP_HARVESTER_INTERVAL] = 5 | ||
|
||
nr_dimensions["service.name"] = nr_config[PROP_SERVICE_NAME] | ||
|
||
return nr_config, nr_dimensions | ||
|
||
|
||
config, dimensions = get_config() | ||
|
||
|
||
class Harvester(_Harvester): | ||
IMMEDIATE_FLUSH_PREFIXES = ("ti_", "dagrun.duration.") | ||
|
||
|
@@ -57,15 +114,14 @@ def harvester(cls): | |
if harvester: | ||
return harvester | ||
|
||
insert_key = os.environ["NEW_RELIC_INSERT_KEY"] | ||
host = os.environ.get("NEW_RELIC_HOST", None) | ||
client = MetricClient(insert_key, host=host) | ||
client = MetricClient(config[PROP_INSERT_KEY], host=config[PROP_HOST]) | ||
|
||
service_name = os.environ.get("NEW_RELIC_SERVICE_NAME", "Airflow") | ||
batch = MetricBatch({"service.name": service_name}) | ||
batch = MetricBatch(dimensions) | ||
_logger.info("PID: %d -- Using New Relic Stats Recorder", pid) | ||
|
||
harvester = cls._harvesters[pid] = Harvester(client, batch) | ||
harvester = cls._harvesters[pid] = Harvester( | ||
client, batch, harvest_interval=config[PROP_HARVESTER_INTERVAL] | ||
) | ||
harvester.start() | ||
|
||
atexit.register(harvester.stop) | ||
|
@@ -121,7 +177,7 @@ def validate(cls): | |
except ImportError: | ||
pass | ||
|
||
if "NEW_RELIC_INSERT_KEY" in os.environ and not cls.patched: | ||
if PROP_INSERT_KEY in config and not cls.patched: | ||
Comment on lines
-124
to
+180
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not equivalent if we swap config strategies. Will need to check both conditions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At this moment the config should already have the value from the environment variable (if not directly defined in airflow.cfg) |
||
cls.patched = True | ||
_logger.info("Using NewRelicStatsLogger") | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
[newrelic] | ||
host = metric-api.foo.newrelic.com | ||
service_name = name-from-file | ||
harvester_interval = 99 | ||
insert_key = my-secret-key | ||
|
||
[newrelic.dimensions] | ||
foo = bar | ||
new = relic | ||
some = thing |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
# Copyright 2019 New Relic, Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import os | ||
|
||
from newrelic_airflow_plugin.newrelic_plugin import ( | ||
ENV_HOST, | ||
ENV_INSERT_KEY, | ||
ENV_SERVICE_NAME, | ||
PROP_HARVESTER_INTERVAL, | ||
PROP_HOST, | ||
PROP_INSERT_KEY, | ||
PROP_SERVICE_NAME, | ||
get_config, | ||
) | ||
|
||
|
||
def test_env_configuration(): | ||
os.environ[ENV_SERVICE_NAME] = "test-name" | ||
os.environ[ENV_INSERT_KEY] = "foo-bar-baz" | ||
os.environ[ENV_HOST] = "metric-api.eu.newrelic.com" | ||
|
||
config, dimensions = get_config() | ||
assert config[PROP_SERVICE_NAME] == "test-name" | ||
assert config[PROP_HOST] == "metric-api.eu.newrelic.com" | ||
assert config[PROP_INSERT_KEY] == "foo-bar-baz" | ||
assert config[PROP_HARVESTER_INTERVAL] == 5 | ||
assert dimensions["service.name"] == "test-name" | ||
|
||
|
||
def test_file_configuration(): | ||
airflow_home = os.environ.get("AIRFLOW_HOME") | ||
|
||
os.environ[ENV_SERVICE_NAME] = "test-name" | ||
os.environ[ENV_INSERT_KEY] = "foo-bar-baz" | ||
os.environ[ENV_HOST] = "metric-api.eu.newrelic.com" | ||
os.environ["AIRFLOW_HOME"] = ( | ||
os.path.dirname(os.path.realpath(__file__)) + "/resources" | ||
) | ||
|
||
config, dimensions = get_config() | ||
assert config[PROP_SERVICE_NAME] == "name-from-file" | ||
assert config[PROP_HOST] == "metric-api.foo.newrelic.com" | ||
assert config[PROP_INSERT_KEY] == "my-secret-key" | ||
assert config[PROP_HARVESTER_INTERVAL] == 99 | ||
|
||
assert dimensions["foo"] == "bar" | ||
assert dimensions["some"] == "thing" | ||
assert dimensions["service.name"] == "name-from-file" | ||
|
||
os.environ["AIRFLOW_HOME"] = airflow_home |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a mistake. Why are we adding another dependency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HI @TimPansino, the email validator is a sub-dependency coming with apache-airflow. Unfortunately, not pinned to a version compatible with Python2.7. I thought it would be better to pin that particular dependency instead of apache-airflow.