Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

Commit

Permalink
PB-431 send participant state to influxdb (#300)
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert-Steiner authored and little-dude committed Feb 25, 2020
1 parent 4636525 commit 8cd3d71
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 75 deletions.
6 changes: 3 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@


@pytest.fixture()
def participant_metrics_sample():
def json_participant_metrics_sample():
"""Return a valid participant metric object."""
return json.dumps(
[
{
"measurement": "participant",
"time": 1234326435,
"time": 1582017483 * 1_000_000_000,
"tags": {"id": "127.0.0.1:1345"},
"fields": {"CPU_1": 90.8, "CPU_2": 90, "CPU_3": "23", "CPU_4": 0.00,},
},
{
"measurement": "participant",
"time": 3542626236,
"time": 1582017484 * 1_000_000_000,
"tags": {"id": "127.0.0.1:1345"},
"fields": {"CPU_1": 90.8, "CPU_2": 90, "CPU_3": "23", "CPU_4": 0.00,},
},
Expand Down
4 changes: 2 additions & 2 deletions tests/test_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def test_start_training_round_failed_precondition( # pylint: disable=unused-arg


@pytest.mark.integration
def test_end_training_round(coordinator_service, participant_metrics_sample):
def test_end_training_round(coordinator_service, json_participant_metrics_sample):
"""[summary]
.. todo:: Advance docstrings (https://xainag.atlassian.net/browse/XP-425)
Expand All @@ -369,7 +369,7 @@ def test_end_training_round(coordinator_service, participant_metrics_sample):
rendezvous(channel)
# call EndTrainingRound service method on coordinator
end_training_round(
channel, test_weights, number_samples, participant_metrics_sample
channel, test_weights, number_samples, json_participant_metrics_sample
)
# check local model received...

Expand Down
147 changes: 115 additions & 32 deletions tests/test_metric_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from influxdb import InfluxDBClient
import pytest
from xain_proto.fl.coordinator_pb2 import State

from xain_fl.config import MetricsConfig
from xain_fl.coordinator.metrics_store import MetricsStore, MetricsStoreError
Expand All @@ -29,101 +30,183 @@ def invalid_json_participant_metrics_sample():
)


@pytest.fixture()
def participant_metrics_sample():
"""Return a valid metric object."""
return {"state": State.FINISHED}


@mock.patch.object(InfluxDBClient, "write_points", return_value=True)
def test_valid_participant_metrics(
write_points_mock, participant_metrics_sample,
def test_write_received_participant_metrics(
write_points_mock, json_participant_metrics_sample,
): # pylint: disable=redefined-outer-name,unused-argument
"""Check that write_points does not raise an exception on a valid metric object."""
"""Test test_write_received_participant_metrics method."""
metric_store = MetricsStore(
MetricsConfig(enable=True, host="", port=1, user="", password="", db_name="")
)

metric_store.write_participant_metrics(participant_metrics_sample)
write_points_mock.assert_called_once()
metric_store.write_received_participant_metrics(json_participant_metrics_sample)
write_points_mock.assert_called_with(
[
{
"measurement": "participant",
"time": 1582017483 * 1_000_000_000,
"tags": {"id": "127.0.0.1:1345"},
"fields": {"CPU_1": 90.8, "CPU_2": 90, "CPU_3": "23", "CPU_4": 0.00,},
},
{
"measurement": "participant",
"time": 1582017484 * 1_000_000_000,
"tags": {"id": "127.0.0.1:1345"},
"fields": {"CPU_1": 90.8, "CPU_2": 90, "CPU_3": "23", "CPU_4": 0.00,},
},
]
)


@mock.patch.object(InfluxDBClient, "write_points", side_effect=Exception())
def test_write_points_exception_handling_write_participant_metrics(
write_points_mock, participant_metrics_sample,
def test_write_received_participant_metrics_write_points_exception(
write_points_mock, json_participant_metrics_sample,
): # pylint: disable=redefined-outer-name,unused-argument
"""Check that raised exceptions of the write_points method are caught in the
write_participant_metrics method."""
"""Check that raised exceptions of the write_points method are re-raised as MetricsStoreError in
the write_received_participant_metrics method."""

metric_store = MetricsStore(
MetricsConfig(enable=True, host="", port=1, user="", password="", db_name="")
)
with pytest.raises(MetricsStoreError):
metric_store.write_participant_metrics(participant_metrics_sample)
metric_store.write_received_participant_metrics(json_participant_metrics_sample)


@mock.patch.object(InfluxDBClient, "write_points", return_value=True)
def test_invalid_json_exception_handling(write_points_mock):
"""Check that raised exceptions of the write_points method are caught in the
write_participant_metrics method."""
def test_write_received_participant_metrics_invalid_json_exception(write_points_mock):
"""Check that raised exceptions of the write_points method are re-raised as MetricsStoreError in
the write_received_participant_metrics method."""
metric_store = MetricsStore(
MetricsConfig(enable=True, host="", port=1, user="", password="", db_name="")
)
with pytest.raises(MetricsStoreError):
metric_store.write_participant_metrics('{"a": 1')
metric_store.write_received_participant_metrics('{"a": 1')
write_points_mock.assert_not_called()

with pytest.raises(MetricsStoreError):
metric_store.write_participant_metrics("{1: 1}")
metric_store.write_received_participant_metrics("{1: 1}")
write_points_mock.assert_not_called()


@mock.patch.object(InfluxDBClient, "write_points", return_value=True)
def test_empty_metrics_exception_handling(
def test_write_received_participant_metrics_empty_metrics_exception(
write_points_mock, empty_json_participant_metrics_sample,
): # pylint: disable=redefined-outer-name,unused-argument
"""Check that raised exceptions of the write_points method are caught in the
write_participant_metrics method."""
"""Check that raised exceptions of the write_points method are re-raised as MetricsStoreError in
the write_received_participant_metrics method."""
metric_store = MetricsStore(
MetricsConfig(enable=True, host="", port=1, user="", password="", db_name="")
)

with pytest.raises(MetricsStoreError):
metric_store.write_participant_metrics(empty_json_participant_metrics_sample)
metric_store.write_received_participant_metrics(
empty_json_participant_metrics_sample
)
write_points_mock.assert_not_called()


@mock.patch.object(InfluxDBClient, "write_points", return_value=True)
def test_invalid_schema_exception_handling(
def test_write_received_participant_metrics_invalid_schema_exception(
write_points_mock, invalid_json_participant_metrics_sample,
): # pylint: disable=redefined-outer-name,unused-argument
"""Check that raised exceptions of the write_points method are caught in the
write_participant_metrics method."""
"""Check that raised exceptions of the write_points method are re-raised as MetricsStoreError in
the write_received_participant_metrics method."""
metric_store = MetricsStore(
MetricsConfig(enable=True, host="", port=1, user="", password="", db_name="")
)

with pytest.raises(MetricsStoreError):
metric_store.write_participant_metrics(invalid_json_participant_metrics_sample)
metric_store.write_received_participant_metrics(
invalid_json_participant_metrics_sample
)
write_points_mock.assert_not_called()


@mock.patch("xain_fl.coordinator.metrics_store.time.time", return_value=1582017483.0)
@mock.patch.object(InfluxDBClient, "write_points", return_value=True)
def test_valid_coordinator_metrics(
write_points_mock, coordinator_metrics_sample,
def test_write_coordinator_metrics(
write_points_mock, time_mock, coordinator_metrics_sample,
): # pylint: disable=redefined-outer-name,unused-argument
"""Check that write_points does not raise an exception on a valid metric object."""
"""Test write_coordinator_metrics method."""
metric_store = MetricsStore(
MetricsConfig(enable=True, host="", port=1, user="", password="", db_name="")
)

metric_store.write_coordinator_metrics(coordinator_metrics_sample, tags={"1": "2"})
write_points_mock.assert_called_once()
metric_store.write_metrics(
"coordinator", coordinator_metrics_sample, tags={"meta_data": "1"}
)

write_points_mock.assert_called_with(
[
{
"measurement": "coordinator",
"time": 1582017483 * 1_000_000_000,
"tags": {"meta_data": "1"},
"fields": {
"state": State.ROUND,
"round": 2,
"number_of_selected_participants": 0,
},
}
]
)


@mock.patch.object(InfluxDBClient, "write_points", side_effect=Exception())
def test_write_points_exception_handling_write_coordinator_metrics(
def test_write_coordinator_metrics_write_points_exception(
write_points_mock, coordinator_metrics_sample,
): # pylint: disable=redefined-outer-name,unused-argument
"""Check that raised exceptions of the write_points method are caught in the
write_coordinator_metrics method."""
"""Check that raised exceptions of the write_points method are re-raised as MetricsStoreError in
the write_coordinator_metrics method."""

metric_store = MetricsStore(
MetricsConfig(enable=True, host="", port=1, user="", password="", db_name="")
)
with pytest.raises(MetricsStoreError):
metric_store.write_metrics("coordinator", coordinator_metrics_sample)


@mock.patch("xain_fl.coordinator.metrics_store.time.time", return_value=1582017483.0)
@mock.patch.object(InfluxDBClient, "write_points", return_value=True)
def test_write_participant_metrics(
write_points_mock, time_mock, participant_metrics_sample,
): # pylint: disable=redefined-outer-name,unused-argument
"""Test write_participant_metrics method."""
metric_store = MetricsStore(
MetricsConfig(enable=True, host="", port=1, user="", password="", db_name="")
)

metric_store.write_metrics(
"participant", participant_metrics_sample, tags={"id": "1234-1234-1234"}
)

write_points_mock.assert_called_with(
[
{
"measurement": "participant",
"time": 1582017483 * 1_000_000_000,
"tags": {"id": "1234-1234-1234"},
"fields": {"state": State.FINISHED,},
}
]
)


@mock.patch.object(InfluxDBClient, "write_points", side_effect=Exception())
def test_write_participant_metrics_write_points_exception(
write_points_mock, participant_metrics_sample,
): # pylint: disable=redefined-outer-name,unused-argument
"""Check that raised exceptions of the write_points method are re-raised as MetricsStoreError in
the write_participant_metrics method."""

metric_store = MetricsStore(
MetricsConfig(enable=True, host="", port=1, user="", password="", db_name="")
)
with pytest.raises(MetricsStoreError):
metric_store.write_coordinator_metrics(coordinator_metrics_sample)
metric_store.write_metrics("participant", participant_metrics_sample)
Loading

0 comments on commit 8cd3d71

Please sign in to comment.