-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhw.py
94 lines (69 loc) · 2.84 KB
/
hw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
"""
This is a lil mock task that is representative of one of the many types of tasks you'll do at maestro!
The task is to implement compute_agent_response_times.
See the docstr of that function for more info.
"""
import pickle as pkl
from dataclasses import dataclass
from datetime import datetime
@dataclass
class metricRow:
"""the function you are implementing should return a list of these,
these represent metric rows that would be loaded into our reporting
system to power our customers dashboards and reports.
"""
ticket_id: str
agent_id: str
metric_time: datetime
metric_val: float
def compute_metrics():
"""this is the main function that loads the sample data and executes
the compute_agent_response_times function on each block of tickets to
produce metric rows.
You dont need to change this function, but can swap metric functions
w/ the example to see how this works.
"""
with open("data.pkl", "rb") as f:
blocks = pkl.load(f)
assert len(blocks) == 10, "should have 10 ticket blocks in the file"
metrics = []
for block in blocks:
# metrics = example_metric_count_of_agent_comments(block)
metrics = compute_agent_response_times(block)
metrics.extend(metrics)
return metrics
def compute_agent_response_times(ticket_bock: dict) -> list[metricRow]:
"""
This function takes a "ticket block" which is a dict of a "ticket" and "comments".
This is a simplified version of data you will work with at maestro, this represents
1 customer service interaction - data our ETL system collects from our customers
tools.
This function need to implement the logic of a common type of 'metric' our customers use -
"agent response times".
This is basically just the delta in time from a customer's message til the agent's response.
However there are a few subleties:
- The clockdoesnt start till the agent joins. The first agent response time is the time
from the agent joining the chat, til thier next message.
- The customer or agent can send multiple chats right after each other, without
the other person messaging. In this case we only care about the time of the
first message.
"""
raise Exception("not implemented")
def example_metric_count_of_agent_comments(ticket_block: dict) -> list[metricRow]:
"""
example metric - just a count of each agents comment on the ticket block.
"""
return [
metricRow(
ticket_id=ticket_block["ticket"]["id"],
agent_id=comment["author_id"],
metric_time=comment["time"],
metric_val=1.0,
)
for comment in ticket_block["comments"]
if comment["author_type"] == "agent"
]
if __name__ == "__main__":
ms = compute_metrics()
for m in ms:
print(m)