Skip to content

Commit b83f298

Browse files
committed
Improves support for Elasticsearch storage.
Requires Elasticsearch 7. Provides an index template to manage cloudkitty document mappings with the option to include user provided component templates. Uses dated indices and an index alias to allow for better management of data. Change-Id: Ie7301ebccc3a72876abcb5c4cc8c07ceaaa29e29
1 parent c540ac8 commit b83f298

File tree

8 files changed

+357
-37
lines changed

8 files changed

+357
-37
lines changed

cloudkitty/storage/v2/elasticsearch/__init__.py

+52-21
Original file line numberDiff line numberDiff line change
@@ -37,21 +37,35 @@
3737
default='http://localhost:9200'),
3838
cfg.StrOpt(
3939
'index_name',
40-
help='Elasticsearch index to use. Defaults to "cloudkitty".',
40+
help='Elasticsearch index to use. '
41+
'Defaults to "cloudkitty".',
4142
default='cloudkitty'),
42-
cfg.BoolOpt('insecure',
43-
help='Set to true to allow insecure HTTPS '
44-
'connections to Elasticsearch',
45-
default=False),
46-
cfg.StrOpt('cafile',
47-
help='Path of the CA certificate to trust for '
48-
'HTTPS connections.',
49-
default=None),
50-
cfg.IntOpt('scroll_duration',
51-
help="Duration (in seconds) for which the ES scroll contexts "
52-
"should be kept alive.",
53-
advanced=True,
54-
default=30, min=0, max=300),
43+
cfg.StrOpt(
44+
'template_name',
45+
help='Elasticsearch template name to use. '
46+
'Defaults to "cloudkitty_mapping".',
47+
default='cloudkitty_mapping'),
48+
cfg.ListOpt(
49+
'component_templates',
50+
help='List of Elasticsearch component template '
51+
'names to include in the index template. ',
52+
default=[]),
53+
cfg.BoolOpt(
54+
'insecure',
55+
help='Set to true to allow insecure HTTPS '
56+
'connections to Elasticsearch',
57+
default=False),
58+
cfg.StrOpt(
59+
'cafile',
60+
help='Path of the CA certificate to trust for '
61+
'HTTPS connections.',
62+
default=None),
63+
cfg.IntOpt(
64+
'scroll_duration',
65+
help="Duration (in seconds) for which the ES scroll contexts "
66+
"should be kept alive.",
67+
advanced=True,
68+
default=30, min=0, max=300)
5569
]
5670

5771
CONF.register_opts(elasticsearch_storage_opts, ELASTICSEARCH_STORAGE_GROUP)
@@ -100,14 +114,31 @@ def __init__(self, *args, **kwargs):
100114
verify=verify)
101115

102116
def init(self):
103-
r = self._conn.get_index()
104-
if r.status_code != 200:
105-
raise exceptions.IndexDoesNotExist(
106-
CONF.storage_elasticsearch.index_name)
107-
LOG.info('Creating mapping "_doc" on index {}...'.format(
117+
LOG.info('Creating index template for mapping.')
118+
index_pattern = "{}-*".format(CONF.storage_elasticsearch.index_name)
119+
component_templates = CONF.storage_elasticsearch.component_templates
120+
index_template = self._conn.build_index_template(
121+
index_pattern, component_templates, CLOUDKITTY_INDEX_MAPPING)
122+
self._conn.put_index_template(
123+
CONF.storage_elasticsearch.template_name, index_template)
124+
LOG.info('Index template for mapping created.')
125+
126+
# If index_name exists, test to ensure it is an alias
127+
if self._conn.exists_index():
128+
if not self._conn.is_index_alias():
129+
raise exceptions.IndexAliasAlreadyExists(
130+
CONF.storage_elasticsearch.index_name)
131+
LOG.info('Index alias already exists. Skipping creation.')
132+
133+
# Otherwise create a dated index with index_name as an alias
134+
else:
135+
LOG.info('Creating first index.')
136+
self._conn.put_first_index()
137+
138+
# Rollover index on startup
139+
LOG.info('Rolling over index {}'.format(
108140
CONF.storage_elasticsearch.index_name))
109-
self._conn.put_mapping(CLOUDKITTY_INDEX_MAPPING)
110-
LOG.info('Mapping created.')
141+
self._conn.post_index_rollover()
111142

112143
def push(self, dataframes, scope_id=None):
113144
for frame in dataframes:

cloudkitty/storage/v2/elasticsearch/client.py

+116-5
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ def _build_should(filters):
9292
{'term': {'metadata.' + k: v}}]
9393
return should
9494

95-
def _build_composite(self, groupby):
95+
@staticmethod
96+
def _build_composite(groupby):
9697
if not groupby:
9798
return []
9899
sources = []
@@ -147,11 +148,121 @@ def _req(self, method, url, data, params, deserialize=True):
147148
self._log_query(url, data, output)
148149
return output
149150

151+
def _req_unsafe(self, method, url, data, params):
152+
"""Request without exception on invalid HTTP codes."""
153+
return method(url, data=data, params=params)
154+
155+
def _req_exists(self, url, data, params):
156+
r = self._sess.head(url, data=data, params=params)
157+
if r.status_code == 200:
158+
return True
159+
elif r.status_code == 404:
160+
return False
161+
else:
162+
raise exceptions.InvalidStatusCode(
163+
"200/404", r.status_code, r.text, None)
164+
165+
@staticmethod
166+
def build_index_template(index_pattern, component_templates, mapping):
167+
"""Build an index template for mapping."""
168+
# High priority template to avoid being overridden.
169+
template_priority = 500
170+
return {
171+
"index_patterns": [index_pattern],
172+
"priority": template_priority,
173+
"composed_of": component_templates,
174+
"template": {
175+
"mappings": mapping
176+
}
177+
}
178+
179+
def put_index_template(self, template_name, template):
180+
"""Does a PUT request against the ES template API.
181+
182+
Does a PUT request against `/_template/<template_name>`
183+
if es6 or `/_index_template/<template_name>` if es7.
184+
185+
:param template_name: index template name
186+
:type template_name: string
187+
:param template: index template
188+
:type template: dict
189+
:rtype: requests.models.Response
190+
"""
191+
url = '/'.join(
192+
(self._url, '_index_template', template_name))
193+
data = json.dumps(template)
194+
LOG.debug('Creating index template {} with data:\n{}'.format(
195+
template_name, data))
196+
return self._req(
197+
self._sess.put, url, data, None, deserialize=False)
198+
199+
def put_first_index(self):
200+
"""Does a PUT request against the ES index API.
201+
202+
Does a PUT request against `/<index_name>-{now/d}-000001`.
203+
204+
Creates a dated index with an alias for which it is the write index.
205+
206+
:rtype: requests.models.Response
207+
"""
208+
# Percent encode the / (%2F) in the date math for the index name
209+
index_string = "<{}{}>".format(self._index_name, "-{now%2Fd}-000001")
210+
url = '/'.join((self._url, index_string))
211+
aliases = {
212+
"aliases": {
213+
self._index_name: {
214+
"is_write_index": True
215+
}
216+
}
217+
}
218+
LOG.debug('Creating index {} with data:\n{}'.format(
219+
index_string, json.dumps(aliases)))
220+
return self._req(
221+
self._sess.put, url, json.dumps(aliases), None, deserialize=False)
222+
223+
def post_index_rollover(self):
224+
"""Does a POST request against the ES index API.
225+
226+
Does a POST request against `/<index_name>/_rollover`.
227+
228+
Performs a rollover of the index alias.
229+
230+
:rtype: requests.models.Response
231+
"""
232+
url = '/'.join((self._url, self._index_name, '_rollover'))
233+
self._req(self._sess.post, url, None, None, deserialize=False)
234+
235+
def exists_index(self):
236+
"""Does a HEAD request against the ES index API.
237+
238+
Does a HEAD request against `/<index_name>`.
239+
240+
Tests if an index or index alias exists.
241+
242+
:rtype: Boolean
243+
"""
244+
url = '/'.join((self._url, self._index_name))
245+
param = {"allow_no_indices": "false"}
246+
return self._req_exists(url, None, param)
247+
248+
def is_index_alias(self):
249+
"""Does a HEAD request against the ES alias API.
250+
251+
Does a HEAD request against `/_alias/<index_name>`.
252+
253+
Tests if an index alias exists.
254+
255+
:rtype: Boolean
256+
"""
257+
url = '/'.join((self._url, '_alias', self._index_name))
258+
return self._req_exists(url, None, None)
259+
150260
def put_mapping(self, mapping):
151261
"""Does a PUT request against ES's mapping API.
152262
153-
The PUT request will be done against
154-
`/<index_name>/_mapping/<mapping_name>`
263+
Does a PUT request against `/<index_name>/_mapping/<mapping_name>`
264+
265+
Creates or updates an index mapping.
155266
156267
:mapping: body of the request
157268
:type mapping: dict
@@ -168,7 +279,7 @@ def put_mapping(self, mapping):
168279
def get_index(self):
169280
"""Does a GET request against ES's index API.
170281
171-
The GET request will be done against `/<index_name>`
282+
Does a GET request against `/<index_name>`
172283
173284
:rtype: requests.models.Response
174285
"""
@@ -360,7 +471,7 @@ def total(self, begin, end, metric_types, filters, groupby,
360471

361472
must = self._build_must(begin, end, metric_types, filters)
362473
should = self._build_should(filters)
363-
composite = self._build_composite(groupby) if groupby else None
474+
composite = self._build_composite(groupby)
364475
if composite:
365476
composite['size'] = self._chunk_size
366477
query = self._build_query(must, should, composite)

cloudkitty/storage/v2/elasticsearch/exceptions.py

+8
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,11 @@ def __init__(self, index_name):
3030
super(IndexDoesNotExist, self).__init__(
3131
"Elasticsearch index {} does not exist".format(index_name)
3232
)
33+
34+
35+
class IndexAliasAlreadyExists(BaseElasticsearchException):
36+
def __init__(self, index_name):
37+
super(IndexAliasAlreadyExists, self).__init__(
38+
"Elasticsearch index alias {} already exists as an index".format(
39+
index_name)
40+
)

cloudkitty/tests/storage/v2/elasticsearch/test_client.py

+132
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from dateutil import tz
2121

2222
from cloudkitty import dataframe
23+
import cloudkitty.storage.v2.elasticsearch
2324
from cloudkitty.storage.v2.elasticsearch import client
2425
from cloudkitty.storage.v2.elasticsearch import exceptions
2526

@@ -181,6 +182,137 @@ def test_req_invalid_status_code(self):
181182
self.client._req,
182183
method_mock, None, None, None)
183184

185+
def test_req_unsafe(self):
186+
url = '/endpoint'
187+
data = {'1': 'one'}
188+
params = {'v'}
189+
resp_mock = mock.MagicMock()
190+
resp_mock.status_code = 400
191+
method_mock = mock.MagicMock()
192+
method_mock.return_value = resp_mock
193+
req_resp = self.client._req_unsafe(
194+
method_mock, url, data, params)
195+
method_mock.assert_called_once_with(
196+
url, data=data, params=params)
197+
self.assertEqual(req_resp, resp_mock)
198+
199+
def test_req_exists(self):
200+
url = '/endpoint'
201+
data = {'1': 'one'}
202+
params = {'v'}
203+
resp_mock = mock.MagicMock()
204+
resp_mock.status_code = 200
205+
with mock.patch.object(self.client._sess, 'head') as hmock:
206+
hmock.return_value = resp_mock
207+
self.client._req_exists(
208+
url, data=data, params=params)
209+
hmock.assert_called_once_with(
210+
url, data=data, params=params)
211+
212+
def test_req_exists_true(self):
213+
url = '/endpoint'
214+
resp_mock = mock.MagicMock()
215+
resp_mock.status_code = 200
216+
with mock.patch.object(self.client._sess, 'head') as hmock:
217+
hmock.return_value = resp_mock
218+
self.assertTrue(self.client._req_exists(
219+
url, data=None, params=None))
220+
221+
def test_req_exists_false(self):
222+
url = '/endpoint'
223+
resp_mock = mock.MagicMock()
224+
resp_mock.status_code = 404
225+
with mock.patch.object(self.client._sess, 'head') as hmock:
226+
hmock.return_value = resp_mock
227+
self.assertFalse(self.client._req_exists(
228+
url, data=None, params=None))
229+
230+
def test_req_exists_exception(self):
231+
url = '/endpoint'
232+
resp_mock = mock.MagicMock()
233+
resp_mock.status_code = 418 # I'm a teapot
234+
with mock.patch.object(self.client._sess, 'head') as hmock:
235+
hmock.return_value = resp_mock
236+
self.assertRaises(exceptions.InvalidStatusCode,
237+
self.client._req_exists,
238+
url, data=None, params=None)
239+
240+
def test_build_index_template(self):
241+
index_pattern = "cloudkitty-*"
242+
mapping = cloudkitty.storage.v2.elasticsearch.CLOUDKITTY_INDEX_MAPPING
243+
component_templates = ["cloudkitty_settings"]
244+
expected = {
245+
"index_patterns": ["cloudkitty-*"],
246+
"priority": 500,
247+
"composed_of": component_templates,
248+
"template": {
249+
"mappings": mapping
250+
}
251+
}
252+
self.assertEqual(
253+
self.client.build_index_template(
254+
index_pattern, component_templates, mapping), expected)
255+
256+
def test_put_index_template(self):
257+
template_name = 'test_template'
258+
template = {
259+
"index_patterns": ["index_name-*"],
260+
"priority": 500,
261+
"template": {
262+
"mappings": "fake_mapping"
263+
}
264+
}
265+
expected_data = \
266+
('{"index_patterns": ["index_name-*"], "priority": 500, '
267+
'"template": {"mappings": "fake_mapping"}}')
268+
with mock.patch.object(self.client, '_req') as rmock:
269+
self.client.put_index_template(
270+
template_name, template)
271+
rmock.assert_called_once_with(
272+
self.client._sess.put,
273+
'http://elasticsearch:9200/_index_template/test_template',
274+
expected_data, None, deserialize=False)
275+
276+
def test_put_first_index(self):
277+
expected_data = '{"aliases": {"index_name": {"is_write_index": true}}}'
278+
with mock.patch.object(self.client, '_req') as rmock:
279+
self.client.put_first_index()
280+
rmock.assert_called_once_with(
281+
self.client._sess.put,
282+
'http://elasticsearch:9200/<index_name-{now%2Fd}-000001>',
283+
expected_data, None, deserialize=False)
284+
285+
def test_post_index_rollover(self):
286+
with mock.patch.object(self.client, '_req') as rmock:
287+
self.client.post_index_rollover()
288+
rmock.assert_called_once_with(
289+
self.client._sess.post,
290+
'http://elasticsearch:9200/index_name/_rollover',
291+
None, None, deserialize=False)
292+
293+
def test_exists_index(self):
294+
expected_param = {"allow_no_indices": "false"}
295+
resp_mock = mock.MagicMock()
296+
resp_mock.status_code = 200
297+
with mock.patch.object(self.client._sess, 'head') as hmock:
298+
hmock.return_value = resp_mock
299+
r = self.client.exists_index()
300+
hmock.assert_called_once_with(
301+
'http://elasticsearch:9200/index_name',
302+
data=None, params=expected_param)
303+
self.assertTrue(r)
304+
305+
def test_is_index_alias(self):
306+
resp_mock = mock.MagicMock()
307+
resp_mock.status_code = 200
308+
with mock.patch.object(self.client._sess, 'head') as hmock:
309+
hmock.return_value = resp_mock
310+
r = self.client.is_index_alias()
311+
hmock.assert_called_once_with(
312+
'http://elasticsearch:9200/_alias/index_name',
313+
data=None, params=None)
314+
self.assertTrue(r)
315+
184316
def test_put_mapping(self):
185317
mapping = {'a': 'b'}
186318
with mock.patch.object(self.client, '_req') as rmock:

0 commit comments

Comments
 (0)