-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabus.py
105 lines (90 loc) · 3.81 KB
/
databus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import logging
import json
import inspect
# this is needed because we use it in the xml.
import random
import gevent
import gevent.event
from lxml import etree
logger = logging.getLogger(__name__)
class Databus(object):
def __init__(self):
self._data = {}
self._observer_map = {}
self.initialized = gevent.event.Event()
# the idea here is that we can store both values and functions in the key value store
# functions could be used if a profile wants to simulate a sensor, or the function
# could interface with a real sensor
def get_value(self, key):
logger.debug('DataBus: Get value from key: [%s]', key)
assert key in self._data
item = self._data[key]
if getattr(item, "get_value", None):
# this could potentially generate a context switch, but as long the called method
# does not "callback" the databus we should be fine
logger.debug('(K, V): (%s, %s)' % (key, item.get_value()))
return item.get_value()
elif hasattr(item, '__call__'):
return item()
else:
# guaranteed to not generate context switch
logger.debug('(K, V): (%s, %s)' % (key, item))
return item
def set_value(self, key, value):
logger.debug('DataBus: Storing key: [%s] value: [%s]', key, value)
self._data[key] = value
# notify observers
if key in self._observer_map:
gevent.spawn(self.notify_observers, key)
def notify_observers(self, key):
for cb in self._observer_map[key]:
cb(key)
def observe_value(self, key, callback):
assert hasattr(callback, '__call__')
assert len(inspect.getargspec(callback)[0]) # depreciated in py3.5, un-depreciated in py3.6
if key not in self._observer_map:
self._observer_map[key] = []
self._observer_map[key].append(callback)
def initialize(self, config_file):
self.reset()
assert self.initialized.isSet() is False
logger.debug('Initializing databus using %s.', config_file)
dom = etree.parse(config_file)
entries = dom.xpath('//core/databus/key_value_mappings/*')
for entry in entries:
key = entry.attrib['name']
value = entry.xpath('./value/text()')[0].strip()
value_type = str(entry.xpath('./value/@type')[0])
assert key not in self._data
logging.debug('Initializing %s with %s as a %s.', key, value, value_type)
if value_type == 'value':
self.set_value(key, eval(value))
elif value_type == 'function':
namespace, _classname = value.rsplit('.', 1)
params = entry.xpath('./value/@param')
module = __import__(namespace, fromlist=[_classname])
_class = getattr(module, _classname)
if len(params) > 0:
# eval param to list
params = eval(params[0])
self.set_value(key, _class(*(tuple(params))))
else:
self.set_value(key, _class())
else:
raise Exception('Unknown value type: {0}'.format(value_type))
self.initialized.set()
def get_shapshot(self):
# takes a snapshot of the internal honeypot state and returns it as json.
snapsnot = {}
for key in list(self._data.keys()):
snapsnot[key] = self.get_value(key)
return json.dumps(snapsnot)
def reset(self):
logger.debug('Resetting databus.')
# if the class has a stop method call it.
for value in list(self._data.values()):
if getattr(value, "stop", None):
value.stop()
self._data.clear()
self._observer_map.clear()
self.initialized.clear()