Skip to content

Commit

Permalink
Apply telemetry leak fix
Browse files Browse the repository at this point in the history
Fix workflows
  • Loading branch information
farshidz committed Jul 26, 2024
1 parent cd86cea commit f6b8b2b
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 99 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/arm64_local_os_CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ on:
Otherwise, the specified docker image is tested. For example "marqoai/marqo:test"
push:
branches:
mainline
- 'releases/1.5.x'
paths-ignore:
- '**.md'
pull_request:
branches:
- 'releases/1.5.x'
paths-ignore:
- '**.md'

Expand Down Expand Up @@ -79,6 +84,7 @@ jobs:
uses: actions/checkout@v3
with:
repository: marqo-ai/marqo-api-tests
ref: 1.5.x

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
Expand All @@ -93,6 +99,7 @@ jobs:
export MQ_API_TEST_BRANCH=$(echo "${GITHUB_REF}" | cut -d'/' -f3-)
CUSTOM_TEST_IMG="${{ github.event.inputs.image_to_test }}"
export MQ_API_TEST_IMG=${CUSTOM_TEST_IMG:-"marqo_docker_0"}
export MQ_PY_MARQO_BRANCH=git+https://github.com/marqo-ai/py-marqo.git
tox -e py3-arm64_local_os
Stop-Runner:
Expand Down
9 changes: 8 additions & 1 deletion .github/workflows/cuda_dind_os_CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ on:
Otherwise, the specified docker image is tested. For example "marqoai/marqo:test"
push:
branches:
mainline
- 'releases/1.5.x'
paths-ignore:
- '**.md'
pull_request:
branches:
- 'releases/1.5.x'
paths-ignore:
- '**.md'

Expand Down Expand Up @@ -79,6 +84,7 @@ jobs:
uses: actions/checkout@v3
with:
repository: marqo-ai/marqo-api-tests
ref: 1.5.x

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
Expand All @@ -93,6 +99,7 @@ jobs:
export MQ_API_TEST_BRANCH=$(echo "${GITHUB_REF}" | cut -d'/' -f3-)
CUSTOM_TEST_IMG="${{ github.event.inputs.image_to_test }}"
export MQ_API_TEST_IMG=${CUSTOM_TEST_IMG:-"marqo_docker_0"}
export MQ_PY_MARQO_BRANCH=git+https://github.com/marqo-ai/py-marqo.git
tox -e py3-cuda_dind_os
Stop-Runner:
Expand Down
27 changes: 17 additions & 10 deletions .github/workflows/dind_os_CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ on:
Otherwise, the specified docker image is tested. For example "marqoai/marqo:test"
push:
branches:
mainline
- 'releases/1.5.x'
paths-ignore:
- '**.md'
pull_request:
branches:
- 'releases/1.5.x'
paths-ignore:
- '**.md'

Expand All @@ -40,7 +45,7 @@ jobs:
with:
mode: start
github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }}
ec2-image-id: ${{ secrets.AMD_200GB_EC2_IMAGE_ID }}
ec2-image-id: ${{ secrets.AMD_200GB_EC2_IMAGE_ID }}
ec2-instance-type: t3.xlarge
subnet-id: ${{ secrets.AMD_SUBNET_ID }}
security-group-id: ${{ secrets.AMD_SECURITY_GROUP_ID }}
Expand All @@ -49,11 +54,11 @@ jobs:
name: Run Docker-in-Docker API Tests
needs: Start-Runner # required to start the main job when the runner is ready
runs-on: ${{ needs.start-runner.outputs.label }} # run the job on the newly created runner
environment: marqo-test-suite

environment: marqo-test-suite

steps:

- name: Checkout marqo repo
uses: actions/checkout@v3
with:
Expand All @@ -64,33 +69,35 @@ jobs:
with:
python-version: "3.8"
cache: "pip"

- name: Install Dependencies
run: |
#pip install -r requirements.txt
pip install tox==3.26
pip install flake8
# linting here

- name: Checkout marqo-api-tests repo
uses: actions/checkout@v3
with:
repository: marqo-ai/marqo-api-tests

ref: 1.5.x

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2

- name: Set up Environment
run: |
# Set up conf file
echo 'export MARQO_API_TESTS_ROOT="${{ github.workspace }}"' >> conf
- name: Run Integration Tests - dind_os
run: |
export MQ_API_TEST_BRANCH=$(echo "${GITHUB_REF}" | cut -d'/' -f3-)
CUSTOM_TEST_IMG="${{ github.event.inputs.image_to_test }}"
export MQ_API_TEST_IMG=${CUSTOM_TEST_IMG:-"marqo_docker_0"}
export MQ_PY_MARQO_BRANCH=git+https://github.com/marqo-ai/py-marqo.git
tox -e py3-dind_os
Stop-Runner:
Expand Down
6 changes: 5 additions & 1 deletion .github/workflows/largemodel_unit_test_CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ name: largemodel_unit_test_CI
on:
workflow_call:
workflow_dispatch:

push:
branches:
- 'releases/1.5.x'
paths-ignore:
- '**.md'
permissions:
contents: read

Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/run_all_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@
name: Run all Marqo tests

on:
pull_request_review:
types: [submitted]
branches:
- mainline
workflow_dispatch:

permissions:
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/unit_test_200gb_CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ on:
workflow_dispatch:
push:
branches:
mainline
- 'releases/1.5.x'
paths-ignore:
- '**.md'
pull_request:
branches:
- mainline
- 'releases/1.5.x'
paths-ignore:
- '**.md'


permissions:
contents: read

Expand Down
86 changes: 46 additions & 40 deletions src/marqo/tensor_search/telemetry.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
from typing import Any, Callable, Dict, List, Optional, Union
import json
import time
from collections import defaultdict
from contextlib import contextmanager
import time
from contextvars import ContextVar
from marqo.tensor_search.models.add_docs_objects import AddDocsParams
from typing import Any, Callable, Dict, List, Optional, Union

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response

from marqo.tensor_search.tensor_search_logging import get_logger

logger = get_logger(__name__)
Expand All @@ -31,7 +32,7 @@ def start(self) -> None:

def stop(self) -> float:
"""Stop the timer, and report the elapsed time
Return time is in Ms.
"""
if self.start_time is None:
Expand Down Expand Up @@ -68,7 +69,7 @@ def __init__(self):
self.timers: Dict[str, Timer] = defaultdict(Timer)

def increment_counter(self, k: str):
self.counter[k]+=1
self.counter[k] += 1

@contextmanager
def time(self, k: str, callback: Optional[Callable[[float], None]] = None):
Expand Down Expand Up @@ -106,7 +107,7 @@ def stop(self, k: str) -> float:
logger.warn(f"timer {k} stopped incorrectly. Time not recorded.")

def increment_counter(self, k: str, v: int = 1):
self.counter[k]+=v
self.counter[k] += v

def json(self):
return {
Expand All @@ -117,7 +118,7 @@ def json(self):

class RequestMetricsStore():
current_request: ContextVar[Request] = ContextVar('current_request')

METRIC_STORES: Dict[Request, RequestMetrics] = {}

@classmethod
Expand All @@ -132,7 +133,7 @@ def _get_request(cls) -> Request:
def for_request(cls, r: Optional[Request] = None) -> RequestMetrics:
if r is None:
r = cls._get_request()

return cls.METRIC_STORES[r]

@classmethod
Expand All @@ -147,19 +148,21 @@ def set_in_request(cls, r: Optional[Request] = None, metrics: Optional[RequestMe
@classmethod
def clear_metrics_for(cls, r: Request) -> None:
cls.METRIC_STORES.pop(r, None)
cls.current_request.set(None)


class TelemetryMiddleware(BaseHTTPMiddleware):
"""
Responsible for starting a request-level metric object, capturing telemetry and injecting
Responsible for starting a request-level metric object, capturing telemetry and injecting
it into the Response payload. Metrics are only returned if the `DEFAULT_TELEMETRY_QUERY_PARAM`
query parameter is provided and it is "true". Otherwise the request is not altered.
query parameter is provided and it is "true". Otherwise the request is not altered.
"""

DEFAULT_TELEMETRY_QUERY_PARAM = "telemetry"

def __init__(self, app, **options):
self.telemetry_flag: Optional[str] = options.pop("telemetery_flag", TelemetryMiddleware.DEFAULT_TELEMETRY_QUERY_PARAM)
def __init__(self, app, **options):
self.telemetry_flag: Optional[str] = options.pop("telemetery_flag",
TelemetryMiddleware.DEFAULT_TELEMETRY_QUERY_PARAM)
super().__init__(app, **options)

def telemetry_enabled_for_request(self, request: Request) -> bool:
Expand All @@ -182,37 +185,40 @@ async def dispatch(self, request: Request, call_next: Callable[[], Any]):
"""
RequestMetricsStore.set_in_request(request)
try:
response = await call_next(request)

# Early exit if opentelemetry is not to be injected into response.
if not self.telemetry_enabled_for_request(request):
return response

data = await self.get_response_json(response)

# Inject telemetry and fix content-length header
if isinstance(data, dict):
telemetry = RequestMetricsStore.for_request(request).json()
if len(telemetry["timesMs"]) == 0:
telemetry.pop("timesMs")
if len(telemetry["counter"]) == 0:
telemetry.pop("counter")
data["telemetry"] = telemetry
else:
get_logger(__name__).warning(
f"{self.telemetry_flag} set but response payload is not Dict. telemetry not returned"
)
get_logger(__name__).info(
f"Telemetry data={json.dumps(RequestMetricsStore.for_request(request).json(), indent=2)}")

response = await call_next(request)

# Early exit if opentelemetry is not to be injected into response.
if not self.telemetry_enabled_for_request(request):
return response

data = await self.get_response_json(response)

# Inject telemetry and fix content-length header
if isinstance(data, dict):
telemetry = RequestMetricsStore.for_request(request).json()
if len(telemetry["timesMs"]) == 0:
telemetry.pop("timesMs")
if len(telemetry["counter"]) == 0:
telemetry.pop("counter")
data["telemetry"] = telemetry
else:
get_logger(__name__).warning(
f"{self.telemetry_flag} set but response payload is not Dict. telemetry not returned"
)
get_logger(__name__).info(f"Telemetry data={json.dumps(RequestMetricsStore.for_request(request).json(), indent=2)}")

RequestMetricsStore.clear_metrics_for(request)
finally:
logger.debug('Clearing metrics for request')
RequestMetricsStore.clear_metrics_for(request)

body = json.dumps(data).encode()
response.headers["content-length"] = str(len(body))

return Response(
content=body,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type
)
)
Loading

0 comments on commit f6b8b2b

Please sign in to comment.