-
Notifications
You must be signed in to change notification settings - Fork 33
Expand file tree
/
Copy pathdatabase.py
More file actions
105 lines (74 loc) · 3.04 KB
/
database.py
File metadata and controls
105 lines (74 loc) · 3.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import logging
from hashlib import shake_128
from typing import Optional, List, Dict, Union
import os
from flask import current_app
from sqlalchemy.dialects import postgresql
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql.elements import TextClause
from sqlalchemy.sql.selectable import Select
# debdeps: python3-clickhouse-driver
from clickhouse_driver import Client as Clickhouse
import clickhouse_driver.errors
# query_time = Summary("query", "query", ["hash", ], registry=metrics.registry)
Base = declarative_base()
log = logging.getLogger()
def _gen_application_name(): # pragma: no cover
try:
machine_id = "/etc/machine-id"
with open(machine_id) as fd:
mid = fd.read(8)
except FileNotFoundError:
mid = "macos"
pid = os.getpid()
return f"api-{mid}-{pid}"
def query_hash(q: str) -> str:
"""Short hash used to identify query statements.
Allows correlating query statements between API logs and metrics
"""
return shake_128(q.encode()).hexdigest(4)
# # Clickhouse
def init_clickhouse_db(app) -> None:
"""Initializes Clickhouse session"""
url = app.config["CLICKHOUSE_URL"]
app.logger.info("Connecting to Clickhouse")
app.click = Clickhouse.from_url(url)
Query = Union[str, TextClause, Select]
def _run_query(query: Query, query_params: dict, query_prio=3):
settings = {"priority": query_prio, "max_execution_time": 28}
if isinstance(query, (Select, TextClause)):
query = str(query.compile(dialect=postgresql.dialect()))
try:
q = current_app.click.execute(
query, query_params, with_column_types=True, settings=settings
)
except clickhouse_driver.errors.ServerException as e:
log.info(e.message)
raise Exception("Database query error")
rows, coldata = q
colnames, coltypes = tuple(zip(*coldata))
return colnames, rows
def query_click(query: Query, query_params: dict, query_prio=3) -> List[Dict]:
colnames, rows = _run_query(query, query_params, query_prio=query_prio)
return [dict(zip(colnames, row)) for row in rows]
def query_click_one_row(
query: Query, query_params: dict, query_prio=3
) -> Optional[dict]:
colnames, rows = _run_query(query, query_params, query_prio=query_prio)
for row in rows:
return dict(zip(colnames, row))
return None
def insert_click(query, rows: list) -> int:
assert isinstance(rows, list)
settings = {"priority": 1, "max_execution_time": 300} # query_prio
return current_app.click.execute(query, rows, types_check=True, settings=settings)
def optimize_table(tblname: str) -> None:
settings = {"priority": 1, "max_execution_time": 300} # query_prio
sql = f"OPTIMIZE TABLE {tblname} FINAL"
current_app.click.execute(sql, {}, settings=settings)
def raw_query(query: Query, query_params: dict, query_prio=1):
settings = {"priority": query_prio, "max_execution_time": 300}
q = current_app.click.execute(
query, query_params, with_column_types=True, settings=settings
)
return q