Skip to content

Commit 1d4bd11

Browse files
authored
Feat: log handler (#2)
* add log handler that writes to questdb * remove accidental pycache commits
1 parent 0fd4faa commit 1d4bd11

11 files changed

+192
-6
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
poetry.lock
33
dist/
44
**/__pycache__/
5+
*.pyc

py_questdb/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
from py_questdb.db import QuestDB
22

3-
__all__ = ['QuestDB']
3+
__all__ = ["QuestDB"]
-148 Bytes
Binary file not shown.
-14.4 KB
Binary file not shown.
-3.73 KB
Binary file not shown.

py_questdb/db.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ def __init__(self, host: str = "localhost", port: int = 9000, username: str | No
3737
self.client.establish() # unclear if this blocks, API keeps changing!
3838
self.session = None
3939

40+
def flush(self) -> None:
41+
"""Flush the QuestDB client buffer."""
42+
self.client.flush()
43+
4044
def write(self, *messages: QuestMessage) -> None:
4145
"""
4246
Write multiple messages to QuestDB.
@@ -125,9 +129,7 @@ def parse_query_response(response: bytes) -> QuestDBResponse:
125129
return msgspec.json.decode(response, type=QuestDBResponse)
126130

127131
@staticmethod
128-
def parse_and_yield_query_response(
129-
response: QuestDBResponse, into_type: Type[T] | None
130-
) -> Iterable[T] | Iterable[dict]:
132+
def parse_and_yield_query_response(response: QuestDBResponse, into_type: Type[T] | None) -> Iterable[T] | Iterable[dict]:
131133
"""
132134
Parse and yield rows from a QuestDBResponse.
133135
@@ -143,8 +145,7 @@ def parse_and_yield_query_response(
143145
for row in response.dataset:
144146
converted_row = {
145147
field.name: converter(value) if value is not None else None
146-
for field, converter, value
147-
in zip(response.columns, type_converter, row)
148+
for field, converter, value in zip(response.columns, type_converter, row)
148149
}
149150

150151
yield into_type(**converted_row) if into_type else converted_row
@@ -238,6 +239,8 @@ def query_df_sync(self, query_string: str) -> pd.DataFrame:
238239

239240
async def close(self):
240241
"""Close the QuestDB connection and any open sessions."""
242+
self.flush()
243+
241244
if self.session:
242245
await self.session.close()
243246
self.client.close()
@@ -251,5 +254,7 @@ async def __aexit__(
251254
self, exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
252255
) -> None:
253256
"""Async context manager exit."""
257+
self.flush()
258+
254259
if self.session:
255260
await self.close()

py_questdb/log_handler.py

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"""
2+
USAGE:
3+
questdb_client = QuestDB()
4+
handler = QuestDBLogHandler(questdb_client=questdb_client)
5+
logger.addHandler(handler)
6+
"""
7+
8+
import logging
9+
from logging.handlers import QueueHandler, QueueListener
10+
from multiprocessing import Queue
11+
from datetime import datetime
12+
from typing import Optional
13+
from py_questdb import QuestDB
14+
15+
16+
class QuestDBLogHandler(QueueHandler):
17+
def __init__(self, questdb_client: QuestDB, table_name: str = "_logs", level: Optional[str] = None):
18+
self.queue = Queue()
19+
super().__init__(self.queue)
20+
21+
if level:
22+
self.setLevel(level)
23+
24+
self.inner_handler = InnerQuestDBLogHandler(questdb_client=questdb_client, table_name=table_name)
25+
26+
self.listener = QueueListener(self.queue, self.inner_handler, respect_handler_level=True)
27+
self.listener.start()
28+
29+
def prepare(self, record: logging.LogRecord) -> logging.LogRecord:
30+
if record.exc_info:
31+
record.exc_info = logging._defaultFormatter.formatException(record.exc_info) # noqa
32+
return record
33+
34+
def close(self) -> None:
35+
try:
36+
if self.listener:
37+
self.listener.stop()
38+
self.listener = None
39+
except Exception as e:
40+
print(f"Error stopping QueueListener: {e}")
41+
finally:
42+
self.inner_handler.questdb_client.flush()
43+
super().close()
44+
45+
def __del__(self):
46+
self.close()
47+
48+
49+
class InnerQuestDBLogHandler(logging.Handler):
50+
def __init__(self, questdb_client: QuestDB, table_name: str = "_logs"):
51+
super().__init__()
52+
self.questdb_client = questdb_client
53+
self.table_name = table_name
54+
55+
def emit(self, record: logging.LogRecord):
56+
try:
57+
print(f"Received {record=}")
58+
log_message = self.format_record(record)
59+
print(f"Writing {log_message=}")
60+
self.questdb_client.client.row(**log_message)
61+
except Exception as e:
62+
print(f"Failed to write log to QuestDB: {e}")
63+
64+
def format_record(self, record: logging.LogRecord) -> dict:
65+
print(record.stack_info)
66+
return {
67+
"table_name": self.table_name,
68+
"symbols": {
69+
"level": record.levelname,
70+
"logger": record.name,
71+
"filename": record.filename,
72+
"funcName": record.funcName,
73+
"module": record.module,
74+
"processName": record.processName,
75+
"threadName": record.threadName,
76+
"pathname": record.pathname,
77+
},
78+
"columns": {
79+
"message": record.getMessage(),
80+
"lineno": record.lineno,
81+
"process": record.process,
82+
"thread": record.thread,
83+
"exc_info": record.exc_info,
84+
"stack_info": record.stack_info,
85+
},
86+
"at": datetime.fromtimestamp(record.created),
87+
}
-143 Bytes
Binary file not shown.
Binary file not shown.
Binary file not shown.

tests/test_log_handler.py

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import pytest
2+
import logging
3+
from unittest.mock import MagicMock, patch
4+
from py_questdb import QuestDB
5+
from py_questdb.log_handler import QuestDBLogHandler, InnerQuestDBLogHandler
6+
7+
8+
def setup_logging(questdb_client: QuestDB):
9+
handler = QuestDBLogHandler(questdb_client)
10+
root_logger = logging.getLogger()
11+
root_logger.setLevel(logging.DEBUG)
12+
root_logger.addHandler(handler)
13+
return handler
14+
15+
16+
def worker_process(worker_id):
17+
logger = logging.getLogger(f"worker_{worker_id}")
18+
if worker_id == 2:
19+
try:
20+
raise ValueError(f"This is an exception from worker {worker_id}")
21+
except ValueError as e:
22+
logger.error(f"This is an error message from worker {worker_id} {e}", exc_info=True, stack_info=True)
23+
logger.info(f"This is an info message from worker {worker_id}")
24+
25+
26+
@pytest.fixture
27+
def mock_questdb_client():
28+
mock_client = MagicMock()
29+
mock_questdb = MagicMock(spec=QuestDB)
30+
mock_questdb.client = mock_client
31+
return mock_questdb
32+
33+
34+
@pytest.fixture
35+
def questdb_log_handler(mock_questdb_client):
36+
return QuestDBLogHandler(mock_questdb_client)
37+
38+
39+
def test_questdb_log_handler_init(mock_questdb_client):
40+
handler = QuestDBLogHandler(mock_questdb_client)
41+
assert isinstance(handler.inner_handler, InnerQuestDBLogHandler)
42+
assert handler.listener is not None
43+
44+
45+
def test_questdb_log_handler_prepare(questdb_log_handler):
46+
record = logging.LogRecord(
47+
name="test_logger", level=logging.INFO, pathname="test_path", lineno=42, msg="Test message", args=(), exc_info=None
48+
)
49+
prepared_record = questdb_log_handler.prepare(record)
50+
assert prepared_record == record
51+
52+
53+
def test_questdb_log_handler_close(questdb_log_handler):
54+
with patch.object(questdb_log_handler.listener, "stop") as mock_stop:
55+
questdb_log_handler.close()
56+
mock_stop.assert_called_once()
57+
assert questdb_log_handler.listener is None
58+
59+
60+
def test_inner_questdb_log_handler_emit(mock_questdb_client):
61+
handler = InnerQuestDBLogHandler(mock_questdb_client)
62+
record = logging.LogRecord(
63+
name="test_logger", level=logging.INFO, pathname="test_path", lineno=42, msg="Test message", args=(), exc_info=None
64+
)
65+
handler.emit(record)
66+
mock_questdb_client.client.row.assert_called_once()
67+
68+
69+
def test_setup_logging(mock_questdb_client):
70+
with patch("logging.getLogger") as mock_get_logger:
71+
mock_root_logger = MagicMock()
72+
mock_get_logger.return_value = mock_root_logger
73+
handler = setup_logging(mock_questdb_client)
74+
assert isinstance(handler, QuestDBLogHandler)
75+
mock_root_logger.setLevel.assert_called_with(logging.DEBUG)
76+
mock_root_logger.addHandler.assert_called_with(handler)
77+
78+
79+
@patch("logging.getLogger")
80+
def test_worker_process(mock_get_logger):
81+
mock_logger = MagicMock()
82+
mock_get_logger.return_value = mock_logger
83+
84+
# Test normal case
85+
worker_process(1)
86+
mock_logger.info.assert_called_with("This is an info message from worker 1")
87+
88+
# Test exception case
89+
worker_process(2)
90+
mock_logger.error.assert_called()
91+
assert "This is an error message from worker 2" in mock_logger.error.call_args[0][0]
92+
assert mock_logger.error.call_args[1]["exc_info"] is True
93+
assert mock_logger.error.call_args[1]["stack_info"] is True

0 commit comments

Comments
 (0)