diff --git a/src/otaclient_iot_logging_server/_common.py b/src/otaclient_iot_logging_server/_common.py index 438e4188..ea0e0e81 100644 --- a/src/otaclient_iot_logging_server/_common.py +++ b/src/otaclient_iot_logging_server/_common.py @@ -15,12 +15,19 @@ from __future__ import annotations +from enum import Enum from queue import Queue from typing import Literal, TypedDict from typing_extensions import NotRequired, TypeAlias -LogsQueue: TypeAlias = "Queue[tuple[str, LogMessage]]" +# LogQueue is a queue of LogGroupType, ecu_id, and LogMessage +LogsQueue: TypeAlias = "Queue[tuple[LogGroupType, str, LogMessage]]" + + +class LogGroupType(Enum): + LOG = "LOG" + METRICS = "METRICS" class LogMessage(TypedDict): diff --git a/src/otaclient_iot_logging_server/_log_setting.py b/src/otaclient_iot_logging_server/_log_setting.py index f1c0ee52..39ba1fbf 100644 --- a/src/otaclient_iot_logging_server/_log_setting.py +++ b/src/otaclient_iot_logging_server/_log_setting.py @@ -18,10 +18,9 @@ import contextlib import logging import time -from queue import Queue from otaclient_iot_logging_server import package_name as root_package_name -from otaclient_iot_logging_server._common import LogMessage +from otaclient_iot_logging_server._common import LogGroupType, LogMessage, LogsQueue from otaclient_iot_logging_server.configs import server_cfg @@ -30,7 +29,7 @@ class _LogTeeHandler(logging.Handler): def __init__( self, - queue: Queue[tuple[str, LogMessage]], + queue: LogsQueue, logstream_suffix: str, ) -> None: super().__init__() @@ -41,6 +40,7 @@ def emit(self, record: logging.LogRecord) -> None: with contextlib.suppress(Exception): self._queue.put_nowait( ( + LogGroupType.LOG, # always put local log into log group self._logstream_suffix, LogMessage( timestamp=int(time.time()) * 1000, # milliseconds @@ -51,7 +51,7 @@ def emit(self, record: logging.LogRecord) -> None: def config_logging( - queue: Queue[tuple[str, LogMessage]], + queue: LogsQueue, *, log_format: str, level: str, diff --git a/src/otaclient_iot_logging_server/aws_iot_logger.py b/src/otaclient_iot_logging_server/aws_iot_logger.py index bed7b743..1317cb85 100644 --- a/src/otaclient_iot_logging_server/aws_iot_logger.py +++ b/src/otaclient_iot_logging_server/aws_iot_logger.py @@ -26,7 +26,12 @@ import awscrt.exceptions from typing_extensions import NoReturn -from otaclient_iot_logging_server._common import LogEvent, LogMessage, LogsQueue +from otaclient_iot_logging_server._common import ( + LogEvent, + LogGroupType, + LogMessage, + LogsQueue, +) from otaclient_iot_logging_server._utils import retry from otaclient_iot_logging_server.boto3_session import get_session from otaclient_iot_logging_server.configs import server_cfg @@ -67,7 +72,12 @@ def __init__( self._exc_types = client.exceptions self._session_config = session_config - self._log_group_name = session_config.aws_cloudwatch_log_group + self._otaclient_logs_log_group = ( + session_config.aws_cloudwatch_otaclient_logs_log_group + ) + self._otaclient_logs_metrics_group = ( + session_config.aws_cloudwatch_otaclient_metrics_log_group + ) self._interval = interval self._queue: LogsQueue = queue # NOTE: add this limitation to ensure all of the log_streams in a merge @@ -75,33 +85,40 @@ def __init__( self._max_logs_per_merge = min(max_logs_per_merge, self.MAX_LOGS_PER_PUT) @retry(max_retry=16, backoff_factor=2, backoff_max=32) - def _create_log_group(self): + def _create_log_groups(self): # TODO: (20240214) should we let the edge side iot_logging_server # create the log group? - log_group_name, client = self._log_group_name, self._client + log_group_names = [ + self._otaclient_logs_log_group, + self._otaclient_logs_metrics_group, + ] + client = self._client exc_types = self._exc_types - try: - client.create_log_group(logGroupName=log_group_name) - logger.info(f"{log_group_name=} has been created") - except exc_types.ResourceAlreadyExistsException as e: - logger.debug( - f"{log_group_name=} already existed, skip creating: {e.response}" - ) - except ValueError as e: - if e.__cause__ and isinstance(e.__cause__, awscrt.exceptions.AwsCrtError): - logger.error( - (f"failed to create mtls connection to remote: {e.__cause__}") + for log_group_name in log_group_names: + try: + client.create_log_group(logGroupName=log_group_name) + logger.info(f"{log_group_name=} has been created") + except exc_types.ResourceAlreadyExistsException as e: + logger.debug( + f"{log_group_name=} already existed, skip creating: {e.response}" ) - raise e.__cause__ from None - logger.error(f"failed to create {log_group_name=}: {e!r}") - raise - except Exception as e: - logger.error(f"failed to create {log_group_name=}: {e!r}") - raise + except ValueError as e: + if e.__cause__ and isinstance( + e.__cause__, awscrt.exceptions.AwsCrtError + ): + logger.error( + (f"failed to create mtls connection to remote: {e.__cause__}") + ) + raise e.__cause__ from None + logger.error(f"failed to create {log_group_name=}: {e!r}") + raise + except Exception as e: + logger.error(f"failed to create {log_group_name=}: {e!r}") + raise @retry(max_retry=16, backoff_factor=2, backoff_max=32) - def _create_log_stream(self, log_stream_name: str): - log_group_name, client = self._log_group_name, self._client + def _create_log_stream(self, log_group_name: str, log_stream_name: str): + client = self._client exc_types = self._exc_types try: client.create_log_stream( @@ -126,7 +143,9 @@ def _create_log_stream(self, log_stream_name: str): raise @retry(backoff_factor=2) - def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]): + def put_log_events( + self, log_group_name: str, log_stream_name: str, message_list: list[LogMessage] + ): """ Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/put_log_events.html @@ -137,7 +156,7 @@ def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]): See the documentation for more details. """ request = LogEvent( - logGroupName=self._log_group_name, + logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=message_list, ) @@ -148,7 +167,7 @@ def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]): # logger.debug(f"successfully uploaded: {response}") except exc_types.ResourceNotFoundException as e: logger.debug(f"{log_stream_name=} not found: {e!r}") - self._create_log_stream(log_stream_name) + self._create_log_stream(log_group_name, log_stream_name) raise except Exception as e: # NOTE: for unhandled exception, we just log it and ignore, @@ -156,36 +175,46 @@ def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]): # in the future! logger.error( f"put_log_events failure: {e!r}\n" - f"log_group_name={self._log_group_name}, \n" + f"log_group_name={log_group_name}, \n" f"log_stream_name={log_stream_name}" ) def thread_main(self) -> NoReturn: """Main entry for running this iot_logger in a thread.""" # unconditionally create log_group and log_stream, do nothing if existed. - self._create_log_group() + self._create_log_groups() while True: # merge LogMessages into the same source, identified by - # log_stream_suffix. - message_dict: dict[str, list[LogMessage]] = defaultdict(list) + # log_group_type and log_stream_suffix. + message_dict: dict[(LogGroupType, str), list[LogMessage]] = defaultdict( + list + ) _merge_count = 0 while _merge_count < self._max_logs_per_merge: _queue = self._queue try: - log_stream_suffix, message = _queue.get_nowait() + log_group_type, log_stream_suffix, message = _queue.get_nowait() _merge_count += 1 - - message_dict[log_stream_suffix].append(message) + message_dict[(log_group_type, log_stream_suffix)].append(message) except Empty: break - for log_stream_suffix, logs in message_dict.items(): + for (log_group_type, log_stream_suffix), logs in message_dict.items(): + # get the log_group_name based on the log_group_type + log_group_name = ( + self._otaclient_logs_metrics_group + if log_group_type == LogGroupType.METRICS + else self._otaclient_logs_log_group + ) + with contextlib.suppress(Exception): self.put_log_events( + log_group_name, get_log_stream_name( - self._session_config.thing_name, log_stream_suffix + self._session_config.thing_name, + log_stream_suffix, ), logs, ) diff --git a/src/otaclient_iot_logging_server/greengrass_config.py b/src/otaclient_iot_logging_server/greengrass_config.py index 11536c02..59661032 100644 --- a/src/otaclient_iot_logging_server/greengrass_config.py +++ b/src/otaclient_iot_logging_server/greengrass_config.py @@ -231,12 +231,20 @@ def aws_role_alias(self) -> str: @computed_field @property - def aws_cloudwatch_log_group(self) -> str: + def aws_cloudwatch_otaclient_logs_log_group(self) -> str: return ( f"/aws/greengrass/edge/{self.region}/" f"{self.account_id}/{self.profile}-edge-otaclient" ) + @computed_field + @property + def aws_cloudwatch_otaclient_metrics_log_group(self) -> str: + return ( + f"/aws/greengrass/edge/{self.region}/" + f"{self.account_id}/{self.profile}-edge-otaclient-metrics" + ) + @computed_field @property def aws_credential_refresh_url(self) -> str: diff --git a/src/otaclient_iot_logging_server/servicer.py b/src/otaclient_iot_logging_server/servicer.py index 5303c32e..91a77b7f 100644 --- a/src/otaclient_iot_logging_server/servicer.py +++ b/src/otaclient_iot_logging_server/servicer.py @@ -23,12 +23,13 @@ from aiohttp import web from aiohttp.web import Request -from otaclient_iot_logging_server._common import LogMessage, LogsQueue +from otaclient_iot_logging_server._common import LogGroupType, LogMessage, LogsQueue from otaclient_iot_logging_server.ecu_info import ECUInfo from otaclient_iot_logging_server.v1._types import ( ErrorCode, HealthCheckResponse, LogLevel, + LogType, PutLogRequest, PutLogResponse, ServiceStatus, @@ -59,9 +60,18 @@ def __init__( "no ecu_info.yaml presented, logging upload filtering is DISABLED" ) + def convert_from_log_type_to_log_group_type(self, log_type): + """ + Convert input log type to log group type + """ + if log_type == LogType.METRICS: + return LogGroupType.METRICS + return LogGroupType.LOG + async def _put_log( self, ecu_id: str, + log_type: LogType = LogType.LOG, timestamp: int = None, level: LogLevel = LogLevel.UNSPECIFIC, message: str = "", @@ -78,6 +88,7 @@ async def _put_log( if self._allowed_ecus and ecu_id not in self._allowed_ecus: return ErrorCode.NOT_ALLOWED_ECU_ID + _logging_group_type = self.convert_from_log_type_to_log_group_type(log_type) if timestamp is None: timestamp = int(time.time()) * 1000 # milliseconds _logging_msg = LogMessage( @@ -86,7 +97,7 @@ async def _put_log( ) # logger.debug(f"receive log from {ecu_id}: {_logging_msg}") try: - self._queue.put_nowait((ecu_id, _logging_msg)) + self._queue.put_nowait((_logging_group_type, ecu_id, _logging_msg)) except Full: logger.debug(f"message dropped: {_logging_msg}") return ErrorCode.SERVER_QUEUE_FULL @@ -123,11 +134,16 @@ async def grpc_put_log(self, request: PutLogRequest) -> PutLogResponse: put log message from gRPC request """ _ecu_id = request.ecu_id + _log_type = request.log_type _timestamp = request.timestamp _level = request.level _message = request.message _code = await self._put_log( - ecu_id=_ecu_id, timestamp=_timestamp, level=_level, message=_message + ecu_id=_ecu_id, + log_type=_log_type, + timestamp=_timestamp, + level=_level, + message=_message, ) return PutLogResponse(code=_code) diff --git a/tests/test__log_setting.py b/tests/test__log_setting.py index 67a45500..78cc9f4d 100644 --- a/tests/test__log_setting.py +++ b/tests/test__log_setting.py @@ -19,7 +19,7 @@ from queue import Queue import otaclient_iot_logging_server._log_setting -from otaclient_iot_logging_server._common import LogsQueue +from otaclient_iot_logging_server._common import LogGroupType, LogsQueue from otaclient_iot_logging_server._log_setting import _LogTeeHandler # type: ignore MODULE = otaclient_iot_logging_server._log_setting.__name__ @@ -39,5 +39,6 @@ def test_server_logger(): logger.removeHandler(_handler) # ------ check result ------ # _log = _queue.get_nowait() - assert _log[0] == suffix - assert _log[1] + assert _log[0] == LogGroupType.LOG + assert _log[1] == suffix + assert _log[2] diff --git a/tests/test_aws_iot_logger.py b/tests/test_aws_iot_logger.py index 378efc53..33bb8f8b 100644 --- a/tests/test_aws_iot_logger.py +++ b/tests/test_aws_iot_logger.py @@ -28,7 +28,7 @@ from pytest_mock import MockerFixture import otaclient_iot_logging_server.aws_iot_logger -from otaclient_iot_logging_server._common import LogMessage, LogsQueue +from otaclient_iot_logging_server._common import LogGroupType, LogMessage, LogsQueue from otaclient_iot_logging_server.aws_iot_logger import ( AWSIoTLogger, get_log_stream_name, @@ -76,10 +76,13 @@ def generate_random_msgs( ) -> list[tuple[str, LogMessage]]: _res: list[tuple[str, LogMessage]] = [] for _ in range(msg_num): - _ecu, *_ = random.sample(ecus_list, 1) + _ecu_id, *_ = random.sample(ecus_list, 1) + _log_group_type = random.choice(list(LogGroupType)) _msg = os.urandom(msg_len).hex() _timestamp = int(time.time()) * 1000 # milliseconds - _res.append((_ecu, LogMessage(timestamp=_timestamp, message=_msg))) + _res.append( + (_log_group_type, _ecu_id, LogMessage(timestamp=_timestamp, message=_msg)) + ) return _res @@ -90,16 +93,27 @@ class TestAWSIoTLogger: class _TestFinished(Exception): pass - def _mocked_put_log_events(self, _ecu_id: str, _logs: list[LogMessage]): - self._test_result[_ecu_id] = _logs + def _mocked_put_log_events( + self, _log_group_name: str, _ecu_id: str, _logs: list[LogMessage] + ): + self._test_result[(_log_group_name, _ecu_id)] = _logs @pytest.fixture def prepare_test_data(self): + self._otaclient_logs_log_group = "some_log_group_name" # place holder + self._otaclient_logs_metrics_group = "some_metrics_group_name" # place holder + _msgs = generate_random_msgs(self.MSG_LEN, self.MSG_NUM) # prepare result for test_thread_main - _merged_msgs: dict[str, list[LogMessage]] = defaultdict(list) - for _ecu_id, _log_msg in _msgs: - _merged_msgs[_ecu_id].append(_log_msg) + _merged_msgs: dict[(LogGroupType, str), list[LogMessage]] = defaultdict(list) + for _log_group_type, _ecu_id, _log_msg in _msgs: + # get the log_group_name based on the log_group_type + _log_group_name = ( + self._otaclient_logs_metrics_group + if _log_group_type == LogGroupType.METRICS + else self._otaclient_logs_log_group + ) + _merged_msgs[(_log_group_name, _ecu_id)].append(_log_msg) self._merged_msgs = _merged_msgs # prepare the queue for test _queue: LogsQueue = Queue() @@ -123,7 +137,7 @@ def setup_test(self, prepare_test_data, mocker: MockerFixture): self._session_config = mocker.MagicMock() # place holder # for holding test results # mocked_send_messages will record each calls in this dict - self._test_result: dict[str, list[LogMessage]] = {} + self._test_result: dict[(LogGroupType, str), list[LogMessage]] = {} # mock get_log_stream_name to let it returns the log_stream_suffix # as it, make the test easier. # see get_log_stream_name signature for more details @@ -132,8 +146,8 @@ def setup_test(self, prepare_test_data, mocker: MockerFixture): def test_thread_main(self, mocker: MockerFixture): func_to_test = AWSIoTLogger.thread_main - self._create_log_group = mocked__create_log_group = mocker.MagicMock( - spec=AWSIoTLogger._create_log_group + self._create_log_groups = mocked__create_log_groups = mocker.MagicMock( + spec=AWSIoTLogger._create_log_groups ) # ------ execution ------ # @@ -142,6 +156,6 @@ def test_thread_main(self, mocker: MockerFixture): logger.info("execution finished") # ------ check result ------ # - mocked__create_log_group.assert_called_once() + mocked__create_log_groups.assert_called_once() # confirm the send_messages mock receives the expecting calls. assert self._merged_msgs == self._test_result diff --git a/tests/test_log_proxy_server.py b/tests/test_log_proxy_server.py index da45e80a..c76310d8 100644 --- a/tests/test_log_proxy_server.py +++ b/tests/test_log_proxy_server.py @@ -32,7 +32,7 @@ from pytest_mock import MockerFixture import otaclient_iot_logging_server.log_proxy_server as log_server_module -from otaclient_iot_logging_server._common import LogsQueue +from otaclient_iot_logging_server._common import LogGroupType, LogsQueue from otaclient_iot_logging_server.ecu_info import parse_ecu_info from otaclient_iot_logging_server.servicer import OTAClientIoTLoggingServerServicer from otaclient_iot_logging_server.v1 import _types @@ -198,7 +198,9 @@ async def test_http_server( # ensure the all msgs are sent in order to the queue by the server. logger.info("checking all the received messages...") for item in self._msgs: - _ecu_id, _log_msg = self._queue.get_nowait() + _log_group_type, _ecu_id, _log_msg = self._queue.get_nowait() + # always log type is LOG in HTTP + assert _log_group_type == LogGroupType.LOG assert _ecu_id == item.ecu_id assert _log_msg["message"] == item.message assert self._queue.empty() @@ -253,6 +255,14 @@ async def send_put_log_msg(item): _response = await stub.PutLog(_req) assert _response.code == pb2.ErrorCode.NO_FAILURE + def convert_from_log_type_to_log_group_type(log_type): + """ + Convert input log type to log group type + """ + if log_type == _types.LogType.METRICS: + return LogGroupType.METRICS + return LogGroupType.LOG + for item in self._msgs: await send_put_log_msg(item) @@ -260,8 +270,11 @@ async def send_put_log_msg(item): # ensure the all msgs are sent in order to the queue by the server. logger.info("checking all the received messages...") for item in self._msgs: - _ecu_id, _log_msg = self._queue.get_nowait() + _log_group_type, _ecu_id, _log_msg = self._queue.get_nowait() assert _ecu_id == item.ecu_id + assert _log_group_type == convert_from_log_type_to_log_group_type( + item.log_type + ) assert _log_msg["message"] == item.message assert self._queue.empty()