-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcratedb.py
242 lines (198 loc) · 8.51 KB
/
cratedb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
"""
Description: This file contains the class CrateDBConnection to connect
to a CrateDB and query data from it.
Author: Martin Altenburger
"""
import pandas as pd
import crate.client
class CrateDBConnection():
"""
Class for a connection to a CrateDB
Args:
- crate_db_url: URL of the CrateDB
- crate_db_user: Name of the User of CrateDB
- crate_db_pw: Password of the User of CrateDB
- crate_db_ssl: Verify the SSL-Cert?
"""
def __init__(
self,
crate_db_url:str,
crate_db_user:str = None,
crate_db_pw:str = None,
crate_db_ssl:bool = False
) -> None:
self.crate_db_url = crate_db_url
self.crate_db_user = crate_db_user
self.crate_db_pw = crate_db_pw
self.crate_db_ssl = crate_db_ssl
def get_data(self,
service:str,
entity:str,
entity_type:str,
attributes:list,
from_date:str,
to_date:str,
limit:int = 100000
):
"""
Function to query data from cratedb
Args:
- service: Name of the Fiware Service
- entity: ID of the entity
- entity_type: type of the entity
- attributes: list of attribute names
- from_date: timestamp from which data is to be retrieved \
(Milliseconds or Datetime (%Y-%m-%dT%H:%M:%S%z))
- to_date: timestamp up to which data is to be retrieved \
(Milliseconds or Datetime (%Y-%m-%dT%H:%M:%S%z))
- limit: maximal number of datapoints
Return:
- dataframe with time index in utc and attributes as columns
"""
if self.crate_db_user is None:
connection = crate.client.connect(self.crate_db_url)
else:
connection = crate.client.connect(self.crate_db_url,
username=self.crate_db_user,
password=self.crate_db_pw,
verify_ssl_cert=self.crate_db_ssl)
attrs = "'time_index'"
for attribute in list(attributes):
attrs += ", '" + str(attribute) + "'"
cursor = connection.cursor()
# check which column exists
sql_query = f"""SELECT column_name FROM information_schema.columns
WHERE table_name = 'et{entity_type}' AND column_name IN ({attrs})"""
cursor.execute(sql_query)
attributes_db = list(set([column[0] for column in cursor.fetchall()]))
# query existing columns
attrs = ""
for attribute in attributes_db:
if attribute == attributes_db[-1]:
attrs += '"' + str(attribute) + '"'
else:
attrs += '"' + str(attribute) + '", '
sql_query = f"""SELECT {attrs} FROM mt{service}.et{entity_type}
WHERE entity_id = '{entity}'
AND time_index > '{from_date}' AND time_index < '{to_date}' limit {limit}"""
cursor.execute(sql_query)
results = cursor.fetchall()
if len(results) > 0:
df = pd.DataFrame(results)
columns = [desc[0] for desc in cursor.description]
df.columns = columns
df.time_index = pd.to_datetime(df.time_index, unit='ms').dt.tz_localize('UTC')
df.rename(columns={"time_index":"datetime"}, inplace=True)
df.set_index(keys="datetime", drop=True, inplace=True)
df = df.groupby(by='datetime').mean()
else:
df = pd.DataFrame()
cursor.close()
connection.close()
return df
def get_data_grouped(self,
service:str,
entity:str,
entity_type:str,
attributes:list,
from_date:str,
to_date:str,
limit:int = 1000000
):
"""
Function to query data from cratedb in fiware-style with grouped data,
query is faster and the response includes no null values
Args:
- service: Name of the Fiware Service
- entity: ID of the entity
- entity_type: type of the entity
- attributes: list of attribute names
- from_date: timestamp from which data is to be retrieved \
(Milliseconds or Datetime (%Y-%m-%dT%H:%M:%S%z))
- to_date: timestamp up to which data is to be retrieved \
(Milliseconds or Datetime (%Y-%m-%dT%H:%M:%S%z))
- limit: maximal number of datapoints
Return:
- dataframe with time index in utc and attributes as columns
"""
# connect to database
if self.crate_db_user is None:
connection = crate.client.connect(self.crate_db_url)
else:
connection = crate.client.connect(self.crate_db_url,
username=self.crate_db_user,
password=self.crate_db_pw,
verify_ssl_cert=self.crate_db_ssl)
attrs = "'time_index'"
for attribute in list(attributes):
attrs += ", '" + str(attribute) + "'"
cursor = connection.cursor()
# check which column exists
sql_query = f"""SELECT column_name FROM information_schema.columns
WHERE table_name = 'et{entity_type}' AND column_name IN ({attrs})"""
cursor.execute(sql_query)
attributes_db = list(set([column[0] for column in cursor.fetchall()]))
if "time_index" in attributes_db:
attributes_db.remove("time_index")
# query existing columns
attrs = ""
for attribute in attributes_db:
if attribute == attributes_db[-1]:
attrs += 'AVG("' + str(attribute) + '") AS "' + str(attribute) + '"'
else:
attrs += 'AVG("' + str(attribute) + '") AS "' + str(attribute) + '", '
sql_query = f"""SELECT date_trunc('second', time_index) as time_index, {attrs}
FROM mt{service}.et{entity_type} WHERE entity_id = '{entity}'
AND time_index > '{from_date}' AND time_index < '{to_date}' AND ("""
for attr in attributes_db:
if attr == attributes_db[-1]:
sql_query += f""""{attr}" IS NOT Null)"""
else:
sql_query += f""""{attr}" IS NOT Null OR """
sql_query+= """ GROUP BY time_index ORDER BY time_index"""
sql_query+= f""" limit {limit}"""
cursor.execute(sql_query)
results = cursor.fetchall()
if len(results) > 0:
df = pd.DataFrame(results)
# get column names
columns = [desc[0] for desc in cursor.description]
df.columns = columns
# set timeindex in utc
df.time_index = pd.to_datetime(df.time_index, unit='ms').dt.tz_localize('UTC')
df.rename(columns={"time_index":"datetime"}, inplace=True)
df.set_index(keys="datetime", drop=True, inplace=True)
df = df.groupby(by='datetime').mean(numeric_only=False)
else:
df = pd.DataFrame()
cursor.close()
connection.close()
return df
def get_columns(self,
service:str,
entity_type:str,
):
"""
Function to query columns from special table
Args:
- service: Name of the Fiware Service (like table_schema)
- entity_type: type of the entity (like table_name)
Return:
- dataframe with column names and column datatypes
"""
if self.crate_db_user is None:
connection = crate.client.connect(self.crate_db_url)
else:
connection = crate.client.connect(self.crate_db_url,
username=self.crate_db_user,
password=self.crate_db_pw,
verify_ssl_cert=self.crate_db_ssl)
cursor = connection.cursor()
# get columns
sql_query = f"""SHOW COLUMNS FROM "et{entity_type}" IN "mt{service}" """
cursor.execute(sql_query)
df = pd.DataFrame(cursor.fetchall())
df.columns = [desc[0] for desc in cursor.description]
cursor.close()
connection.close()
return df