Skip to content

feat: support metrics log stream #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/otaclient_iot_logging_server/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@

from __future__ import annotations

from enum import Enum
from queue import Queue
from typing import Literal, TypedDict

from typing_extensions import NotRequired, TypeAlias

LogsQueue: TypeAlias = "Queue[tuple[str, LogMessage]]"
# LogQueue is a queue of LogGroupType, ecu_id, and LogMessage
LogsQueue: TypeAlias = "Queue[tuple[LogGroupType, str, LogMessage]]"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change the data format of the queue.



class LogGroupType(Enum):
LOG = "LOG"
METRICS = "METRICS"


class LogMessage(TypedDict):
Expand Down
8 changes: 4 additions & 4 deletions src/otaclient_iot_logging_server/_log_setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
import contextlib
import logging
import time
from queue import Queue

from otaclient_iot_logging_server import package_name as root_package_name
from otaclient_iot_logging_server._common import LogMessage
from otaclient_iot_logging_server._common import LogGroupType, LogMessage, LogsQueue
from otaclient_iot_logging_server.configs import server_cfg


Expand All @@ -30,7 +29,7 @@ class _LogTeeHandler(logging.Handler):

def __init__(
self,
queue: Queue[tuple[str, LogMessage]],
queue: LogsQueue,
logstream_suffix: str,
) -> None:
super().__init__()
Expand All @@ -41,6 +40,7 @@ def emit(self, record: logging.LogRecord) -> None:
with contextlib.suppress(Exception):
self._queue.put_nowait(
(
LogGroupType.LOG, # always put local log into log group
self._logstream_suffix,
LogMessage(
timestamp=int(time.time()) * 1000, # milliseconds
Expand All @@ -51,7 +51,7 @@ def emit(self, record: logging.LogRecord) -> None:


def config_logging(
queue: Queue[tuple[str, LogMessage]],
queue: LogsQueue,
*,
log_format: str,
level: str,
Expand Down
99 changes: 64 additions & 35 deletions src/otaclient_iot_logging_server/aws_iot_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
import awscrt.exceptions
from typing_extensions import NoReturn

from otaclient_iot_logging_server._common import LogEvent, LogMessage, LogsQueue
from otaclient_iot_logging_server._common import (
LogEvent,
LogGroupType,
LogMessage,
LogsQueue,
)
from otaclient_iot_logging_server._utils import retry
from otaclient_iot_logging_server.boto3_session import get_session
from otaclient_iot_logging_server.configs import server_cfg
Expand Down Expand Up @@ -67,41 +72,53 @@ def __init__(
self._exc_types = client.exceptions

self._session_config = session_config
self._log_group_name = session_config.aws_cloudwatch_log_group
self._otaclient_logs_log_group = (
session_config.aws_cloudwatch_otaclient_logs_log_group
)
self._otaclient_logs_metrics_group = (
session_config.aws_cloudwatch_otaclient_metrics_log_group
)
self._interval = interval
self._queue: LogsQueue = queue
# NOTE: add this limitation to ensure all of the log_streams in a merge
# will definitely have entries less than MAX_LOGS_PER_PUT
self._max_logs_per_merge = min(max_logs_per_merge, self.MAX_LOGS_PER_PUT)

@retry(max_retry=16, backoff_factor=2, backoff_max=32)
def _create_log_group(self):
def _create_log_groups(self):
# TODO: (20240214) should we let the edge side iot_logging_server
# create the log group?
log_group_name, client = self._log_group_name, self._client
log_group_names = [
self._otaclient_logs_log_group,
self._otaclient_logs_metrics_group,
]
client = self._client
exc_types = self._exc_types
try:
client.create_log_group(logGroupName=log_group_name)
logger.info(f"{log_group_name=} has been created")
except exc_types.ResourceAlreadyExistsException as e:
logger.debug(
f"{log_group_name=} already existed, skip creating: {e.response}"
)
except ValueError as e:
if e.__cause__ and isinstance(e.__cause__, awscrt.exceptions.AwsCrtError):
logger.error(
(f"failed to create mtls connection to remote: {e.__cause__}")
for log_group_name in log_group_names:
try:
client.create_log_group(logGroupName=log_group_name)
logger.info(f"{log_group_name=} has been created")
except exc_types.ResourceAlreadyExistsException as e:
logger.debug(
f"{log_group_name=} already existed, skip creating: {e.response}"
)
raise e.__cause__ from None
logger.error(f"failed to create {log_group_name=}: {e!r}")
raise
except Exception as e:
logger.error(f"failed to create {log_group_name=}: {e!r}")
raise
except ValueError as e:
if e.__cause__ and isinstance(
e.__cause__, awscrt.exceptions.AwsCrtError
):
logger.error(
(f"failed to create mtls connection to remote: {e.__cause__}")
)
raise e.__cause__ from None
logger.error(f"failed to create {log_group_name=}: {e!r}")
raise
except Exception as e:
logger.error(f"failed to create {log_group_name=}: {e!r}")
raise
Comment on lines +97 to +117
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create both AWS CloudWatch log groups for LOG and METRICS.


@retry(max_retry=16, backoff_factor=2, backoff_max=32)
def _create_log_stream(self, log_stream_name: str):
log_group_name, client = self._log_group_name, self._client
def _create_log_stream(self, log_group_name: str, log_stream_name: str):
client = self._client
exc_types = self._exc_types
try:
client.create_log_stream(
Expand All @@ -126,7 +143,9 @@ def _create_log_stream(self, log_stream_name: str):
raise

@retry(backoff_factor=2)
def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]):
def put_log_events(
self, log_group_name: str, log_stream_name: str, message_list: list[LogMessage]
):
"""
Ref:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/put_log_events.html
Expand All @@ -137,7 +156,7 @@ def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]):
See the documentation for more details.
"""
request = LogEvent(
logGroupName=self._log_group_name,
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=message_list,
)
Expand All @@ -148,44 +167,54 @@ def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]):
# logger.debug(f"successfully uploaded: {response}")
except exc_types.ResourceNotFoundException as e:
logger.debug(f"{log_stream_name=} not found: {e!r}")
self._create_log_stream(log_stream_name)
self._create_log_stream(log_group_name, log_stream_name)
raise
except Exception as e:
# NOTE: for unhandled exception, we just log it and ignore,
# leave for the developer to properly handle it
# in the future!
logger.error(
f"put_log_events failure: {e!r}\n"
f"log_group_name={self._log_group_name}, \n"
f"log_group_name={log_group_name}, \n"
f"log_stream_name={log_stream_name}"
)

def thread_main(self) -> NoReturn:
"""Main entry for running this iot_logger in a thread."""
# unconditionally create log_group and log_stream, do nothing if existed.
self._create_log_group()
self._create_log_groups()

while True:
# merge LogMessages into the same source, identified by
# log_stream_suffix.
message_dict: dict[str, list[LogMessage]] = defaultdict(list)
# log_group_type and log_stream_suffix.
message_dict: dict[(LogGroupType, str), list[LogMessage]] = defaultdict(
list
)

_merge_count = 0
while _merge_count < self._max_logs_per_merge:
_queue = self._queue
try:
log_stream_suffix, message = _queue.get_nowait()
log_group_type, log_stream_suffix, message = _queue.get_nowait()
_merge_count += 1

message_dict[log_stream_suffix].append(message)
message_dict[(log_group_type, log_stream_suffix)].append(message)
except Empty:
break

for log_stream_suffix, logs in message_dict.items():
for (log_group_type, log_stream_suffix), logs in message_dict.items():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one, I just know that we can directly unpack tuple here!

# get the log_group_name based on the log_group_type
log_group_name = (
self._otaclient_logs_metrics_group
if log_group_type == LogGroupType.METRICS
else self._otaclient_logs_log_group
)

with contextlib.suppress(Exception):
self.put_log_events(
log_group_name,
get_log_stream_name(
self._session_config.thing_name, log_stream_suffix
self._session_config.thing_name,
log_stream_suffix,
),
logs,
)
Expand Down
10 changes: 9 additions & 1 deletion src/otaclient_iot_logging_server/greengrass_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,20 @@ def aws_role_alias(self) -> str:

@computed_field
@property
def aws_cloudwatch_log_group(self) -> str:
def aws_cloudwatch_otaclient_logs_log_group(self) -> str:
return (
f"/aws/greengrass/edge/{self.region}/"
f"{self.account_id}/{self.profile}-edge-otaclient"
)

@computed_field
@property
def aws_cloudwatch_otaclient_metrics_log_group(self) -> str:
return (
f"/aws/greengrass/edge/{self.region}/"
f"{self.account_id}/{self.profile}-edge-otaclient-metrics"
)

@computed_field
@property
def aws_credential_refresh_url(self) -> str:
Expand Down
22 changes: 19 additions & 3 deletions src/otaclient_iot_logging_server/servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
from aiohttp import web
from aiohttp.web import Request

from otaclient_iot_logging_server._common import LogMessage, LogsQueue
from otaclient_iot_logging_server._common import LogGroupType, LogMessage, LogsQueue
from otaclient_iot_logging_server.ecu_info import ECUInfo
from otaclient_iot_logging_server.v1._types import (
ErrorCode,
HealthCheckResponse,
LogLevel,
LogType,
PutLogRequest,
PutLogResponse,
ServiceStatus,
Expand Down Expand Up @@ -59,9 +60,18 @@ def __init__(
"no ecu_info.yaml presented, logging upload filtering is DISABLED"
)

def convert_from_log_type_to_log_group_type(self, log_type):
"""
Convert input log type to log group type
"""
if log_type == LogType.METRICS:
return LogGroupType.METRICS
return LogGroupType.LOG

async def _put_log(
self,
ecu_id: str,
log_type: LogType = LogType.LOG,
timestamp: int = None,
level: LogLevel = LogLevel.UNSPECIFIC,
message: str = "",
Expand All @@ -78,6 +88,7 @@ async def _put_log(
if self._allowed_ecus and ecu_id not in self._allowed_ecus:
return ErrorCode.NOT_ALLOWED_ECU_ID

_logging_group_type = self.convert_from_log_type_to_log_group_type(log_type)
if timestamp is None:
timestamp = int(time.time()) * 1000 # milliseconds
_logging_msg = LogMessage(
Expand All @@ -86,7 +97,7 @@ async def _put_log(
)
# logger.debug(f"receive log from {ecu_id}: {_logging_msg}")
try:
self._queue.put_nowait((ecu_id, _logging_msg))
self._queue.put_nowait((_logging_group_type, ecu_id, _logging_msg))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put group type to queue, then aws_iot_logger thread will get and handle it.

except Full:
logger.debug(f"message dropped: {_logging_msg}")
return ErrorCode.SERVER_QUEUE_FULL
Expand Down Expand Up @@ -123,11 +134,16 @@ async def grpc_put_log(self, request: PutLogRequest) -> PutLogResponse:
put log message from gRPC request
"""
_ecu_id = request.ecu_id
_log_type = request.log_type
_timestamp = request.timestamp
_level = request.level
_message = request.message

_code = await self._put_log(
ecu_id=_ecu_id, timestamp=_timestamp, level=_level, message=_message
ecu_id=_ecu_id,
log_type=_log_type,
timestamp=_timestamp,
level=_level,
message=_message,
)
return PutLogResponse(code=_code)
7 changes: 4 additions & 3 deletions tests/test__log_setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from queue import Queue

import otaclient_iot_logging_server._log_setting
from otaclient_iot_logging_server._common import LogsQueue
from otaclient_iot_logging_server._common import LogGroupType, LogsQueue
from otaclient_iot_logging_server._log_setting import _LogTeeHandler # type: ignore

MODULE = otaclient_iot_logging_server._log_setting.__name__
Expand All @@ -39,5 +39,6 @@ def test_server_logger():
logger.removeHandler(_handler)
# ------ check result ------ #
_log = _queue.get_nowait()
assert _log[0] == suffix
assert _log[1]
assert _log[0] == LogGroupType.LOG
assert _log[1] == suffix
assert _log[2]
Loading
Loading