-
Notifications
You must be signed in to change notification settings - Fork 3.1k
/
Copy pathreport.py
137 lines (111 loc) · 4.54 KB
/
report.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import dataclasses
import json
import logging
import pprint
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Optional, runtime_checkable
import humanfriendly
import pydantic
from pydantic import BaseModel
from typing_extensions import Literal, Protocol
from datahub.ingestion.api.report_helpers import format_datetime_relative
from datahub.utilities.lossy_collections import LossyList
logger = logging.getLogger(__name__)
LogLevel = Literal["ERROR", "WARNING", "INFO", "DEBUG"]
@runtime_checkable
class SupportsAsObj(Protocol):
def as_obj(self) -> dict: ...
@dataclass
class Report(SupportsAsObj):
@staticmethod
def to_str(some_val: Any) -> str:
if isinstance(some_val, Enum):
return some_val.name
else:
return str(some_val)
@staticmethod
def to_pure_python_obj(some_val: Any) -> Any:
"""A cheap way to generate a dictionary."""
if isinstance(some_val, SupportsAsObj):
return some_val.as_obj()
elif isinstance(some_val, pydantic.BaseModel):
return Report.to_pure_python_obj(some_val.dict())
elif dataclasses.is_dataclass(some_val) and not isinstance(some_val, type):
# The `is_dataclass` function returns `True` for both instances and classes.
# We need an extra check to ensure an instance was passed in.
# https://docs.python.org/3/library/dataclasses.html#dataclasses.is_dataclass
return dataclasses.asdict(some_val)
elif isinstance(some_val, list):
return [Report.to_pure_python_obj(v) for v in some_val if v is not None]
elif isinstance(some_val, timedelta):
return humanfriendly.format_timespan(some_val)
elif isinstance(some_val, datetime):
try:
return format_datetime_relative(some_val)
except Exception:
# we don't want to fail reporting because we were unable to pretty print a timestamp
return str(datetime)
elif isinstance(some_val, dict):
return {
Report.to_str(k): Report.to_pure_python_obj(v)
for k, v in some_val.items()
if v is not None
}
elif isinstance(some_val, (int, float, bool)):
return some_val
else:
# fall through option
return Report.to_str(some_val)
def compute_stats(self) -> None:
"""A hook to compute derived stats"""
pass
def as_obj(self) -> dict:
self.compute_stats()
return {
str(key): Report.to_pure_python_obj(value)
for (key, value) in self.__dict__.items()
# ignore nulls and fields starting with _
if value is not None and not str(key).startswith("_")
}
def as_string(self) -> str:
return pprint.pformat(self.as_obj(), width=150, sort_dicts=False)
def as_json(self) -> str:
return json.dumps(self.as_obj())
# TODO add helper method for warning / failure status + counts?
class ReportAttribute(BaseModel):
severity: LogLevel = "DEBUG"
help: Optional[str] = None
@property
def logger_sev(self) -> int:
log_levels = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
}
return log_levels[self.severity]
def log(self, msg: str) -> None:
logger.log(level=self.logger_sev, msg=msg, stacklevel=3)
class EntityFilterReport(ReportAttribute):
type: str
processed_entities: LossyList[str] = pydantic.Field(default_factory=LossyList)
dropped_entities: LossyList[str] = pydantic.Field(default_factory=LossyList)
def processed(self, entity: str, type: Optional[str] = None) -> None:
self.log(f"Processed {type or self.type} {entity}")
self.processed_entities.append(entity)
def dropped(self, entity: str, type: Optional[str] = None) -> None:
self.log(f"Filtered {type or self.type} {entity}")
self.dropped_entities.append(entity)
def as_obj(self) -> dict:
return {
"filtered": self.dropped_entities.as_obj(),
"processed": self.processed_entities.as_obj(),
}
@staticmethod
def field(type: str, severity: LogLevel = "DEBUG") -> "EntityFilterReport":
"""A helper to create a dataclass field."""
return dataclasses.field(
default_factory=lambda: EntityFilterReport(type=type, severity=severity)
)