Skip to content

Commit 3e59bb5

Browse files
committed
feat: support metrics log stream
1 parent d0318df commit 3e59bb5

8 files changed

+141
-60
lines changed

src/otaclient_iot_logging_server/_common.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,19 @@
1515

1616
from __future__ import annotations
1717

18+
from enum import Enum
1819
from queue import Queue
1920
from typing import Literal, TypedDict
2021

2122
from typing_extensions import NotRequired, TypeAlias
2223

23-
LogsQueue: TypeAlias = "Queue[tuple[str, LogMessage]]"
24+
# LogQueue is a queue of LogGroupType, ecu_id, and LogMessage
25+
LogsQueue: TypeAlias = "Queue[tuple[LogGroupType, str, LogMessage]]"
26+
27+
28+
class LogGroupType(Enum):
29+
LOG = "LOG"
30+
METRICS = "METRICS"
2431

2532

2633
class LogMessage(TypedDict):

src/otaclient_iot_logging_server/_log_setting.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818
import contextlib
1919
import logging
2020
import time
21-
from queue import Queue
2221

2322
from otaclient_iot_logging_server import package_name as root_package_name
24-
from otaclient_iot_logging_server._common import LogMessage
23+
from otaclient_iot_logging_server._common import LogGroupType, LogMessage, LogsQueue
2524
from otaclient_iot_logging_server.configs import server_cfg
2625

2726

@@ -30,7 +29,7 @@ class _LogTeeHandler(logging.Handler):
3029

3130
def __init__(
3231
self,
33-
queue: Queue[tuple[str, LogMessage]],
32+
queue: LogsQueue,
3433
logstream_suffix: str,
3534
) -> None:
3635
super().__init__()
@@ -41,6 +40,7 @@ def emit(self, record: logging.LogRecord) -> None:
4140
with contextlib.suppress(Exception):
4241
self._queue.put_nowait(
4342
(
43+
LogGroupType.LOG, # always put local log into log group
4444
self._logstream_suffix,
4545
LogMessage(
4646
timestamp=int(time.time()) * 1000, # milliseconds
@@ -51,7 +51,7 @@ def emit(self, record: logging.LogRecord) -> None:
5151

5252

5353
def config_logging(
54-
queue: Queue[tuple[str, LogMessage]],
54+
queue: LogsQueue,
5555
*,
5656
log_format: str,
5757
level: str,

src/otaclient_iot_logging_server/aws_iot_logger.py

+56-34
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,12 @@
2626
import awscrt.exceptions
2727
from typing_extensions import NoReturn
2828

29-
from otaclient_iot_logging_server._common import LogEvent, LogMessage, LogsQueue
29+
from otaclient_iot_logging_server._common import (
30+
LogEvent,
31+
LogGroupType,
32+
LogMessage,
33+
LogsQueue,
34+
)
3035
from otaclient_iot_logging_server._utils import retry
3136
from otaclient_iot_logging_server.boto3_session import get_session
3237
from otaclient_iot_logging_server.configs import server_cfg
@@ -68,40 +73,45 @@ def __init__(
6873

6974
self._session_config = session_config
7075
self._log_group_name = session_config.aws_cloudwatch_log_group
76+
self._metrics_group_name = session_config.aws_cloudwatch_metrics_log_group
7177
self._interval = interval
7278
self._queue: LogsQueue = queue
7379
# NOTE: add this limitation to ensure all of the log_streams in a merge
7480
# will definitely have entries less than MAX_LOGS_PER_PUT
7581
self._max_logs_per_merge = min(max_logs_per_merge, self.MAX_LOGS_PER_PUT)
7682

7783
@retry(max_retry=16, backoff_factor=2, backoff_max=32)
78-
def _create_log_group(self):
84+
def _create_log_groups(self):
7985
# TODO: (20240214) should we let the edge side iot_logging_server
8086
# create the log group?
81-
log_group_name, client = self._log_group_name, self._client
87+
log_group_names = [self._log_group_name, self._metrics_group_name]
88+
client = self._client
8289
exc_types = self._exc_types
83-
try:
84-
client.create_log_group(logGroupName=log_group_name)
85-
logger.info(f"{log_group_name=} has been created")
86-
except exc_types.ResourceAlreadyExistsException as e:
87-
logger.debug(
88-
f"{log_group_name=} already existed, skip creating: {e.response}"
89-
)
90-
except ValueError as e:
91-
if e.__cause__ and isinstance(e.__cause__, awscrt.exceptions.AwsCrtError):
92-
logger.error(
93-
(f"failed to create mtls connection to remote: {e.__cause__}")
90+
for log_group_name in log_group_names:
91+
try:
92+
client.create_log_group(logGroupName=log_group_name)
93+
logger.info(f"{log_group_name=} has been created")
94+
except exc_types.ResourceAlreadyExistsException as e:
95+
logger.debug(
96+
f"{log_group_name=} already existed, skip creating: {e.response}"
9497
)
95-
raise e.__cause__ from None
96-
logger.error(f"failed to create {log_group_name=}: {e!r}")
97-
raise
98-
except Exception as e:
99-
logger.error(f"failed to create {log_group_name=}: {e!r}")
100-
raise
98+
except ValueError as e:
99+
if e.__cause__ and isinstance(
100+
e.__cause__, awscrt.exceptions.AwsCrtError
101+
):
102+
logger.error(
103+
(f"failed to create mtls connection to remote: {e.__cause__}")
104+
)
105+
raise e.__cause__ from None
106+
logger.error(f"failed to create {log_group_name=}: {e!r}")
107+
raise
108+
except Exception as e:
109+
logger.error(f"failed to create {log_group_name=}: {e!r}")
110+
raise
101111

102112
@retry(max_retry=16, backoff_factor=2, backoff_max=32)
103-
def _create_log_stream(self, log_stream_name: str):
104-
log_group_name, client = self._log_group_name, self._client
113+
def _create_log_stream(self, log_group_name: str, log_stream_name: str):
114+
client = self._client
105115
exc_types = self._exc_types
106116
try:
107117
client.create_log_stream(
@@ -126,7 +136,9 @@ def _create_log_stream(self, log_stream_name: str):
126136
raise
127137

128138
@retry(backoff_factor=2)
129-
def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]):
139+
def put_log_events(
140+
self, log_group_name: str, log_stream_name: str, message_list: list[LogMessage]
141+
):
130142
"""
131143
Ref:
132144
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/put_log_events.html
@@ -137,7 +149,7 @@ def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]):
137149
See the documentation for more details.
138150
"""
139151
request = LogEvent(
140-
logGroupName=self._log_group_name,
152+
logGroupName=log_group_name,
141153
logStreamName=log_stream_name,
142154
logEvents=message_list,
143155
)
@@ -148,44 +160,54 @@ def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]):
148160
# logger.debug(f"successfully uploaded: {response}")
149161
except exc_types.ResourceNotFoundException as e:
150162
logger.debug(f"{log_stream_name=} not found: {e!r}")
151-
self._create_log_stream(log_stream_name)
163+
self._create_log_stream(log_group_name, log_stream_name)
152164
raise
153165
except Exception as e:
154166
# NOTE: for unhandled exception, we just log it and ignore,
155167
# leave for the developer to properly handle it
156168
# in the future!
157169
logger.error(
158170
f"put_log_events failure: {e!r}\n"
159-
f"log_group_name={self._log_group_name}, \n"
171+
f"log_group_name={log_group_name}, \n"
160172
f"log_stream_name={log_stream_name}"
161173
)
162174

163175
def thread_main(self) -> NoReturn:
164176
"""Main entry for running this iot_logger in a thread."""
165177
# unconditionally create log_group and log_stream, do nothing if existed.
166-
self._create_log_group()
178+
self._create_log_groups()
167179

168180
while True:
169181
# merge LogMessages into the same source, identified by
170-
# log_stream_suffix.
171-
message_dict: dict[str, list[LogMessage]] = defaultdict(list)
182+
# log_group_type and log_stream_suffix.
183+
message_dict: dict[
184+
(log_group_type, log_stream_suffix), list[LogMessage]
185+
] = defaultdict(list)
172186

173187
_merge_count = 0
174188
while _merge_count < self._max_logs_per_merge:
175189
_queue = self._queue
176190
try:
177-
log_stream_suffix, message = _queue.get_nowait()
191+
log_group_type, log_stream_suffix, message = _queue.get_nowait()
178192
_merge_count += 1
179-
180-
message_dict[log_stream_suffix].append(message)
193+
message_dict[(log_group_type, log_stream_suffix)].append(message)
181194
except Empty:
182195
break
183196

184-
for log_stream_suffix, logs in message_dict.items():
197+
for (log_group_type, log_stream_suffix), logs in message_dict.items():
198+
# get the log_group_name based on the log_group_type
199+
log_group_name = (
200+
self._metrics_group_name
201+
if log_group_type == LogGroupType.METRICS
202+
else self._log_group_name
203+
)
204+
185205
with contextlib.suppress(Exception):
186206
self.put_log_events(
207+
log_group_name,
187208
get_log_stream_name(
188-
self._session_config.thing_name, log_stream_suffix
209+
self._session_config.thing_name,
210+
log_stream_suffix,
189211
),
190212
logs,
191213
)

src/otaclient_iot_logging_server/greengrass_config.py

+8
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,14 @@ def aws_cloudwatch_log_group(self) -> str:
237237
f"{self.account_id}/{self.profile}-edge-otaclient"
238238
)
239239

240+
@computed_field
241+
@property
242+
def aws_cloudwatch_metrics_log_group(self) -> str:
243+
return (
244+
f"/aws/greengrass/edge/{self.region}/"
245+
f"{self.account_id}/{self.profile}-edge-otaclient-metrics"
246+
)
247+
240248
@computed_field
241249
@property
242250
def aws_credential_refresh_url(self) -> str:

src/otaclient_iot_logging_server/servicer.py

+19-3
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@
2323
from aiohttp import web
2424
from aiohttp.web import Request
2525

26-
from otaclient_iot_logging_server._common import LogMessage, LogsQueue
26+
from otaclient_iot_logging_server._common import LogGroupType, LogMessage, LogsQueue
2727
from otaclient_iot_logging_server.ecu_info import ECUInfo
2828
from otaclient_iot_logging_server.v1._types import (
2929
ErrorCode,
3030
HealthCheckResponse,
3131
LogLevel,
32+
LogType,
3233
PutLogRequest,
3334
PutLogResponse,
3435
ServiceStatus,
@@ -59,9 +60,18 @@ def __init__(
5960
"no ecu_info.yaml presented, logging upload filtering is DISABLED"
6061
)
6162

63+
def convert_from_log_type_to_log_group_type(self, log_type):
64+
"""
65+
Convert input log type to log group type
66+
"""
67+
if log_type == LogType.METRICS:
68+
return LogGroupType.METRICS
69+
return LogGroupType.LOG
70+
6271
async def _put_log(
6372
self,
6473
ecu_id: str,
74+
log_type: LogType = LogType.LOG,
6575
timestamp: int = None,
6676
level: LogLevel = LogLevel.UNSPECIFIC,
6777
message: str = "",
@@ -78,6 +88,7 @@ async def _put_log(
7888
if self._allowed_ecus and ecu_id not in self._allowed_ecus:
7989
return ErrorCode.NOT_ALLOWED_ECU_ID
8090

91+
_logging_group_type = self.convert_from_log_type_to_log_group_type(log_type)
8192
if timestamp is None:
8293
timestamp = int(time.time()) * 1000 # milliseconds
8394
_logging_msg = LogMessage(
@@ -86,7 +97,7 @@ async def _put_log(
8697
)
8798
# logger.debug(f"receive log from {ecu_id}: {_logging_msg}")
8899
try:
89-
self._queue.put_nowait((ecu_id, _logging_msg))
100+
self._queue.put_nowait((_logging_group_type, ecu_id, _logging_msg))
90101
except Full:
91102
logger.debug(f"message dropped: {_logging_msg}")
92103
return ErrorCode.SERVER_QUEUE_FULL
@@ -123,11 +134,16 @@ async def grpc_put_log(self, request: PutLogRequest) -> PutLogResponse:
123134
put log message from gRPC request
124135
"""
125136
_ecu_id = request.ecu_id
137+
_log_type = request.log_type
126138
_timestamp = request.timestamp
127139
_level = request.level
128140
_message = request.message
129141

130142
_code = await self._put_log(
131-
ecu_id=_ecu_id, timestamp=_timestamp, level=_level, message=_message
143+
ecu_id=_ecu_id,
144+
log_type=_log_type,
145+
timestamp=_timestamp,
146+
level=_level,
147+
message=_message,
132148
)
133149
return PutLogResponse(code=_code)

tests/test__log_setting.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from queue import Queue
2020

2121
import otaclient_iot_logging_server._log_setting
22-
from otaclient_iot_logging_server._common import LogsQueue
22+
from otaclient_iot_logging_server._common import LogGroupType, LogsQueue
2323
from otaclient_iot_logging_server._log_setting import _LogTeeHandler # type: ignore
2424

2525
MODULE = otaclient_iot_logging_server._log_setting.__name__
@@ -39,5 +39,6 @@ def test_server_logger():
3939
logger.removeHandler(_handler)
4040
# ------ check result ------ #
4141
_log = _queue.get_nowait()
42-
assert _log[0] == suffix
43-
assert _log[1]
42+
assert _log[0] == LogGroupType.LOG
43+
assert _log[1] == suffix
44+
assert _log[2]

0 commit comments

Comments
 (0)