Skip to content

Commit

Permalink
Refactor and use hub v2 endpoints (#28)
Browse files Browse the repository at this point in the history
* Refactor and use hub v2 endpoints

* Fix
  • Loading branch information
itssimon authored Jul 4, 2024
1 parent 58d9066 commit 89a4d30
Show file tree
Hide file tree
Showing 15 changed files with 195 additions and 189 deletions.
68 changes: 34 additions & 34 deletions apitally/client/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, client_id: str, env: str) -> None:
super().__init__(client_id=client_id, env=env)
self._stop_sync_loop = False
self._sync_loop_task: Optional[asyncio.Task[Any]] = None
self._requests_data_queue: asyncio.Queue[Tuple[float, Dict[str, Any]]] = asyncio.Queue()
self._sync_data_queue: asyncio.Queue[Tuple[float, Dict[str, Any]]] = asyncio.Queue()

def get_http_client(self) -> httpx.AsyncClient:
return httpx.AsyncClient(base_url=self.hub_url, timeout=REQUEST_TIMEOUT)
Expand All @@ -44,9 +44,9 @@ async def _run_sync_loop(self) -> None:
try:
time_start = time.perf_counter()
async with self.get_http_client() as client:
tasks = [self.send_requests_data(client)]
if not self._app_info_sent and not first_iteration:
tasks.append(self.send_app_info(client))
tasks = [self.send_sync_data(client)]
if not self._startup_data_sent and not first_iteration:
tasks.append(self.send_startup_data(client))
await asyncio.gather(*tasks)
time_elapsed = time.perf_counter() - time_start
await asyncio.sleep(self.sync_interval - time_elapsed)
Expand All @@ -60,52 +60,52 @@ def stop_sync_loop(self) -> None:
async def handle_shutdown(self) -> None:
if self._sync_loop_task is not None:
self._sync_loop_task.cancel()
# Send any remaining requests data before exiting
# Send any remaining data before exiting
async with self.get_http_client() as client:
await self.send_requests_data(client)
await self.send_sync_data(client)

def set_app_info(self, app_info: Dict[str, Any]) -> None:
self._app_info_sent = False
self._app_info_payload = self.get_info_payload(app_info)
asyncio.create_task(self._set_app_info_task())
def set_startup_data(self, data: Dict[str, Any]) -> None:
self._startup_data_sent = False
self._startup_data = self.add_uuids_to_data(data)
asyncio.create_task(self._set_startup_data_task())

async def _set_app_info_task(self) -> None:
async def _set_startup_data_task(self) -> None:
async with self.get_http_client() as client:
await self.send_app_info(client)
await self.send_startup_data(client)

async def send_app_info(self, client: httpx.AsyncClient) -> None:
if self._app_info_payload is not None:
await self._send_app_info(client, self._app_info_payload)
async def send_startup_data(self, client: httpx.AsyncClient) -> None:
if self._startup_data is not None:
await self._send_startup_data(client, self._startup_data)

async def send_requests_data(self, client: httpx.AsyncClient) -> None:
payload = self.get_requests_payload()
self._requests_data_queue.put_nowait((time.time(), payload))
async def send_sync_data(self, client: httpx.AsyncClient) -> None:
data = self.get_sync_data()
self._sync_data_queue.put_nowait((time.time(), data))

failed_items = []
while not self._requests_data_queue.empty():
payload_time, payload = self._requests_data_queue.get_nowait()
while not self._sync_data_queue.empty():
timestamp, data = self._sync_data_queue.get_nowait()
try:
if (time_offset := time.time() - payload_time) <= MAX_QUEUE_TIME:
payload["time_offset"] = time_offset
await self._send_requests_data(client, payload)
self._requests_data_queue.task_done()
if (time_offset := time.time() - timestamp) <= MAX_QUEUE_TIME:
data["time_offset"] = time_offset
await self._send_sync_data(client, data)
self._sync_data_queue.task_done()
except httpx.HTTPError:
failed_items.append((payload_time, payload))
failed_items.append((timestamp, data))
for item in failed_items:
self._requests_data_queue.put_nowait(item)
self._sync_data_queue.put_nowait(item)

@retry(raise_on_giveup=False)
async def _send_app_info(self, client: httpx.AsyncClient, payload: Dict[str, Any]) -> None:
logger.debug("Sending app info")
response = await client.post(url="/info", json=payload, timeout=REQUEST_TIMEOUT)
async def _send_startup_data(self, client: httpx.AsyncClient, data: Dict[str, Any]) -> None:
logger.debug("Sending startup data")
response = await client.post(url="/startup", json=data, timeout=REQUEST_TIMEOUT)
self._handle_hub_response(response)
self._app_info_sent = True
self._app_info_payload = None
self._startup_data_sent = True
self._startup_data = None

@retry()
async def _send_requests_data(self, client: httpx.AsyncClient, payload: Dict[str, Any]) -> None:
logger.debug("Sending requests data")
response = await client.post(url="/requests", json=payload)
async def _send_sync_data(self, client: httpx.AsyncClient, data: Dict[str, Any]) -> None:
logger.debug("Synchronizing data with hub")
response = await client.post(url="/sync", json=data)
self._handle_hub_response(response)

def _handle_hub_response(self, response: httpx.Response) -> None:
Expand Down
32 changes: 14 additions & 18 deletions apitally/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
logger = get_logger(__name__)

HUB_BASE_URL = os.getenv("APITALLY_HUB_BASE_URL") or "https://hub.apitally.io"
HUB_VERSION = "v1"
HUB_VERSION = "v2"
REQUEST_TIMEOUT = 10
MAX_QUEUE_TIME = 3600
SYNC_INTERVAL = 60
Expand Down Expand Up @@ -60,8 +60,8 @@ def __init__(self, client_id: str, env: str) -> None:
self.validation_error_counter = ValidationErrorCounter()
self.server_error_counter = ServerErrorCounter()

self._app_info_payload: Optional[Dict[str, Any]] = None
self._app_info_sent = False
self._startup_data: Optional[Dict[str, Any]] = None
self._startup_data_sent = False
self._started_at = time.time()

@classmethod
Expand All @@ -80,25 +80,21 @@ def sync_interval(self) -> float:
def hub_url(self) -> str:
return f"{HUB_BASE_URL}/{HUB_VERSION}/{self.client_id}/{self.env}"

def get_info_payload(self, app_info: Dict[str, Any]) -> Dict[str, Any]:
payload = {
def add_uuids_to_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
data_with_uuids = {
"instance_uuid": self.instance_uuid,
"message_uuid": str(uuid4()),
}
payload.update(app_info)
return payload

def get_requests_payload(self) -> Dict[str, Any]:
requests = self.request_counter.get_and_reset_requests()
validation_errors = self.validation_error_counter.get_and_reset_validation_errors()
server_errors = self.server_error_counter.get_and_reset_server_errors()
return {
"instance_uuid": self.instance_uuid,
"message_uuid": str(uuid4()),
"requests": requests,
"validation_errors": validation_errors,
"server_errors": server_errors,
data_with_uuids.update(data)
return data_with_uuids

def get_sync_data(self) -> Dict[str, Any]:
data = {
"requests": self.request_counter.get_and_reset_requests(),
"validation_errors": self.validation_error_counter.get_and_reset_validation_errors(),
"server_errors": self.server_error_counter.get_and_reset_server_errors(),
}
return self.add_uuids_to_data(data)


@dataclass(frozen=True)
Expand Down
64 changes: 32 additions & 32 deletions apitally/client/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self, client_id: str, env: str) -> None:
super().__init__(client_id=client_id, env=env)
self._thread: Optional[Thread] = None
self._stop_sync_loop = Event()
self._requests_data_queue: queue.Queue[Tuple[float, Dict[str, Any]]] = queue.Queue()
self._sync_data_queue: queue.Queue[Tuple[float, Dict[str, Any]]] = queue.Queue()

def start_sync_loop(self) -> None:
self._stop_sync_loop.clear()
Expand All @@ -63,63 +63,63 @@ def _run_sync_loop(self) -> None:
now = time.time()
if (now - last_sync_time) >= self.sync_interval:
with requests.Session() as session:
if not self._app_info_sent and last_sync_time > 0: # not on first sync
self.send_app_info(session)
self.send_requests_data(session)
if not self._startup_data_sent and last_sync_time > 0: # not on first sync
self.send_startup_data(session)
self.send_sync_data(session)
last_sync_time = now
time.sleep(1)
except Exception as e: # pragma: no cover
logger.exception(e)
finally:
# Send any remaining requests data before exiting
# Send any remaining data before exiting
with requests.Session() as session:
self.send_requests_data(session)
self.send_sync_data(session)

def stop_sync_loop(self) -> None:
self._stop_sync_loop.set()
if self._thread is not None:
self._thread.join()
self._thread = None

def set_app_info(self, app_info: Dict[str, Any]) -> None:
self._app_info_sent = False
self._app_info_payload = self.get_info_payload(app_info)
def set_startup_data(self, data: Dict[str, Any]) -> None:
self._startup_data_sent = False
self._startup_data = self.add_uuids_to_data(data)
with requests.Session() as session:
self.send_app_info(session)
self.send_startup_data(session)

def send_app_info(self, session: requests.Session) -> None:
if self._app_info_payload is not None:
self._send_app_info(session, self._app_info_payload)
def send_startup_data(self, session: requests.Session) -> None:
if self._startup_data is not None:
self._send_startup_data(session, self._startup_data)

def send_requests_data(self, session: requests.Session) -> None:
payload = self.get_requests_payload()
self._requests_data_queue.put_nowait((time.time(), payload))
def send_sync_data(self, session: requests.Session) -> None:
data = self.get_sync_data()
self._sync_data_queue.put_nowait((time.time(), data))

failed_items = []
while not self._requests_data_queue.empty():
payload_time, payload = self._requests_data_queue.get_nowait()
while not self._sync_data_queue.empty():
timestamp, data = self._sync_data_queue.get_nowait()
try:
if (time_offset := time.time() - payload_time) <= MAX_QUEUE_TIME:
payload["time_offset"] = time_offset
self._send_requests_data(session, payload)
self._requests_data_queue.task_done()
if (time_offset := time.time() - timestamp) <= MAX_QUEUE_TIME:
data["time_offset"] = time_offset
self._send_sync_data(session, data)
self._sync_data_queue.task_done()
except requests.RequestException:
failed_items.append((payload_time, payload))
failed_items.append((timestamp, data))
for item in failed_items:
self._requests_data_queue.put_nowait(item)
self._sync_data_queue.put_nowait(item)

@retry(raise_on_giveup=False)
def _send_app_info(self, session: requests.Session, payload: Dict[str, Any]) -> None:
logger.debug("Sending app info")
response = session.post(url=f"{self.hub_url}/info", json=payload, timeout=REQUEST_TIMEOUT)
def _send_startup_data(self, session: requests.Session, data: Dict[str, Any]) -> None:
logger.debug("Sending startup data")
response = session.post(url=f"{self.hub_url}/startup", json=data, timeout=REQUEST_TIMEOUT)
self._handle_hub_response(response)
self._app_info_sent = True
self._app_info_payload = None
self._startup_data_sent = True
self._startup_data = None

@retry()
def _send_requests_data(self, session: requests.Session, payload: Dict[str, Any]) -> None:
logger.debug("Sending requests data")
response = session.post(url=f"{self.hub_url}/requests", json=payload, timeout=REQUEST_TIMEOUT)
def _send_sync_data(self, session: requests.Session, data: Dict[str, Any]) -> None:
logger.debug("Synchronizing data with hub")
response = session.post(url=f"{self.hub_url}/sync", json=data, timeout=REQUEST_TIMEOUT)
self._handle_hub_response(response)

def _handle_hub_response(self, response: requests.Response) -> None:
Expand Down
20 changes: 10 additions & 10 deletions apitally/django.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]) -> None:

self.client = ApitallyClient(client_id=self.config.client_id, env=self.config.env)
self.client.start_sync_loop()
self.client.set_app_info(
app_info=_get_app_info(
self.client.set_startup_data(
_get_startup_data(
app_version=self.config.app_version,
urlconfs=self.config.urlconfs,
)
Expand Down Expand Up @@ -177,20 +177,20 @@ def get_consumer(self, request: HttpRequest) -> Optional[str]:
return None


def _get_app_info(app_version: Optional[str], urlconfs: List[Optional[str]]) -> Dict[str, Any]:
app_info: Dict[str, Any] = {}
def _get_startup_data(app_version: Optional[str], urlconfs: List[Optional[str]]) -> Dict[str, Any]:
data: Dict[str, Any] = {}
try:
app_info["paths"] = _get_paths(urlconfs)
data["paths"] = _get_paths(urlconfs)
except Exception: # pragma: no cover
app_info["paths"] = []
data["paths"] = []
logger.exception("Failed to get paths")
try:
app_info["openapi"] = _get_openapi(urlconfs)
data["openapi"] = _get_openapi(urlconfs)
except Exception: # pragma: no cover
logger.exception("Failed to get OpenAPI schema")
app_info["versions"] = get_versions("django", "djangorestframework", "django-ninja", app_version=app_version)
app_info["client"] = "python:django"
return app_info
data["versions"] = get_versions("django", "djangorestframework", "django-ninja", app_version=app_version)
data["client"] = "python:django"
return data


def _get_openapi(urlconfs: List[Optional[str]]) -> Optional[str]:
Expand Down
32 changes: 19 additions & 13 deletions apitally/flask.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,20 @@ def __init__(
self.patch_handle_exception()
self.client = ApitallyClient(client_id=client_id, env=env)
self.client.start_sync_loop()
self.delayed_set_app_info(app_version, openapi_url)
self.delayed_set_startup_data(app_version, openapi_url)

def delayed_set_app_info(self, app_version: Optional[str] = None, openapi_url: Optional[str] = None) -> None:
def delayed_set_startup_data(self, app_version: Optional[str] = None, openapi_url: Optional[str] = None) -> None:
# Short delay to allow app routes to be registered first
timer = Timer(1.0, self._delayed_set_app_info, kwargs={"app_version": app_version, "openapi_url": openapi_url})
timer = Timer(
1.0,
self._delayed_set_startup_data,
kwargs={"app_version": app_version, "openapi_url": openapi_url},
)
timer.start()

def _delayed_set_app_info(self, app_version: Optional[str] = None, openapi_url: Optional[str] = None) -> None:
app_info = _get_app_info(self.app, app_version, openapi_url)
self.client.set_app_info(app_info)
def _delayed_set_startup_data(self, app_version: Optional[str] = None, openapi_url: Optional[str] = None) -> None:
data = _get_startup_data(self.app, app_version, openapi_url)
self.client.set_startup_data(data)

def __call__(self, environ: WSGIEnvironment, start_response: StartResponse) -> Iterable[bytes]:
status_code = 200
Expand Down Expand Up @@ -122,15 +126,17 @@ def get_consumer(self) -> Optional[str]:
return None


def _get_app_info(app: Flask, app_version: Optional[str] = None, openapi_url: Optional[str] = None) -> Dict[str, Any]:
app_info: Dict[str, Any] = {}
def _get_startup_data(
app: Flask, app_version: Optional[str] = None, openapi_url: Optional[str] = None
) -> Dict[str, Any]:
data: Dict[str, Any] = {}
if openapi_url and (openapi := _get_openapi(app, openapi_url)):
app_info["openapi"] = openapi
data["openapi"] = openapi
if paths := _get_paths(app.url_map):
app_info["paths"] = paths
app_info["versions"] = get_versions("flask", app_version=app_version)
app_info["client"] = "python:flask"
return app_info
data["paths"] = paths
data["versions"] = get_versions("flask", app_version=app_version)
data["client"] = "python:flask"
return data


def _get_paths(url_map: Map) -> List[Dict[str, str]]:
Expand Down
4 changes: 2 additions & 2 deletions apitally/litestar.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ def on_startup(self, app: Litestar) -> None:
elif openapi_config.path is not None:
self.openapi_path = openapi_config.path

app_info = {
data = {
"openapi": _get_openapi(app),
"paths": [route for route in _get_routes(app) if not self.filter_path(route["path"])],
"versions": get_versions("litestar", app_version=self.app_version),
"client": "python:litestar",
}
self.client.set_app_info(app_info)
self.client.set_startup_data(data)
self.client.start_sync_loop()

def after_exception(self, exception: Exception, scope: Scope) -> None:
Expand Down
Loading

0 comments on commit 89a4d30

Please sign in to comment.