Skip to content

Commit

Permalink
Allow logging to be controlled externally
Browse files Browse the repository at this point in the history
By inheriting from a parent logger and using a LoggingAdapter for custom
formatting of messages.
  • Loading branch information
markwaddle committed Apr 23, 2024
1 parent 5518d41 commit 8ea32fa
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 78 deletions.
94 changes: 77 additions & 17 deletions node_engine/libs/log.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
# Copyright (c) Microsoft. All rights reserved.

import logging
import logging.handlers
from enum import IntEnum, unique
from typing import Any
from node_engine.libs.logging import console_log_handler
from node_engine.libs.logging.event_logger_handler import EventLoggerHandler

from node_engine.libs.logging.status_logger_handler import StatusLoggerHandler
from node_engine.libs.logging.flow_log_handler import FlowLogHandler
from node_engine.models.flow_definition import FlowDefinition
from node_engine.models.flow_executor import FlowExecutor

Expand Down Expand Up @@ -34,28 +33,89 @@ def active(cls, set_level, level) -> Any:
return set_level <= level


class Log(logging.Logger):
flow_logger = logging.getLogger("node_engine.flow")


class FlowLoggerAdapter(logging.LoggerAdapter):
"""Logger adapter that adds flow information to log messages."""

def __init__(
self,
namespace: str,
logger: Any,
flow_definition: FlowDefinition,
executor: FlowExecutor,
level=LogLevel.DEBUG,
):
super().__init__(namespace, level.value)
self.addHandler(StatusLoggerHandler(namespace, flow_definition))
self.addHandler(
EventLoggerHandler(namespace, flow_definition, executor=executor)
component_label: str,
) -> None:
super().__init__(logger)
self._flow_definition = flow_definition
self._executor = executor
self._component_label = component_label

def process(self, msg, kwargs):
if self._component_label:
return (
"s:%s | f:%s | c:%s | %s"
% (
self._flow_definition.session_id,
self._flow_definition.key,
self._component_label,
msg,
),
kwargs,
)

return (
"s:%s | f:%s | %s"
% (self._flow_definition.session_id, self._flow_definition.key, msg),
kwargs,
)
self.addHandler(console_log_handler.new(flow_definition))

def __call__(self, message, *args, **kwargs) -> None:
self.info(message, *args, **kwargs)

def active(self, level) -> Any:
return LogLevel.active(self.level, level)

class FlowLogger(logging.Logger):
"""Logger that forwards log records to FlowLogHandler."""

def __init__(
self,
name: str,
flow_definition: FlowDefinition,
executor: FlowExecutor,
parent: logging.Logger,
level=LogLevel.DEBUG,
):
super().__init__(name, level.value)
self.parent = parent
self.addHandler(FlowLogHandler(flow_definition, executor))


def get_flow_logger(
namespace: str,
flow_definition: FlowDefinition,
executor: FlowExecutor,
) -> FlowLoggerAdapter:
namespace = ".".join(["node_engine", "flow", namespace])
logger = FlowLogger(namespace, flow_definition, executor, flow_logger)
return FlowLoggerAdapter(
logger,
flow_definition=flow_definition,
executor=executor,
component_label="",
)


# We pass around the string representation of the log level, but
def get_log_level(str) -> int:
return LogLevel[str.upper()].value
def get_component_logger(
component_class: type,
component_key: str,
flow_definition: FlowDefinition,
executor: FlowExecutor,
) -> FlowLoggerAdapter:
namespace = "node_engine.flow.component"
logger = FlowLogger(namespace, flow_definition, executor, flow_logger)
return FlowLoggerAdapter(
logger,
flow_definition=flow_definition,
executor=executor,
component_label=f"{component_class.__name__}:{component_key}",
)
13 changes: 2 additions & 11 deletions node_engine/libs/logging/console_log_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,10 @@

import rich.logging

from node_engine.models.flow_definition import FlowDefinition


def new(flow_definition: FlowDefinition) -> rich.logging.RichHandler:
console_handler = rich.logging.RichHandler(
def new() -> rich.logging.RichHandler:
return rich.logging.RichHandler(
level=logging.INFO,
log_time_format="[%X]",
keywords=(rich.logging.RichHandler.KEYWORDS or []) + ["node_engine"],
)
console_handler.setFormatter(
logging.Formatter(
f"node_engine:%(name)s | s:{flow_definition.session_id} | f:{flow_definition.key}"
f" | %(message)s"
)
)
return console_handler
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,44 @@
from node_engine.models.flow_definition import FlowDefinition
from node_engine.models.flow_event import FlowEvent
from node_engine.models.flow_executor import FlowExecutor
from node_engine.models.log_item import LevelEnum, LogItem


class EventLoggerHandler(logging.Handler):
class FlowLogHandler(logging.Handler):
def __init__(
self,
namespace: str,
flow_definition: FlowDefinition,
executor: FlowExecutor,
level=logging.DEBUG,
flow_executor: FlowExecutor,
) -> None:
super().__init__()
self.namespace = namespace
self.flow_definition = flow_definition
self.level = level
self.runtime = executor

self.flow_executor = flow_executor
self.background_tasks: set[asyncio.Task] = set()

def emit(self, record) -> None:
self.emit_status_log(record)
self.emit_log_event(record)

def emit_status_log(self, record) -> None:
try:
self.flow_definition.status.log.append(
LogItem(
namespace=record.name,
level=LevelEnum[record.levelname.lower()],
message=record.getMessage(),
)
)
except Exception as e:
print("Exception:", e)

def emit_log_event(self, record) -> None:
context = Context(self.flow_definition)

if not context.get("stream_log"):
return

data: dict[str, Any] = {
"namespace": self.namespace,
"namespace": record.name or "",
"level": record.levelname.lower(),
"message": record.getMessage(),
}
Expand All @@ -48,7 +60,7 @@ def emit(self, record) -> None:
data=json.dumps(data),
)

task = asyncio.create_task(self.runtime.emit(message))
task = asyncio.create_task(self.flow_executor.emit(message))
# Add task to the set. This creates a strong reference.
self.background_tasks.add(task)
# To prevent keeping references to finished tasks forever,
Expand Down
28 changes: 0 additions & 28 deletions node_engine/libs/logging/status_logger_handler.py

This file was deleted.

8 changes: 4 additions & 4 deletions node_engine/libs/node_engine_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
from abc import ABC, abstractmethod
from typing import Any

from node_engine.libs import debug_collector
from node_engine.libs import debug_collector, log
from node_engine.libs.component_config import ComponentConfig
from node_engine.libs.context import Context
from node_engine.libs.log import Log
from node_engine.libs.telemetry import Telemetry
from node_engine.libs.utility import continue_flow, exit_flow_with_error
from node_engine.models.flow_definition import FlowDefinition
Expand All @@ -32,8 +31,9 @@ def __init__(
self.tunnel_authorization = tunnel_authorization
self.code = None
self.runtime = executor
self.log = Log(
f"{self.__class__.__name__}:{component_key}",
self.log = log.get_component_logger(
component_class=self.__class__,
component_key=component_key,
flow_definition=flow_definition,
executor=executor,
)
Expand Down
9 changes: 5 additions & 4 deletions node_engine/libs/runtime.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Copyright (c) Microsoft. All rights reserved.

import asyncio
import logging
import traceback

from node_engine.libs import debug_collector
from node_engine.libs.log import Log
from node_engine.libs.log import get_flow_logger
from node_engine.libs.node_engine_component import NodeEngineComponent
from node_engine.libs.registry import Registry
from node_engine.libs.utility import continue_flow, exit_flow_with_error
Expand Down Expand Up @@ -41,7 +42,7 @@ async def invoke(
self, flow_definition: FlowDefinition, tunnel_authorization: str | None = None
) -> FlowDefinition:

log = Log("runtime", flow_definition, executor=self)
log = get_flow_logger("runtime", flow_definition, executor=self)
log("Invoking flow")

# Ensure there is at least one component in the flow.
Expand Down Expand Up @@ -134,7 +135,7 @@ async def __execute_next(
"""
Executes the next component in the flow.
"""
log = Log("runtime", flow_definition, executor=self)
log = get_flow_logger("runtime", flow_definition, executor=self)

# Find the next component to execute by key
flow_component = None
Expand Down Expand Up @@ -212,7 +213,7 @@ def exit_flow_with_error(
self,
message: str,
flow_definition: FlowDefinition,
log: Log,
log: logging.Logger | logging.LoggerAdapter,
component: NodeEngineComponent | None = None,
):
debug_information = debug_collector.collect(
Expand Down
24 changes: 20 additions & 4 deletions node_engine/start.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
# Copyright (c) Microsoft. All rights reserved.

import argparse
import logging
import os

import uvicorn
from fastapi import FastAPI

from node_engine.libs.logging import console_log_handler

from . import service

logging.basicConfig(
level=logging.DEBUG,
format="%(name)35s | %(message)s",
handlers=[console_log_handler.new()],
)

logger = logging.getLogger(__name__)


def main():
parser = argparse.ArgumentParser(
Expand All @@ -29,7 +40,7 @@ def main():
"--registry-root",
dest="registry_root",
type=str,
default=".",
default="./examples",
help="root directory for registry and component discovery",
)
args = parser.parse_args()
Expand All @@ -48,10 +59,15 @@ def main():
app = FastAPI()
service.init(app, registry_root)

print(f"Starting node_engine service on {host}:{port}...")
print(f"- Registry root: {registry_root}")
logger.info("Starting node_engine service on %s:%s...", host, port)
logger.info("Registry root: %s", registry_root)

uvicorn.run(app, host=host, port=port)
uvicorn.run(
app,
host=host,
port=port,
log_config={"version": 1, "disable_existing_loggers": False},
)


if __name__ == "__main__":
Expand Down

0 comments on commit 8ea32fa

Please sign in to comment.