forked from chromium/chromium
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathresult_sink_util.py
139 lines (116 loc) · 4.79 KB
/
result_sink_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# Copyright 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import atexit
import base64
import cgi
import json
import logging
import os
import requests
LOGGER = logging.getLogger(__name__)
# VALID_STATUSES is a list of valid status values for test_result['status'].
# The full list can be obtained at
# https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/proto/v1/test_result.proto;drc=ca12b9f52b27f064b0fa47c39baa3b011ffa5790;l=151-174
VALID_STATUSES = {"PASS", "FAIL", "CRASH", "ABORT", "SKIP"}
def _compose_test_result(test_id, status, expected, test_log=None, tags=None):
"""Composes the test_result dict item to be posted to result sink.
Args:
test_id: (str) A unique identifier of the test in LUCI context.
status: (str) Status of the test. Must be one in |VALID_STATUSES|.
expected: (bool) Whether the status is expected.
test_log: (str) Log of the test. Optional.
tags: (list) List of tags. Each item in list should be a length 2 tuple of
string as ("key", "value"). Optional.
Returns:
A dict of test results with input information, confirming to
https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/sink/proto/v1/test_result.proto
"""
assert status in VALID_STATUSES, (
'%s is not a valid status (one in %s) for ResultSink.' %
(status, VALID_STATUSES))
for tag in tags or []:
assert len(tag) == 2, 'Items in tags should be length 2 tuples of strings'
assert isinstance(tag[0], str) and isinstance(
tag[1], str), ('Items in'
'tags should be length 2 tuples of strings')
test_result = {
'testId': test_id,
'status': status,
'expected': expected,
'tags': [{
'key': key,
'value': value
} for (key, value) in (tags or [])]
}
if test_log:
test_result['summaryHtml'] = '<text-artifact artifact-id="Test Log" />'
test_result['artifacts'] = {
'Test Log': {
'contents': base64.b64encode(test_log)
},
}
return test_result
class ResultSinkClient(object):
"""Stores constants and handles posting to ResultSink."""
def __init__(self):
"""Initiates and stores constants to class."""
self.sink = None
luci_context_file = os.environ.get('LUCI_CONTEXT')
if not luci_context_file:
logging.warning('LUCI_CONTEXT not found in environment. ResultDB'
' integration disabled.')
return
with open(luci_context_file) as f:
self.sink = json.load(f).get('result_sink')
if not self.sink:
logging.warning('ResultSink constants not found in LUCI context.'
' ResultDB integration disabled.')
return
self.url = ('http://%s/prpc/luci.resultsink.v1.Sink/ReportTestResults' %
self.sink['address'])
self.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'ResultSink %s' % self.sink['auth_token'],
}
self._session = requests.Session()
# Ensure session is closed at exit.
atexit.register(self.close)
logging.getLogger("requests").setLevel(logging.WARNING)
def close(self):
"""Closes the connection to result sink server."""
if not self.sink:
return
LOGGER.info('Closing connection with result sink server.')
# Reset to default logging level of test runner scripts.
logging.getLogger("requests").setLevel(logging.DEBUG)
self._session.close()
def post(self, test_id, status, expected, **kwargs):
"""Composes and posts a test and status to result sink.
Args:
test_id: (str) A unique identifier of the test in LUCI context.
status: (str) Status of the test. Must be one in |VALID_STATUSES|.
expected: (bool) Whether the status is expected.
**kwargs: Optional keyword args. Namely:
test_log: (str) Log of the test. Optional.
tags: (list) List of tags. Each item in list should be a length 2 tuple
of string as ("key", "value"). Optional.
"""
if not self.sink:
return
self._post_test_result(
_compose_test_result(test_id, status, expected, **kwargs))
def _post_test_result(self, test_result):
"""Posts single test result to server.
This method assumes |self.sink| is not None.
Args:
test_result: (dict) Confirming to protocol defined in
https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/sink/proto/v1/test_result.proto
"""
res = self._session.post(
url=self.url,
headers=self.headers,
data=json.dumps({'testResults': [test_result]}),
)
res.raise_for_status()