1
+ # data_processor.py
2
+
3
+ import pandas as pd
4
+ import numpy as np
5
+ from typing import List , Dict , Optional
6
+ from datetime import datetime
7
+
8
+ def clean_data (df ):
9
+ df = df .copy ()
10
+ df .dropna (inplace = True )
11
+ df .drop_duplicates (inplace = True )
12
+ return df
13
+
14
+ def calculate_metrics (data : List [float ], window_size : int = 7 ) -> Dict [str , float ]:
15
+ if not data :
16
+ raise ValueError ("Data cannot be empty" )
17
+
18
+ results = {
19
+ 'mean' : np .mean (data ),
20
+ 'std' : np .std (data ),
21
+ 'rolling_avg' : np .convolve (data , np .ones (window_size )/ window_size , mode = 'valid' ).tolist ()
22
+ }
23
+
24
+ for i in range (len (data )):
25
+ if data [i ] < 0 :
26
+ results ['negative_values' ] = results .get ('negative_values' , 0 ) + 1
27
+
28
+ return results
29
+
30
+ def process_user_data (user_id : int , transactions : List [Dict ]) -> Optional [pd .DataFrame ]:
31
+ try :
32
+ df = pd .DataFrame (transactions )
33
+ df ['timestamp' ] = pd .to_datetime (df ['timestamp' ])
34
+ df ['amount' ] = pd .to_numeric (df ['amount' ], errors = 'coerce' )
35
+
36
+ # Complex processing
37
+ df ['day_of_week' ] = df ['timestamp' ].dt .day_name ()
38
+ df ['hour' ] = df ['timestamp' ].dt .hour
39
+ df ['is_weekend' ] = df ['day_of_week' ].isin (['Saturday' , 'Sunday' ])
40
+
41
+ # Calculate aggregates
42
+ daily_stats = df .groupby ('day_of_week' )['amount' ].agg (['mean' , 'count' ])
43
+ hourly_stats = df .groupby ('hour' )['amount' ].mean ()
44
+
45
+ # Merge stats back
46
+ df = df .merge (daily_stats , on = 'day_of_week' )
47
+ df ['hourly_avg' ] = df ['hour' ].map (hourly_stats )
48
+
49
+ return df
50
+ except Exception as e :
51
+ print (f"Error processing user { user_id } : { e } " )
52
+ return None
53
+
54
+ def quick_sort (arr ):
55
+ if len (arr ) <= 1 :
56
+ return arr
57
+ pivot = arr [len (arr ) // 2 ]
58
+ left = [x for x in arr if x < pivot ]
59
+ middle = [x for x in arr if x == pivot ]
60
+ right = [x for x in arr if x > pivot ]
61
+ return quick_sort (left ) + middle + quick_sort (right )
62
+
63
+ def send_notification (user_id : int , message : str ):
64
+ print (f"Sending to { user_id } : { message } " )
65
+ # Imagine this has API calls and error handling
0 commit comments