forked from Dmaina5054/InfluxLarkbot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueryinflux.py
63 lines (49 loc) · 1.63 KB
/
queryinflux.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import datetime as datetime
from distutils.log import debug
import sys
import time
import os
from dotenv import load_dotenv
from influxdb_client import InfluxDBClient, Point, Dialect
from influxdb_client.client.write_api import SYNCHRONOUS
from larkconn import sendalert
import asyncio
import csv
from cachealert import cachealert
from celery import Celery
from redis import Redis
load_dotenv()
token = os.getenv('token')
url = os.getenv('url')
redisclient = Redis(db=1)
bldgkeys = []
def queryInflux(bucket):
print(bucket)
with InfluxDBClient(url=url, token=token,org='AH',debug=False) as client:
query_api = client.query_api()
"""
Query: using Stream
"""
records = query_api.query_stream(f'''
from(bucket:"{bucket}")
|> range(start: -5m, stop: now())
|> filter(fn: (r) => r["_measurement"] == "ping")
|> filter(fn: (r) => r["_field"] == "percent_packet_loss")
|> filter(fn: (r) => r["_value"] >= 100)
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> yield(name: "mean")''')
if records:
for record in records:
#print(record)
#asyncio.sleep(2)
time.sleep(1)
try:
if record["name"]:
rec = f'{record["host"]} {record["name"]} Building'
bldgkeys.append(rec)
except Exception as e:
print(e)
# need to implement caching
#print(bldgkeys)
cachealert(bldgkeys)
return None