forked from digitalshadows/splunk-soar-digitalshadows
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdigital_shadows_connector.py
103 lines (89 loc) · 4.82 KB
/
digital_shadows_connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File: digital_shadows_connector.py
#
# Licensed under Apache 2.0 (https://www.apache.org/licenses/LICENSE-2.0.txt)
#
import json
import phantom.app as phantom
from phantom.base_connector import BaseConnector
from digital_shadows_consts import DS_ACTION_NOT_SUPPORTED
from ds_databreach_connector import DSDataBreachConnector
from ds_incidents_connector import DSIncidentsConnector
from ds_intelligence_incidents_connector import DSIntelligenceIncidentsConnector
from ds_on_poll_connector import DSOnPollConnector
from ds_search_entities_connector import DSSearchEntitiesConnector
from ds_test_connectivity_connector import DSTestConnectivityConnector
class DigitalShadowsConnector(BaseConnector):
def __init__(self):
super(DigitalShadowsConnector, self).__init__()
def handle_action(self, param):
action_id = self.get_action_identifier()
if param:
self.save_progress("Ingesting handle action in: {}".format(param))
if action_id == 'test_connectivity':
test_connectivity_connector = DSTestConnectivityConnector(self)
return test_connectivity_connector.test_connectivity()
elif action_id == 'get_incident_by_id':
incidents_connector = DSIncidentsConnector(self)
return incidents_connector.get_incident_by_id(param)
elif action_id == 'get_incident_review_by_id':
incidents_connector = DSIncidentsConnector(self)
return incidents_connector.get_incident_review_by_id(param)
elif action_id == 'get_incident_list':
incidents_connector = DSIncidentsConnector(self)
return incidents_connector.get_incident_list(param)
elif action_id == 'post_incident_review':
incidents_connector = DSIncidentsConnector(self)
return incidents_connector.post_incident_review(param)
elif action_id == 'get_intelligence_incident_by_id':
intelligence_incidents_connector = DSIntelligenceIncidentsConnector(self)
return intelligence_incidents_connector.get_intelligence_incident_by_id(param)
elif action_id == 'get_intel_incident_ioc_by_id':
intelligence_incidents_connector = DSIntelligenceIncidentsConnector(self)
return intelligence_incidents_connector.get_intel_incident_ioc_by_id(param)
elif action_id == 'get_intelligence_incident':
intelligence_incidents_connector = DSIntelligenceIncidentsConnector(self)
return intelligence_incidents_connector.get_intelligence_incident(param)
elif action_id == 'get_data_breach':
databreach_connector = DSDataBreachConnector(self)
return databreach_connector.get_data_breach(param)
elif action_id == 'get_data_breach_by_id':
databreach_connector = DSDataBreachConnector(self)
return databreach_connector.get_data_breach_by_id(param)
elif action_id == 'get_data_breach_record':
databreach_connector = DSDataBreachConnector(self)
return databreach_connector.get_data_breach_record(param)
elif action_id == 'get_data_breach_record_by_id':
databreach_connector = DSDataBreachConnector(self)
return databreach_connector.get_data_breach_record_by_id(param)
elif action_id == 'get_data_breach_record_by_username':
databreach_connector = DSDataBreachConnector(self)
return databreach_connector.get_data_breach_record_by_username(param)
elif action_id == 'get_data_breach_record_reviews':
databreach_connector = DSDataBreachConnector(self)
return databreach_connector.get_data_breach_record_reviews(param)
elif action_id == 'post_breach_record_review':
databreach_connector = DSDataBreachConnector(self)
return databreach_connector.post_breach_record_review(param)
elif action_id == 'search_entities':
search_entities_connector = DSSearchEntitiesConnector(self)
return search_entities_connector.search_entities(param)
elif action_id == 'on_poll':
on_poll_connector = DSOnPollConnector(self)
return on_poll_connector.on_poll(param)
else:
self.save_progress(DS_ACTION_NOT_SUPPORTED.format(action_id))
return self.set_status(phantom.APP_ERROR, DS_ACTION_NOT_SUPPORTED.format(action_id))
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print("No test json specified as input")
exit(0)
with open(sys.argv[1]) as f:
in_json = f.read()
in_json = json.loads(in_json)
print(json.dumps(in_json, indent=4))
connector = DigitalShadowsConnector()
connector.print_progress_message = True
ret_val = connector._handle_action(json.dumps(in_json), None)
print(json.dumps(json.loads(ret_val), indent=4))
exit(0)