-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathtest_run_job.py
117 lines (104 loc) · 4.02 KB
/
test_run_job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import json
import logging
import logging.handlers
import os
import httpx
from .crud import (
get_scheduler_job,
)
dir_path = os.path.dirname(os.path.realpath(__file__))
logname = os.path.join(dir_path, "test_run_job.log")
logger = logging.getLogger("scheduler testlog")
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(filename=logname, encoding="utf-8", mode="a")
dt_fmt = "%Y-%m-%d %H:%M:%S"
formatter = logging.Formatter(
"[{asctime}] [{levelname}] {name}: {message}", dt_fmt, style="{"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
async def test_job(job_id: str) -> str:
"""
A clone of what is actually run when run_cron_job.py is executed
This is used to execute the API call and log the result.
"""
try:
# print(f'[test_run_job]: jobid: {job_id} adminkey: {adminkey}')
jobinfo = await get_scheduler_job(job_id)
assert jobinfo, "Job not found"
# print(f'[test_run_job]: get scheduler job created: {jobinfo}')
body_json: dict = {}
json_headers = {}
method_name = jobinfo.selectedverb
url = jobinfo.url
assert url, "No URL found"
body = jobinfo.body
headers = jobinfo.headers
assert headers, "No headers found"
if body is None:
body_json = {}
elif len(body) > 0:
body_json = json.loads(body)
# await process_json_body(body)
# print(f' Length of body data {len(body)}')
for h in headers:
key = h.key
value = h.value
json_headers.update({key: value})
# print(f'key: {key} value: {value}')
# print(f'body_json: {body_json}')
# print(f'headers_json: {json_headers}')
logger.info(
"[test_run_job]: url: %s headers: %s body: %s", url, json_headers, body_json
)
response = None
# GET response
if method_name == "GET":
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=json_headers, params=body_json)
logger.info(
"[test_run_job]: response status from api call: %s",
response.status_code,
)
logger.info(
"[test_run_job]: response text from api call: %s", response.text
)
# POST response
elif method_name == "POST":
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=json_headers, data=body_json)
logger.info(
"[test_run_job]: response status from api call: "
f"{response.status_code}"
)
logger.info(
f"[test_run_job]: response text from api call: {response.text}"
)
# PUT response
elif method_name == "PUT":
async with httpx.AsyncClient() as client:
response = await client.put(url, headers=json_headers, data=body_json)
logger.info(
"[test_run_job]: response status from api call: "
f"{response.status_code}"
)
logger.info(
f"[test_run_job]: response text from api call: {response.text}"
)
# DELETE response
elif method_name == "DELETE":
async with httpx.AsyncClient() as client:
response = await client.delete(url, headers=json_headers)
logger.info(
"[test_run_job]: response status from api call: "
f"{response.status_code}"
)
logger.info(
f"[test_run_job]: response text from api call: {response.text}"
)
# return "testjob 1234"
assert response, "No response from API call"
return response.text
except Exception as e:
logger.error("[test_job]:Exception thrown in [test_job]: %s", e)
return str(e)