Skip to content

Commit

Permalink
Merge pull request #46 from CybercentreCanada/feature/full_safelist
Browse files Browse the repository at this point in the history
Get the full safelist related to a specific tags
  • Loading branch information
cccs-sgaron authored Jul 24, 2021
2 parents b56c8e6 + a26dae5 commit 2d86798
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 2 deletions.
68 changes: 66 additions & 2 deletions assemblyline_service_server/api/v1/safelist.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import yaml

from assemblyline_service_server.api.base import make_subapi_blueprint, make_api_response, api_login
from assemblyline_service_server.config import STORAGE
from flask import request

from assemblyline.common import forge
from assemblyline_service_server.api.base import api_login, make_api_response, make_subapi_blueprint
from assemblyline_service_server.config import STORAGE, config

SUB_API = 'safelist'
safelist_api = make_subapi_blueprint(SUB_API, api_version=1)
Expand Down Expand Up @@ -33,3 +37,63 @@ def exists(qhash, **_):
return make_api_response(safelist)

return make_api_response(None, "The hash was not found in the safelist.", 404)


@safelist_api.route("/", methods=["GET"])
@api_login()
def get_safelist_for_tags(**_):
"""
Get the safelist for a given list of tags
Variables:
tags => List of tag types (comma seperated)
Arguments:
None
Data Block:
None
API call example:
GET /api/v1/safelist/?tags=network.static.domain,network.dynamic.domain
Result example:
{
"match": { # List of direct matches by tag type
"network.static.domain": ["google.ca"],
"network.dynamic.domain": ["updates.microsoft.com"]
},
"regex": { # List of regular expressions by tag type
"network.static.domain": ["*.cyber.gc.ca"],
"network.dynamic.domain": ["*.cyber.gc.ca"]
}
}
"""
tag_types = request.args.get('tags', None)
if tag_types:
tag_types = tag_types.split(',')

with forge.get_cachestore('system', config=config, datastore=STORAGE) as cache:
tag_safelist_yml = cache.get('tag_safelist_yml')
if tag_safelist_yml:
tag_safelist_data = yaml.safe_load(tag_safelist_yml)
else:
tag_safelist_data = forge.get_tag_safelist_data()

if tag_types:
output = {
'match': {k: v for k, v in tag_safelist_data.get('match', {}).items() if k in tag_types or tag_types == []},
'regex': {k: v for k, v in tag_safelist_data.get('regex', {}).items() if k in tag_types or tag_types == []},
}
for tag in tag_types:
for sl in STORAGE.safelist.stream_search(f"type:tag AND enabled:true AND tag.type:{tag}", as_obj=False):
output['match'].setdefault(sl['tag']['type'], [])
output['match'][sl['tag']['type']].append(sl['tag']['value'])

else:
output = tag_safelist_data
for sl in STORAGE.safelist.stream_search("type:tag AND enabled:true", as_obj=False):
output['match'].setdefault(sl['tag']['type'], [])
output['match'][sl['tag']['type']].append(sl['tag']['value'])

return make_api_response(output)
41 changes: 41 additions & 0 deletions test/test_srv_safelist.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,44 @@ def test_safelist_missing(client, storage):
resp = client.get(f'/api/v1/safelist/{invalid_hash}/', headers=headers)
assert resp.status_code == 404
assert resp.json['api_response'] is None


# noinspection PyUnusedLocal
def test_get_full_safelist(client, storage):
storage.safelist.search = {
"offset": 0,
"rows": 0,
"total": 0,
"items": []
}

resp = client.get('/api/v1/safelist/', headers=headers)
assert resp.status_code == 200
assert 'match' in resp.json['api_response']
assert 'regex' in resp.json['api_response']
assert isinstance(resp.json['api_response']['match'], dict)
assert isinstance(resp.json['api_response']['regex'], dict)


# noinspection PyUnusedLocal
def test_get_full_safelist_specific(client, storage):
storage.safelist.search = {
"offset": 0,
"rows": 0,
"total": 0,
"items": []
}

tag_type = "network.dynamic.domain"
resp = client.get(f'/api/v1/safelist/?tags={tag_type}', headers=headers)
assert resp.status_code == 200
assert 'match' in resp.json['api_response']
assert 'regex' in resp.json['api_response']
assert isinstance(resp.json['api_response']['match'], dict)
assert isinstance(resp.json['api_response']['regex'], dict)

for k in resp.json['api_response']['match']:
assert k == tag_type

for k in resp.json['api_response']['regex']:
assert k == tag_type

0 comments on commit 2d86798

Please sign in to comment.