Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log stats #1423

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion lmdeploy/cli/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ def add_parser_api_server():
type=str,
default='',
help='Qos policy config path')
parser.add_argument('--log-stats',
type=bool,
default=False,
help='Whether log stats to prometheus')
# common args
ArgumentHelper.backend(parser)
ArgumentHelper.log_level(parser)
Expand Down Expand Up @@ -294,7 +298,8 @@ def api_server(args):
log_level=args.log_level.upper(),
api_keys=args.api_keys,
ssl=args.ssl,
qos_config_path=args.qos_config_path)
qos_config_path=args.qos_config_path,
log_stats=args.log_stats)

@staticmethod
def api_client(args):
Expand Down
104 changes: 71 additions & 33 deletions lmdeploy/serve/async_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dataclasses
import os
import random
import time
from argparse import ArgumentError
from contextlib import asynccontextmanager
from itertools import count
Expand All @@ -14,6 +15,7 @@
PytorchEngineConfig, Response,
TurbomindEngineConfig)
from lmdeploy.model import MODELS, ChatTemplateConfig, best_match_model
from lmdeploy.serve.metrics import IterTimer, Metrics, Stats
from lmdeploy.tokenizer import DetokenizeState
from lmdeploy.utils import _stop_words, get_logger

Expand Down Expand Up @@ -172,6 +174,7 @@ def __init__(self,
PytorchEngineConfig]] = None,
chat_template_config: Optional[ChatTemplateConfig] = None,
tp: int = 1,
log_stats: bool = False,
**kwargs) -> None:
logger.info(
f'input backend={backend}, backend_config={backend_config}')
Expand Down Expand Up @@ -226,6 +229,11 @@ def __init__(self,
self.gens_set = set()
for i in range(self.instance_num):
self.gens_set.add(self.engine.create_instance())
self.log_stats = log_stats
if self.log_stats:
self.stats = Stats(now=time.time())
self.metrics = Metrics()
self.metrics.info(self.backend_config)

def _build_turbomind(
self,
Expand Down Expand Up @@ -322,6 +330,12 @@ def __call__(self,
adapter_name=adapter_name,
**kwargs)

async def handle_exception(self, session_id: int):
if self.log_stats:
self.stats.request_failure += 1
self.stats.request_total += 1
await self.stop_session(session_id)

async def stop_session(self, session_id: int):
"""Stop a session by a session_id."""
if str(session_id) in self.id2generator:
Expand All @@ -345,14 +359,16 @@ async def safe_run(self, session_id: Optional[int] = None):
try:
yield
except (Exception, asyncio.CancelledError) as e: # noqa
await self.stop_session(session_id)
await self.handle_exception(session_id)
raise e
if str(session_id) in self.id2generator:
self.gens_set.add(self.id2generator[str(session_id)])
self.running_session_ids.discard(session_id)

async def get_generator(self, stop: bool, session_id: int):
"""Only return the model instance if it is available."""
if self.log_stats:
start = time.time()
if stop:
return self.engine.create_instance()
# waiting no generator is available or the same session_id is running
Expand All @@ -361,6 +377,8 @@ async def get_generator(self, stop: bool, session_id: int):
generator = self.gens_set.pop()
self.id2generator[str(session_id)] = generator
self.running_session_ids.add(session_id)
if self.log_stats:
self.stats.duration_queue += time.time() - start
return generator

def batch_infer(self,
Expand Down Expand Up @@ -548,32 +566,39 @@ async def generate(
do_preprocess (bool): whether pre-process the messages. Default to
True, which means chat_template will be applied.
"""
if str(session_id) not in self.id2step:
self.id2step[str(session_id)] = 0
if step != 0:
self.id2step[str(session_id)] = step
if gen_config is None:
gen_config = GenerationConfig()
if type(gen_config) is GenerationConfig:
gen_config = EngineGenerationConfig.From(gen_config,
self.tokenizer)
if gen_config.stop_words is None:
gen_config.stop_words = self.stop_words
# set random if it is not set and sequence_start is True
if gen_config.random_seed is None and sequence_start:
gen_config.random_seed = random.getrandbits(64)
prompt = messages

prompt_input = await self._get_prompt_input(prompt, do_preprocess,
sequence_start,
adapter_name)
async def preprocess(gen_config):
if self.log_stats:
start = time.time()
if str(session_id) not in self.id2step:
self.id2step[str(session_id)] = 0
if step != 0:
self.id2step[str(session_id)] = step
if gen_config is None:
gen_config = GenerationConfig()
if type(gen_config) is GenerationConfig:
gen_config = EngineGenerationConfig.From(
gen_config, self.tokenizer)
if gen_config.stop_words is None:
gen_config.stop_words = self.stop_words
# set random if it is not set and sequence_start is True
if gen_config.random_seed is None and sequence_start:
gen_config.random_seed = random.getrandbits(64)
prompt = messages

prompt_input = await self._get_prompt_input(
prompt, do_preprocess, sequence_start, adapter_name)
if gen_config.max_new_tokens is None:
gen_config.max_new_tokens = max(
128, self.session_len - self.id2step[str(session_id)] -
len(prompt_input['input_ids']))
if self.log_stats:
self.stats.duration_preprocess += time.time() - start
return prompt_input, gen_config

prompt_input, gen_config = await preprocess(gen_config)
prompt = prompt_input['prompt']
input_ids = prompt_input['input_ids']
if gen_config.max_new_tokens is None:
# for interactive endpoint, will try maximum possible token num
gen_config.max_new_tokens = max(
128, self.session_len - self.id2step[str(session_id)] -
len(input_ids))
finish_reason = None
logger.info(f'prompt={prompt!r}, '
f'gen_config={gen_config}, '
Expand All @@ -595,23 +620,31 @@ async def generate(
await self.end_session(session_id)
else:
generator = await self.get_generator(False, session_id)
iterator = generator.async_stream_infer(
session_id=session_id,
**prompt_input,
gen_config=gen_config,
adapter_name=adapter_name,
stream_output=stream_response,
sequence_start=sequence_start,
sequence_end=sequence_end,
step=self.id2step[str(session_id)])
if self.log_stats:
iterator = IterTimer(iterator)
async with self.safe_run(session_id):
state = DetokenizeState()
async for outputs in generator.async_stream_infer(
session_id=session_id,
**prompt_input,
gen_config=gen_config,
adapter_name=adapter_name,
stream_output=stream_response,
sequence_start=sequence_start,
sequence_end=sequence_end,
step=self.id2step[str(session_id)]):
async for outputs in iterator:
_, res, tokens = outputs
# decode res
if self.log_stats:
start = time.perf_counter()
response, state = self.tokenizer.detokenize_incrementally(
res,
state,
skip_special_tokens=gen_config.skip_special_tokens)
if self.log_stats:
self.stats.duration_postprocess += time.perf_counter(
) - start
# response, history token len,
# input token len, gen token len
yield GenOut(response, self.id2step[str(session_id)],
Expand All @@ -633,6 +666,11 @@ async def generate(
# TODO modify pytorch or turbomind api
if self.backend == 'pytorch' and sequence_end:
await self.end_session(session_id)
if self.log_stats:
self.stats.duration_infer += iterator.get_duration()
self.stats.request_success += 1
self.stats.request_total += 1
self.metrics.log(self.stats)

def chat(self,
prompt: str,
Expand Down
180 changes: 180 additions & 0 deletions lmdeploy/serve/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# Copyright (c) OpenMMLab. All rights reserved.
import dataclasses
import time
from dataclasses import dataclass
from typing import Dict, List, Optional

import psutil
import pynvml
from prometheus_client import REGISTRY, Gauge, Info, disable_created_metrics

disable_created_metrics()


class IterTimer:

def __init__(self, iterable):
self._iterable = iterable
self._duration = 0

def __iter__(self):
return self

def __next__(self):
start = time.perf_counter()
item = next(iter(self._iterable))
self._duration += (time.perf_counter() - start)
return item

def get_duration(self):
return self._duration

def __aiter__(self):
return self

async def __anext__(self):
start = time.perf_counter()
item = await self._iterable.__anext__()
self._duration += (time.perf_counter() - start)
return item


@dataclass
class Stats:
"""Created by LLMEngine for use by StatLogger."""
now: float

# request stats
request_success: int = 0
request_failure: int = 0
request_total: int = 0
request_responding: int = 0
request_waiting: int = 0

# latency stats
duration_queue: float = 0
duration_infer: float = 0
duration_preprocess: float = 0
duration_postprocess: float = 0

# system status
cpu_utilization: Optional[float] = None
cpu_memory_used_bytes: Optional[float] = None
gpu_utilization: Optional[Dict] = None
gpu_memory_used_bytes: Optional[Dict] = None

def refresh(self):
"""Fresh system status."""
p = psutil.Process()
self.cpu_utilization = p.cpu_percent()
self.cpu_memory_used_bytes = p.memory_info().rss
pynvml.nvmlInit()
self.gpu_memory_used_bytes = {}
self.gpu_utilization = {}
for i in range(pynvml.nvmlDeviceGetCount()):
handle = pynvml.nvmlDeviceGetHandleByIndex(int(i))
mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
utilization = pynvml.nvmlDeviceGetUtilizationRates(handle)
self.gpu_memory_used_bytes[str(i)] = str(mem_info.used)
self.gpu_utilization[str(i)] = str(utilization.gpu)


class Metrics:

def __init__(self, labelnames: Optional[List[str]] = []):
# Unregister any existing lmdeploy collectors
for collector in list(REGISTRY._collector_to_names):
if hasattr(collector, '_name') and 'lmdeploy' in collector._name:
REGISTRY.unregister(collector)

# Config Information
self.info_backend_config = Info(
name='lmdeploy:backend_config',
documentation='information of backend_config')

# System stats
self.info_gpu_utilization = Info(
name='lmdeploy:gpu_utilization',
documentation='GPU utilization. 1 means 100 percent usage.')
self.info_gpu_memory_used_bytes = Info(
name='lmdeploy:gpu_memory_used_bytes',
documentation='GPU memory used bytes.')
self.gauge_cpu_utilization = Gauge(
name='lmdeploy:cpu_utilization',
documentation='CPU utilization. 1 means 100 percent usage.',
labelnames=labelnames)
self.gauge_cpu_memory_used_bytes = Gauge(
name='lmdeploy:cpu_memory_used_bytes',
documentation='CPU memory used bytes.',
labelnames=labelnames)

# requests
self.gauge_request_success = Gauge(
name='lmdeploy:request_success',
documentation='Number of successful requests.',
labelnames=labelnames)
self.gauge_request_failure = Gauge(
name='lmdeploy:request_failure',
documentation='Number of failed requests.',
labelnames=labelnames)
self.gauge_request_total = Gauge(
name='lmdeploy:request_total',
documentation='Number of total requests.',
labelnames=labelnames)

# Legacy metrics
self.gauge_duration_queue = Gauge(
name='lmdeploy:duration_queue',
documentation= # noqa
'Avarate duration waiting in the queue of requests in s.',
labelnames=labelnames,
)
self.gauge_duration_infer = Gauge(
name='lmdeploy:duration_infer',
documentation='Average inference time in s.',
labelnames=labelnames,
)
self.gauge_duration_preprocess = Gauge(
name='lmdeploy:duration_preprocess',
documentation='Average duration of processing inputs in s.',
labelnames=labelnames,
)
self.gauge_duration_postprocess = Gauge(
name='lmdeploy:duration_postprocess',
documentation='Average duration of processing outputs in s.',
labelnames=labelnames,
)

def info(self, backend_config: object) -> None:
config_dict = {
key: str(value)
for key, value in dataclasses.asdict(backend_config).items()
}
self.info_backend_config.info(config_dict)

def log(self, stats: Stats) -> None:
"""Called by LLMEngine.

Logs to prometheus and tracked stats every iteration. Logs to Stdout
every self.local_interval seconds.
"""

# Log to prometheus.
stats.refresh()
# Info gpu stats
self.info_gpu_utilization.info(stats.gpu_utilization)
self.info_gpu_memory_used_bytes.info(stats.gpu_memory_used_bytes)
# Set system stat gauges.
self.gauge_cpu_utilization.set(stats.cpu_utilization)
self.gauge_cpu_memory_used_bytes.set(stats.cpu_memory_used_bytes)

# Add to request counters.
self.gauge_request_total.set(stats.request_total)
self.gauge_request_success.set(stats.request_success)
self.gauge_request_failure.set(stats.request_failure)

# duration gauges
self.gauge_duration_infer.set(stats.duration_infer)
self.gauge_duration_queue.set(stats.duration_queue)
self.gauge_duration_preprocess.set(stats.duration_preprocess)
self.gauge_duration_postprocess.set(stats.duration_postprocess)
Loading
Loading