Skip to content

Commit

Permalink
Fix small issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Fireye04 committed Jun 4, 2024
1 parent 8a5f7d3 commit 936a429
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 37 deletions.
9 changes: 5 additions & 4 deletions src/sasquatchbackpack/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def main() -> None:
@click.option(
"-c",
"--coords",
help="latitude and longitude of the central coordnates "
help="latitude and longitude of the central coordinates "
"(latitude, longitude). Defaults to the coordinates of Cerro Pachon.",
default=DEFAULT_COORDS,
type=(float, float),
Expand Down Expand Up @@ -195,9 +195,10 @@ def usgs_earthquake_data(
)
source = sources.USGSSource(config)

poster = sasquatch.BackpackDispatcher(source, sasquatch.DispatcherConfig())

click.echo(poster.post())
backpack_dispatcher = sasquatch.BackpackDispatcher(
source, sasquatch.DispatcherConfig()
)
click.echo(backpack_dispatcher.post())


if __name__ == "__main__":
Expand Down
52 changes: 29 additions & 23 deletions src/sasquatchbackpack/sasquatch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Handles dispatch of backpack data to kafka."""

import json
import os
from dataclasses import dataclass
from string import Template

import requests

Expand All @@ -15,11 +16,26 @@
class DispatcherConfig:
"""Class containing relevant configuration information for the
BackpackDispatcher.
Values
------
sasquatch_rest_proxy_url
environment variable contatining the target for data
partitions_count
number of partitions to create
replication_factor
number of replicas to create
namespace
environment varible containing the target namespace
"""

sasquatch_rest_proxy_url = (
"https://data-int.lsst.cloud/sasquatch-rest-proxy"
sasquatch_rest_proxy_url = os.getenv(
"SASQUATCH_REST_PROXY_URL",
"https://data-int.lsst.cloud/sasquatch-rest-proxy",
)
partitions_count = 1
replication_factor = 3
namespace = os.getenv("BACKPACK_NAMESPACE", "lsst.example")


class BackpackDispatcher:
Expand All @@ -40,20 +56,12 @@ def __init__(
) -> None:
self.source = source
self.config = config
self.schema = source.load_schema()
self.namespace = self.get_namespace()

def get_namespace(self) -> str:
"""Sorts the schema and returns the namespace value.
Returns
-------
namespace
provided namespace from the schema file
"""
json_schema = json.loads(self.schema)

return json_schema["namespace"]
self.schema = Template(source.load_schema()).substitute(
{
"namespace": self.config.namespace,
"topic_name": self.source.topic_name,
}
)

def create_topic(self) -> str:
"""Create kafka topic based off data from provided source.
Expand All @@ -73,13 +81,11 @@ def create_topic(self) -> str:

cluster_id = r.json()["data"][0]["cluster_id"]

# The topic is created with one partition and a replication
# factor of 3 by default, this configuration is fixed for the
# Sasquatch Kafka cluster.
topic_config = {
"topic_name": f"{self.namespace}." + f"{self.source.topic_name}",
"partitions_count": 1,
"replication_factor": 3,
"topic_name": f"{self.config.namespace}."
f"{self.source.topic_name}",
"partitions_count": self.config.partitions_count,
"replication_factor": self.config.replication_factor,
}

headers = {"content-type": "application/json"}
Expand Down
2 changes: 1 addition & 1 deletion src/sasquatchbackpack/schemas/usgs.avsc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"namespace": "lsst.example", "type": "record", "name": "$topic_name", "description": "Collection of earthquakes near the summit", "fields": [{"name": "timestamp", "type": "long"}, {"name": "id", "type": "str", "description": "unique earthquake id"}, {"name": "latitude", "type": "float", "units": "Degrees"}, {"name": "longitude", "type": "float", "units": "Degrees"}, {"name": "depth", "type": "float", "units": "Km"}, {"name": "magnitude", "type": "float", "units": "Richter Magnitudes"}]}
{"namespace": "$namespace", "type": "record", "name": "$topic_name", "description": "Collection of earthquakes near the summit", "fields": [{"name": "timestamp", "type": "long"}, {"name": "id", "type": "str", "description": "unique earthquake id"}, {"name": "latitude", "type": "float", "units": "Degrees"}, {"name": "longitude", "type": "float", "units": "Degrees"}, {"name": "depth", "type": "float", "units": "Km"}, {"name": "magnitude", "type": "float", "units": "Richter Magnitudes"}]}
2 changes: 1 addition & 1 deletion src/sasquatchbackpack/scripts/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def search_api(
"""
# Linting bypassed, as at the time of writing Libcomcat breaks if provided
# with a timezone-aware datetime object
current_dt = datetime.now(None) # noqa: DTZ005
current_dt = datetime.utcnow() # noqa: DTZ003

return search(
starttime=current_dt - duration,
Expand Down
9 changes: 1 addition & 8 deletions src/sasquatchbackpack/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
from string import Template

from sasquatchbackpack.scripts import usgs

Expand Down Expand Up @@ -88,13 +87,7 @@ def load_schema(self) -> str:
then update results.
"""
with Path(self.config.schema_file).open("r") as file:
template = Template(file.read())

return template.substitute(
{
"topic_name": self.topic_name,
}
)
return file.read()

def get_records(self) -> list:
"""Call the USGS Comcat API and assembles records.
Expand Down

0 comments on commit 936a429

Please sign in to comment.